zoukankan      html  css  js  c++  java
  • storm中KafkaSpout的选择


    Storm最常用的消息源就是Kafka,在对接的时候大多需要使用KafkaSpout;

    在网上大概有两种KafkaSpout,一种是只有几十行,一种却有一大啪啦类文件。


    在kafka中,同一个partition中的消息只能被同一个组的一个consumer消费,不能并发,所以kafka的并发说的是多partition的并发;

    kafka的consumer API分为high level consumer和low level consumer,官方建议使用前者,以为不用关心partition、offset那些,但是后者也有其存在的意义:1.多次读取的时候;2.选择性读取部分消息;3.控制消费过程。


    写法比较简单的KafkaSpout:

     1 public class KafkaSpouttest implements IRichSpout {
     2 
     3     private static final long serialVersionUID = 1L;
     4     private SpoutOutputCollector collector;
     5     private ConsumerConnector consumer;
     6     private String topic;
     7 
     8     public KafkaSpouttest() {}
     9 
    10     public KafkaSpouttest(String topic) {
    11         this.topic = topic;
    12     }
    13 
    14     public void ack(Object arg0) {
    15 
    16 }
    17 
    18     private static ConsumerConfig createConsumerConfig() {
    19         Properties props = new Properties();
    20         // 设置zookeeper的链接地址
    21         props.put("zookeeper.connect", "localhost:2181");
    22         // 设置group id
    23         props.put("group.id", "1");
    24         // kafka的group 消费记录是保存在zookeeper上的, 但这个信息在zookeeper上不是实时更新的, 需要有个间隔时间更新
    25         props.put("auto.commit.interval.ms", "1000");
    26         props.put("zookeeper.session.timeout.ms", "10000");
    27         return new ConsumerConfig(props);
    28     }
    29 
    30     public void activate() {
    31         consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());
    32         Map < String,
    33         Integer > topickMap = new HashMap < String,
    34         Integer > ();
    35         topickMap.put(topic, 1);
    36 
    37         System.out.println("*********Results********topic:" + topic);
    38 
    39         Map < String,
    40         List < KafkaStream < byte[],
    41         byte[] >>> streamMap = consumer.createMessageStreams(topickMap);
    42         KafkaStream < byte[],
    43         byte[] > stream = streamMap.get(topic).get(0);
    44         ConsumerIterator < byte[],
    45         byte[] > it = stream.iterator();
    46         while (it.hasNext()) {
    47             String value = new String(it.next().message());
    48             SimpleDateFormat formatter = new SimpleDateFormat("yyyy年MM月dd日 HH:mm:ss SSS");
    49             Date curDate = new Date(System.currentTimeMillis()); //获取当前时间      
    50             String str = formatter.format(curDate);
    51 
    52             System.out.println("storm接收到来自kafka的消息------->" + value);
    53 
    54             collector.emit(new Values(value, 1, str), value);
    55         }
    56     }
    57 
    58     public void close() {
    59         // TODO Auto-generated method stub
    60     }
    61 
    62     public void deactivate() {
    63         // TODO Auto-generated method stub
    64     }
    65 
    66     public void fail(Object arg0) {
    67         // TODO Auto-generated method stub
    68     }
    69 
    70     public void nextTuple() {
    71         // TODO Auto-generated method stub
    72     }
    73 
    74     public void open(Map arg0, TopologyContext arg1, SpoutOutputCollector collector) {
    75         this.collector = collector;
    76     }
    77 
    78     public void declareOutputFields(OutputFieldsDeclarer declarer) {
    79         declarer.declare(new Fields("word", "id", "time"));
    80     }
    81 
    82     public Map < String,
    83     Object > getComponentConfiguration() {
    84         System.out.println("getComponentConfiguration被调用");
    85         topic = "admln";
    86         return null;
    87     }
    88 
    89 }

    方法相关的不解释,和本主题相关的一句话是:

    byte[] >>> streamMap = consumer.createMessageStreams(topickMap);

    想说的是它用的是High Level API


    复杂的代码就多了,在github上有好几个,最官方的还是apache storm自带的:

    里面和本主题相关的一句话是DynamicPartitionConnections.java中的60行:

    _connections.put(host, new ConnectionInfo(new SimpleConsumer(host.host, host.port, _config.socketTimeoutMs, _config.bufferSizeBytes, _config.clientId)));

    它用的是low level API


    apache KafkaSpout 在 topology 中的配置

    String zkConnString = "node1:2181,node2:2181,node3:2181";
            String topicName = "testtopic";
            BrokerHosts hosts = new ZkHosts(zkConnString);
            SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, "/" + topicName, UUID.randomUUID().toString());
            spoutConfig.forceFromStart = false;
            spoutConfig.zkPort = 2181;
            spoutConfig.zkServers = Arrays.asList(new String[]{"node1","node2","node3"});
            
            spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
            
            KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
            
            TopologyBuilder builder = new TopologyBuilder();
            // 构造NC数据流向图
            builder.setSpout("mrspout", kafkaSpout, 30);
            builder.setBolt("mrverifybolt", new MRVerifyBolt(), 30)
                    .shuffleGrouping("mrspout");
            builder.setBolt("mr2storagebolt", new MR2StorageBolt(), 30)
                    .shuffleGrouping("mrverifybolt");
            // 以类名作为STORM任务名
            String name = MRTopology.class.getSimpleName();
            // 传主机名则为集群运行模式,不传则为本地运行模式
            if (args != null && args.length > 0) {
                Config conf = new Config();
                // 通过指定nimbus主机
                conf.put(Config.NIMBUS_HOST, args[0]);
                conf.setNumWorkers(6);
                conf.setNumAckers(0);
                conf.setMaxSpoutPending(100000);
                StormSubmitter.submitTopologyWithProgressBar(name, conf,
                        builder.createTopology());
            } else {
                Map conf = new HashMap();
                conf.put(Config.TOPOLOGY_WORKERS, 1);
                conf.put(Config.TOPOLOGY_DEBUG, true);
                LocalCluster cluster = new LocalCluster();
                cluster.submitTopology(name, conf, builder.createTopology());
            }
        }

    关于 spoutConfig.servers 和 spoutConfig.port 在实际应用中其实不设置也可以,因为在集群中如果不设置 storm 默认会把 storm 配置中的 zookeeper 地址和端口,设置的用处是在 eclipse 中测试运行的时候因为是模拟 storm cluster, 所以主动设置。


    两者各有优劣,相同点性能,简单测试过,low level的要好点,但是相差不大(都在合适的配置下,小集群);

    不同点是high level 的代码简单,而low level的代码很多,配置也多,用着麻烦(也不是很麻烦);

    low level的优点是支持重读,就是配置中的 spoutConfig.forceFromStart = false; ,支持重读的另一个好处是和storm的acker结合,可以重发,防止丢数据,这一点比low level的要安全一点,另一个好处是配置多,使用就很难灵活,比如设置KafkaSpout的fetchSizeBytes,和kafka的bufferSizeBytes对应,是优化的一个手段。

    至于选择哪种,支持后者,反正storm中已经自带了,不需要自己写,配置就好,而且0.9.4中优化了很多KafkaSpout的问题。


  • 相关阅读:
    Django extra 和 annotate
    剑指offer——26反转链表
    剑指offer——25链表中环的入口节点
    剑指offer——24链表中倒数第k个结点
    剑指offer——23调整数组顺序使奇数位于偶数前面
    剑指offer——22表示数值的字符串
    剑指offer——21正则表达式匹配
    剑指offer——20删除链表中重复的结点
    剑指offer——19删除链表的节点
    剑指offer——18打印从1到最大的n位数
  • 原文地址:https://www.cnblogs.com/admln/p/storm-KafkaSpout-choose.html
Copyright © 2011-2022 走看看