zoukankan      html  css  js  c++  java
  • storm笔记:Storm+Kafka简单应用

    storm笔记:Storm+Kafka简单应用

    这几天工作须要使用storm+kafka,基本场景是应用出现错误,发送日志到kafka的某个topic。storm订阅该topic。然后进行兴许处理。场景很easy,可是在学习过程中。遇到一个奇怪的异常情况:使用KafkaSpout读取topic数据时,没有向ZK写offset数据,致使每次都从头開始读取。

    纠结了两天,最终碰巧找到原因:应该使用BaseBasicBolt作为bolt的父类。而不是BaseRichBolt

    通过本文记录一下这样的情况,后文中依据上述场景提供几个简单的样例。

    由于是初学storm、kafka,基础理论查看storm笔记:storm基本概念,。或查看Storm 简单介绍


    基本订阅

    基本场景:订阅kafka的某个topic,然后在读取的消息前加上自己定义的字符串,然后写回到kafka另外一个topic。

    从Kafka读取数据的Spout使用storm.kafka.KafkaSpout。向Kafka写数据的Bolt使用storm.kafka.bolt.KafkaBolt

    中间进行进行数据处理的Bolt定义为TopicMsgBolt。闲言少叙。奉上代码:

    
    public class TopicMsgTopology {
        public static void main(String[] args) throws Exception {
            // 配置Zookeeper地址
            BrokerHosts brokerHosts = new ZkHosts("zk1:2181,zk2:2281,zk3:2381");
            // 配置Kafka订阅的Topic。以及zookeeper中数据节点文件夹和名字
            SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, "msgTopic1", "/topology/root", "topicMsgTopology");
            // 配置KafkaBolt中的kafka.broker.properties
            Config conf = new Config();
            Properties props = new Properties();
            // 配置Kafka broker地址
            props.put("metadata.broker.list", "dev2_55.wfj-search:9092");
            // serializer.class为消息的序列化类
            props.put("serializer.class", "kafka.serializer.StringEncoder");
            conf.put("kafka.broker.properties", props);
            // 配置KafkaBolt生成的topic
            conf.put("topic", "msgTopic2");
            spoutConfig.scheme = new SchemeAsMultiScheme(new MessageScheme());
            TopologyBuilder builder = new TopologyBuilder();
            builder.setSpout("msgKafkaSpout", new KafkaSpout(spoutConfig));
            builder.setBolt("msgSentenceBolt", new TopicMsgBolt()).shuffleGrouping("msgKafkaSpout");
            builder.setBolt("msgKafkaBolt", new KafkaBolt<String, Integer>()).shuffleGrouping("msgSentenceBolt");
            if (args.length == 0) {
                String topologyName = "kafkaTopicTopology";
                LocalCluster cluster = new LocalCluster();
                cluster.submitTopology(topologyName, conf, builder.createTopology());
                Utils.sleep(100000);
                cluster.killTopology(topologyName);
                cluster.shutdown();
            } else {
                conf.setNumWorkers(1);
                StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
            }
        }
    }

    storm.kafka.ZkHosts构造方法的參数是zookeeper标准配置地址的形式(ZooKeeper环境搭建能够查看ZooKeeper安装部署),zk1、zk2、zk3在本地配置了host。由于server使用的伪分布式模式,因此几个端口号不是默认的2181。

    storm.kafka.SpoutConfig构造方法第一个參数为上述的storm.kafka.ZkHosts对象。第二个为待订阅的topic名称,第三个參数zkRoot为写读取topic时的偏移量offset数据的节点(zk node),第四个參数为该节点上的次级节点名(有个地方说这个是spout的id)。

    backtype.storm.Config对象是配置storm的topology(拓扑)所须要的基础配置。

    backtype.storm.spout.SchemeAsMultiScheme的构造方法输入的參数是订阅kafka数据的处理參数,这里的MessageScheme是自己定义的,代码例如以下:

    public class MessageScheme implements Scheme {
        private static final Logger logger = LoggerFactory.getLogger(MessageScheme.class);
    
        @Override
        public List<Object> deserialize(byte[] ser) {
            try {
                String msg = new String(ser, "UTF-8");
                logger.info("get one message is {}", msg);
                return new Values(msg);
            } catch (UnsupportedEncodingException ignored) {
                return null;
            }
        }
    
        @Override
        public Fields getOutputFields() {
            return new Fields("msg");
        }
    }

    MessageScheme类中getOutputFields方法是KafkaSpout向后发送tuple(storm数据传输的最小结构)的名字,须要与接收数据的Bolt中统一(在这个样例中能够不统一,由于后面直接取第0条数据。可是在wordCount的那个样例中就须要统一了)。

    TopicMsgBolt类是从storm.kafka.KafkaSpout接收数据的Bolt,对接收到的数据进行处理,然后向后传输给storm.kafka.bolt.KafkaBolt

    代码例如以下:

    public class TopicMsgBolt extends BaseBasicBolt {
        private static final Logger logger = LoggerFactory.getLogger(TopicMsgBolt.class);
    
        @Override
        public void execute(Tuple input, BasicOutputCollector collector) {
            String word = (String) input.getValue(0);
            String out = "Message got is '" + word + "'!";
            logger.info("out={}", out);
            collector.emit(new Values(out));
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("message"));
        }
    }

    此处须要特别注意的是,要使用backtype.storm.topology.base.BaseBasicBolt对象作为父类,否则不会在zk记录偏移量offset数据。

    须要编写的代码已完毕,接下来就是在搭建好的storm、kafka中进行測试:

    # 创建topic
    ./bin/kafka-topics.sh --create --zookeeper zk1:2181,zk2:2281,zk3:2381 --replication-factor 1 --partitions 1 --topic msgTopic1
    ./bin/kafka-topics.sh --create --zookeeper zk1:2181,zk2:2281,zk3:2381 --replication-factor 1 --partitions 1 --topic msgTopic2

    接下来须要分别对msgTopic1、msgTopic2启动producer(生产者)与consumer(消费者):

    # 对msgTopic1启动producer,用于发送数据
    ./bin/kafka-console-producer.sh --broker-list dev2_55.wfj-search:9092 --topic msgTopic1
    # 对msgTopic2启动consumer,用于查看发送数据的处理结果
    ./bin/kafka-console-consumer.sh --zookeeper zk1:2181,zk2:2281,zk3:2381 --topic msgTopic2 --from-beginning

    然后将打好的jar包上传到storm的nimbus(能够使用远程上传或先上传jar包到nimbus节点所在server,然后本地运行):

    # ./bin/storm jar topology TopicMsgTopology.jar cn.howardliu.demo.storm.kafka.topicMsg.TopicMsgTopology TopicMsgTopology

    待相应的worker启动好之后,就能够在msgTopic1的producer相应终端输入数据,然后在msgTopic2的consumer相应终端查看输出结果了。

    有几点须要注意的:
    1. 必须先创建msgTopic1、msgTopic2两个topic。
    2. 定义的bolt必须使用BaseBasicBolt作为父类,不能够使用BaseRichBolt。否则无法记录偏移量;
    3. zookeeper最好使用至少三个节点的分布式模式或伪分布式模式。否则会出现一些异常情况;
    4. 在整个storm下。spout、bolt的id必须唯一。否则会出现异常。


    5. TopicMsgBolt类作为storm.kafka.bolt.KafkaBolt前的最后一个Bolt。须要将输出数据名称定义为message。否则KafkaBolt无法接收数据。

    wordCount

    简单的输入输出做完了,来点复杂点儿的场景:从某个topic定于消息,然后依据空格分词,统计单词数量。然后将当前输入的单词数量推送到还有一个topic。

    首先规划须要用到的类:
    1. 从KafkaSpout接收数据并进行处理的backtype.storm.spout.Scheme子类;
    2. 数据切分bolt:SplitSentenceBolt
    3. 计数bolt:WordCountBolt
    4. 报表bolt:ReportBolt
    5. topology定义:WordCountTopology
    6. 最后再加一个原样显示订阅数据的bolt:SentenceBolt

    backtype.storm.spout.Scheme子类能够使用上面已经定义过的MessageScheme。此处不再赘述。

    SplitSentenceBolt是对输入数据进行切割。简单的使用String类的split方法,然后将每一个单词命名为“word”,向后传输,代码例如以下:

    public class SplitSentenceBolt extends BaseBasicBolt {
        @Override
        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
            outputFieldsDeclarer.declare(new Fields("word"));
        }
    
        @Override
        public void execute(Tuple input, BasicOutputCollector collector) {
            String sentence = input.getStringByField("msg");
            String[] words = sentence.split(" ");
            Arrays.asList(words).forEach(word -> collector.emit(new Values(word)));
        }
    }

    SentenceBolt是从KafkaSpout接收数据,然后直接输出。在拓扑图上就是从输入分叉。一个进入SplitSentenceBolt。一个进入SentenceBolt。这样的结构能够应用在Lambda架构中。代码例如以下:

    public class SentenceBolt extends BaseBasicBolt {
        private static final Logger logger = LoggerFactory.getLogger(SentenceBolt.class);
    
        @Override
        public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
            String msg = tuple.getStringByField("msg");
            logger.info("get one message is {}", msg);
            basicOutputCollector.emit(new Values(msg));
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
            outputFieldsDeclarer.declare(new Fields("sentence"));
        }
    }

    WordCountBolt是对接收到的单词进行汇总统一,然后将单词“word”及其相应数量“count”向后传输,代码例如以下:

    public class WordCountBolt extends BaseBasicBolt {
        private Map<String, Long> counts = null;
    
        @Override
        public void prepare(Map stormConf, TopologyContext context) {
            this.counts = new ConcurrentHashMap<>();
            super.prepare(stormConf, context);
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
            outputFieldsDeclarer.declare(new Fields("word", "count"));
        }
    
        @Override
        public void execute(Tuple input, BasicOutputCollector collector) {
            String word = input.getStringByField("word");
            Long count = this.counts.get(word);
            if (count == null) {
                count = 0L;
            }
            count++;
            this.counts.put(word, count);
            collector.emit(new Values(word, count));
        }
    }

    ReportBolt是对接收到的单词及数量进行整理,拼成json格式,然后继续向后传输。代码例如以下:

    public class ReportBolt extends BaseBasicBolt {
        @Override
        public void execute(Tuple input, BasicOutputCollector collector) {
            String word = input.getStringByField("word");
            Long count = input.getLongByField("count");
            String reportMessage = "{'word': '" + word + "', 'count': '" + count + "'}";
            collector.emit(new Values(reportMessage));
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
            outputFieldsDeclarer.declare(new Fields("message"));
        }
    }

    最后是定义topology(拓扑)WordCountTopology,代码例如以下:

    public class WordCountTopology {
        private static final String KAFKA_SPOUT_ID = "kafkaSpout";
        private static final String SENTENCE_BOLT_ID = "sentenceBolt";
        private static final String SPLIT_BOLT_ID = "sentenceSplitBolt";
        private static final String WORD_COUNT_BOLT_ID = "sentenceWordCountBolt";
        private static final String REPORT_BOLT_ID = "reportBolt";
        private static final String KAFKA_BOLT_ID = "kafkabolt";
        private static final String CONSUME_TOPIC = "sentenceTopic";
        private static final String PRODUCT_TOPIC = "wordCountTopic";
        private static final String ZK_ROOT = "/topology/root";
        private static final String ZK_ID = "wordCount";
        private static final String DEFAULT_TOPOLOGY_NAME = "sentenceWordCountKafka";
    
        public static void main(String[] args) throws Exception {
            // 配置Zookeeper地址
            BrokerHosts brokerHosts = new ZkHosts("zk1:2181,zk2:2281,zk3:2381");
            // 配置Kafka订阅的Topic,以及zookeeper中数据节点文件夹和名字
            SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, CONSUME_TOPIC, ZK_ROOT, ZK_ID);
            spoutConfig.scheme = new SchemeAsMultiScheme(new MessageScheme());
    
            TopologyBuilder builder = new TopologyBuilder();
            builder.setSpout(KAFKA_SPOUT_ID, new KafkaSpout(spoutConfig));
            builder.setBolt(SENTENCE_BOLT_ID, new SentenceBolt()).shuffleGrouping(KAFKA_SPOUT_ID);
            builder.setBolt(SPLIT_BOLT_ID, new SplitSentenceBolt()).shuffleGrouping(KAFKA_SPOUT_ID);
            builder.setBolt(WORD_COUNT_BOLT_ID, new WordCountBolt()).fieldsGrouping(SPLIT_BOLT_ID, new Fields("word"));
            builder.setBolt(REPORT_BOLT_ID, new ReportBolt()).shuffleGrouping(WORD_COUNT_BOLT_ID);
            builder.setBolt(KAFKA_BOLT_ID, new KafkaBolt<String, Long>()).shuffleGrouping(REPORT_BOLT_ID);
    
            Config config = new Config();
            Map<String, String> map = new HashMap<>();
            map.put("metadata.broker.list", "dev2_55.wfj-search:9092");// 配置Kafka broker地址
            map.put("serializer.class", "kafka.serializer.StringEncoder");// serializer.class为消息的序列化类
            config.put("kafka.broker.properties", map);// 配置KafkaBolt中的kafka.broker.properties
            config.put("topic", PRODUCT_TOPIC);// 配置KafkaBolt生成的topic
    
            if (args.length == 0) {
                LocalCluster cluster = new LocalCluster();
                cluster.submitTopology(DEFAULT_TOPOLOGY_NAME, config, builder.createTopology());
                Utils.sleep(100000);
                cluster.killTopology(DEFAULT_TOPOLOGY_NAME);
                cluster.shutdown();
            } else {
                config.setNumWorkers(1);
                StormSubmitter.submitTopology(args[0], config, builder.createTopology());
            }
        }
    }

    除了上面提过应该注意的地方。此处还须要注意。storm.kafka.SpoutConfig定义的zkRoot与id应该与第一个样例中不同(至少保证id不同,否则两个topology将使用一个节点记录偏移量)。

  • 相关阅读:
    随机森林算法参数调优
    BAYES和朴素BAYES
    阿里云 金融接口 token PHP
    PHP mysql 按时间分组 表格table 跨度 rowspan
    MySql按周,按月,按日分组统计数据
    PHP 获取今日、昨日、本周、上周、本月的等等常用的起始时间戳和结束时间戳的时间处理类
    thinkphp5 tp5 会话控制 session 登录 退出 检查检验登录 判断是否应该跳转到上次url
    微信 模板消息
    php 腾讯 地图 api 计算 坐标 两点 距离 微信 网页 WebService API
    php添加http头禁止浏览器缓存
  • 原文地址:https://www.cnblogs.com/gavanwanggw/p/7252163.html
Copyright © 2011-2022 走看看