zoukankan      html  css  js  c++  java
  • storm-kafka的使用

    storm-kafka的使用

    1.storm-kafka介绍

    storm-kafka是storm自带的从kafka上获取消息的kafka客户端程序。
    提供kafka和Trident的spout实现从kafka消费数据。

    2.storm-kafka的使用实例

    maven的依赖配置文件,要注意strom-kafka是使用的kafka的低级api,因此也要引用kafka的包。如果不引,虽然编译不报错,但运行时会报错,我在初次使用时就是因为这个原因一直有问题。

    		<dependency>
    			<groupId>org.apache.storm</groupId>
    			<artifactId>storm-core</artifactId>
    			<version>0.9.5</version>
    			<scope>provided</scope>
    		</dependency>
    		<dependency>
    			<groupId>org.apache.storm</groupId>
    			<artifactId>storm-kafka</artifactId>
    			<version>0.9.5</version>
    			<scope>provided</scope>
    		</dependency>
    		<dependency>
    			<groupId>org.apache.kafka</groupId>
    			<artifactId>kafka_2.10</artifactId>
    			<version>0.8.1.1</version>
    			<exclusions>
    				<exclusion>
    					<groupId>org.apache.zookeeper</groupId>
    					<artifactId>zookeeper</artifactId>
    				</exclusion>
    				<exclusion>
    					<groupId>log4j</groupId>
    					<artifactId>log4j</artifactId>
    				</exclusion>
    			</exclusions>
    		</dependency>
    

    下面是我写的一个demo
    具体步骤如下
    1.new BrokerHosts
    需要的参数zookeeper的地址
    2.new SpoutConfig
    构建SpoutConfig,需要设置BrokerHosts,kafka的topic,strom在zookeeper上的根等相关的参数。
    3.new TopologyBuilder
    给TopologyBuilder设置Soupt和Boult用于构建一个Topology
    4.配置Config并设置参数,启动LocalCluster,提交topology任务。

    import java.util.Arrays;
    import java.util.Map;
    import storm.kafka.BrokerHosts;
    import storm.kafka.KafkaSpout;
    import storm.kafka.SpoutConfig;
    import storm.kafka.StringScheme;
    import storm.kafka.ZkHosts;
    import backtype.storm.Config;
    import backtype.storm.LocalCluster;
    import backtype.storm.spout.SchemeAsMultiScheme;
    import backtype.storm.task.OutputCollector;
    import backtype.storm.task.TopologyContext;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.topology.TopologyBuilder;
    import backtype.storm.topology.base.BaseRichBolt;
    import backtype.storm.tuple.Tuple;
    
    /**
     * @Description:strom-kafka 使用
     * @author:difeng
     * @time:2015年11月18日 上午10:18:31
     */
    public class StormKafkaConsumer {
    	
    	public static class PingCounter extends BaseRichBolt{
    		/**
    		 * 
    		 */
    		private static final long serialVersionUID = 1L;
    		private OutputCollector collector;
    		@Override
    		public void execute(Tuple input) {
    			String msg = input.getString(0);
    			System.out.println("---------------------" + msg + "-----------------");
    			collector.ack(input);
    		}
    
    		@Override
    		public void prepare(Map arg0, TopologyContext arg1, OutputCollector arg2) {
    			this.collector = arg2;
                System.out.println("++++++++++++++++++++prepare++++++++++++++++++++++++++++++++++");
    		}
    
    		@Override
    		public void declareOutputFields(OutputFieldsDeclarer arg0) {
    			// TODO Auto-generated method stub
    			 System.out.println("++++++++++++++++++++declareOutputFields+++++++++++++++++++++");
    		}
    
    	}
    
    	public static void main(String[] args) {
    	    //zookeeper的服务器地址
    		String zks = "192.168.1.50:2181,192.168.1.57:2181,192.168.1.58:2181";
    	    //消息的topic
    		String topic = "data_icmp_ping";
    		//strom在zookeeper上的根
    		String zkRoot = "/storm"; 
    		String id = "data_icmp_ping";
    		BrokerHosts brokerHosts = new ZkHosts(zks);
    		SpoutConfig spoutConf = new SpoutConfig(brokerHosts, topic, zkRoot, id);
    		spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
    		spoutConf.forceFromStart = true;
    		spoutConf.zkServers = Arrays.asList(new String[] {"192.168.1.50,192.168.1.57,192.168.1.58"});
    		spoutConf.zkPort = 2181;
    		
    		TopologyBuilder builder = new TopologyBuilder();
    		builder.setSpout("kafka-reader", new KafkaSpout(spoutConf),3); 
    		builder.setBolt("ping-counter", new PingCounter(),3).shuffleGrouping("kafka-reader");
    		Config conf = new Config();
            conf.setDebug(true);
            //设置任务线程数
            conf.setMaxTaskParallelism(1);
            
            LocalCluster cluster = new LocalCluster();
    		cluster.submitTopology("test", conf, builder.createTopology());
    		try {
    			Thread.sleep(60000);
    		} catch (InterruptedException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
    		cluster.shutdown();
    	}
    }
    
  • 相关阅读:
    ORA00600: internal error code, arguments: [15160]
    My Opinion On Exadata V2
    11g Release 2 enhanced Tablespace Point In Time Recovery
    Oct 12: Patch Set Update Released
    Sqlserver:代理作业调度的时间不准确.作业停止问题
    破解ASA数据库的密码
    Sqlserver:添加 “windows用户组”到sqlserver的格式要求
    Sybase:数据类型(对比sqlserver)
    Sybase:基本语言元素(对比sqlserver)
    ASP 精品网站
  • 原文地址:https://www.cnblogs.com/difeng/p/5097220.html
Copyright © 2011-2022 走看看