zoukankan      html  css  js  c++  java
  • [BD] Storm

    什么是实时计算

    • 离线计算:批处理,代表MapReduce、Spark Core,采集数据Sqoop、Flume
    • 实时计算:源源不断,代表Storm等,采集数据Flume
    • 框架
      • Apache Storm
      • Spark Streaming:把流式数据转换成离散数据,本质是离线计算
      • JStrom:阿里基于Strom开发
      • Flink

     环境搭建

    • 伪分布
      • storm nimbus &
      • storm supervisor &
      • storm ui &

    • 全分布

    编程案例 WordCount

    • 启用Debug,日志查看器,在网页上查看数据
      • "topology.eventlogger.executors": 1
      • /root/training/apache-storm-1.0.3/examples/storm-starter 
      • storm jar storm-starter-topologies-1.0.3.jar org.apache.storm.starter.WordCountTopology MyWC
      • storm logviewer &

    WordCountTopology.java

      1 package demo.wc;
      2 
      3 import org.apache.storm.Config;
      4 import org.apache.storm.LocalCluster;
      5 import org.apache.storm.StormSubmitter;
      6 import org.apache.storm.generated.StormTopology;
      7 import org.apache.storm.hdfs.bolt.HdfsBolt;
      8 import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
      9 import org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat;
     10 import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy;
     11 import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy.Units;
     12 import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
     13 import org.apache.storm.redis.bolt.RedisStoreBolt;
     14 import org.apache.storm.redis.common.config.JedisPoolConfig;
     15 import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
     16 import org.apache.storm.redis.common.mapper.RedisStoreMapper;
     17 import org.apache.storm.topology.IRichBolt;
     18 import org.apache.storm.topology.TopologyBuilder;
     19 import org.apache.storm.tuple.Fields;
     20 import org.apache.storm.tuple.ITuple;
     21 
     22 
     23 //任务的主程序,创建任务:Topology
     24 public class WordCountTopology {
     25 
     26     public static void main(String[] args) throws Exception {
     27         TopologyBuilder builder = new TopologyBuilder();
     28         
     29         //设置任务的spout组件
     30         builder.setSpout("wordcount_spout", new WordCountSpout());
     31         
     32         //设置任务的单词拆分的bolt组件,是随机分组
     33         builder.setBolt("wordcount_split", new WordCountSplitBolt()).shuffleGrouping("wordcount_spout");
     34         
     35         //设置任务的单词计数的bolt组件,是按字段分组
     36         builder.setBolt("wordcount_total", new WordCountTotalBolt()).fieldsGrouping("wordcount_split", new Fields("word"));
     37         
     38         //设置任务的第三个Bolt组件,将结果保存到Redis,直接使用Storm提供的BOlt
     39         //builder.setBolt("wordcount_redis", createRedisBolt()).shuffleGrouping("wordcount_total");
     40         
     41         //设置任务的第三个Bolt组件,将结果保存到HDFS(文件),直接使用Storm提供的Bolt
     42         builder.setBolt("wordcount_hdfs", createHDFSBolt()).shuffleGrouping("wordcount_total");
     43         
     44         //设置任务的第三个Bolt组件,将结果保存到HBase中
     45         //builder.setBolt("wordcount_hbase", new WordCountHBaseBolt()).shuffleGrouping("wordcount_total");
     46         
     47         
     48         //创建一个任务:Topology
     49         StormTopology topology = builder.createTopology();
     50         
     51         //创建一个Config对象,保存配置信息
     52         Config conf = new Config();
     53         
     54         /*
     55          * 提交Storm的任务有两种方式
     56          * 1、本地模式
     57          * 2、集群模式
     58          */
     59         LocalCluster cluster = new LocalCluster();
     60         cluster.submitTopology("MyWordCount", conf, topology);
     61         
     62 //        StormSubmitter.submitTopology("MyWordCount", conf, topology);
     63 
     64     }
     65 
     66     private static IRichBolt createHDFSBolt() {
     67         // 将结果保存到HDFS 文件
     68         
     69         HdfsBolt bolt = new HdfsBolt();
     70         //设置HDFS的相关配置信息
     71         //HDFS的位置:NameNode的地址
     72         bolt.withFsUrl("hdfs://192.168.174.111:9000");
     73         
     74         //设置保存的HDFS的目录
     75         bolt.withFileNameFormat(new DefaultFileNameFormat().withPath("/stormdata"));
     76         
     77         //保存的是<key value>,设置数据保存到文件的时候,分隔符 |
     78         //举例:<Beijing,10>   ----> 结果: Beijing|10
     79         bolt.withRecordFormat(new DelimitedRecordFormat().withFieldDelimiter("|"));
     80         
     81         //流式处理,多大的数据生成一个文件?
     82         //每5M的数据生成一个文件
     83         bolt.withRotationPolicy(new FileSizeRotationPolicy(5.0f, Units.MB));
     84         
     85         //当输出tuple达到了一定大小,就会跟HDFS进行一次同步
     86         bolt.withSyncPolicy(new CountSyncPolicy(1000));
     87         
     88         
     89         return bolt;
     90     }
     91 
     92     private static IRichBolt createRedisBolt() {
     93         //把单词计数是结果保存到Redis
     94         
     95         //创建Redis的连接池
     96         JedisPoolConfig.Builder builder = new JedisPoolConfig.Builder();
     97         builder.setHost("192.168.174.111");
     98         builder.setPort(6379);
     99         JedisPoolConfig poolConfig = builder.build();
    100         
    101         //参数:StoreMapper:用于指定存入Redis中的数据格式
    102         return new RedisStoreBolt(poolConfig, new RedisStoreMapper() {
    103             
    104             @Override
    105             public RedisDataTypeDescription getDataTypeDescription() {
    106                 //定义Redis中的数据类型:WordCount采用什么数据类型?
    107                 //使用Hash集合
    108                 return new RedisDataTypeDescription(RedisDataTypeDescription.RedisDataType.HASH,
    109                                                     "wordcount");
    110             }
    111             
    112             @Override
    113             public String getValueFromTuple(ITuple tuple) {
    114                 // 从上一个Tuple中取出值:频率
    115                 return String.valueOf(tuple.getIntegerByField("total"));
    116             }
    117             
    118             @Override
    119             public String getKeyFromTuple(ITuple tuple) {
    120                 // 从上一个Tuple中取出key:单词
    121                 return tuple.getStringByField("word");
    122             }
    123         });
    124     }
    125 }
    View Code

    WordCountSpout.java

     1 package demo.wc;
     2 
     3 import java.util.Map;
     4 import java.util.Random;
     5 
     6 import org.apache.storm.spout.SpoutOutputCollector;
     7 import org.apache.storm.task.TopologyContext;
     8 import org.apache.storm.topology.OutputFieldsDeclarer;
     9 import org.apache.storm.topology.base.BaseRichSpout;
    10 import org.apache.storm.tuple.Fields;
    11 import org.apache.storm.tuple.Values;
    12 import org.apache.storm.utils.Utils;
    13 
    14 //第一级组件,作为任务的Spout组件,来采集数据
    15 //模拟一些数据,随机产生数据
    16 public class WordCountSpout extends BaseRichSpout {
    17 
    18     //定义我们要产生的数据
    19     private String[] datas = {"I love Beijing","I love China","Beijing is the capital of China"};
    20 
    21     //定义一个变量来保存输出流
    22     private SpoutOutputCollector collector;
    23     
    24     @Override
    25     public void nextTuple() {
    26         //每隔2秒采集一次
    27         Utils.sleep(2000);
    28         
    29         // 由Storm的框架调用,用于如何接受数据
    30         //产生一个3以内的随机数
    31         int random = (new Random()).nextInt(3);
    32         //数据
    33         String data = datas[random];
    34         
    35         //把数据发送给下一个组件
    36         //数据一定要遵循schema的结构
    37         System.out.println("采集的数据是:" + data);
    38         this.collector.emit(new Values(data));
    39     }
    40 
    41     @Override
    42     public void open(Map arg0, TopologyContext arg1, SpoutOutputCollector collector) {
    43         //相当于Spout初始化方法
    44         //参数:SpoutOutputCollector collector 相当于是输出流
    45         this.collector = collector;
    46     }
    47 
    48     @Override
    49     public void declareOutputFields(OutputFieldsDeclarer declare) {
    50         // 申明Tuple的格式,是Schema
    51         declare.declare(new Fields("sentence"));
    52     }
    53 }
    View Code

    WordCountSplitBolt.java

     1 package demo.wc;
     2 
     3 import java.util.Map;
     4 
     5 import org.apache.storm.task.OutputCollector;
     6 import org.apache.storm.task.TopologyContext;
     7 import org.apache.storm.topology.OutputFieldsDeclarer;
     8 import org.apache.storm.topology.base.BaseRichBolt;
     9 import org.apache.storm.tuple.Fields;
    10 import org.apache.storm.tuple.Tuple;
    11 import org.apache.storm.tuple.Values;
    12 
    13 //第二级组件,是bolt组件,用于单词的拆分
    14 public class WordCountSplitBolt extends BaseRichBolt{
    15 
    16     private OutputCollector collector;
    17     
    18     @Override
    19     public void execute(Tuple tuple) {
    20         //如何处理上一级组件发来的数据: I love Beijing
    21         String data = tuple.getStringByField("sentence");
    22         
    23         //分词
    24         String[] words = data.split(" ");
    25         
    26         //输出
    27         for(String w:words){
    28             collector.emit(new Values(w,1));
    29         }
    30     }
    31 
    32     @Override
    33     public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) {
    34         // 对Bolt进行初始化
    35         this.collector = collector;
    36     }
    37 
    38     @Override
    39     public void declareOutputFields(OutputFieldsDeclarer declare) {
    40         //申明Tuple的格式
    41         declare.declare(new Fields("word","count"));
    42         
    43     }
    44 }
    View Code

    WordCountTotalBolt.java

     1 package demo.wc;
     2 
     3 import java.util.HashMap;
     4 import java.util.Map;
     5 
     6 import org.apache.storm.task.OutputCollector;
     7 import org.apache.storm.task.TopologyContext;
     8 import org.apache.storm.topology.OutputFieldsDeclarer;
     9 import org.apache.storm.topology.base.BaseRichBolt;
    10 import org.apache.storm.tuple.Fields;
    11 import org.apache.storm.tuple.Tuple;
    12 import org.apache.storm.tuple.Values;
    13 
    14 //第三级组件,是bolt组件,用于单词的计数
    15 public class WordCountTotalBolt extends BaseRichBolt {
    16 
    17     private OutputCollector collector;
    18     
    19     //定义一个Map集合来保存结果
    20     private Map<String, Integer> result = new HashMap<>();
    21     
    22     @Override
    23     public void execute(Tuple tuple) {
    24         // 对每个单词进行计数
    25         //取出数据
    26         String word = tuple.getStringByField("word");
    27         int count = tuple.getIntegerByField("count");
    28         
    29         if(result.containsKey(word)){
    30             //如果包含,进行累加
    31             int total = result.get(word);
    32             result.put(word, total+count);
    33         }else{
    34             //这个单词第一次出现
    35             result.put(word, count);
    36         }
    37         
    38         //打印在屏幕上
    39         System.out.println("统计的结果是: " + result);
    40         
    41         //把结果继续发送给下一个bolt组件:  (单词,频率)
    42         this.collector.emit(new Values(word,result.get(word)));
    43     }
    44 
    45     @Override
    46     public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) {
    47         // TODO Auto-generated method stub
    48         this.collector = collector;
    49     }
    50 
    51     @Override
    52     public void declareOutputFields(OutputFieldsDeclarer declare) {
    53         // TODO Auto-generated method stub
    54         declare.declare(new Fields("word","total"));
    55     }
    56 }
    View Code

    编程模型

    • Topology:Storm中运行的一个实时应用程序
    • Stream:数据流向
    • Spout:在一个Topology中获取源数据流的组件
    • Bolt:接收数据然后执行处理的组件,可级联
    • Tuple:一次消息传递的基本单元
    • StreamGroup:数据分组策略
      • 随机分组:1-2之间
      • 按字段分组:2-3之间
      • 广播分组

    流式计算架构

    • Flume:获取数据
    • Kafka:临时保存数据
    • Storm:计算数据
    • Redis:保存数据

     原理分析

    • Storm在ZK中保存的数据

    • Storm提交任务的过程

     

    • Storm内部通信的机制

    外部集成

    • Redis
      • 添加依赖jar包,在WordCountTopology中编写Bolt组件
      • 创建连接池

    • JDBC
    • HDFS:storm-hdfs***.jar
    • HBase:自己开发一个Bolt组件
    • Kafka
    • Hive

    参考

    大数据实时计算框架

    https://www.csdn.net/gather_21/MtTacgxsMDI1Mi1ibG9n.html

  • 相关阅读:
    高效是如何来的
    find 删除指定日期的文件
    MySQL基础教程
    grep search information
    关于进程的问题
    linux useradd 命令
    host and ip 的关系
    git cherry-pick 教程
    正则练习
    正则表达式-获取
  • 原文地址:https://www.cnblogs.com/cxc1357/p/12713182.html
Copyright © 2011-2022 走看看