zoukankan      html  css  js  c++  java
  • storm入门基础实例(无可靠性保证实例)

    本实例为入门篇无可靠性保证实例,关于storm的介绍,以及一些术语名词等,可以参考Storm介绍(一)Storm介绍(二)

    本案例是基于storm0.9.3版本

    1.案例结构
    案例:Word Count案例

    语句Spout --> 语句分隔Bolt --> 单词计数Bolt --> 上报Bolt

    2.语句生成Spout - SentenceSpout
    作为入门案例,我们直接从一个数组中不断读取语句,作为数据来源。
    SentenceSpout不断读取语句将其作为数据来源,组装成单值tuple(键名sentence,键值为祖父穿格式的语句)向后发射。
    {"sentence":"i am so shuai!"}

    3.代码结构

    话不多说,上代码:

     1 import backtype.storm.Config;
     2 import backtype.storm.LocalCluster;
     3 import backtype.storm.generated.StormTopology;
     4 import backtype.storm.topology.TopologyBuilder;
     5 import backtype.storm.tuple.Fields;
     6 
     7 public class WCTopologyDriver {
     8     public static void main(String[] args) throws Exception {
     9         //1.创建组件
    10         SentenceSpout sentenceSpout = new SentenceSpout();
    11         SplitSentenceBolt splitSentenceBolt = new SplitSentenceBolt();
    12         WordCountBolt wordCountBolt = new WordCountBolt();
    13         ReportBolt reportBolt = new ReportBolt();
    14         
    15         //2.创建构建者
    16         TopologyBuilder builder = new TopologyBuilder();
    17         
    18         //3.向构建者描述拓扑结构
    19         builder.setSpout("Sentence_Spout", sentenceSpout);
    20         builder.setBolt("Split_Sentence_Bolt", splitSentenceBolt)
    21         .shuffleGrouping("Sentence_Spout");
    22         builder.setBolt(" ", wordCountBolt)
    23         .fieldsGrouping("Split_Sentence_Bolt", new Fields("word"));
    24         builder.setBolt("Report_Bolt", reportBolt)
    25         .shuffleGrouping("Word_Count_Bolt");
    26         
    27         //4.通过构建者创建拓扑
    28         StormTopology topology = builder.createTopology();
    29         
    30         //5.将拓扑提交到集群中运行
    31         //Config conf = new Config();
    32         //StormSubmitter.submitTopology("WC_Topology", conf, topology);
    33         
    34         //5.创建本地集群 模拟运行拓扑 
    35         LocalCluster cluster = new LocalCluster();
    36         Config conf = new Config();
    37         cluster.submitTopology("WC_Topology", conf, topology);
    38         
    39         Thread.sleep(10 * 1000);
    40         cluster.killTopology("WC_Topology");
    41         cluster.shutdown();
    42     }
    43 }
     1 import java.util.Map;
     2 
     3 import backtype.storm.spout.SpoutOutputCollector;
     4 import backtype.storm.task.TopologyContext;
     5 import backtype.storm.topology.OutputFieldsDeclarer;
     6 import backtype.storm.topology.base.BaseRichSpout;
     7 import backtype.storm.tuple.Fields;
     8 import backtype.storm.tuple.Values;
     9 
    10 public class SentenceSpout extends BaseRichSpout {
    11 
    12     private String [] sentences = {
    13         "my name is park",
    14         "i am so shuai",
    15         "do you like me",
    16         "are you sure you do not like me",
    17         "ok i am sure"
    18     }; 
    19     
    20     private SpoutOutputCollector collector = null;
    21     
    22     /**
    23      * 初始化的方法
    24      * 当前组件初始化时 调用  执行初始化操作
    25      * conf:代表当前topology相关配置信息
    26      * context:代表上下文环境 可以用来获取 任务id 组件id 输入输出相关信息 等信息
    27      * collector:代表发送者 可以用来发送 拓扑 可以在任何时候发送 此对象线程安全  可以放心的保存在类的内部作为类的成员
    28      */
    29     @Override
    30     public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
    31         this.collector = collector;
    32     }
    33 
    34     /**
    35      * storm会在一个单一线程中不停的调用此方法 要求发送tuple
    36      * 如果有数据要发 直接发 如果没有数据要发 也不要阻塞这个方法 而是直接返回即可
    37      * 如果真的没有数据要发送 最好睡上一个很短的时间 以便释放cpu 不至于浪费过多资源
    38      */
    39     private int index = 0;
    40     @Override
    41     public void nextTuple() {
    42         if(index < sentences.length){
    43             collector.emit(new Values(sentences[index]));
    44             index++;
    45         }else{
    46             try {
    47                 Thread.sleep(1);
    48             } catch (InterruptedException e) {
    49                 e.printStackTrace();
    50             }
    51             return;
    52         }
    53     }
    54 
    55     /**
    56      * 用来声明输出信息
    57      * declarer:声明输出的流的编号 输出的tuple中的字段 以及是否是一个指向性的流
    58      * 要注意 组件发送的tuple的结构 都要现在此方法中声明
    59      */
    60     @Override
    61     public void declareOutputFields(OutputFieldsDeclarer declarer) {
    62         declarer.declare(new Fields("sentence"));
    63     }
    64     
    65 }
     1 import java.util.Map;
     2 
     3 import backtype.storm.task.OutputCollector;
     4 import backtype.storm.task.TopologyContext;
     5 import backtype.storm.topology.OutputFieldsDeclarer;
     6 import backtype.storm.topology.base.BaseRichBolt;
     7 import backtype.storm.tuple.Fields;
     8 import backtype.storm.tuple.Tuple;
     9 import backtype.storm.tuple.Values;
    10 
    11 public class SplitSentenceBolt extends BaseRichBolt{
    12 
    13     private OutputCollector collector = null;
    14     
    15     /**
    16      * 初始化的方法
    17      * 当前组件初始化时 调用  执行初始化操作
    18      * conf:代表当前topology相关配置信息
    19      * context:代表上下文环境 可以用来获取 任务id 组件id 输入输出相关信息 等信息
    20      * collector:代表发送者 可以用来发送 拓扑 可以在任何时候发送 此对象线程安全  可以放心的保存在类的内部作为类的成员
    21      */
    22     @Override
    23     public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    24         this.collector = collector;
    25     }
    26     
    27     /**
    28      *     对于输入的tuple 一个tuple触发一次此方法
    29      *    在这个方法中对tuple进行处理
    30      */    
    31     @Override
    32     public void execute(Tuple input) {
    33         String sentence = input.getStringByField("sentence");
    34         String []  words = sentence.split(" ");
    35         for(String word : words){
    36             collector.emit(new Values(word));
    37         }
    38     }
    39 
    40     /**
    41      * 用来声明输出信息
    42      * declarer:声明输出的流的编号 输出的tuple中的字段 以及是否是一个指向性的流
    43      * 要注意 组件发送的tuple的结构 都要现在此方法中声明
    44      */
    45     @Override
    46     public void declareOutputFields(OutputFieldsDeclarer declarer) {
    47         declarer.declare(new Fields("word"));
    48     }
    49     
    50 }
     1 import java.util.HashMap;
     2 import java.util.Map;
     3 
     4 import backtype.storm.task.OutputCollector;
     5 import backtype.storm.task.TopologyContext;
     6 import backtype.storm.topology.OutputFieldsDeclarer;
     7 import backtype.storm.topology.base.BaseRichBolt;
     8 import backtype.storm.tuple.Fields;
     9 import backtype.storm.tuple.Tuple;
    10 import backtype.storm.tuple.Values;
    11 
    12 public class WordCountBolt extends BaseRichBolt {
    13 
    14     private OutputCollector collector = null;
    15     
    16     @Override
    17     public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    18         this.collector = collector;
    19     }
    20 
    21     private Map<String,Integer> map = new HashMap<>();
    22     @Override
    23     public void execute(Tuple input) {
    24         String word = input.getStringByField("word");
    25         map.put(word, map.containsKey(word) ? map.get(word)+1 : 1);
    26         collector.emit(new Values(word,map.get(word)));
    27     }
    28 
    29     @Override
    30     public void declareOutputFields(OutputFieldsDeclarer declarer) {
    31         declarer.declare(new Fields("word","count"));
    32     }
    33 
    34 }
     1 import java.util.Map;
     2 
     3 import backtype.storm.task.OutputCollector;
     4 import backtype.storm.task.TopologyContext;
     5 import backtype.storm.topology.OutputFieldsDeclarer;
     6 import backtype.storm.topology.base.BaseRichBolt;
     7 import backtype.storm.tuple.Tuple;
     8 
     9 public class ReportBolt extends BaseRichBolt {
    10 
    11     @Override
    12     public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    13 
    14     }
    15 
    16     @Override
    17     public void execute(Tuple input) {
    18         String word = input.getStringByField("word");
    19         int count = input.getIntegerByField("count");
    20         System.out.println("--单词数量发生变化:"+word+"~"+count+"--");
    21     }
    22 
    23     @Override
    24     public void declareOutputFields(OutputFieldsDeclarer declarer) {
    25 
    26     }
    27 
    28 }

     运行结果:

    补充,以下是本文案例用到的jar包,由于太大,没有上传,下载0.9.3的storm源码,解压后文件夹中的lib下的所有jar包:

  • 相关阅读:
    kubernetes上安装MongoDB-3.6.5集群副本集方式
    kubernetes Metrics-server 安装
    kubenetes 应用更新
    filebeat+logstash通过zabbix微信报警
    Redis持久化及复制
    kubernetes rabbitmq 集群安装配置
    kubernetes elasticsearch2.4 集群安装
    kubernetes --> kube-dns 安装
    详解Javascript中的Object对象
    提高代码质量:如何编写函数
  • 原文地址:https://www.cnblogs.com/XiOrang/p/10373729.html
Copyright © 2011-2022 走看看