zoukankan      html  css  js  c++  java
  • storm kafka整合

    public class KafkaTopo {
    	
    	public static void main(String[] args) {
    		String zkRoot = "/kafka-storm";
    		String spoutId = "KafkaSpout";
    		BrokerHosts brokerHosts = new ZkHosts("m2:2181,m7:2181,m8:2181"); 
    		SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, "test2", zkRoot, spoutId);
    		// spoutConfig.forceFromStart = true;
    		spoutConfig.scheme = new SchemeAsMultiScheme(new MessageScheme());
    		TopologyBuilder builder = new TopologyBuilder();
    		//设置一个spout用来从kaflka消息队列中读取数据并发送给下一级的bolt组件,此处用的spout组件并非自定义的,而是storm中已经开发好的KafkaSpout
    		builder.setSpout("KafkaSpout", new KafkaSpout(spoutConfig));
    		builder.setBolt("UpperBolt", new UpperBolt()).shuffleGrouping("KafkaSpout");
    		builder.setBolt("ExtBolt", new ExtBolt(), 4).fieldsGrouping("UpperBolt", new Fields("name"));
    		Config conf = new Config();
    		conf.setNumWorkers(4);
    		conf.setNumAckers(0);
    		conf.setDebug(false);
    		
    		//LocalCluster用来将topology提交到本地模拟器运行,方便开发调试
    		LocalCluster cluster = new LocalCluster();
    		cluster.submitTopology("WordCount", conf, builder.createTopology());
    		
    		//提交topology到storm集群中运行
    //		StormSubmitter.submitTopology("sufei-topo", conf, builder.createTopology());
    	}
    	
    }
    

      

  • 相关阅读:
    决策树理解
    堆排序
    glove理解
    PHP图片水印类
    宝塔nginx安装rtmp模块实现推拉流
    nginx安装配置
    结构体,位域,共用体
    指针
    升级mac Catalina版本后无操作权限
    脚本连接linux服务器
  • 原文地址:https://www.cnblogs.com/heml/p/6074960.html
Copyright © 2011-2022 走看看