zoukankan      html  css  js  c++  java
  • Storm的 hello 输出流

    Spout  (随机生成随机数)-----------------------》Ecclamation(加感叹号)  -----> print(加问候语)

       

    TestWordSpout
    package cn.ljh.storm.helloworld;
    
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import java.util.Map;
    import org.apache.storm.spout.SpoutOutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.base.BaseRichSpout;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Values;
    import org.apache.storm.utils.Utils;
    import java.util.Random;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    
    public class TestWordSpout extends BaseRichSpout {
        public static Logger LOG = LoggerFactory.getLogger(TestWordSpout.class);
        SpoutOutputCollector _collector;
    
        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
            _collector = collector;
        }
    
        public void nextTuple() {
            Utils.sleep(100);
            final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"};
            final Random rand = new Random();
            final String word = words[rand.nextInt(words.length)];
            _collector.emit(new Values(word));
        }
    
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }
    }
    ExclamationBolt
    package cn.ljh.storm.helloworld;
    
    import java.util.Map;
    
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichBolt;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    import org.apache.storm.tuple.Values;
    
    public class ExclamationBolt extends BaseRichBolt {
        OutputCollector _collector;
    
        public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
            _collector = collector;
        }
    
        public void execute(Tuple tuple) {
            _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
            _collector.ack(tuple);
        }
    
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));                            //指定名称
        }
    
    }

    printBolt

    package cn.ljh.storm.helloworld;
    
    import java.util.Map;
    
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichBolt;
    import org.apache.storm.tuple.Tuple;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class PrintBolt extends BaseRichBolt {
        private static Logger LOG = LoggerFactory.getLogger(PrintBolt.class);
        OutputCollector _collector;
    
        public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
            _collector = collector;
        }
    
        public void execute(Tuple tuple) {
            LOG.info(tuple.getString(0) + " Hello World!");
            _collector.ack(tuple);
        }
    
        public void declareOutputFields(OutputFieldsDeclarer declarer) {          //不再下发
        }
    }
    ExclamationTopology
    package cn.ljh.storm.helloworld;
    
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.StormSubmitter;
    import org.apache.storm.topology.TopologyBuilder;
    import org.apache.storm.utils.Utils;
    
    public class ExclamationTopology {
        public static void main(String[] args) throws Exception {
            TopologyBuilder builder = new TopologyBuilder();
    
            builder.setSpout("word", new TestWordSpout(), 1);          //指定消息ID 为word
            builder.setBolt("exclaim", new ExclamationBolt(), 1).shuffleGrouping("word");   //指定分发策略,由spout(wordID)进行下发 
            builder.setBolt("print", new PrintBolt(), 1).shuffleGrouping("exclaim");        //指定分组策略
    
            Config conf = new Config();
            conf.setDebug(true);
    
            if (args != null && args.length > 0) {
                conf.setNumWorkers(3);
    
                StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());         //随机NAME
            }
            else {
    
                LocalCluster cluster = new LocalCluster();
                cluster.submitTopology("test3", conf, builder.createTopology());
                Utils.sleep(20000);
                cluster.killTopology("test3");
                cluster.shutdown();
            }
        }
    }
    RUSH B
  • 相关阅读:
    Mybatis(二) Mybatis通用的写法
    Mybatis(一)Mybatis相关概念
    NodeJS添加Jquery依赖
    安卓、IOS端AEC密钥加密 Java端密钥解密通用实现(16进制表现形式)
    关于博客园首页及详情页美化的代码
    MD5用户密码加密工具类 MD5Util
    .Net Core跨平台应用研究-CustomSerialPort(增强型跨平台串口类库)
    FtpServer穿透内网访问配置踩坑笔记
    .Net Core之编辑json配置文件
    玩转MQTT-阿里云之MQTT使用(下)
  • 原文地址:https://www.cnblogs.com/tangsonghuai/p/11123846.html
Copyright © 2011-2022 走看看