zoukankan      html  css  js  c++  java
  • JStorm环境搭建

    开始JStorm学习之前需要搭建集群环境,这里演示搭建单机JStorm环境,仅供学习使用,生产环境部署大同小异,但建议参考JStorm社区及相关说明文档。
    一、前提
    JStorm核心代码均用Java实现,所以依赖Java Runtime,另外,JStorm有脚本采用Python实现,所以还需要Python的支持。
    1、JAVA环境

    2、Python环境

    这里选择Java版本1.6.0_35及Python版本2.6.5,如果默认没有安装可以参考相关文档(www.java.comwww.python.org)。
    二、版本选择
    zeromq-3.2.4
    zookeeper-3.4.5
    jstorm-0.7.1
    三、JStorm环境搭建
    与Storm一样,JStorm的底层消息通信机制依赖zeromq/jzmq,另外,JStorm通过zookeeper实现数据共享和协调服务。
    1、安装zeromq
    wget http://download.zeromq.org/zeromq-3.2.4.tar.gz
    tar zxf zeromq-3.2.4.tar.gz
    cd zeromq-3.2.4
    ./configure
    make
    sudo make install
    sudo ldconfig
    2、安装jzmq
    wget https://github.com/zeromq/jzmq/tarball/master -O jzmq.tar.gz
    tar zxf jzmq.tar.gz
    cd jzmq
    ./autogen.sh
    ./configure
    make
    make install
    3、安装zookeeper
    wget http://apache.dataguru.cn/zookeeper/zookeeper-3.4.5/zookeeper-3.4.5.tar.gz
    tar zxf zookeeper-3.4.5.tar.gz
    cd zookeeper-3.4.5
    ./bin/zkServer.sh start
    ./bin/zkServer.sh stop
    4、安装jstorm
    wget http://42.121.19.155/jstorm/jstorm-0.7.1.zip
    unzip jstorm-0.7.1.zip
    编辑配置文件conf/storm.yaml
    storm.zookeeper.servers:
    - “localhost”
    nimbus.host: “localhost”
    storm.zookeeper.root: “/jstorm”
    storm.local.dir: “/tmp/jstorm”
    drpc.servers:
    - “localhost”
    如果是开发环境本地内存不足情况时启动nimbus可能会抛出异常:
    Error occurred during initialization of VM
    Could not reserve enough space for object heap
    只需要在conf/storm.yaml里配置:
    nimbus.childopts: “-Xmx256m”
    supervisor.childopts: “-Xmx256m”
    worker.childopts: “-Xmx128m”
    其中大小可根据实际情况配置
    5、UI
    前提:tomcat 7.0 或以上版本;
    将jstorm-ui-0.7.1.war复制到tomcat的webapps目录下;
    6、启动JStorm
    启动zookeeper:进入zookeeper目录,执行bin/zkServer.sh start
    启动Nimbus:进入JStorm目录,执行bin/jstorm nimbus
    启动Supervisor:进入JStorm目录,执行bin/jstorm supervisor
    启动Tomcat:进入Tomcat目录,执行bin/startup.sh
    四、JStorm HelloWorld
    1、编写源码
    这个例子取自:github
    HelloWorldTopology.java

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    
    package storm.cookbook;
    import backtype.storm.Config;
    import backtype.storm.StormSubmitter;
    import backtype.storm.generated.AlreadyAliveException;
    import backtype.storm.generated.InvalidTopologyException;
    import backtype.storm.topology.TopologyBuilder;
    import backtype.storm.utils.Utils;
    /**
     * Author: ashrith
     * Date: 8/26/13
     * Time: 12:03 PM
     * Desc: setup the topology and submit it to either a local of remote Storm cluster depending on the arguments
     *       passed to the main method.
     */
    public class HelloWorldTopology {
        /*
         * main class in which to define the topology and a LocalCluster object (enables you to test and debug the
         * topology locally). In conjunction with the Config object, LocalCluster allows you to try out different
         * cluster configurations.
         *
         * Create a topology using 'TopologyBuilder' (which will tell storm how the nodes area arranged and how they
         * exchange data)
         * The spout and the bolts are connected using 'ShuffleGroupings'
         *
         * Create a 'Config' object containing the topology configuration, which is merged with the cluster configuration
         * at runtime and sent to all nodes with the prepare method
         *
         * Create and run the topology using 'createTopology' and 'submitTopology'
         */
        public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
            TopologyBuilder builder = new TopologyBuilder();
            builder.setSpout("randomHelloWorld", new HelloWorldSpout(), 10);
            builder.setBolt("HelloWorldBolt", new HelloWorldBolt(), 1).shuffleGrouping("randomHelloWorld");
            Config conf = new Config();
            conf.put(Config.NIMBUS_HOST, "localhost");
            conf.put(Config.NIMBUS_THRIFT_PORT, 6627);
            conf.setDebug(true);
            if(args!=null && args.length > 0) {
                conf.setNumWorkers(3);
                StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
            } else {
            }
        }
    }

    HelloWorldSpout.java

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    
    package storm.cookbook;
    import backtype.storm.spout.SpoutOutputCollector;
    import backtype.storm.task.TopologyContext;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.topology.base.BaseRichSpout;
    import backtype.storm.tuple.Fields;
    import backtype.storm.tuple.Values;
    import java.util.Map;
    import java.util.Random;
    /**
     * Author: ashrith
     * Date: 8/21/13
     * Time: 8:33 PM
     * Desc: spout essentially emits a stream containing 1 of 2 sentences 'Other Random Word' or 'Hello World' based on
     *       random probability. It works by generating a random number upon construction and then generating subsequent
     *       random numbers to test against the original member variable's value. When it matches "Hello World" is emitted,
     *       during the remaining executions the other sentence is emitted.
     */
    public class HelloWorldSpout extends BaseRichSpout{
        private SpoutOutputCollector collector;
        private int referenceRandom;
        private static final int MAX_RANDOM = 10;
        public HelloWorldSpout() {
            final Random rand = new Random();
            referenceRandom = rand.nextInt(MAX_RANDOM);
        }
        /*
         * declareOutputFields() => you need to tell the Storm cluster which fields this Spout emits within the
         *  declareOutputFields method.
         */
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("sentence"));
        }
        /*
         * open() => The first method called in any spout is 'open'
         *           TopologyContext => contains all our topology data
         *           SpoutOutputCollector => enables us to emit the data that will be processed by the bolts
         *           conf => created in the topology definition
         */
        @Override
        public void open(Map conf, TopologyContext topologyContext, SpoutOutputCollector collector) {
            this.collector = collector;
        }
        /*
         * nextTuple() => Storm cluster will repeatedly call the nextTuple method which will do all the work of the spout.
         *  nextTuple() must release the control of the thread when there is no work to do so that the other methods have
         *  a chance to be called.
         */
        @Override
        public void nextTuple() {
            final Random rand = new Random();
            int instanceRandom = rand.nextInt(MAX_RANDOM);
            if(instanceRandom == referenceRandom){
                collector.emit(new Values("Hello World"));
            } else {
                collector.emit(new Values("Other Random Word"));
            }
        }
    }

    HelloWorldBolt.java

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    
    package storm.cookbook;
    import backtype.storm.task.OutputCollector;
    import backtype.storm.task.TopologyContext;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.topology.base.BaseRichBolt;
    import backtype.storm.tuple.Tuple;
    import java.util.Map;
    /**
     * Author: ashrith
     * Date: 8/26/13
     * Time: 11:48 AM
     * Desc: This bolt will consume the produced Tuples from HelloWorldSpout and implement the required counting logic
     */
    public class HelloWorldBolt extends BaseRichBolt {
        private int myCount = 0;
        /*
         * prepare() => on create
         */
        @Override
        public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        }
        /*
         * execute() => most important method in the bolt is execute(Tuple input), which is called once per tuple received
         *  the bolt may emit several tuples for each tuple received
         */
        @Override
        public void execute(Tuple tuple) {
            String test = tuple.getStringByField("sentence");
            if(test == "Hello World"){
                myCount++;
                System.out.println("Found a Hello World! My Count is now: " + Integer.toString(myCount));
            }
        }
        /*
         * declareOutputFields => This bolt emits nothing hence no body for declareOutputFields()
         */
        @Override
        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        }
    }

    2、提交Topology
    上述源码编译打包Helloworld.jar后提交到jstorm集群:
    bin/jstorm jar Helloworld.jar storm.cookbook.HelloWorldTopology HelloWorld
    其中参数[HelloWorld]为TopologyName
    3.查看Topology运行状况
    通过ui等途径可以查看Topology的执行情况。
    五、结语
    本节简单介绍了JStorm单机环境的搭建,用供初学者搭建单机JStorm,并能够编写HelloWolrd,生产环境集群搭建仅做参考,详细配置建议查询相关文档。
    六、参考文档
    [1]https://github.com/alibaba/jstorm/wiki
    [2]https://github.com/nathanmarz/storm/wiki

    http://hexiaoqiao.sinaapp.com/2014/06/09/jstorm%E7%8E%AF%E5%A2%83%E6%90%AD%E5%BB%BA/

  • 相关阅读:
    【赛前集训】20190902
    【2019csp模拟】树上相交路径
    如何给网站添加SSL证书(西部数码)?
    如何将页面blob类型的视频链接下载下来?
    VUE笔记:运行Vue报错error in ./*xxxxx&lang=scss Syntax Error: TypeError: this.getOptions is not a function
    VUE笔记:Failed to resolve loader: sass-loader
    VUE笔记:VScode(英文版)快速生成VUE模板的设置
    VUE笔记:运行VUE报错 Node Sass version 6.0.1 is incompatible with ^4.0.0.
    VUE笔记:yarn add sass-loader node-sass -dev
    Excel多条件求和:SUM和SUMIFS的综合使用
  • 原文地址:https://www.cnblogs.com/chen110xi/p/5655050.html
Copyright © 2011-2022 走看看