zoukankan      html  css  js  c++  java
  • storm kafka整合

    public class KafkaTopo {
    	
    	public static void main(String[] args) {
    		String zkRoot = "/kafka-storm";
    		String spoutId = "KafkaSpout";
    		BrokerHosts brokerHosts = new ZkHosts("m2:2181,m7:2181,m8:2181"); 
    		SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, "test2", zkRoot, spoutId);
    		// spoutConfig.forceFromStart = true;
    		spoutConfig.scheme = new SchemeAsMultiScheme(new MessageScheme());
    		TopologyBuilder builder = new TopologyBuilder();
    		//设置一个spout用来从kaflka消息队列中读取数据并发送给下一级的bolt组件,此处用的spout组件并非自定义的,而是storm中已经开发好的KafkaSpout
    		builder.setSpout("KafkaSpout", new KafkaSpout(spoutConfig));
    		builder.setBolt("UpperBolt", new UpperBolt()).shuffleGrouping("KafkaSpout");
    		builder.setBolt("ExtBolt", new ExtBolt(), 4).fieldsGrouping("UpperBolt", new Fields("name"));
    		Config conf = new Config();
    		conf.setNumWorkers(4);
    		conf.setNumAckers(0);
    		conf.setDebug(false);
    		
    		//LocalCluster用来将topology提交到本地模拟器运行,方便开发调试
    		LocalCluster cluster = new LocalCluster();
    		cluster.submitTopology("WordCount", conf, builder.createTopology());
    		
    		//提交topology到storm集群中运行
    //		StormSubmitter.submitTopology("sufei-topo", conf, builder.createTopology());
    	}
    	
    }
    

      

  • 相关阅读:
    node学习报错之883
    vue-cli4创建项目
    Decorator学习笔记
    合天网安实验室学习笔记----Linux基础
    IDF实验室解题学习笔记1
    QA笑话----杂思
    测试优先
    Python的IDE:Eclipse+PyDev配置
    JS实现浏览器的title闪烁
    JSTL实现分页
  • 原文地址:https://www.cnblogs.com/heml/p/6074960.html
Copyright © 2011-2022 走看看