zoukankan      html  css  js  c++  java
  • (04)Storm与Kafka结合使用简单案例

      在前面Storm的基本概念中,提到过Storm的Spout应该是源源不断的取数据,不能间断。那么,很显然,消息队列系统、分布式内存系统或内存数据库是作为其数据源的很好的选择。因此,Storm的发布包中也包含了一个集成jar,支持从kafka读出数据,供Storm应用使用。下面记录一下使用过程。

      本篇在上一篇随笔Storm编程案例的基础上改进

      1、添加额外jar包

      在java工程中新添加storm-kafka-0.9.2-incubating.jar,该jar可以从apache-storm-0.9.2-incubating/external/storm-kafka下获取

      2、修改组装类

     1 package demo;
     2 
     3 import java.util.UUID;
     4 
     5 import backtype.storm.Config;
     6 import backtype.storm.StormSubmitter;
     7 import backtype.storm.generated.StormTopology;
     8 import backtype.storm.spout.SchemeAsMultiScheme;
     9 import backtype.storm.topology.TopologyBuilder;
    10 import storm.kafka.BrokerHosts;
    11 import storm.kafka.KafkaSpout;
    12 import storm.kafka.SpoutConfig;
    13 import storm.kafka.StringScheme;
    14 import storm.kafka.ZkHosts;
    15 
    16 //组装各个组件,并且提交任务到Storm集群
    17 public class SubmitClient {
    18 
    19     public static void main(String[] args) throws Exception {
    20         //得到一个topology的构造器
    21         TopologyBuilder builder = new TopologyBuilder();
    22         //指定我们的spout
    23         builder.setSpout("datasource-spout",createKafkaSpout());
    24         //指定Bolt组件,还需要指定数据的来源
    25         builder.setBolt("boltA", new MyBoltA()).shuffleGrouping("datasource-spout");
    26         builder.setBolt("boltB", new MyBoltB()).shuffleGrouping("boltA");
    27         //生成一个具体的任务
    28         StormTopology phoneTopo = builder.createTopology();
    29         //指明任务的一些参数
    30         Config config = new Config();
    31         //希望storm集群分配6个worker来执行任务
    32         config.setNumWorkers(6);
    33         //提交任务
    34         StormSubmitter.submitTopology("mystormdemo", config, phoneTopo);
    35     }
    36 
    37     //支持从Kakfa消息系统中读取数据
    38     private static KafkaSpout createKafkaSpout() {
    39         BrokerHosts brokerHosts = new ZkHosts("192.168.7.151:2181,192.168.7.152:2181,192.168.7.153:2181");
    40         SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, "mydemo1", "/mydemo1", UUID.randomUUID().toString());
    41         spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
    42         //返回一个KafkaSpout
    43         return new KafkaSpout(spoutConfig);
    44     }
    45 }

      第23行指定spout为kafka,并且新增createKafkaSpout方法

      3、文件打包,发送服务器

      将这四个文件打成 stormDemo.jar,并且上传到Storm的服务器,临时存放在 /usr/local/test/storm

      4、服务器添加额外jar包

      向Storm服务添加与kafka相关的jar包,位置在kafka_2.9.2-0.8.1.1/libs添加到apache-storm-0.9.2-incubating/lib下

    [root@localhost storm-kafka]# cp storm-kafka-0.9.2-incubating.jar /usr/local/apache-storm-0.9.2-incubating/lib/
    [root@localhost libs]# cp kafka_2.9.2-0.8.1.1.jar /usr/local/apache-storm-0.9.2-incubating/lib/ [root@localhost libs]# cp scala-library-2.9.2.jar /usr/local/apache-storm-0.9.2-incubating/lib/ [root@localhost libs]# cp metrics-core-2.2.0.jar /usr/local/apache-storm-0.9.2-incubating/lib/ [root@localhost libs]# cp snappy-java-1.0.5.jar /usr/local/apache-storm-0.9.2-incubating/lib/ [root@localhost libs]# cp zkclient-0.3.jar /usr/local/apache-storm-0.9.2-incubating/lib/ [root@localhost libs]# cp log4j-1.2.15.jar /usr/local/apache-storm-0.9.2-incubating/lib/ [root@localhost libs]# cp slf4j-api-1.7.2.jar /usr/local/apache-storm-0.9.2-incubating/lib/ [root@localhost libs]# cp jopt-simple-3.2.jar /usr/local/apache-storm-0.9.2-incubating/lib/

      5、启动相关程序

      首先启动Zookeeper、Kafka、Storm服务,然后启动kafka的生产者客户端程序,分别参考前面的随笔

      安装zookeeper集群

      kafka单机多Broker(伪分布式)的基本配置

      单机安装配置Storm

      java程序连接kafka示例

      然后执行以下命令提交任务:

    [root@localhost apache-storm-0.9.2-incubating]# bin/storm jar /usr/local/test/storm/stormDemo.jar demo.SubmitClient
  • 相关阅读:
    asp.net mvc 4 json大数据异常 提示JSON字符长度超出限制的异常
    js禁止复制页面文字
    小度wifi在window server2008R2系统下创建不了
    spring boot 系列之一:spring boot 入门
    hibernate.hbm2ddl.auto=update不能自动生成表结构
    spring整合springmvc和hibernate
    如何使用maven搭建web项目
    移动端的拼图游戏
    AForge.net 使用之录像拍照功能实现
    AForge.net简介和认识
  • 原文地址:https://www.cnblogs.com/javasl/p/12315060.html
Copyright © 2011-2022 走看看