zoukankan      html  css  js  c++  java
  • jstorm开发指南-写个简单的jstorm应用

    jstorm开发指南-写个简单的jstorm应用

    jstorm 是阿里巴巴开源的基于storm采用Java重写的一套分布式实时流计算框架,使用简单,特点如下:

    • 开发非常迅速: 接口简单,容易上手,只要遵守Topology,Spout, Bolt的编程规范即可开发出一个扩展性极好的应用,底层rpc,worker之间冗余,数据分流之类的动作完全不用考虑。
    • 扩展性极好:当一级处理单元速度,直接配置一下并发数,即可线性扩展性能
    • 健壮:当worker失效或机器出现故障时, 自动分配新的worker替换失效worker
    • 数据准确性: 可以采用Acker机制,保证数据不丢失。 如果对精度有更多一步要求,采用事务机制,保证数据准确。

    为什么要选择jstorm,而不采用twitter的storm呢?jstorm对比storm有如下优点:

    • Nimbus 实现HA
    • 彻底解决Storm雪崩问题:底层RPC采用netty + disruptor保证发送速度和接受速度是匹配的
    • 新增supervisor、Supervisor shutdown时、提交新任务,worker数不够时,均不自动触发任务rebalance
    • 新topology不影响现有任务,新任务无需去抢占老任务的cpu,memory,disk和net
    • 减少对ZK的访问量:去掉大量无用的watch;task的心跳时间延长一倍;Task心跳检测无需全ZK扫描
    • Worker 内部全流水线模式:Spout nextTuple和ack/fail运行在不同线程
    • 性能:采用ZeroMq, 比storm快30%;采用netty时, 和storm快10%,并且稳定非常多

    总之,Jstorm 比Storm 更稳定,功能更强大,更快。而且Storm上跑的程序可以一行代码不变运行在Jstorm上,零成本,推荐所有使用storm的兄弟们搭建个jstorm集群缓过来。

    jstorm 集群的搭建过程,可以参考另一篇文章:分布式实时日志系统(一)环境搭建之 Jstorm 集群搭建过程/Jstorm集群一键安装部署

    jstorm 开发实例

    上面也说过了,jstorm使用起来很简单,遵循Topology,Spout, Bolt的编程规范就可以,在下面的例子中将一步步完成这些。例子也很简单,在spout中不断产生自增的int数组,bolt接受到数值后打印出日志,并插入到hbase中。(如果没有hbase环境的,这一步可以继续注释掉,不用打开,只看到跑到日志打印的地方就好了)

    spout 的开发只需要继承BaseRichSpout,实现继承的方法即可:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    public class TestSpout extends BaseRichSpout {
    private static final Logger LOGGER = LoggerFactory.getLogger(TestSpout.class);
    static AtomicInteger sAtomicInteger = new AtomicInteger(0);
    static AtomicInteger pendNum = new AtomicInteger(0);
    private int sqnum;
    SpoutOutputCollector collector;

    @Override
    public void open(Map conf, TopologyContext context,
    SpoutOutputCollector collector) {
    sqnum = sAtomicInteger.incrementAndGet();
    this.collector = collector;
    }

    @Override
    public void nextTuple() {
    while (true) {
    int a = pendNum.incrementAndGet();
    LOGGER.info(String.format("spount %d,pendNum %d", sqnum, a));
    this.collector.emit(new Values("xxxxx:"+a));

    try {
    Thread.sleep(10000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("log"));

    }

    /**
    * 启用 ack 机制,详情参考:https://github.com/alibaba/jstorm/wiki/Ack-%E6%9C%BA%E5%88%B6
    * @param msgId
    */
    @Override
    public void ack(Object msgId) {
    super.ack(msgId);
    }

    /**
    * 消息处理失败后需要自己处理
    * @param msgId
    */
    @Override
    public void fail(Object msgId) {
    super.fail(msgId);
    LOGGER.info("ack fail,msgId"+msgId);
    }

    }

    bolt 同理,继承 BaseRichBolt 实现其相应的方法:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    public  class TestBolt extends BaseRichBolt {

    private static final Logger LOGGER = CustomerLoggerFactory.LOGGER(TestBolt.class);
    OutputCollector collector;

    @Override
    public void prepare(Map stormConf, TopologyContext context,
    OutputCollector collector) {
    this.collector = collector;
    }

    @Override
    public void execute(Tuple input) {
    String xx = input.getString(0);
    LOGGER.info(String.format("receive from spout ,num is : %d", xx));

    // 发送ack信息告知spout 完成处理的消息 ,如果下面的hbase的注释代码打开了,则必须等到插入hbase完毕后才能发送ack信息,这段代码需要删除
    this.collector.ack(input);
    try {
    Thread.sleep(10000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {

    }
    }

    topology 的开发同理:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    public class TestTopology implements ILogTopology {
    @Override
    public void start(Properties properties) throws AlreadyAliveException, InvalidTopologyException, InterruptedException, IOException {

    TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout("testspout", new TestSpout(), 1);
    builder.setBolt("testbolt", new TestBolt(), 2).shuffleGrouping("testspout");

    Config conf = ConfigUtils.getStormConfig(properties);
    conf.setNumAckers(1);

    StormSubmitter.submitTopology("testtopology", conf, builder.createTopology());
    System.out.println("storm cluster will start");
    }

    }

    经过上面的三个步骤,一个最简单的jstorm应用就开发完成了,接下来通过编译、打包完后,生成jar文件 jstorm-hbase-demo-0.1.jar ,将此jar文件在jstorm集群的nimbus机器上提交即可:jstorm jar jstorm-hbase-demo-0.1.jar com.xirong.demo.BootStrap config.properties

    demo运行效果

    从jstorm集群的监控图赏可以看到,对应topology的运行情况:

    http://static.ixirong.com/pic/jstormdemo/jstorm-topology.png

    bolt 的执行效率,及ack数量,占用机器内存等:

    http://static.ixirong.com/pic/jstormdemo/jstorm-testbolt.png

    源码已经上传到github上面,喜欢研究的同学,可以fork后自己修改练习。地址为:https://github.com/xirong/jstorm-hbase-demo
    源码中使用到的Phoenix组件,hbase上层的中间件,使得开发人员可以使用sql的方式来对hbase进行相应的操作,感兴趣的可以阅读:使用Phoenix通过sql语句更新操作hbase数据 ,此文中介绍了如何安装及使用。
    另外想对hbase的有所了解的可以查看:列式存储hbase系统架构学习

    原文http://www.ixirong.com/2015/07/18/develop-the-first-jstorm-demo/

  • 相关阅读:
    101. Symmetric Tree(js)
    100. Same Tree(js)
    99. Recover Binary Search Tree(js)
    98. Validate Binary Search Tree(js)
    97. Interleaving String(js)
    96. Unique Binary Search Trees(js)
    95. Unique Binary Search Trees II(js)
    94. Binary Tree Inorder Traversal(js)
    93. Restore IP Addresses(js)
    92. Reverse Linked List II(js)
  • 原文地址:https://www.cnblogs.com/yudar/p/4883348.html
Copyright © 2011-2022 走看看