zoukankan      html  css  js  c++  java
  • Kafka+Storm+HDFS整合实践

    在基于Hadoop平台的很多应用场景中,我们需要对数据进行离线和实时分析,离线分析可以很容易地借助于Hive来实现统计分析,但是对于实时的需求Hive就不合适了。实时应用场景可以使用Storm,它是一个实时处理系统,它为实时处理类应用提供了一个计算模型,可以很容易地进行编程处理。为了统一离线和实时计算,一般情况下,我们都希望将离线和实时计算的数据源的集合统一起来作为输入,然后将数据的流向分别经由实时系统和离线分析系统,分别进行分析处理,这时我们可以考虑将数据源(如使用Flume收集日志)直接连接一个消息中间件,如Kafka,可以整合Flume+Kafka,Flume作为消息的Producer,生产的消息数据(日志数据、业务请求数据等等)发布到Kafka中,然后通过订阅的方式,使用Storm的Topology作为消息的Consumer,在Storm集群中分别进行如下两个需求场景的处理:

    • 直接使用Storm的Topology对数据进行实时分析处理
    • 整合Storm+HDFS,将消息处理后写入HDFS进行离线分析处理

    实时处理,只要开发满足业务需要的Topology即可,不做过多说明。这里,我们主要从安装配置Kafka、Storm,以及整合Kafka+Storm、整合Storm+HDFS、整合Kafka+Storm+HDFS这几点来配置实践,满足上面提出的一些需求。配置实践使用的软件包如下所示:

    • zookeeper-3.4.5.tar.gz
    • kafka_2.9.2-0.8.1.1.tgz
    • apache-storm-0.9.2-incubating.tar.gz
    • hadoop-2.2.0.tar.gz

    程序配置运行所基于的操作系统为CentOS 5.11。

    Kafka安装配置

    我们使用3台机器搭建Kafka集群:

    1 192.168.4.142   h1
    2 192.168.4.143   h2
    3 192.168.4.144   h3

    在安装Kafka集群之前,这里没有使用Kafka自带的Zookeeper,而是独立安装了一个Zookeeper集群,也是使用这3台机器,保证Zookeeper集群正常运行。
    首先,在h1上准备Kafka安装文件,执行如下命令:

    1 cd /usr/local/
    3 tar xvzf kafka_2.9.2-0.8.1.1.tgz
    4 ln -s /usr/local/kafka_2.9.2-0.8.1.1 /usr/local/kafka
    5 chown -R kafka:kafka /usr/local/kafka_2.9.2-0.8.1.1 /usr/local/kafka

    修改配置文件/usr/local/kafka/config/server.properties,修改如下内容:

    1 broker.id=0
    2 zookeeper.connect=h1:2181,h2:2181,h3:2181/kafka

    这里需要说明的是,默认Kafka会使用ZooKeeper默认的/路径,这样有关Kafka的ZooKeeper配置就会散落在根路径下面,如果你有其他的应用也在使用ZooKeeper集群,查看ZooKeeper中数据可能会不直观,所以强烈建议指定一个chroot路径,直接在zookeeper.connect配置项中指定:

    1 zookeeper.connect=h1:2181,h2:2181,h3:2181/kafka

    而且,需要手动在ZooKeeper中创建路径/kafka,使用如下命令连接到任意一台ZooKeeper服务器:

    1 cd /usr/local/zookeeper
    2 bin/zkCli.sh

    在ZooKeeper执行如下命令创建chroot路径:

    1 create /kafka ''

    这样,每次连接Kafka集群的时候(使用--zookeeper选项),也必须使用带chroot路径的连接字符串,后面会看到。
    然后,将配置好的安装文件同步到其他的h2、h3节点上:

    1 scp -r /usr/local/kafka_2.9.2-0.8.1.1/ h2:/usr/local/
    2 scp -r /usr/local/kafka_2.9.2-0.8.1.1/ h3:/usr/local/

    最后,在h2、h3节点上配置,执行如下命令:

    1 cd /usr/local/
    2 ln -s /usr/local/kafka_2.9.2-0.8.1.1 /usr/local/kafka
    3 chown -R kafka:kafka /usr/local/kafka_2.9.2-0.8.1.1 /usr/local/kafka

    并修改配置文件/usr/local/kafka/config/server.properties内容如下所示:

    1 broker.id=1  # 在h1修改
    2  
    3 broker.id=2  # 在h2修改

    因为Kafka集群需要保证各个Broker的id在整个集群中必须唯一,需要调整这个配置项的值(如果在单机上,可以通过建立多个Broker进程来模拟分布式的Kafka集群,也需要Broker的id唯一,还需要修改一些配置目录的信息)。
    在集群中的h1、h2、h3这三个节点上分别启动Kafka,分别执行如下命令:

    1 bin/kafka-server-start.sh /usr/local/kafka/config/server.properties &

    可以通过查看日志,或者检查进程状态,保证Kafka集群启动成功。
    我们创建一个名称为my-replicated-topic5的Topic,5个分区,并且复制因子为3,执行如下命令:

    1 bin/kafka-topics.sh --create --zookeeper h1:2181,h2:2181,h3:2181/kafka --replication-factor 3 --partitions 5 --topic my-replicated-topic5

    查看创建的Topic,执行如下命令:

    1 bin/kafka-topics.sh --describe --zookeeper h1:2181,h2:2181,h3:2181/kafka --topic my-replicated-topic5

    结果信息如下所示:

    1 Topic:my-replicated-topic5     PartitionCount:5     ReplicationFactor:3     Configs:
    2      Topic: my-replicated-topic5     Partition: 0     Leader: 0     Replicas: 0,2,1     Isr: 0,2,1
    3      Topic: my-replicated-topic5     Partition: 1     Leader: 0     Replicas: 1,0,2     Isr: 0,2,1
    4      Topic: my-replicated-topic5     Partition: 2     Leader: 2     Replicas: 2,1,0     Isr: 2,0,1
    5      Topic: my-replicated-topic5     Partition: 3     Leader: 0     Replicas: 0,1,2     Isr: 0,2,1
    6      Topic: my-replicated-topic5     Partition: 4     Leader: 2     Replicas: 1,2,0     Isr: 2,0,1

    上面Leader、Replicas、Isr的含义如下:

    1 Partition: 分区
    2 Leader   : 负责读写指定分区的节点
    3 Replicas : 复制该分区log的节点列表
    4 Isr      : "in-sync" replicas,当前活跃的副本列表(是一个子集),并且可能成为Leader

    我们可以通过Kafka自带的bin/kafka-console-producer.sh和bin/kafka-console-consumer.sh脚本,来验证演示如果发布消息、消费消息。
    在一个终端,启动Producer,并向我们上面创建的名称为my-replicated-topic5的Topic中生产消息,执行如下脚本:

    1 bin/kafka-console-producer.sh --broker-list h1:9092,h2:9092,h3:9092 --topic my-replicated-topic5

    在另一个终端,启动Consumer,并订阅我们上面创建的名称为my-replicated-topic5的Topic中生产的消息,执行如下脚本:

    1 bin/kafka-console-consumer.sh --zookeeper h1:2181,h2:2181,h3:2181/kafka --from-beginning --topic my-replicated-topic5

    可以在Producer终端上输入字符串消息行,然后回车,就可以在Consumer终端上看到消费者消费的消息内容。
    也可以参考Kafka的Producer和Consumer的Java API,通过API编码的方式来实现消息生产和消费的处理逻辑。

    Storm安装配置

    Storm集群也依赖Zookeeper集群,要保证Zookeeper集群正常运行。Storm的安装配置比较简单,我们仍然使用下面3台机器搭建:

    1 192.168.4.142   h1
    2 192.168.4.143   h2
    3 192.168.4.144   h3

    首先,在h1节点上,执行如下命令安装:

    1 cd /usr/local/
    3 tar xvzf apache-storm-0.9.2-incubating.tar.gz
    4 ln -s /usr/local/apache-storm-0.9.2-incubating /usr/local/storm
    5 chown -R storm:storm /usr/local/apache-storm-0.9.2-incubating /usr/local/storm

    然后,修改配置文件conf/storm.yaml,内容如下所示:

    01 storm.zookeeper.servers:
    02      - "h1"
    03      - "h2"
    04      - "h3"
    05 storm.zookeeper.port: 2181
    06 #
    07 nimbus.host: "h1"
    08  
    09 supervisor.slots.ports:
    10     - 6700
    11     - 6701
    12     - 6702
    13     - 6703
    14  
    15 storm.local.dir: "/tmp/storm"

    将配置好的安装文件,分发到其他节点上:

    1 scp -r /usr/local/apache-storm-0.9.2-incubating/ h2:/usr/local/
    2 scp -r /usr/local/apache-storm-0.9.2-incubating/ h3:/usr/local/

    最后,在h2、h3节点上配置,执行如下命令:

    1 cd /usr/local/
    2 ln -s /usr/local/apache-storm-0.9.2-incubating /usr/local/storm
    3 chown -R storm:storm /usr/local/apache-storm-0.9.2-incubating /usr/local/storm

    Storm集群的主节点为Nimbus,从节点为Supervisor,我们需要在h1上启动Nimbus服务,在从节点h2、h3上启动Supervisor服务:

    1 bin/storm nimbus &
    2 bin/storm supervisor &

    为了方便监控,可以启动Storm UI,可以从Web页面上监控Storm Topology的运行状态,例如在h2上启动:

    1 bin/storm ui &

    这样可以通过访问http://h2:8080/来查看Topology的运行状况。

    整合Kafka+Storm

    消息通过各种方式进入到Kafka消息中间件,比如可以通过使用Flume来收集日志数据,然后在Kafka中路由暂存,然后再由实时计算程序Storm做实时分析,这时我们就需要将在Storm的Spout中读取Kafka中的消息,然后交由具体的Spot组件去分析处理。实际上,apache-storm-0.9.2-incubating这个版本的Storm已经自带了一个集成Kafka的外部插件程序storm-kafka,可以直接使用,例如我使用的Maven依赖配置,如下所示:

    01 <dependency>
    02      <groupId>org.apache.storm</groupId>
    03      <artifactId>storm-core</artifactId>
    04      <version>0.9.2-incubating</version>
    05      <scope>provided</scope>
    06 </dependency>
    07 <dependency>
    08      <groupId>org.apache.storm</groupId>
    09      <artifactId>storm-kafka</artifactId>
    10      <version>0.9.2-incubating</version>
    11 </dependency>
    12 <dependency>
    13      <groupId>org.apache.kafka</groupId>
    14      <artifactId>kafka_2.9.2</artifactId>
    15      <version>0.8.1.1</version>
    16      <exclusions>
    17           <exclusion>
    18                <groupId>org.apache.zookeeper</groupId>
    19                <artifactId>zookeeper</artifactId>
    20           </exclusion>
    21           <exclusion>
    22                <groupId>log4j</groupId>
    23                <artifactId>log4j</artifactId>
    24           </exclusion>
    25      </exclusions>
    26 </dependency>

    下面,我们开发了一个简单WordCount示例程序,从Kafka读取订阅的消息行,通过空格拆分出单个单词,然后再做词频统计计算,实现的Topology的代码,如下所示:

    001 package org.shirdrn.storm.examples;
    002  
    003 import java.util.Arrays;
    004 import java.util.HashMap;
    005 import java.util.Iterator;
    006 import java.util.Map;
    007 import java.util.Map.Entry;
    008 import java.util.concurrent.atomic.AtomicInteger;
    009  
    010 import org.apache.commons.logging.Log;
    011 import org.apache.commons.logging.LogFactory;
    012  
    013 import storm.kafka.BrokerHosts;
    014 import storm.kafka.KafkaSpout;
    015 import storm.kafka.SpoutConfig;
    016 import storm.kafka.StringScheme;
    017 import storm.kafka.ZkHosts;
    018 import backtype.storm.Config;
    019 import backtype.storm.LocalCluster;
    020 import backtype.storm.StormSubmitter;
    021 import backtype.storm.generated.AlreadyAliveException;
    022 import backtype.storm.generated.InvalidTopologyException;
    023 import backtype.storm.spout.SchemeAsMultiScheme;
    024 import backtype.storm.task.OutputCollector;
    025 import backtype.storm.task.TopologyContext;
    026 import backtype.storm.topology.OutputFieldsDeclarer;
    027 import backtype.storm.topology.TopologyBuilder;
    028 import backtype.storm.topology.base.BaseRichBolt;
    029 import backtype.storm.tuple.Fields;
    030 import backtype.storm.tuple.Tuple;
    031 import backtype.storm.tuple.Values;
    032  
    033 public class MyKafkaTopology {
    034  
    035      public static class KafkaWordSplitter extends BaseRichBolt {
    036  
    037           private static final Log LOG = LogFactory.getLog(KafkaWordSplitter.class);
    038           private static final long serialVersionUID = 886149197481637894L;
    039           private OutputCollector collector;
    040           
    041           @Override
    042           public void prepare(Map stormConf, TopologyContext context,
    043                     OutputCollector collector) {
    044                this.collector = collector;              
    045           }
    046  
    047           @Override
    048           public void execute(Tuple input) {
    049                String line = input.getString(0);
    050                LOG.info("RECV[kafka -> splitter] " + line);
    051                String[] words = line.split("\s+");
    052                for(String word : words) {
    053                     LOG.info("EMIT[splitter -> counter] " + word);
    054                     collector.emit(input, new Values(word, 1));
    055                }
    056                collector.ack(input);
    057           }
    058  
    059           @Override
    060           public void declareOutputFields(OutputFieldsDeclarer declarer) {
    061                declarer.declare(new Fields("word""count"));         
    062           }
    063           
    064      }
    065      
    066      public static class WordCounter extends BaseRichBolt {
    067  
    068           private static final Log LOG = LogFactory.getLog(WordCounter.class);
    069           private static final long serialVersionUID = 886149197481637894L;
    070           private OutputCollector collector;
    071           private Map<String, AtomicInteger> counterMap;
    072           
    073           @Override
    074           public void prepare(Map stormConf, TopologyContext context,
    075                     OutputCollector collector) {
    076                this.collector = collector;    
    077                this.counterMap = new HashMap<String, AtomicInteger>();
    078           }
    079  
    080           @Override
    081           public void execute(Tuple input) {
    082                String word = input.getString(0);
    083                int count = input.getInteger(1);
    084                LOG.info("RECV[splitter -> counter] " + word + " : " + count);
    085                AtomicInteger ai = this.counterMap.get(word);
    086                if(ai == null) {
    087                     ai = new AtomicInteger();
    088                     this.counterMap.put(word, ai);
    089                }
    090                ai.addAndGet(count);
    091                collector.ack(input);
    092                LOG.info("CHECK statistics map: " this.counterMap);
    093           }
    094  
    095           @Override
    096           public void cleanup() {
    097                LOG.info("The final result:");
    098                Iterator<Entry<String, AtomicInteger>> iter = this.counterMap.entrySet().iterator();
    099                while(iter.hasNext()) {
    100                     Entry<String, AtomicInteger> entry = iter.next();
    101                     LOG.info(entry.getKey() + " : " + entry.getValue().get());
    102                }
    103                
    104           }
    105  
    106           @Override
    107           public void declareOutputFields(OutputFieldsDeclarer declarer) {
    108                declarer.declare(new Fields("word""count"));         
    109           }
    110      }
    111      
    112      public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, InterruptedException {
    113           String zks = "h1:2181,h2:2181,h3:2181";
    114           String topic = "my-replicated-topic5";
    115           String zkRoot = "/storm"// default zookeeper root configuration for storm
    116           String id = "word";
    117           
    118           BrokerHosts brokerHosts = new ZkHosts(zks);
    119           SpoutConfig spoutConf = new SpoutConfig(brokerHosts, topic, zkRoot, id);
    120           spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
    121           spoutConf.forceFromStart = false;
    122           spoutConf.zkServers = Arrays.asList(new String[] {"h1""h2""h3"});
    123           spoutConf.zkPort = 2181;
    124           
    125           TopologyBuilder builder = new TopologyBuilder();
    126           builder.setSpout("kafka-reader"new KafkaSpout(spoutConf), 5); // Kafka我们创建了一个5分区的Topic,这里并行度设置为5
    127           builder.setBolt("word-splitter"new KafkaWordSplitter(), 2).shuffleGrouping("kafka-reader");
    128           builder.setBolt("word-counter"new WordCounter()).fieldsGrouping("word-splitter"newFields("word"));
    129           
    130           Config conf = new Config();
    131           
    132           String name = MyKafkaTopology.class.getSimpleName();
    133           if (args != null && args.length > 0) {
    134                // Nimbus host name passed from command line
    135                conf.put(Config.NIMBUS_HOST, args[0]);
    136                conf.setNumWorkers(3);
    137                StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology());
    138           else {
    139                conf.setMaxTaskParallelism(3);
    140                LocalCluster cluster = new LocalCluster();
    141                cluster.submitTopology(name, conf, builder.createTopology());
    142                Thread.sleep(60000);
    143                cluster.shutdown();
    144           }
    145      }
    146 }

    上面程序,在本地调试(使用LocalCluster)不需要输入任何参数,提交到实际集群中运行时,需要传递一个参数,该参数为Nimbus的主机名称。
    通过Maven构建,生成一个包含依赖的single jar文件(不要把Storm的依赖包添加进去),例如storm-examples-0.0.1-SNAPSHOT.jar,在提交Topology程序到Storm集群之前,因为用到了Kafka,需要拷贝一下依赖jar文件到Storm集群中的lib目录下面:

    1 cp /usr/local/kafka/libs/kafka_2.9.2-0.8.1.1.jar /usr/local/storm/lib/
    2 cp /usr/local/kafka/libs/scala-library-2.9.2.jar /usr/local/storm/lib/
    3 cp /usr/local/kafka/libs/metrics-core-2.2.0.jar /usr/local/storm/lib/
    4 cp /usr/local/kafka/libs/snappy-java-1.0.5.jar /usr/local/storm/lib/
    5 cp /usr/local/kafka/libs/zkclient-0.3.jar /usr/local/storm/lib/
    6 cp /usr/local/kafka/libs/log4j-1.2.15.jar /usr/local/storm/lib/
    7 cp /usr/local/kafka/libs/slf4j-api-1.7.2.jar /usr/local/storm/lib/
    8 cp /usr/local/kafka/libs/jopt-simple-3.2.jar /usr/local/storm/lib/

    然后,就可以提交我们开发的Topology程序了:

    1 bin/storm jar /home/storm/storm-examples-0.0.1-SNAPSHOT.jar org.shirdrn.storm.examples.MyKafkaTopology h1

    可以通过查看日志文件(logs/目录下)或者Storm UI来监控Topology的运行状况。如果程序没有错误,可以使用前面我们使用的Kafka Producer来生成消息,就能看到我们开发的Storm Topology能够实时接收到并进行处理。
    上面Topology实现代码中,有一个很关键的配置对象SpoutConfig,配置属性如下所示:

    1 spoutConf.forceFromStart = false;

    该配置是指,如果该Topology因故障停止处理,下次正常运行时是否从Spout对应数据源Kafka中的该订阅Topic的起始位置开始读取,如果forceFromStart=true,则之前处理过的Tuple还要重新处理一遍,否则会从上次处理的位置继续处理,保证Kafka中的Topic数据不被重复处理,是在数据源的位置进行状态记录。

    整合Storm+HDFS

    Storm实时计算集群从Kafka消息中间件中消费消息,有实时处理需求的可以走实时处理程序,还有需要进行离线分析的需求,如写入到HDFS进行分析。下面实现了一个Topology,代码如下所示:

    001 package org.shirdrn.storm.examples;
    002  
    003 import java.text.DateFormat;
    004 import java.text.SimpleDateFormat;
    005 import java.util.Date;
    006 import java.util.Map;
    007 import java.util.Random;
    008  
    009 import org.apache.commons.logging.Log;
    010 import org.apache.commons.logging.LogFactory;
    011 import org.apache.storm.hdfs.bolt.HdfsBolt;
    012 import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
    013 import org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat;
    014 import org.apache.storm.hdfs.bolt.format.FileNameFormat;
    015 import org.apache.storm.hdfs.bolt.format.RecordFormat;
    016 import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
    017 import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy;
    018 import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy.TimeUnit;
    019 import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
    020 import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
    021  
    022 import backtype.storm.Config;
    023 import backtype.storm.LocalCluster;
    024 import backtype.storm.StormSubmitter;
    025 import backtype.storm.generated.AlreadyAliveException;
    026 import backtype.storm.generated.InvalidTopologyException;
    027 import backtype.storm.spout.SpoutOutputCollector;
    028 import backtype.storm.task.TopologyContext;
    029 import backtype.storm.topology.OutputFieldsDeclarer;
    030 import backtype.storm.topology.TopologyBuilder;
    031 import backtype.storm.topology.base.BaseRichSpout;
    032 import backtype.storm.tuple.Fields;
    033 import backtype.storm.tuple.Values;
    034 import backtype.storm.utils.Utils;
    035  
    036 public class StormToHDFSTopology {
    037  
    038      public static class EventSpout extends BaseRichSpout {
    039  
    040           private static final Log LOG = LogFactory.getLog(EventSpout.class);
    041           private static final long serialVersionUID = 886149197481637894L;
    042           private SpoutOutputCollector collector;
    043           private Random rand;
    044           private String[] records;
    045           
    046           @Override
    047           public void open(Map conf, TopologyContext context,
    048                     SpoutOutputCollector collector) {
    049                this.collector = collector;    
    050                rand = new Random();
    051                records = new String[] {
    052                          "10001     ef2da82d4c8b49c44199655dc14f39f6     4.2.1     HUAWEI G610-U00     HUAWEI     2     70:72:3c:73:8b:22     2014-10-13 12:36:35",
    053                          "10001     ffb52739a29348a67952e47c12da54ef     4.3     GT-I9300     samsung     2     50:CC:F8:E4:22:E2     2014-10-13 12:36:02",
    054                          "10001     ef2da82d4c8b49c44199655dc14f39f6     4.2.1     HUAWEI G610-U00     HUAWEI     2     70:72:3c:73:8b:22     2014-10-13 12:36:35"
    055                };
    056           }
    057  
    058  
    059           @Override
    060           public void nextTuple() {
    061                Utils.sleep(1000);
    062                DateFormat df = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss");
    063                Date d = new Date(System.currentTimeMillis());
    064                String minute = df.format(d);
    065                String record = records[rand.nextInt(records.length)];
    066                LOG.info("EMIT[spout -> hdfs] " + minute + " : " + record);
    067                collector.emit(new Values(minute, record));
    068           }
    069  
    070           @Override
    071           public void declareOutputFields(OutputFieldsDeclarer declarer) {
    072                declarer.declare(new Fields("minute""record"));         
    073           }
    074  
    075  
    076      }
    077      
    078      public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, InterruptedException {
    079           // use "|" instead of "," for field delimiter
    080           RecordFormat format = new DelimitedRecordFormat()
    081                   .withFieldDelimiter(" : ");
    082  
    083           // sync the filesystem after every 1k tuples
    084           SyncPolicy syncPolicy = new CountSyncPolicy(1000);
    085  
    086           // rotate files 
    087           FileRotationPolicy rotationPolicy = new TimedRotationPolicy(1.0f, TimeUnit.MINUTES);
    088  
    089           FileNameFormat fileNameFormat = new DefaultFileNameFormat()
    090                   .withPath("/storm/").withPrefix("app_").withExtension(".log");
    091  
    092           HdfsBolt hdfsBolt = new HdfsBolt()
    093                   .withFsUrl("hdfs://h1:8020")
    094                   .withFileNameFormat(fileNameFormat)
    095                   .withRecordFormat(format)
    096                   .withRotationPolicy(rotationPolicy)
    097                   .withSyncPolicy(syncPolicy);
    098           
    099           TopologyBuilder builder = new TopologyBuilder();
    100           builder.setSpout("event-spout"new EventSpout(), 3);
    101           builder.setBolt("hdfs-bolt", hdfsBolt, 2).fieldsGrouping("event-spout"new Fields("minute"));
    102           
    103           Config conf = new Config();
    104           
    105           String name = StormToHDFSTopology.class.getSimpleName();
    106           if (args != null && args.length > 0) {
    107                conf.put(Config.NIMBUS_HOST, args[0]);
    108                conf.setNumWorkers(3);
    109                StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology());
    110           else {
    111                conf.setMaxTaskParallelism(3);
    112                LocalCluster cluster = new LocalCluster();
    113                cluster.submitTopology(name, conf, builder.createTopology());
    114                Thread.sleep(60000);
    115                cluster.shutdown();
    116           }
    117      }
    118  
    119 }

    上面的处理逻辑,可以对HdfsBolt进行更加详细的配置,如FileNameFormat、SyncPolicy、FileRotationPolicy(可以设置在满足什么条件下,切出一个新的日志,如可以指定多长时间切出一个新的日志文件,可以指定一个日志文件大小达到设置值后,再写一个新日志文件),更多设置可以参考storm-hdfs,。
    上面代码在打包的时候,需要注意,使用storm-starter自带的Maven打包配置,可能在将Topology部署运行的时候,会报错,可以使用maven-shade-plugin这个插件,如下配置所示:

    01 <plugin>
    02     <groupId>org.apache.maven.plugins</groupId>
    03     <artifactId>maven-shade-plugin</artifactId>
    04     <version>1.4</version>
    05     <configuration>
    06         <createDependencyReducedPom>true</createDependencyReducedPom>
    07     </configuration>
    08     <executions>
    09         <execution>
    10             <phase>package</phase>
    11             <goals>
    12                 <goal>shade</goal>
    13             </goals>
    14             <configuration>
    15                 <transformers>
    16                     <transformer
    17                             implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
    18                     <transformer
    19                             implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
    20                         <mainClass></mainClass>
    21                     </transformer>
    22                 </transformers>
    23             </configuration>
    24         </execution>
    25     </executions>
    26 </plugin>

    整合Kafka+Storm+HDFS

    上面分别对整合Kafka+Storm和Storm+HDFS做了实践,可以将后者的Spout改成前者的Spout,从Kafka中消费消息,在Storm中可以做简单处理,然后将数据写入HDFS,最后可以在Hadoop平台上对数据进行离线分析处理。下面,写了一个简单的例子,从Kafka消费消息,然后经由Storm处理,写入到HDFS存储,代码如下所示:

    001 package org.shirdrn.storm.examples;
    002  
    003 import java.util.Arrays;
    004 import java.util.Map;
    005  
    006 import org.apache.commons.logging.Log;
    007 import org.apache.commons.logging.LogFactory;
    008 import org.apache.storm.hdfs.bolt.HdfsBolt;
    009 import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
    010 import org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat;
    011 import org.apache.storm.hdfs.bolt.format.FileNameFormat;
    012 import org.apache.storm.hdfs.bolt.format.RecordFormat;
    013 import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
    014 import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy;
    015 import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy.TimeUnit;
    016 import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
    017 import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
    018  
    019 import storm.kafka.BrokerHosts;
    020 import storm.kafka.KafkaSpout;
    021 import storm.kafka.SpoutConfig;
    022 import storm.kafka.StringScheme;
    023 import storm.kafka.ZkHosts;
    024 import backtype.storm.Config;
    025 import backtype.storm.LocalCluster;
    026 import backtype.storm.StormSubmitter;
    027 import backtype.storm.generated.AlreadyAliveException;
    028 import backtype.storm.generated.InvalidTopologyException;
    029 import backtype.storm.spout.SchemeAsMultiScheme;
    030 import backtype.storm.task.OutputCollector;
    031 import backtype.storm.task.TopologyContext;
    032 import backtype.storm.topology.OutputFieldsDeclarer;
    033 import backtype.storm.topology.TopologyBuilder;
    034 import backtype.storm.topology.base.BaseRichBolt;
    035 import backtype.storm.tuple.Fields;
    036 import backtype.storm.tuple.Tuple;
    037 import backtype.storm.tuple.Values;
    038  
    039 public class DistributeWordTopology {
    040      
    041      public static class KafkaWordToUpperCase extends BaseRichBolt {
    042  
    043           private static final Log LOG = LogFactory.getLog(KafkaWordToUpperCase.class);
    044           private static final long serialVersionUID = -5207232012035109026L;
    045           private OutputCollector collector;
    046           
    047           @Override
    048           public void prepare(Map stormConf, TopologyContext context,
    049                     OutputCollector collector) {
    050                this.collector = collector;              
    051           }
    052  
    053           @Override
    054           public void execute(Tuple input) {
    055                String line = input.getString(0).trim();
    056                LOG.info("RECV[kafka -> splitter] " + line);
    057                if(!line.isEmpty()) {
    058                     String upperLine = line.toUpperCase();
    059                     LOG.info("EMIT[splitter -> counter] " + upperLine);
    060                     collector.emit(input, new Values(upperLine, upperLine.length()));
    061                }
    062                collector.ack(input);
    063           }
    064  
    065           @Override
    066           public void declareOutputFields(OutputFieldsDeclarer declarer) {
    067                declarer.declare(new Fields("line""len"));         
    068           }
    069           
    070      }
    071      
    072      public static class RealtimeBolt extends BaseRichBolt {
    073  
    074           private static final Log LOG = LogFactory.getLog(KafkaWordToUpperCase.class);
    075           private static final long serialVersionUID = -4115132557403913367L;
    076           private OutputCollector collector;
    077           
    078           @Override
    079           public void prepare(Map stormConf, TopologyContext context,
    080                     OutputCollector collector) {
    081                this.collector = collector;              
    082           }
    083  
    084           @Override
    085           public void execute(Tuple input) {
    086                String line = input.getString(0).trim();
    087                LOG.info("REALTIME: " + line);
    088                collector.ack(input);
    089           }
    090  
    091           @Override
    092           public void declareOutputFields(OutputFieldsDeclarer declarer) {
    093                
    094           }
    095  
    096      }
    097  
    098      public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, InterruptedException {
    099  
    100           // Configure Kafka
    101           String zks = "h1:2181,h2:2181,h3:2181";
    102           String topic = "my-replicated-topic5";
    103           String zkRoot = "/storm"// default zookeeper root configuration for storm
    104           String id = "word";
    105           BrokerHosts brokerHosts = new ZkHosts(zks);
    106           SpoutConfig spoutConf = new SpoutConfig(brokerHosts, topic, zkRoot, id);
    107           spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
    108           spoutConf.forceFromStart = false;
    109           spoutConf.zkServers = Arrays.asList(new String[] {"h1""h2""h3"});
    110           spoutConf.zkPort = 2181;
    111           
    112           // Configure HDFS bolt
    113           RecordFormat format = new DelimitedRecordFormat()
    114                   .withFieldDelimiter(" "); // use " " instead of "," for field delimiter
    115           SyncPolicy syncPolicy = new CountSyncPolicy(1000); // sync the filesystem after every 1k tuples
    116           FileRotationPolicy rotationPolicy = new TimedRotationPolicy(1.0f, TimeUnit.MINUTES); // rotate files
    117           FileNameFormat fileNameFormat = new DefaultFileNameFormat()
    118                   .withPath("/storm/").withPrefix("app_").withExtension(".log"); // set file name format
    119           HdfsBolt hdfsBolt = new HdfsBolt()
    120                   .withFsUrl("hdfs://h1:8020")
    121                   .withFileNameFormat(fileNameFormat)
    122                   .withRecordFormat(format)
    123                   .withRotationPolicy(rotationPolicy)
    124                   .withSyncPolicy(syncPolicy);
    125           
    126           // configure & build topology
    127           TopologyBuilder builder = new TopologyBuilder();
    128           builder.setSpout("kafka-reader"new KafkaSpout(spoutConf), 5);
    129           builder.setBolt("to-upper"new KafkaWordToUpperCase(), 3).shuffleGrouping("kafka-reader");
    130           builder.setBolt("hdfs-bolt", hdfsBolt, 2).shuffleGrouping("to-upper");
    131           builder.setBolt("realtime"new RealtimeBolt(), 2).shuffleGrouping("to-upper");
    132           
    133           // submit topology
    134           Config conf = new Config();
    135           String name = DistributeWordTopology.class.getSimpleName();
    136           if (args != null && args.length > 0) {
    137                String nimbus = args[0];
    138                conf.put(Config.NIMBUS_HOST, nimbus);
    139                conf.setNumWorkers(3);
    140                StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology());
    141           else {
    142                conf.setMaxTaskParallelism(3);
    143                LocalCluster cluster = new LocalCluster();
    144                cluster.submitTopology(name, conf, builder.createTopology());
    145                Thread.sleep(60000);
    146                cluster.shutdown();
    147           }
    148      }
    149  
    150 }

    上面代码中,名称为to-upper的Bolt将接收到的字符串行转换成大写以后,会将处理过的数据向后面的hdfs-bolt、realtime这两个Bolt各发一份拷贝,然后由这两个Bolt分别根据实际需要(实时/离线)单独处理。
    打包后,在Storm集群上部署并运行这个Topology:

    1 bin/storm jar ~/storm-examples-0.0.1-SNAPSHOT.jar org.shirdrn.storm.examples.DistributeWordTopology h1

    可以通过Storm UI查看Topology运行情况,可以查看HDFS上生成的数据。

  • 相关阅读:
    CNN comprehension
    Gradient Descent
    Various Optimization Algorithms For Training Neural Network
    gerrit workflow
    jenkins job配置脚本化
    Jenkins pipeline jobs隐式传参
    make words counter for image with the help of paddlehub model
    make words counter for image with the help of paddlehub model
    git push and gerrit code review
    image similarity
  • 原文地址:https://www.cnblogs.com/wanghuaijun/p/5603363.html
Copyright © 2011-2022 走看看