zoukankan      html  css  js  c++  java
  • (03)Storm编程案例

      本篇记录一下Storm的编程案例:将手机型号转换成大写,并且加上当前时间,再输出到文件。

      1、所需jar包

      解压安装包apache-storm-0.9.2-incubating.tar.gz,在apache-storm-0.9.2-incubating/lib下

      2、创建一个Spout类

      负责实时读取数据,然后发送给后续的bolt组件进行处理

    package demo;
    
    import java.util.Map;
    import java.util.Random;
    
    import backtype.storm.spout.SpoutOutputCollector;
    import backtype.storm.task.TopologyContext;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.topology.base.BaseRichSpout;
    import backtype.storm.tuple.Fields;
    import backtype.storm.tuple.Values;
    import backtype.storm.utils.Utils;
    
    //负责实时读取数据,然后发送给后续的bolt组件进行处理
    public class DataSouceSpout extends BaseRichSpout {
        
        //模拟一些数据
        private String[] phones = {"iphone","xiaomi","moto","sunsumg","mate","huawei","nokia"};
        //得到上下文的信息
        private SpoutOutputCollector collector;
        
        //消息的处理方法
        public void nextTuple() {
            //模拟从外部读取数据
            Utils.sleep(500);
            //随机得到了一个手机的型号
            int index = new Random().nextInt(7);
            String phone = phones[index];
            //发送给后续的bolt
            this.collector.emit(new Values(phone));
        }
    
        //对这个spout进行初始化
        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
            this.collector = collector;
        }
    
        //指定spout组件发送出去的数据的key
        public void declareOutputFields(OutputFieldsDeclarer declare) {
            declare.declare(new Fields("phone-name"));
        }
    }

      3、创建一个Bolt类

      将Spout发送过来的数据,转成大写

    package demo;
    
    import backtype.storm.topology.BasicOutputCollector;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.topology.base.BaseBasicBolt;
    import backtype.storm.tuple.Fields;
    import backtype.storm.tuple.Tuple;
    import backtype.storm.tuple.Values;
    
    //将Spout发送过来的数据,转成大写
    public class MyBoltA extends BaseBasicBolt {
    
        //业务的处理逻辑方法
        public void execute(Tuple tuple, BasicOutputCollector collector) {
            //从spout组件中获取数据
            //方式一:
            //String phone = tuple.getStringByField("phone-name");
            //方式二
            String phone = tuple.getString(0);
            //处理数据
            String upperPhone = phone.toUpperCase();
            //将数据发送给下一个组件继续处理
            collector.emit(new Values(upperPhone));
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declare) {
            declare.declare(new Fields("upperphone"));
        }
    
    }

      4、创建另一个Bolt类

      将BoltA发送过来的数据,加上时间,并且写到文件中

    package demo;
    
    import java.io.FileWriter;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.Map;
    
    import backtype.storm.task.TopologyContext;
    import backtype.storm.topology.BasicOutputCollector;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.topology.base.BaseBasicBolt;
    import backtype.storm.tuple.Tuple;
    
    //将BoltA发送过来的数据,加上时间,并且写到文件中
    public class MyBoltB extends BaseBasicBolt {
    
        private FileWriter fw = null;
        
        public void prepare(Map stormConf, TopologyContext context) {
            //对FileWriter进行初始化
            try {
                fw = new FileWriter("/usr/local/test/storm/mystormoutput.txt");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        @Override
        public void execute(Tuple tuple, BasicOutputCollector collector) {
            //将boltA发送过来的数据,加上当前的时间
            //获取数据
            String upperPhoneName = tuple.getString(0);
            //业务逻辑
            SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            String result = upperPhoneName +"   " + df.format(new Date());
            //将数据输出到文件系统中
            try {
                fw.write(result + "
    ");
                fw.flush();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declare) {
            //这是最后一个bolt组件
        }
    
    }

      5、创建一个组装类

      组装各个组件,并且提交任务到Storm集群

    package demo;
    
    import backtype.storm.Config;
    import backtype.storm.StormSubmitter;
    import backtype.storm.generated.StormTopology;
    import backtype.storm.topology.TopologyBuilder;
    
    //组装各个组件,并且提交任务到Storm集群
    public class SubmitClient {
    
        public static void main(String[] args) throws Exception {
            //得到一个topology的构造器
            TopologyBuilder builder = new TopologyBuilder();
            //指定spout
            builder.setSpout("datasource-spout", new DataSouceSpout());
            //指定Bolt组件,还需要指定数据的来源
            builder.setBolt("boltA", new MyBoltA()).shuffleGrouping("datasource-spout");
            builder.setBolt("boltB", new MyBoltB()).shuffleGrouping("boltA");
            //生成一个具体的任务
            StormTopology phoneTopo = builder.createTopology();
            //指明任务的一些参数
            Config config = new Config();
            //希望storm集群分配6个worker来执行任务
            config.setNumWorkers(6);
            //提交任务
            StormSubmitter.submitTopology("mystormdemo", config, phoneTopo);
        }
    }

      6、文件打包,发送服务器

      将这四个文件打成 stormDemo.jar,并且上传到Storm的服务器,临时存放在 /usr/local/test/storm

      7、运行程序

      首先启动Zookeeper和Storm,然后执行以下命令提交任务

    [root@localhost apache-storm-0.9.2-incubating]# bin/storm jar /usr/local/test/storm/stormDemo.jar demo.SubmitClient

      如下图所示:

      8、查看结果

    [root@localhost storm]# tail -f /usr/local/test/storm/mystormoutput.txt

      程序执行成功,文件一输出到 mystormoutput.txt

  • 相关阅读:
    Debian 9 更换源
    MySqlDataAdapter.Fill() 报异常‘给定关键字不在字典中’的解决方案
    阿里云函数计算 .NET Core 初体验
    TimeSpan 的 Milliseconds 和 TotalMilliseconds 有啥区别?
    使用 gitee 托管你的 go 模块
    markdown的css样式(自己写的)
    markdown的流程图实现和代码语法着色
    Python元组与字典详解
    centos7的防火墙(firewalld)
    centos7 安装java和tomcat9
  • 原文地址:https://www.cnblogs.com/javasl/p/12312983.html
Copyright © 2011-2022 走看看