zoukankan      html  css  js  c++  java
  • Storm入门(2)--Storm编程

    以电信通话记录为例

    移动呼叫及其持续时间将作为对Apache Storm的输入,Storm将处理和分组在相同呼叫者和接收者之间的呼叫及其呼叫总数。

     Storm编程套路

    在storm中,把对数据的处理过程抽象成一个topology,这个topology包含的组件主要是spout、bolt,以及以tuple形式在组件之间传输的数据流。这个数据流在topology流一遍,就是对数据的一次处理。

    1、创建Spout类

    这一部分,是创建数据流的源头。

    创建一个类,实现IRichSpout接口,实现相应方法。其中几个方法的含义:

    • open -为Spout提供执行环境。执行器将运行此方法来初始化喷头。一般写一些第一次运行时要处理的逻辑
    • nextTuple -通过收集器发出生成的数据。核心,用于生成数据流
    • close -当spout将要关闭时调用此方法。
    • declareOutputFields -声明元组的输出模式。即,声明了从此spout出去的流都的数据格式
    • ack -确认处理了特定元组。
    • fail -指定不处理和不重新处理特定元组。
    open(Map conf, TopologyContext context, SpoutOutputCollector collector)
    • conf - 为此spout提供storm配置。
    • context - 提供有关拓扑中的spout位置,其任务ID,输入和输出信息的完整信息。
    • collector - 使我们能够发出将由bolts处理的元组。
    nextTuple()

    nextTuple()从与ack()和fail()方法相同的循环中定期调用。它必须释放线程的控制,当没有工作要做,以便其他方法有机会被调用。因此,nextTuple的第一行检查处理是否已完成。如果是这样,它应该休眠至少一毫秒,以减少处理器在返回之前的负载。

    declareOutputFields(OutputFieldsDeclarer declarer)

    declarer -它用于声明输出流id,输出字段等,此方法用于指定元组的输出模式。

    ack(Object msgId)

    该方法确认已经处理了特定元组。

    fail(Object o)

    此方法通知特定元组尚未完全处理。 Storm将重新处理特定的元组

    package com.jing.calllogdemo;
    
    import org.apache.storm.spout.SpoutOutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.IRichSpout;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Values;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Map;
    import java.util.Random;
    
    /*
    spout类,负责产生数据流
     */
    public class CallLogSpout implements IRichSpout {
        //spout 输出收集器
        private SpoutOutputCollector collector;
        //是否完成
        private boolean completed = false;
        //上下文对象
        private TopologyContext context;
        //随机发生器
        private Random randomGenerator = new Random();
        //索引
        private Integer idx = 0;
    
        @Override
        public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
            //第一次运行要做的事
            this.context = topologyContext;
            this.collector = spoutOutputCollector;
    
        }
    
        @Override
        public void close() {
    
        }
    
        @Override
        public void activate() {
    
        }
    
        @Override
        public void deactivate() {
    
        }
    
        @Override
        public void nextTuple() {
            //产生第一条数据,
    
            if (this.idx <= 1000){
                List<String> mobileNumbers = new ArrayList<String>();
                mobileNumbers.add("1234123401");
                mobileNumbers.add("1234123402");
                mobileNumbers.add("1234123403");
                mobileNumbers.add("1234123404");
    
                Integer localIdx = 0;
                while (localIdx++ < 100 && this.idx++ <1000){
                    //取出主叫
                    String caller = mobileNumbers.get(randomGenerator.nextInt(4));
                    //取出被叫
                    String callee = mobileNumbers.get(randomGenerator.nextInt(4));
                    while (caller == callee){
                        //重新取出被叫
                        callee = mobileNumbers.get(randomGenerator.nextInt(4));
                    }
                    //模拟通话时长
                    Integer duration = randomGenerator.nextInt(60);
                    //输出元祖
                    this.collector.emit(new Values(caller,callee,duration));
                }
            }
    
        }
    
        @Override
        public void ack(Object o) {
    
        }
    
        @Override
        public void fail(Object o) {
    
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
            //声明输出字段,定义元组的结构,定义输出字段名称
            outputFieldsDeclarer.declare(new Fields("from", "to", "duration"));
    
        }
    
        @Override
        public Map<String, Object> getComponentConfiguration() {
            return null;
        }
    }
    CallLogSpout

    2、创建Bolt类

    这一部分是完成对数据流的处理,Bolt把元组作为输入,对元组进行处理后,产生新的元组,bolt类可以有多个。

    创建一个类,实现IRichBolt接口,实现相应方法。

    • prepare -为bolt提供要执行的环境。执行器将运行此方法来初始化spout。
    • execute -处理单个元组的输入
    • cleanup -当spout要关闭时调用。
    • declareOutputFields -声明元组的输出模式。
    prepare(Map conf, TopologyContext context, OutputCollector collector)
    • conf -为此bolt提供Storm配置。
    • context -提供有关拓扑中的bolt位置,其任务ID,输入和输出信息等的完整信息。
    • collector -使我们能够发出处理的元组。
    execute(Tuple tuple)

    这是bolt的核心方法,这里的元组是要处理的输入元组。execute方法一次处理单个元组。元组数据可以通过Tuple类的getValue方法访问。不必立即处理输入元组。多元组可以被处理和输出为单个输出元组。处理的元组可以通过使用OutputCollector类发出。

    cleanup()
    declareOutputFields(OutputFieldsDeclarer declarer)

    这个方法用于指定元组的输出模式,参数declarer用于声明输出流id,输出字段等。

    这里有两个bolt

    呼叫日志创建者bolt接收呼叫日志元组。呼叫日志元组具有主叫方号码,接收方号码和呼叫持续时间。此bolt通过组合主叫方号码和接收方号码简单地创建一个新值。新值的格式为“来电号码 - 接收方号码”,并将其命名为新字段“呼叫”

    package com.jing.calllogdemo;
    
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.IRichBolt;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    import org.apache.storm.tuple.Values;
    
    import java.util.Map;
    /*
    创建calllog日志的bolt
     */
    public class CallLogCreatorBolt implements IRichBolt {
        private OutputCollector collector;
    
        @Override
        public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
            this.collector = outputCollector;
        }
    
        @Override
        public void execute(Tuple tuple) {
            //处理新的同话记录
            String from = tuple.getString(0);
            String to = tuple.getString(1);
            Integer duration = tuple.getInteger(2);
            //产生新的tuple
            String fromTO = from + "-" + to;
            collector.emit(new Values(fromTO, duration));
    
        }
    
        @Override
        public void cleanup() {
    
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
            //设置输出字段的名称
            outputFieldsDeclarer.declare(new Fields("call", "duration"));
        }
    
        @Override
        public Map<String, Object> getComponentConfiguration() {
            return null;
        }
    }
    CallLogCreatorBolt

     呼叫日志创建者bolt接收呼叫日志元组。呼叫日志元组具有主叫方号码,接收方号码和呼叫持续时间。此bolt通过组合主叫方号码和接收方号码简单地创建一个新值。新值的格式为“来电号码 - 接收方号码”,并将其命名为新字段“呼叫”。

    package com.jing.calllogdemo;
    
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.IRichBolt;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    
    import java.util.HashMap;
    import java.util.Map;
    /*
    通话记录计数器bolt
     */
    public class CallLogCounterBolt implements IRichBolt {
        Map<String, Integer> counterMap;
        private OutputCollector collector;
        @Override
        public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
            this.counterMap = new HashMap<String, Integer>();
            this.collector = outputCollector;
        }
    
        @Override
        public void execute(Tuple tuple) {
            String call = tuple.getString(0);
            Integer duration = tuple.getInteger(1);
            if(!counterMap.containsKey(call)){
                counterMap.put(call, 1);
            }else {
                Integer c = counterMap.get(call) + duration;
                counterMap.put(call, c);
            }
            collector.ack(tuple);
    
        }
    
        @Override
        public void cleanup() {
            for(Map.Entry<String, Integer> entry : counterMap.entrySet()){
                System.out.println(entry.getKey() + " : " + entry.getValue());
            }
    
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
            outputFieldsDeclarer.declare(new Fields("call"));
    
        }
    
        @Override
        public Map<String, Object> getComponentConfiguration() {
            return null;
        }
    }
    CallLogCounterBolt

     3、创建执行入口类,构建Topology

    Storm拓扑基本上是一个Thrift结构。 TopologyBuilder类提供了简单而容易的方法来创建复杂的拓扑。TopologyBuilder类具有设置spout(setSpout)和设置bolt(setBolt)的方法。最后,TopologyBuilder有createTopology来创建拓扑。使用以下代码片段创建拓扑 -
    package com.jing.calllogdemo;
    
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.StormSubmitter;
    import org.apache.storm.generated.AlreadyAliveException;
    import org.apache.storm.generated.AuthorizationException;
    import org.apache.storm.generated.InvalidTopologyException;
    import org.apache.storm.topology.TopologyBuilder;
    import org.apache.storm.tuple.Fields;
    
    public class App {
        public static void main(String[] args) throws InterruptedException, InvalidTopologyException, AuthorizationException, AlreadyAliveException {
            TopologyBuilder builder = new TopologyBuilder();
    
            //设置spout
            builder.setSpout("spout", new CallLogSpout());
            //设置creator-bolt
            builder.setBolt("creator-bolt", new CallLogCreatorBolt()).shuffleGrouping("spout");
            //设置countor-bolt
            builder.setBolt("counter-bolt", new CallLogCounterBolt()).
                    fieldsGrouping("creator-bolt", new Fields("call"));
    
            Config config = new Config();
            config.setDebug(true);
    
            /*本地模式
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("LogAnalyserStorm", config, builder.createTopology());
            Thread.sleep(10000);
            cluster.shutdown();
    
             */
    
            StormSubmitter.submitTopology("myTop", config, builder.createTopology());
    
    
        }
    }
    App
    为了开发目的,我们可以使用“LocalCluster”对象创建本地集群,然后使用“LocalCluster”类的“submitTopology”方法提交拓扑。 “submitTopology”的参数之一是“Config”类的实例。“Config”类用于在提交拓扑之前设置配置选项。此配置选项将在运行时与集群配置合并,并使用prepare方法发送到所有任务(spout和bolt)。一旦拓扑提交到集群,我们将等待10秒钟,集群计算提交的拓扑,然后使用“LocalCluster”的“shutdown”方法关闭集群。完整的程序代码如下 -
     
     
    参考:

    作者:raincoffee
    链接:https://www.jianshu.com/p/7af9693d9ffc
    来源:简书
    简书著作权归作者所有,任何形式的转载都请联系作者获得授权并注明出处。

    生产环境的集群上运行topology

    1)修改提交方式,在代码中

    2)导出jar包 mvn

    3)在linux上运行topologys

    &>storm jar XXX.jar  full.class.name

  • 相关阅读:
    linux安装oracle
    echarts柱状图,改变柱状颜色
    JS实现键盘监听(包括组合键)
    css根据屏幕大小切换样式
    (转)Win10下PostgreSQL10与PostGIS安装
    navicat连接PostgreSQL报:column “rolcatupdate” does not exist ...错误的解决办法
    大屏FAQ
    大屏介绍
    大屏模板制作
    大屏做成这样,领导不重用你都难
  • 原文地址:https://www.cnblogs.com/Jing-Wang/p/11028749.html
Copyright © 2011-2022 走看看