zoukankan      html  css  js  c++  java
  • Storm+kafka的HelloWorld初体验

    从16年4月5号开始学习kafka,后来由于项目需要又涉及到了storm。

    经过几天的扫盲,到今天16年4月13日,磕磕碰碰的总算是写了一个kafka+storm的HelloWorld的例子。

    为了达到前人栽树后人乘凉的知识共享的目的,我尝试着梳理一下过程。

    ====实例需求

    由kafka消息队列源源不断生产数据,然后由storm进行实时消费。

    大家可以设想这些数据源是不同商品的用户行为操作行为,我们是不是就可以实时观测到用户关注商品的热点呢?

     

    ====环境准备

    (1)Linux:

    公司暂时没有多余的Linux主机,所以我只能在自己的电脑上建立的3台Linux虚拟机。

    虚拟机的建立方法我做了一个小白级别的手册,按照这个手册就可以建立起虚拟机了。

    百度云连接地址:http://pan.baidu.com/s/1hr3lVqG

    (2)JDK:

    我这里使用的是:jdk-7u80-linux-x64.tar.gz。

    在官方网站上下载,然后配置环境变量即可。

    (3)zookeeper集群:

    搭建方法省略。可以参照我的博客:http://www.cnblogs.com/quchunhui/p/5356511.html

    (4)kafka:

    搭建方法省略。可以参照我的博客:http://www.cnblogs.com/quchunhui/p/5356511.html

     (5)storm:

    我这里使用的版本是相对稳定的:apache-storm-0.9.5.tar.gz

    搭建方法省略,可以参照我的博客:http://www.cnblogs.com/quchunhui/p/5370191.html

    (6)Maven:

    开发环境的构建使用Maven。我这里使用的版本是:apache-maven-3.3.3.zip

    Maven的入门可以参考我的博客:http://www.cnblogs.com/quchunhui/p/5359293.html

    补充一下环境变量配置之后的图,以供小白参考。

    ====程序执行方式

    (1)kafka:

    需要手动编写kafka的生产者程序,然后通过eclipse等工具在Windows端启动,以达到生产消息的目的。

    (2)storm:

    可以进行两种方式的启动。一种是通过eclipse等工具在Windows端启动(俗称本地模式)

    另一种是将storm的消费者程序打成jar包发布到Linux环境上,通过Linux启动程序进行消费(俗称集群模式)。

     

    ====Storm框架前期理解

    从某位大神的QQ群组里下载了一篇关于storm的基本框架以及安装的文章

    我这里共享到了我的百度云盘上了,请大家在开始编程之前一定要看看。非常值得一看。

    百度云地址:https://pan.baidu.com/s/14q7HBAYtvHKtaTA4_dm62g

     

    那么后面我们就可以开始编写我们的程序了。首先需要编写的是kafka的生产者程序。

    ====kafka程序相关:

    我已经写好的代码共享到了Github上了:https://github.com/quchunhui/kafkaSample/

    这里只对目录结构以及重要部分进行说明:

    (1)src/main路径结构如下:

    +---common

    | Constants.java                   //这里统一定义了所有的常量,修改配置的时候只修改这里就可以。
    |
    +---consumer
    | +---group
    | | GroupConsumer.java        //kafka消费者程序。消费模型:分组消费
    | |
    | ---partition
    | PartitionConsumer.java       //kafka消费者程序。消费模型:分区消费
    |
    +---producer
    | +---async
    | | AsyncProduce.java           //kafka生产者程序。生产模型:异步生产(本次实例相关)
    | |
    | +---partiton
    | | SimplePartitioner.java       //message的序列化类
    | |
    | ---sync
    | SyncProduce.java              //kafka生产者程序。生产模型:同步生产
    |
    ---utilities
    CommonUtil.java                 //共通方法类。

    (2)实例所用的代码:

    本次实例中,仅仅使用了kafka进行消息的生产,同事考虑到异步生产性能更高一些,

    本次实例中使用了异步生产的代码,就是上面红色字标记的java程序(AsyncProduce.java)。

    代码本身比较简单,其中下面红色框的部分为【异步】的配置项,需要注意。

    各个配置项的说明请参考我的另一篇博客:http://www.cnblogs.com/quchunhui/p/5357040.html

    ====Storm程序相关:

    (1)拓扑设计

    【消息源(RandomSentenceSpout)】

    接入到从上面的kafka消息队列中,将kafka作为storm的消息源。

    【数据标准化(WordNormalizerBolt)】

    然后使用一个Bolt进行归一化(语句切分),句子切分成单词发射出去。(代码更新中。。。)

    【词频统计(WordCountBolt)】

    使用一个Bolt接受订阅切分的单词Tuple,进行单词统计,并且选择使用按字段分组的策略,词频实时排序,把TopN实时发射出去。(代码更新中。。。)

    【工具类(PrintBolt)】

    最后使用一哥Bolt将结果打印到Log中。(代码更新中。。。)

    ====实例代码

    我自己进行验证用的代码已经上传到Github上了,可以直接下载下来使用。

    这里只对代码的目录结构以及需要格外关注的点进行一些补充。

    Git地址:https://github.com/quchunhui/storm-kafka-plus-qch

    (1)目录结构

    srcmainjavacomdscnhelloworld


    | WordCountTopology.java         // Topology代码,程序入口,使用eclipse是需要执行该程序。
    |
    +---bolt
    | PrintBolt.java                          // 上面讲到的工具类(PrintBolt)类
    | WordCountBolt.java                // 上面讲到的词频统计(WordCountBolt)类
    | WordNormalizerBolt.java          // 上面讲到的数据标准化(WordNormalizerBolt)类
    |
    ---spout
    RandomSentenceSpout.java       // 未使用

    (2)重要代码说明

    由于源代码已经共享给大家了,Storm的接口的用法在下面的篇幅中单独罗列了一下,我这里不进行过多的阐述。

    在这里只将我碰到过的问题罗列出来、以问题&解决方法的形式分享。 

    【问题1】

    storm是如何实现与kafka的对接的

    【回答】

    Spout作为storm的消息源的接入点,在这里可以同构设置Storm官方提供【storm.kafka.SpoutConfig】类来指定消息源。

     

    ----------------

    //配置zookeeper集群地址,毕竟storm是需要集群支持的。

    BrokerHosts brokerHosts = new ZkHosts("192.168.93.128:2181,192.168.93.129:2181,192.168.93.130:2181");

    //配置Kafka订阅的Topic,以及zookeeper中数据节点目录和名字

    SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, "qchlocaltest", "", "topo");

    spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

    //如果该Topology因故障停止处理,下次正常运行时是否从Spout对应数据源Kafka中的该订阅Topic的起始位置开始读取
    spoutConfig.forceFromStart = true;

    //zookeeper集群host

    spoutConfig.zkServers = Arrays.asList(new String[] {"192.168.93.128", "192.168.93.129", "192.168.93.130"});

    //zookeeper集群port

    spoutConfig.zkPort = 2181;

    ----------------

    【问题2】

    我尝试着自己重新写代码配置开发环境(不是直接使用Github上的代码),

    编译时可以正常通过的,但是本地模式通过eclipse启动Topology的时候,出现了log4j和slf4j的冲突问题。

    【解决方法】

    问题原因是由于log4j和slf4j之间的重复调用,导致死循环而致使内存溢出。

    解决办法就是log4j和slf4j保留一个, 普遍上都是保留slf4j的。

    需要在Maven的pom.xml上将log4j的相关依赖移除。

    移除方法:

     

    可以通过【mvn dependency:tree】命令来查看修改之后的依赖关系。

    如果发现需要移除的包的时候,使用Maven的【exclusion】标签来移除依赖关系。

    填写exclusion标签的时候,下图中红色的部分是groupId,蓝色的部分是artifactId。

    【问题3】

    使用mvn install命令将程序打jar包上传到Linux的storm目录下,然后使用命令

    [storm jar test-0.1-jar-with-dependencies.jar com.dscn.helloworld.WordCountTopology 192.168.93.128]

    启动Topology的时候,出现了下面的提示错误。

    【解决方法】

    是Maven的pom.xml的配置出现了问题。详细请参考博客http://blog.csdn.net/luyee2010/article/details/18455237

    修改方法就是强storm的scope修改为provided。如下图所示:

     

    【问题4】

    将代码放到实际的集群运行环境(kafka+storm+hbase)中,发现storm接受不到消息。

    【原因】

    一直以来都是使用kafka的异步生产来生产消息,以为都正常的生产消息了。由于异步生产的时候,并没有消息确认机制,

    所以不能确保消息是否正确的进入到了消息队列之中,改用同步生产的代码尝试了一下,果然发生了一下的错误。

    【解决办法】

    通过网上搜索[kafka Failed to send messages]关键字,发现有可能是需要设置advertised.host.name这个属性。

    抱着尝试一下的心态试了一下,果然好使了。至于这个属性的真正意义还有待探索。(TODO)

    【问题5】

    代码在本地的时候好好的,通过storm jar命令发布到集群环境的时候,发生了Jar包冲突的问题。

    【解决方法】

    本来是认为自己的Maven环境的依赖有问题,也通过mvn dependency:tree查看了依赖关系,毫无问题。根本就诶有log4j-over-slf4j.jar这个包。

    头疼了很久,通过QQ群咨询了一些朋友,他们建议我确认集群环境中storm/lib下是否存在log4j-over-slf4j.jar,如果存在就把它删掉。

    尝试了一下之后,果然好使了。原来是我的程序的jar包和集群环境中会有冲突。详细请参考我的另一篇博客:

    http://www.cnblogs.com/quchunhui/p/5404168.html

    ====Storm接口详解:

    【IComponent接口】

    Spout和Bolt都是其Component。所以,Storm定义了一个名叫IComponent的总接口。

    IComponent的继承关系如下图所示:

    绿色部分是我们最常用、比较简单的部分。红色部分是与事务相关。

    BaseComponent 是Storm提供的“偷懒”的类。为什么这么说呢,它及其子类,都或多或少实现了其接口定义的部分方法。

    这样我们在用的时候,可以直接继承该类,而不是自己每次都写所有的方法。

    但值得一提的是,BaseXXX这种定义的类,它所实现的方法,都是空的,直接返回null。

    【Spout】

    类图如下图所示:

    接口如下图所示:

    各个接口说明:

    ①、open方法:

    是初始化动作。允许你在该spout初始化时做一些动作,传入了上下文,方便取上下文的一些数据。 

    ②、close方法

    在该spout关闭前执行,但是并不能得到保证其一定被执行。

    spout是作为task运行在worker内,在cluster模式下,supervisor会直接kill -9 woker的进程,这样它就无法执行了。

    而在本地模式下,只要不是kill -9, 如果是发送停止命令,是可以保证close的执行的。 

    ③、activatedeactivate方法 

    一个spout可以被暂时激活和关闭,这两个方法分别在对应的时刻被调用。 

    ④、nextTuple方法:

    负责消息的接入,执行数据发射。是Spout中的最重要方法。

    ⑤、ack(Object)方法:

    传入的Object其实是一个id,唯一表示一个tuple。该方法是这个id所对应的tuple被成功处理后执行。 

    ⑥、fail(Object)方法:

    同ack,只不过是tuple处理失败时执行。 

    我们的RandomSpout由于继承了BaseRichSpout,

    所以不用实现close、activate、deactivate、ack、fail和getComponentConfiguration方法,只关心最基本核心的部分。 

    结论:

    通常情况下(Shell和事务型的除外),实现一个Spout,可以直接实现接口IRichSpout,如果不想写多余的代码,可以直接继承BaseRichSpout。 

    【Bolt】

    类图如下图所示:

    这里可以看到一个奇怪的问题: 为什么IBasicBolt并没有继承IBolt? 我们带着问题往下看。 

    IBolt定义了三个方法: 

    ①、prepare方法:

    IBolt继承了java.io.Serializable,我们在nimbus上提交了topology以后,创建出来的bolt会序列化后发送到具体执行的worker上去。

    worker在执行该Bolt时,会先调用prepare方法传入当前执行的上下文。

    ②、execute方法:

    接受一个tuple进行处理,并用prepare方法传入的OutputCollector的ack方法(表示成功)或fail(表示失败)来反馈处理结果。

    ③、cleanup方法:

    同ISpout的close方法,在关闭前调用。同样不保证其一定执行。

    红色部分(execute方法)是Bolt实现时一定要注意的地方。

    而Storm提供了IBasicBolt接口,其目的就是实现该接口的Bolt不用在代码中提供反馈结果了,Storm内部会自动反馈成功。

    如果你确实要反馈失败,可以抛出FailedException。

     
    我们来再写一个Bolt继承BaseRichBolt替代ExclaimBasicBolt。代码如下:
    修改topology
    运行下,结果一致。
     
    结论:
    通常情况下,实现一个Bolt,可以实现IRichBolt接口或继承BaseRichBolt,
    如果不想自己处理结果反馈,可以实现IBasicBolt接口或继承BaseBasicBolt,它实际上相当于自动做掉了prepare方法和collector.emit.ack(inputTuple);

    ====推荐博客:

    【整合实战类】:

    http://shiyanjun.cn/archives/934.html

    http://www.tuicool.com/articles/NzyqAn

    http://itindex.net/detail/51477-storm-笔记-kafka

    【问题解决类】:

    http://www.aboutyun.com/thread-12590-1-1.html

    【Storm调优类】:

    http://blog.csdn.net/derekjiang/article/details/9040243

    http://www.51studyit.com/html/notes/20140329/45.html

    --END--

  • 相关阅读:
    String类型作为方法的形参
    [转] 为什么说 Java 程序员必须掌握 Spring Boot ?
    Centos打开、关闭、结束tomcat,及查看tomcat运行日志
    centos中iptables和firewall防火墙开启、关闭、查看状态、基本设置等
    防火墙没有关导致外部访问虚拟机的tomcat遇到的问题和解决方法
    可以ping通ip地址,但是访问80,或者8080报错
    JAVA的非对称加密算法RSA——加密和解密
    CA双向认证的时候,如果一开始下载的证书就有问题的,怎么保证以后的交易没有问题?
    图解HTTPS协议加密解密全过程
    https单向认证服务端发送到客户端到底会不会加密?
  • 原文地址:https://www.cnblogs.com/quchunhui/p/5380260.html
Copyright © 2011-2022 走看看