zoukankan      html  css  js  c++  java
  • Kafka+Storm+HDFS整合实践

    在基于Hadoop平台的很多应用场景中,我们需要对数据进行离线和实时分析,离线分析可以很容易地借助于Hive来实现统计分析,但是对于实时的需求Hive就不合适了。实时应用场景可以使用Storm,它是一个实时处理系统,它为实时处理类应用提供了一个计算模型,可以很容易地进行编程处理。为了统一离线和实时计算,一般情况下,我们都希望将离线和实时计算的数据源的集合统一起来作为输入,然后将数据的流向分别经由实时系统和离线分析系统,分别进行分析处理,这时我们可以考虑将数据源(如使用Flume收集日志)直接连接一个消息中间件,如Kafka,可以整合Flume+Kafka,Flume作为消息的Producer,生产的消息数据(日志数据、业务请求数据等等)发布到Kafka中,然后通过订阅的方式,使用Storm的Topology作为消息的Consumer,在Storm集群中分别进行如下两个需求场景的处理:

    • 直接使用Storm的Topology对数据进行实时分析处理
    • 整合Storm+HDFS,将消息处理后写入HDFS进行离线分析处理

    实时处理,只要开发满足业务需要的Topology即可,不做过多说明。这里,我们主要从安装配置Kafka、Storm,以及整合Kafka+Storm、整合Storm+HDFS、整合Kafka+Storm+HDFS这几点来配置实践,满足上面提出的一些需求。配置实践使用的软件包如下所示:

    • zookeeper-3.4.5.tar.gz
    • kafka_2.9.2-0.8.1.1.tgz
    • apache-storm-0.9.2-incubating.tar.gz
    • hadoop-2.2.0.tar.gz

    程序配置运行所基于的操作系统为CentOS 5.11。

    Kafka安装配置

    我们使用3台机器搭建Kafka集群:

    1 192.168.4.142   h1
    2 192.168.4.143   h2
    3 192.168.4.144   h3

    在安装Kafka集群之前,这里没有使用Kafka自带的Zookeeper,而是独立安装了一个Zookeeper集群,也是使用这3台机器,保证Zookeeper集群正常运行。
    首先,在h1上准备Kafka安装文件,执行如下命令:

    1 cd /usr/local/
    3 tar xvzf kafka_2.9.2-0.8.1.1.tgz
    4 ln -s /usr/local/kafka_2.9.2-0.8.1.1 /usr/local/kafka
    5 chown -R kafka:kafka /usr/local/kafka_2.9.2-0.8.1.1 /usr/local/kafka

    修改配置文件/usr/local/kafka/config/server.properties,修改如下内容:

    1 broker.id=0
    2 zookeeper.connect=h1:2181,h2:2181,h3:2181/kafka

    这里需要说明的是,默认Kafka会使用ZooKeeper默认的/路径,这样有关Kafka的ZooKeeper配置就会散落在根路径下面,如果你有其他的应用也在使用ZooKeeper集群,查看ZooKeeper中数据可能会不直观,所以强烈建议指定一个chroot路径,直接在zookeeper.connect配置项中指定:

    1 zookeeper.connect=h1:2181,h2:2181,h3:2181/kafka

    而且,需要手动在ZooKeeper中创建路径/kafka,使用如下命令连接到任意一台ZooKeeper服务器:

    1 cd /usr/local/zookeeper
    2 bin/zkCli.sh

    在ZooKeeper执行如下命令创建chroot路径:

    1 create /kafka ''

    这样,每次连接Kafka集群的时候(使用--zookeeper选项),也必须使用带chroot路径的连接字符串,后面会看到。
    然后,将配置好的安装文件同步到其他的h2、h3节点上:

    1 scp -r /usr/local/kafka_2.9.2-0.8.1.1/ h2:/usr/local/
    2 scp -r /usr/local/kafka_2.9.2-0.8.1.1/ h3:/usr/local/

    最后,在h2、h3节点上配置,执行如下命令:

    1 cd /usr/local/
    2 ln -s /usr/local/kafka_2.9.2-0.8.1.1 /usr/local/kafka
    3 chown -R kafka:kafka /usr/local/kafka_2.9.2-0.8.1.1 /usr/local/kafka

    并修改配置文件/usr/local/kafka/config/server.properties内容如下所示:

    1 broker.id=1  # 在h1修改
    2  
    3 broker.id=2  # 在h2修改

    因为Kafka集群需要保证各个Broker的id在整个集群中必须唯一,需要调整这个配置项的值(如果在单机上,可以通过建立多个Broker进程来模拟分布式的Kafka集群,也需要Broker的id唯一,还需要修改一些配置目录的信息)。
    在集群中的h1、h2、h3这三个节点上分别启动Kafka,分别执行如下命令:

    1 bin/kafka-server-start.sh /usr/local/kafka/config/server.properties &

    可以通过查看日志,或者检查进程状态,保证Kafka集群启动成功。
    我们创建一个名称为my-replicated-topic5的Topic,5个分区,并且复制因子为3,执行如下命令:

    1 bin/kafka-topics.sh --create --zookeeper h1:2181,h2:2181,h3:2181/kafka --replication-factor 3 --partitions 5 --topic my-replicated-topic5

    查看创建的Topic,执行如下命令:

    1 bin/kafka-topics.sh --describe --zookeeper h1:2181,h2:2181,h3:2181/kafka --topic my-replicated-topic5

    结果信息如下所示:

    1 Topic:my-replicated-topic5     PartitionCount:5     ReplicationFactor:3     Configs:
    2      Topic: my-replicated-topic5     Partition: 0     Leader: 0     Replicas: 0,2,1     Isr: 0,2,1
    3      Topic: my-replicated-topic5     Partition: 1     Leader: 0     Replicas: 1,0,2     Isr: 0,2,1
    4      Topic: my-replicated-topic5     Partition: 2     Leader: 2     Replicas: 2,1,0     Isr: 2,0,1
    5      Topic: my-replicated-topic5     Partition: 3     Leader: 0     Replicas: 0,1,2     Isr: 0,2,1
    6      Topic: my-replicated-topic5     Partition: 4     Leader: 2     Replicas: 1,2,0     Isr: 2,0,1

    上面Leader、Replicas、Isr的含义如下:

    1 Partition: 分区
    2 Leader   : 负责读写指定分区的节点
    3 Replicas : 复制该分区log的节点列表
    4 Isr      : "in-sync" replicas,当前活跃的副本列表(是一个子集),并且可能成为Leader

    我们可以通过Kafka自带的bin/kafka-console-producer.sh和bin/kafka-console-consumer.sh脚本,来验证演示如果发布消息、消费消息。
    在一个终端,启动Producer,并向我们上面创建的名称为my-replicated-topic5的Topic中生产消息,执行如下脚本:

    1 bin/kafka-console-producer.sh --broker-list h1:9092,h2:9092,h3:9092 --topic my-replicated-topic5

    在另一个终端,启动Consumer,并订阅我们上面创建的名称为my-replicated-topic5的Topic中生产的消息,执行如下脚本:

    1 bin/kafka-console-consumer.sh --zookeeper h1:2181,h2:2181,h3:2181/kafka --from-beginning --topic my-replicated-topic5

    可以在Producer终端上输入字符串消息行,然后回车,就可以在Consumer终端上看到消费者消费的消息内容。
    也可以参考Kafka的Producer和Consumer的Java API,通过API编码的方式来实现消息生产和消费的处理逻辑。

    Storm安装配置

    Storm集群也依赖Zookeeper集群,要保证Zookeeper集群正常运行。Storm的安装配置比较简单,我们仍然使用下面3台机器搭建:

    1 192.168.4.142   h1
    2 192.168.4.143   h2
    3 192.168.4.144   h3

    首先,在h1节点上,执行如下命令安装:

    1 cd /usr/local/
    3 tar xvzf apache-storm-0.9.2-incubating.tar.gz
    4 ln -s /usr/local/apache-storm-0.9.2-incubating /usr/local/storm
    5 chown -R storm:storm /usr/local/apache-storm-0.9.2-incubating /usr/local/storm

    然后,修改配置文件conf/storm.yaml,内容如下所示:

    01 storm.zookeeper.servers:
    02      - "h1"
    03      - "h2"
    04      - "h3"
    05 storm.zookeeper.port: 2181
    06 #
    07 nimbus.host: "h1"
    08  
    09 supervisor.slots.ports:
    10     - 6700
    11     - 6701
    12     - 6702
    13     - 6703
    14  
    15 storm.local.dir: "/tmp/storm"

    将配置好的安装文件,分发到其他节点上:

    1 scp -r /usr/local/apache-storm-0.9.2-incubating/ h2:/usr/local/
    2 scp -r /usr/local/apache-storm-0.9.2-incubating/ h3:/usr/local/

    最后,在h2、h3节点上配置,执行如下命令:

    1 cd /usr/local/
    2 ln -s /usr/local/apache-storm-0.9.2-incubating /usr/local/storm
    3 chown -R storm:storm /usr/local/apache-storm-0.9.2-incubating /usr/local/storm

    Storm集群的主节点为Nimbus,从节点为Supervisor,我们需要在h1上启动Nimbus服务,在从节点h2、h3上启动Supervisor服务:

    1 bin/storm nimbus &
    2 bin/storm supervisor &

    为了方便监控,可以启动Storm UI,可以从Web页面上监控Storm Topology的运行状态,例如在h2上启动:

    1 bin/storm ui &

    这样可以通过访问http://h2:8080/来查看Topology的运行状况。

    整合Kafka+Storm

    消息通过各种方式进入到Kafka消息中间件,比如可以通过使用Flume来收集日志数据,然后在Kafka中路由暂存,然后再由实时计算程序Storm做实时分析,这时我们就需要将在Storm的Spout中读取Kafka中的消息,然后交由具体的Spot组件去分析处理。实际上,apache-storm-0.9.2-incubating这个版本的Storm已经自带了一个集成Kafka的外部插件程序storm-kafka,可以直接使用,例如我使用的Maven依赖配置,如下所示:

    01 <dependency>
    02      <groupId>org.apache.storm</groupId>
    03      <artifactId>storm-core</artifactId>
    04      <version>0.9.2-incubating</version>
    05      <scope>provided</scope>
    06 </dependency>
    07 <dependency>
    08      <groupId>org.apache.storm</groupId>
    09      <artifactId>storm-kafka</artifactId>
    10      <version>0.9.2-incubating</version>
    11 </dependency>
    12 <dependency>
    13      <groupId>org.apache.kafka</groupId>
    14      <artifactId>kafka_2.9.2</artifactId>
    15      <version>0.8.1.1</version>
    16      <exclusions>
    17           <exclusion>
    18                <groupId>org.apache.zookeeper</groupId>
    19                <artifactId>zookeeper</artifactId>
    20           </exclusion>
    21           <exclusion>
    22                <groupId>log4j</groupId>
    23                <artifactId>log4j</artifactId>
    24           </exclusion>
    25      </exclusions>
    26 </dependency>

    下面,我们开发了一个简单WordCount示例程序,从Kafka读取订阅的消息行,通过空格拆分出单个单词,然后再做词频统计计算,实现的Topology的代码,如下所示:

    001 package org.shirdrn.storm.examples;
    002  
    003 import java.util.Arrays;
    004 import java.util.HashMap;
    005 import java.util.Iterator;
    006 import java.util.Map;
    007 import java.util.Map.Entry;
    008 import java.util.concurrent.atomic.AtomicInteger;
    009  
    010 import org.apache.commons.logging.Log;
    011 import org.apache.commons.logging.LogFactory;
    012  
    013 import storm.kafka.BrokerHosts;
    014 import storm.kafka.KafkaSpout;
    015 import storm.kafka.SpoutConfig;
    016 import storm.kafka.StringScheme;
    017 import storm.kafka.ZkHosts;
    018 import backtype.storm.Config;
    019 import backtype.storm.LocalCluster;
    020 import backtype.storm.StormSubmitter;
    021 import backtype.storm.generated.AlreadyAliveException;
    022 import backtype.storm.generated.InvalidTopologyException;
    023 import backtype.storm.spout.SchemeAsMultiScheme;
    024 import backtype.storm.task.OutputCollector;
    025 import backtype.storm.task.TopologyContext;
    026 import backtype.storm.topology.OutputFieldsDeclarer;
    027 import backtype.storm.topology.TopologyBuilder;
    028 import backtype.storm.topology.base.BaseRichBolt;
    029 import backtype.storm.tuple.Fields;
    030 import backtype.storm.tuple.Tuple;
    031 import backtype.storm.tuple.Values;
    032  
    033 public class MyKafkaTopology {
    034  
    035      public static class KafkaWordSplitter extends BaseRichBolt {
    036  
    037           private static final Log LOG = LogFactory.getLog(KafkaWordSplitter.class);
    038           private static final long serialVersionUID = 886149197481637894L;
    039           private OutputCollector collector;
    040           
    041           @Override
    042           public void prepare(Map stormConf, TopologyContext context,
    043                     OutputCollector collector) {
    044                this.collector = collector;              
    045           }
    046  
    047           @Override
    048           public void execute(Tuple input) {
    049                String line = input.getString(0);
    050                LOG.info("RECV[kafka -> splitter] " + line);
    051                String[] words = line.split("\s+");
    052                for(String word : words) {
    053                     LOG.info("EMIT[splitter -> counter] " + word);
    054                     collector.emit(input, new Values(word, 1));
    055                }
    056                collector.ack(input);
    057           }
    058  
    059           @Override
    060           public void declareOutputFields(OutputFieldsDeclarer declarer) {
    061                declarer.declare(new Fields("word""count"));         
    062           }
    063           
    064      }
    065      
    066      public static class WordCounter extends BaseRichBolt {
    067  
    068           private static final Log LOG = LogFactory.getLog(WordCounter.class);
    069           private static final long serialVersionUID = 886149197481637894L;
    070           private OutputCollector collector;
    071           private Map<String, AtomicInteger> counterMap;
    072           
    073           @Override
    074           public void prepare(Map stormConf, TopologyContext context,
    075                     OutputCollector collector) {
    076                this.collector = collector;    
    077                this.counterMap = new HashMap<String, AtomicInteger>();
    078           }
    079  
    080           @Override
    081           public void execute(Tuple input) {
    082                String word = input.getString(0);
    083                int count = input.getInteger(1);
    084                LOG.info("RECV[splitter -> counter] " + word + " : " + count);
    085                AtomicInteger ai = this.counterMap.get(word);
    086                if(ai == null) {
    087                     ai = new AtomicInteger();
    088                     this.counterMap.put(word, ai);
    089                }
    090                ai.addAndGet(count);
    091                collector.ack(input);
    092                LOG.info("CHECK statistics map: " this.counterMap);
    093           }
    094  
    095           @Override
    096           public void cleanup() {
    097                LOG.info("The final result:");
    098                Iterator<Entry<String, AtomicInteger>> iter = this.counterMap.entrySet().iterator();
    099                while(iter.hasNext()) {
    100                     Entry<String, AtomicInteger> entry = iter.next();
    101                     LOG.info(entry.getKey() + " : " + entry.getValue().get());
    102                }
    103                
    104           }
    105  
    106           @Override
    107           public void declareOutputFields(OutputFieldsDeclarer declarer) {
    108                declarer.declare(new Fields("word""count"));         
    109           }
    110      }
    111      
    112      public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, InterruptedException {
    113           String zks = "h1:2181,h2:2181,h3:2181";
    114           String topic = "my-replicated-topic5";
    115           String zkRoot = "/storm"// default zookeeper root configuration for storm
    116           String id = "word";
    117           
    118           BrokerHosts brokerHosts = new ZkHosts(zks);
    119           SpoutConfig spoutConf = new SpoutConfig(brokerHosts, topic, zkRoot, id);
    120           spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
    121           spoutConf.forceFromStart = false;
    122           spoutConf.zkServers = Arrays.asList(new String[] {"h1""h2""h3"});
    123           spoutConf.zkPort = 2181;
    124           
    125           TopologyBuilder builder = new TopologyBuilder();
    126           builder.setSpout("kafka-reader"new KafkaSpout(spoutConf), 5); // Kafka我们创建了一个5分区的Topic,这里并行度设置为5
    127           builder.setBolt("word-splitter"new KafkaWordSplitter(), 2).shuffleGrouping("kafka-reader");
    128           builder.setBolt("word-counter"new WordCounter()).fieldsGrouping("word-splitter"newFields("word"));
    129           
    130           Config conf = new Config();
    131           
    132           String name = MyKafkaTopology.class.getSimpleName();
    133           if (args != null && args.length > 0) {
    134                // Nimbus host name passed from command line
    135                conf.put(Config.NIMBUS_HOST, args[0]);
    136                conf.setNumWorkers(3);
    137                StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology());
    138           else {
    139                conf.setMaxTaskParallelism(3);
    140                LocalCluster cluster = new LocalCluster();
    141                cluster.submitTopology(name, conf, builder.createTopology());
    142                Thread.sleep(60000);
    143                cluster.shutdown();
    144           }
    145      }
    146 }

    上面程序,在本地调试(使用LocalCluster)不需要输入任何参数,提交到实际集群中运行时,需要传递一个参数,该参数为Nimbus的主机名称。
    通过Maven构建,生成一个包含依赖的single jar文件(不要把Storm的依赖包添加进去),例如storm-examples-0.0.1-SNAPSHOT.jar,在提交Topology程序到Storm集群之前,因为用到了Kafka,需要拷贝一下依赖jar文件到Storm集群中的lib目录下面:

    1 cp /usr/local/kafka/libs/kafka_2.9.2-0.8.1.1.jar /usr/local/storm/lib/
    2 cp /usr/local/kafka/libs/scala-library-2.9.2.jar /usr/local/storm/lib/
    3 cp /usr/local/kafka/libs/metrics-core-2.2.0.jar /usr/local/storm/lib/
    4 cp /usr/local/kafka/libs/snappy-java-1.0.5.jar /usr/local/storm/lib/
    5 cp /usr/local/kafka/libs/zkclient-0.3.jar /usr/local/storm/lib/
    6 cp /usr/local/kafka/libs/log4j-1.2.15.jar /usr/local/storm/lib/
    7 cp /usr/local/kafka/libs/slf4j-api-1.7.2.jar /usr/local/storm/lib/
    8 cp /usr/local/kafka/libs/jopt-simple-3.2.jar /usr/local/storm/lib/

    然后,就可以提交我们开发的Topology程序了:

    1 bin/storm jar /home/storm/storm-examples-0.0.1-SNAPSHOT.jar org.shirdrn.storm.examples.MyKafkaTopology h1

    可以通过查看日志文件(logs/目录下)或者Storm UI来监控Topology的运行状况。如果程序没有错误,可以使用前面我们使用的Kafka Producer来生成消息,就能看到我们开发的Storm Topology能够实时接收到并进行处理。
    上面Topology实现代码中,有一个很关键的配置对象SpoutConfig,配置属性如下所示:

    1 spoutConf.forceFromStart = false;

    该配置是指,如果该Topology因故障停止处理,下次正常运行时是否从Spout对应数据源Kafka中的该订阅Topic的起始位置开始读取,如果forceFromStart=true,则之前处理过的Tuple还要重新处理一遍,否则会从上次处理的位置继续处理,保证Kafka中的Topic数据不被重复处理,是在数据源的位置进行状态记录。

    整合Storm+HDFS

    Storm实时计算集群从Kafka消息中间件中消费消息,有实时处理需求的可以走实时处理程序,还有需要进行离线分析的需求,如写入到HDFS进行分析。下面实现了一个Topology,代码如下所示:

    001 package org.shirdrn.storm.examples;
    002  
    003 import java.text.DateFormat;
    004 import java.text.SimpleDateFormat;
    005 import java.util.Date;
    006 import java.util.Map;
    007 import java.util.Random;
    008  
    009 import org.apache.commons.logging.Log;
    010 import org.apache.commons.logging.LogFactory;
    011 import org.apache.storm.hdfs.bolt.HdfsBolt;
    012 import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
    013 import org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat;
    014 import org.apache.storm.hdfs.bolt.format.FileNameFormat;
    015 import org.apache.storm.hdfs.bolt.format.RecordFormat;
    016 import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
    017 import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy;
    018 import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy.TimeUnit;
    019 import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
    020 import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
    021  
    022 import backtype.storm.Config;
    023 import backtype.storm.LocalCluster;
    024 import backtype.storm.StormSubmitter;
    025 import backtype.storm.generated.AlreadyAliveException;
    026 import backtype.storm.generated.InvalidTopologyException;
    027 import backtype.storm.spout.SpoutOutputCollector;
    028 import backtype.storm.task.TopologyContext;
    029 import backtype.storm.topology.OutputFieldsDeclarer;
    030 import backtype.storm.topology.TopologyBuilder;
    031 import backtype.storm.topology.base.BaseRichSpout;
    032 import backtype.storm.tuple.Fields;
    033 import backtype.storm.tuple.Values;
    034 import backtype.storm.utils.Utils;
    035  
    036 public class StormToHDFSTopology {
    037  
    038      public static class EventSpout extends BaseRichSpout {
    039  
    040           private static final Log LOG = LogFactory.getLog(EventSpout.class);
    041           private static final long serialVersionUID = 886149197481637894L;
    042           private SpoutOutputCollector collector;
    043           private Random rand;
    044           private String[] records;
    045           
    046           @Override
    047           public void open(Map conf, TopologyContext context,
    048                     SpoutOutputCollector collector) {
    049                this.collector = collector;    
    050                rand = new Random();
    051                records = new String[] {
    052                          "10001     ef2da82d4c8b49c44199655dc14f39f6     4.2.1     HUAWEI G610-U00     HUAWEI     2     70:72:3c:73:8b:22     2014-10-13 12:36:35",
    053                          "10001     ffb52739a29348a67952e47c12da54ef     4.3     GT-I9300     samsung     2     50:CC:F8:E4:22:E2     2014-10-13 12:36:02",
    054                          "10001     ef2da82d4c8b49c44199655dc14f39f6     4.2.1     HUAWEI G610-U00     HUAWEI     2     70:72:3c:73:8b:22     2014-10-13 12:36:35"
    055                };
    056           }
    057  
    058  
    059           @Override
    060           public void nextTuple() {
    061                Utils.sleep(1000);
    062                DateFormat df = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss");
    063                Date d = new Date(System.currentTimeMillis());
    064                String minute = df.format(d);
    065                String record = records[rand.nextInt(records.length)];
    066                LOG.info("EMIT[spout -> hdfs] " + minute + " : " + record);
    067                collector.emit(new Values(minute, record));
    068           }
    069  
    070           @Override
    071           public void declareOutputFields(OutputFieldsDeclarer declarer) {
    072                declarer.declare(new Fields("minute""record"));         
    073           }
    074  
    075  
    076      }
    077      
    078      public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, InterruptedException {
    079           // use "|" instead of "," for field delimiter
    080           RecordFormat format = new DelimitedRecordFormat()
    081                   .withFieldDelimiter(" : ");
    082  
    083           // sync the filesystem after every 1k tuples
    084           SyncPolicy syncPolicy = new CountSyncPolicy(1000);
    085  
    086           // rotate files 
    087           FileRotationPolicy rotationPolicy = new TimedRotationPolicy(1.0f, TimeUnit.MINUTES);
    088  
    089           FileNameFormat fileNameFormat = new DefaultFileNameFormat()
    090                   .withPath("/storm/").withPrefix("app_").withExtension(".log");
    091  
    092           HdfsBolt hdfsBolt = new HdfsBolt()
    093                   .withFsUrl("hdfs://h1:8020")
    094                   .withFileNameFormat(fileNameFormat)
    095                   .withRecordFormat(format)
    096                   .withRotationPolicy(rotationPolicy)
    097                   .withSyncPolicy(syncPolicy);
    098           
    099           TopologyBuilder builder = new TopologyBuilder();
    100           builder.setSpout("event-spout"new EventSpout(), 3);
    101           builder.setBolt("hdfs-bolt", hdfsBolt, 2).fieldsGrouping("event-spout"new Fields("minute"));
    102           
    103           Config conf = new Config();
    104           
    105           String name = StormToHDFSTopology.class.getSimpleName();
    106           if (args != null && args.length > 0) {
    107                conf.put(Config.NIMBUS_HOST, args[0]);
    108                conf.setNumWorkers(3);
    109                StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology());
    110           else {
    111                conf.setMaxTaskParallelism(3);
    112                LocalCluster cluster = new LocalCluster();
    113                cluster.submitTopology(name, conf, builder.createTopology());
    114                Thread.sleep(60000);
    115                cluster.shutdown();
    116           }
    117      }
    118  
    119 }

    上面的处理逻辑,可以对HdfsBolt进行更加详细的配置,如FileNameFormat、SyncPolicy、FileRotationPolicy(可以设置在满足什么条件下,切出一个新的日志,如可以指定多长时间切出一个新的日志文件,可以指定一个日志文件大小达到设置值后,再写一个新日志文件),更多设置可以参考storm-hdfs,。
    上面代码在打包的时候,需要注意,使用storm-starter自带的Maven打包配置,可能在将Topology部署运行的时候,会报错,可以使用maven-shade-plugin这个插件,如下配置所示:

    01 <plugin>
    02     <groupId>org.apache.maven.plugins</groupId>
    03     <artifactId>maven-shade-plugin</artifactId>
    04     <version>1.4</version>
    05     <configuration>
    06         <createDependencyReducedPom>true</createDependencyReducedPom>
    07     </configuration>
    08     <executions>
    09         <execution>
    10             <phase>package</phase>
    11             <goals>
    12                 <goal>shade</goal>
    13             </goals>
    14             <configuration>
    15                 <transformers>
    16                     <transformer
    17                             implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
    18                     <transformer
    19                             implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
    20                         <mainClass></mainClass>
    21                     </transformer>
    22                 </transformers>
    23             </configuration>
    24         </execution>
    25     </executions>
    26 </plugin>

    整合Kafka+Storm+HDFS

    上面分别对整合Kafka+Storm和Storm+HDFS做了实践,可以将后者的Spout改成前者的Spout,从Kafka中消费消息,在Storm中可以做简单处理,然后将数据写入HDFS,最后可以在Hadoop平台上对数据进行离线分析处理。下面,写了一个简单的例子,从Kafka消费消息,然后经由Storm处理,写入到HDFS存储,代码如下所示:

    001 package org.shirdrn.storm.examples;
    002  
    003 import java.util.Arrays;
    004 import java.util.Map;
    005  
    006 import org.apache.commons.logging.Log;
    007 import org.apache.commons.logging.LogFactory;
    008 import org.apache.storm.hdfs.bolt.HdfsBolt;
    009 import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
    010 import org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat;
    011 import org.apache.storm.hdfs.bolt.format.FileNameFormat;
    012 import org.apache.storm.hdfs.bolt.format.RecordFormat;
    013 import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
    014 import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy;
    015 import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy.TimeUnit;
    016 import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
    017 import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
    018  
    019 import storm.kafka.BrokerHosts;
    020 import storm.kafka.KafkaSpout;
    021 import storm.kafka.SpoutConfig;
    022 import storm.kafka.StringScheme;
    023 import storm.kafka.ZkHosts;
    024 import backtype.storm.Config;
    025 import backtype.storm.LocalCluster;
    026 import backtype.storm.StormSubmitter;
    027 import backtype.storm.generated.AlreadyAliveException;
    028 import backtype.storm.generated.InvalidTopologyException;
    029 import backtype.storm.spout.SchemeAsMultiScheme;
    030 import backtype.storm.task.OutputCollector;
    031 import backtype.storm.task.TopologyContext;
    032 import backtype.storm.topology.OutputFieldsDeclarer;
    033 import backtype.storm.topology.TopologyBuilder;
    034 import backtype.storm.topology.base.BaseRichBolt;
    035 import backtype.storm.tuple.Fields;
    036 import backtype.storm.tuple.Tuple;
    037 import backtype.storm.tuple.Values;
    038  
    039 public class DistributeWordTopology {
    040      
    041      public static class KafkaWordToUpperCase extends BaseRichBolt {
    042  
    043           private static final Log LOG = LogFactory.getLog(KafkaWordToUpperCase.class);
    044           private static final long serialVersionUID = -5207232012035109026L;
    045           private OutputCollector collector;
    046           
    047           @Override
    048           public void prepare(Map stormConf, TopologyContext context,
    049                     OutputCollector collector) {
    050                this.collector = collector;              
    051           }
    052  
    053           @Override
    054           public void execute(Tuple input) {
    055                String line = input.getString(0).trim();
    056                LOG.info("RECV[kafka -> splitter] " + line);
    057                if(!line.isEmpty()) {
    058                     String upperLine = line.toUpperCase();
    059                     LOG.info("EMIT[splitter -> counter] " + upperLine);
    060                     collector.emit(input, new Values(upperLine, upperLine.length()));
    061                }
    062                collector.ack(input);
    063           }
    064  
    065           @Override
    066           public void declareOutputFields(OutputFieldsDeclarer declarer) {
    067                declarer.declare(new Fields("line""len"));         
    068           }
    069           
    070      }
    071      
    072      public static class RealtimeBolt extends BaseRichBolt {
    073  
    074           private static final Log LOG = LogFactory.getLog(KafkaWordToUpperCase.class);
    075           private static final long serialVersionUID = -4115132557403913367L;
    076           private OutputCollector collector;
    077           
    078           @Override
    079           public void prepare(Map stormConf, TopologyContext context,
    080                     OutputCollector collector) {
    081                this.collector = collector;              
    082           }
    083  
    084           @Override
    085           public void execute(Tuple input) {
    086                String line = input.getString(0).trim();
    087                LOG.info("REALTIME: " + line);
    088                collector.ack(input);
    089           }
    090  
    091           @Override
    092           public void declareOutputFields(OutputFieldsDeclarer declarer) {
    093                
    094           }
    095  
    096      }
    097  
    098      public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, InterruptedException {
    099  
    100           // Configure Kafka
    101           String zks = "h1:2181,h2:2181,h3:2181";
    102           String topic = "my-replicated-topic5";
    103           String zkRoot = "/storm"// default zookeeper root configuration for storm
    104           String id = "word";
    105           BrokerHosts brokerHosts = new ZkHosts(zks);
    106           SpoutConfig spoutConf = new SpoutConfig(brokerHosts, topic, zkRoot, id);
    107           spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
    108           spoutConf.forceFromStart = false;
    109           spoutConf.zkServers = Arrays.asList(new String[] {"h1""h2""h3"});
    110           spoutConf.zkPort = 2181;
    111           
    112           // Configure HDFS bolt
    113           RecordFormat format = new DelimitedRecordFormat()
    114                   .withFieldDelimiter(" "); // use " " instead of "," for field delimiter
    115           SyncPolicy syncPolicy = new CountSyncPolicy(1000); // sync the filesystem after every 1k tuples
    116           FileRotationPolicy rotationPolicy = new TimedRotationPolicy(1.0f, TimeUnit.MINUTES); // rotate files
    117           FileNameFormat fileNameFormat = new DefaultFileNameFormat()
    118                   .withPath("/storm/").withPrefix("app_").withExtension(".log"); // set file name format
    119           HdfsBolt hdfsBolt = new HdfsBolt()
    120                   .withFsUrl("hdfs://h1:8020")
    121                   .withFileNameFormat(fileNameFormat)
    122                   .withRecordFormat(format)
    123                   .withRotationPolicy(rotationPolicy)
    124                   .withSyncPolicy(syncPolicy);
    125           
    126           // configure & build topology
    127           TopologyBuilder builder = new TopologyBuilder();
    128           builder.setSpout("kafka-reader"new KafkaSpout(spoutConf), 5);
    129           builder.setBolt("to-upper"new KafkaWordToUpperCase(), 3).shuffleGrouping("kafka-reader");
    130           builder.setBolt("hdfs-bolt", hdfsBolt, 2).shuffleGrouping("to-upper");
    131           builder.setBolt("realtime"new RealtimeBolt(), 2).shuffleGrouping("to-upper");
    132           
    133           // submit topology
    134           Config conf = new Config();
    135           String name = DistributeWordTopology.class.getSimpleName();
    136           if (args != null && args.length > 0) {
    137                String nimbus = args[0];
    138                conf.put(Config.NIMBUS_HOST, nimbus);
    139                conf.setNumWorkers(3);
    140                StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology());
    141           else {
    142                conf.setMaxTaskParallelism(3);
    143                LocalCluster cluster = new LocalCluster();
    144                cluster.submitTopology(name, conf, builder.createTopology());
    145                Thread.sleep(60000);
    146                cluster.shutdown();
    147           }
    148      }
    149  
    150 }

    上面代码中,名称为to-upper的Bolt将接收到的字符串行转换成大写以后,会将处理过的数据向后面的hdfs-bolt、realtime这两个Bolt各发一份拷贝,然后由这两个Bolt分别根据实际需要(实时/离线)单独处理。
    打包后,在Storm集群上部署并运行这个Topology:

    1 bin/storm jar ~/storm-examples-0.0.1-SNAPSHOT.jar org.shirdrn.storm.examples.DistributeWordTopology h1

    可以通过Storm UI查看Topology运行情况,可以查看HDFS上生成的数据。

  • 相关阅读:
    软件测试原则
    java知识点
    jquery取值
    Javaweb命名规则
    @ResponseBody
    jquery ajax 方法及各参数详解
    @RequestMapping用法详解
    eclipse+android+opencv环境搭建的步骤
    Java中的内部类(回调)
    OpenCV直方图(直方图、直方图均衡,直方图匹配,原理、实现)
  • 原文地址:https://www.cnblogs.com/wanghuaijun/p/5603363.html
Copyright © 2011-2022 走看看