zoukankan      html  css  js  c++  java
  • storm集群搭建和java应用

    1. vim /etc/hosts
    ssh免密登录
    192.168.132.154 c0
    192.168.132.156 c1
    192.168.132.155 c2

    storm集群:
    192.168.132.154 c0
    192.168.132.156 c1
    192.168.132.155 c2

    2. 请在官网下载,并解压。文末有文件下载地址
    tar -zxvf apache-storm-1.2.3.tar.gz

      配置环境变量(c0,c1,c2都要修改)

    vim /etc/profile
    # storm
    export STORM_HOME=/home/xiaozw/soft/java/storm
    export PATH=$PATH:${JAVA_PATH}:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$ZK_HOME/bin:$SPARK_HOME/bin:${STORM_HOME}/bin
    刷新生效
    source /etc/profile

    3. 修改storm/conf/storm.yaml配置文件

    storm.zookeeper.servers:

    - "192.168.132.154"
    - "192.168.132.156"
    - "192.168.132.155"
    nimbus.seeds: ["192.168.132.154"]
    storm.local.dir:"/home/xiaozw/soft/tmp/storm"
    supervisor.slots.ports:
    - 6700
    - 6701
    - 6702
    - 6703

      

    创建storm目录(c0,c1,c2都要创建)

    mkdir /home/xiaozw/soft/tmp/storm

    4. 拷贝storm文件夹到其它集群上(c1,c2)
    scp -r /home/xiaozw/soft/java/storm root@c1:/home/xiaozw/soft/java/
    scp -r /home/xiaozw/soft/java/storm root@c2:/home/xiaozw/soft/java/

    先启动zookeeper,安装zookeeper请查看其它文档。
    zkServer.sh start
    5. 启动storm
    c0,nimbus上启动
    ./storm nimbus >> /dev/null 2>&1 &
    启动界面
    storm ui >> /dev/null 2>&1 &

      c1,c2 supervisor上启动

    ./storm supervisor >> /dev/null 2>&1 &

     界面查看,http://192.168.132.154:8080

     6. java wordcount程序。

    package com.xiaozw.demo4.storm;
    
    
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.StormSubmitter;
    import org.apache.storm.spout.SpoutOutputCollector;
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.TopologyBuilder;
    import org.apache.storm.topology.base.BaseRichBolt;
    import org.apache.storm.topology.base.BaseRichSpout;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    import org.apache.storm.tuple.Values;
    import org.apache.storm.utils.Utils;
    
    import java.util.HashMap;
    import java.util.Map;
    import java.util.Random;
    
    public class WordCountTopology {
        /**
         * spout 继承一个基类,实现接口,这个里面主要是负责从数据源获取数据。
         * 简化从内部发射数据。
         */
        public static class RandomSentenceSpout extends BaseRichSpout{
    
            private static final long serialVersionUID = -8017609899644290351L;
    
            private SpoutOutputCollector collector;
    
            private Random random;
    
            /**
             * 对spout初始化,创建线程,数据库连接
             * @param conf
             * @param topologyContext
             * @param collector
             */
            @Override
            public void open(Map conf, TopologyContext topologyContext, SpoutOutputCollector collector) {
                //初始化数据,SpoutOutputCollector用来发射数据出去,
                this.collector=collector;
                this.random=new Random();
            }
    
            /**
             * 最终运行在task中,某个worker进程的某个executor线程内部。
             * 某个task负责无限循环调用nextTuple方法,
             * 形成数据流。
             */
            @Override
            public void nextTuple() {
                Utils.sleep(100);
                String[] sentences = new String[] {
                        "the cow jumped over the moon", "an apple a day keeps the doctor away",
                        "four score and seven years ago", "snow white and the seven dwarfs",
                        "i am at two with nature" };
                String sentence = sentences[random.nextInt(sentences.length)];
                System.err.println("【发射句子】sentence=" + sentence);
                collector.emit(new Values(sentence));
            }
    
            /**
             * 发射出去每个tuple中field名称是什么。
             * @param declarer
             */
            @Override
            public void declareOutputFields(OutputFieldsDeclarer declarer) {
                declarer.declare(new Fields("sentence"));
            }
        }
    
        /**
         * 每个bolt同样是发送到worker某个executor的task中执行
         *
         * @author Administrator
         *
         */
        public static class SplitSentence extends BaseRichBolt {
    
            private static final long serialVersionUID = -1863792429350238883L;
    
            private OutputCollector collector;
    
            /**
             * 对于bolt来说,第一个方法就是prepare()方法。
             */
            @Override
            @SuppressWarnings("rawtypes")
            public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
                this.collector = collector;
            }
    
            /**
             * 每接收到一条数据后,就会交给executor方法来执行
             */
            @Override
            public void execute(Tuple tuple) {
                String sentence = tuple.getStringByField("sentence");
                if (sentence != null && "".equals(sentence) == false) {
                    String[] words = sentence.split(" ");
                    for (String word : words) {
                        collector.emit(new Values(word));
                    }
                }
            }
    
            @Override
            public void declareOutputFields(OutputFieldsDeclarer declarer) {
                declarer.declare(new Fields("word"));
            }
    
        }
    
        /**
         * 单词计数bolt
         *
         * @author Administrator
         *
         */
        public static class WordCount extends BaseRichBolt {
    
            private static final long serialVersionUID = -8940950046975910504L;
    
            //private static final Logger LOGGER = LoggerFactory.getLogger(WordCount.class);
    
            private OutputCollector collector;
            private Map<String, Integer> wordCounts = new HashMap<String, Integer>();
    
            @Override
            @SuppressWarnings("rawtypes")
            public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
                this.collector = collector;
            }
    
            @Override
            public void execute(Tuple input) {
                String word = input.getStringByField("word");
                Integer count = wordCounts.get(word);
                if (count == null) {
                    count = 0;
                }
                wordCounts.put(word, ++count);
                System.err.println("【单词计数】" + word + "出现的次数是" + count);
                collector.emit(new Values(word, count));
            }
    
            @Override
            public void declareOutputFields(OutputFieldsDeclarer declarer) {
                declarer.declare(new Fields("word", "count"));
            }
    
        }
    
        public static void main(String[] args) throws Exception{
            // 在main方法中,会去将spout和bolts组合起来,构建成一个拓扑
            TopologyBuilder builder = new TopologyBuilder();
    
            // 第一个参数的意思,就是给这个spout设置一个名字
            // 第二个参数的意思,就是创建一个spout的对象
            // 第三个参数的意思,就是设置spout的executor有几个
            builder.setSpout("RandomSentence", new RandomSentenceSpout(), 5);
    
            builder.setBolt("SplitSentence", new SplitSentence(), 10).setNumTasks(20)
                    .shuffleGrouping("RandomSentence");
            // 这里设置fieldsGrouping很重要,相同的单词从SplitSentence发射出来时,一定会进入到下游的指定的同一个task中
            // 只有这样子,才能准确的统计出每个单词的数量
            builder.setBolt("WordCount", new WordCount(), 10).setNumTasks(20).fieldsGrouping(
                    "SplitSentence", new Fields("word"));
    
            Config config = new Config();
            if(args!=null && args.length>0){
                config.setNumWorkers(3);
                try{
                    StormSubmitter.submitTopologyWithProgressBar(args[0],config,builder.createTopology());
                }
                catch (Exception e){
                    e.printStackTrace();
                }
            }
            else{
                config.setMaxTaskParallelism(20);
                // 在eclipse本地运行
                LocalCluster cluster = new LocalCluster();
                cluster.submitTopology("WordCountTopology", config, builder.createTopology());
                Utils.sleep(60000);
                cluster.shutdown();
            }
        }
    }

    打包 mvn clean install -DskipTests
    上传jar包到测试服务器

      

      执行命令:

    storm jar demo4-1.0-SNAPSHOT.jar com.xiaozw.demo4.storm.WordCountTopology WordCountTopology

      查看UI界面 http://192.168.132.154:8080/index.html

     

     7. 源码和文件下载地址

    链接:https://pan.baidu.com/s/1RmBlhZ_p-30clHoUhxycBg
    提取码:ik6g

      

  • 相关阅读:
    Django models中的null和blank的区别
    微服务
    幂等性
    restful规范
    related_name
    数据库 引擎,数据类型,约束
    数据库 基本操作
    python 常见算法
    python if,循环的练习
    python数据类型、if判断语句
  • 原文地址:https://www.cnblogs.com/xiaozw/p/11904641.html
Copyright © 2011-2022 走看看