zoukankan      html  css  js  c++  java
  • kafka-->storm-->mongodb

    目的:

    通过Spout发射kafka的数据,到bolt统计每一个单词的个数,将这些记录更新到mongodb中。

    Spout的nextTuple方法会一直处于一个while循环这中,每一条数据发送给bolt后,bolt都会调用一次execute方法。

    spout用于发射数据,bolt用于对数据进行处理。

    MongoUtil:mongo工具类

    package storm;

    import com.mongodb.BasicDBObject;
    import com.mongodb.DB;
    import com.mongodb.DBCollection;
    import com.mongodb.DBObject;
    import com.mongodb.MongoClient;

    public class MongoUtil {
    private MongoUtil(){}
    private static MongoClient mongo;
    private static DB db;
    private static DBCollection collection;
    static{
    mongo = new MongoClient("192.168.170.185",27017);
    db = mongo.getDB("mySpout");
    collection = db.getCollection("myBolt");
    }
    public static Long getCount(){
    return collection.count(new BasicDBObject("_id",1L));
    }
    public static void insert(String substring){
    DBObject obj = new BasicDBObject();
    obj.put("_id", 1);
    obj.put("bolt", substring);
    collection.insert(obj);
    }
    public static void update(String substring){
    DBObject obj = new BasicDBObject();
    obj.put("_id", 1);
    DBObject obj2 = collection.findOne(obj);
    obj2.put("bolt", substring);
    collection.update(obj, obj2);
    }

    }

    SentenceSpout:发射数据的spout,从kafka读取数据。

    package storm;

    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.Properties;

    import org.apache.kafka.common.utils.Utils;
    import org.apache.storm.Constants;
    import org.apache.storm.spout.SpoutOutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichSpout;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Values;

    import kafka.KafkaConsumer;
    import kafka.KafkaProducer;
    import kafka.consumer.ConsumerConfig;
    import kafka.consumer.ConsumerIterator;
    import kafka.consumer.KafkaStream;
    import kafka.javaapi.consumer.ConsumerConnector;
    import kafka.serializer.StringDecoder;
    import kafka.utils.VerifiableProperties;

    public class SentenceSpout extends BaseRichSpout{
    private SpoutOutputCollector collector;
    private int index = 0;
    private ConsumerConnector consumer;
    private Map conf;
    @Override
    public void open(Map map, TopologyContext context, SpoutOutputCollector collector) {//尽量将初始化写在open方法中,否则可能会报错。
    this.conf = map;
    this.collector = collector;
    Properties props = new Properties(); 

    // zookeeper 配置 
    props.put("zookeeper.connect", "192.168.170.185:2181"); 

    // 消费者所在组 
    props.put("group.id", "testgroup"); 

    // zk连接超时 
    props.put("zookeeper.session.timeout.ms", "4000"); 
    props.put("zookeeper.sync.time.ms", "200"); 
    props.put("auto.commit.interval.ms", "1000"); 
    props.put("auto.offset.reset", "smallest"); 

    // 序列化类 
    props.put("serializer.class", "kafka.serializer.StringEncoder"); 

    ConsumerConfig config = new ConsumerConfig(props); 

    this.consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config); 
    }
    @Override
    public void nextTuple() {

    Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); 
    topicCountMap.put("helloworld", new Integer(1)); 

    StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties()); 
    StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties()); 
    Map<String, List<KafkaStream<String, String>>> consumerMap = 
    consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder); 
    KafkaStream<String, String> stream = consumerMap.get("helloworld").get(0); 
    ConsumerIterator<String, String> it = stream.iterator(); 

    int messageCount = 0; 
    while (it.hasNext()){ 
    this.collector.emit(new Values(it.next().message().toString()));

    // index = (index+1>=sentences.length)?0:index+1;
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("sentence"));
    }

    }
    SplitSentenceBolt:切割单词bolt

    package storm;

    import java.util.Map;

    import org.apache.storm.Constants;
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichBolt;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    import org.apache.storm.tuple.Values;

    public class SplitSentenceBolt extends BaseRichBolt{
    private OutputCollector collector;
    private Map stormConf; 
    @Override
    public void prepare(Map map, TopologyContext context, OutputCollector collector) {
    this.stormConf = map;
    this.collector = collector;
    }

    @Override
    public void execute(Tuple tuple) {
    String str = tuple.getStringByField("sentence");
    String[] split = str.split(" ");
    for(String word : split){
    this.collector.emit(new Values(word));
    }
    }


    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("word"));
    }

    }

    WordCountBolt:计数的bolt

    package storm;

    import java.io.ByteArrayInputStream;
    import java.io.ByteArrayOutputStream;
    import java.io.IOException;
    import java.io.ObjectInputStream;
    import java.io.ObjectOutputStream;
    import java.util.HashMap;
    import java.util.Map;

    import org.apache.storm.Config;
    import org.apache.storm.Constants;
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichBolt;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    import org.apache.storm.tuple.Values;

    public class WordCountBolt extends BaseRichBolt{
    private Map boltconf;
    private OutputCollector collector;
    private HashMap<String,Long> counts = null;
    @Override
    public void prepare(Map map, TopologyContext context, OutputCollector collector) {
    this.boltconf = map;
    this.collector=collector;
    this.counts = new HashMap<String,Long>();
    }

    @Override
    public void execute(Tuple tuple) {
    String word = tuple.getStringByField("word");
    this.counts.put(word, this.counts.containsKey(word)?this.counts.get(word)+1:1);
    this.collector.emit(new Values(word,counts.get(word)));
    }


    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("word","count"));
    }

    }

    ReportBolt:打印记录结果,并将结果插入mongodb中bolt

    package storm;

    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;

    import org.apache.storm.Config;
    import org.apache.storm.Constants;
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichBolt;
    import org.apache.storm.tuple.Tuple;

    import com.mongodb.BasicDBObject;
    import com.mongodb.DB;
    import com.mongodb.DBCollection;
    import com.mongodb.DBObject;
    import com.mongodb.MongoClient;

    public class ReportBolt extends BaseRichBolt{
    private HashMap<String,Long> counts = null;
    private Map boltconf;
    private StringBuffer buf = null;
    @Override
    public void prepare(Map arg0, TopologyContext arg1, OutputCollector arg2) {
    this.boltconf = arg0;
    this.counts=new HashMap<String,Long>();
    this.buf = new StringBuffer();
    }

    @Override
    public void execute(Tuple tuple) {
    String word = tuple.getStringByField("word");
    Long counts = tuple.getLongByField("count");
    this.counts.put(word, counts);
    System.out.println("------统计结果------");
    List<String> keys = new ArrayList<String>();
    keys.addAll(this.counts.keySet());

    buf.append("{");
    for(String key : keys){

    buf.append(key+":"+this.counts.get(key)).append(",");
    System.out.println(key + " : " +this.counts.get(key));
    }
    System.out.println("------------------");
    buf.append("}");
    String substring = buf.delete(buf.length()-2, buf.length()-1).toString();

    long count = MongoUtil.getCount();
    if(count<=0){
    MongoUtil.insert(substring);
    }else{
    MongoUtil.update(substring);
    }
    buf = buf.delete(0, buf.length());
    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer arg0) {
    // TODO Auto-generated method stub
    }
    /* @Override
    public Map<String, Object> getComponentConfiguration() {
    HashMap<String, Object> hashMap = new HashMap<String, Object>();
    hashMap.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 10);
    return hashMap;
    }*/
    }

    WordCountTopology: topology,storm零件的组装

    package storm;

    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.topology.TopologyBuilder;
    import org.apache.storm.tuple.Fields;

    public class WordCountTopology {
    private static final String SENTENCE_SPOUT_ID = "sentence-spout";
    private static final String SPLIT_BOLT_ID = "split-bolt";
    private static final String COUNT_BOLT_ID = "count-bolt";
    private static final String REPORT_BOLT_ID = "report-bolt";
    private static final String TOPOLOGY_NAME = "word-count-topology";

    public static void main(String[] args) throws Exception {

    //--实例化Spout和Bolt
    SentenceSpout spout = new SentenceSpout();
    SplitSentenceBolt splitBolt = new SplitSentenceBolt();
    WordCountBolt countBolt = new WordCountBolt();
    ReportBolt reportBolt = new ReportBolt();
    //--创建TopologyBuilder类实例
    TopologyBuilder builder = new TopologyBuilder();

    //--注册SentenceSpout
    builder.setSpout(SENTENCE_SPOUT_ID, spout);
    //--注册SplitSentenceBolt,订阅SentenceSpout发送的tuple
    //此处使用了shuffleGrouping方法,此方法指定所有的tuple随机均匀的分发给SplitSentenceBolt的实例。
    builder.setBolt(SPLIT_BOLT_ID, splitBolt).shuffleGrouping(SENTENCE_SPOUT_ID);
    //--注册WordCountBolt,,订阅SplitSentenceBolt发送的tuple
    //此处使用了filedsGrouping方法,此方法可以将指定名称的tuple路由到同一个WordCountBolt实例中
    builder.setBolt(COUNT_BOLT_ID, countBolt).fieldsGrouping(SPLIT_BOLT_ID, new Fields("word"));
    //--注册ReprotBolt,订阅WordCountBolt发送的tuple
    //此处使用了globalGrouping方法,表示所有的tuple都路由到唯一的ReprotBolt实例中
    builder.setBolt(REPORT_BOLT_ID, reportBolt).globalGrouping(COUNT_BOLT_ID);

    //--创建配置对象
    Config conf = new Config();


    //--创建代表集群的对象,LocalCluster表示在本地开发环境来模拟一个完整的Storm集群
    //本地模式是开发和测试的简单方式,省去了在分布式集群中反复部署的开销
    //另外可以执行断点调试非常的便捷
    LocalCluster cluster = new LocalCluster();

    //--提交Topology给集群运行
    cluster.submitTopology(TOPOLOGY_NAME, conf, builder.createTopology());

    //--运行10秒钟后杀死Topology关闭集群
    Thread.sleep(300000000);
    cluster.killTopology(TOPOLOGY_NAME);
    cluster.shutdown();
    }
    }

  • 相关阅读:
    IDEA 实用功能Auto Import:自动优化导包(自动删除、导入包)
    idea 设置主题
    MySql where 后面使用函数导致索引失效问题
    IDEA报错,注解标红,提示Cannot resolve symbol xxx
    分批更新list
    java.lang.ArithmeticException: Rounding necessary
    Java selenium通过JS直接进行赋值给日期框
    postman接口测试之获取响应数据
    Jenkins集成allure测试报告
    Jenkins配置邮件通知
  • 原文地址:https://www.cnblogs.com/wangjing666/p/6894015.html
Copyright © 2011-2022 走看看