消息通过各种方式进入到Kafka消息中间件,比如可以通过使用Flume来收集日志数据,然后在Kafka中路由暂存,然后再由实时计算程序Storm做实时分析,最后将结果保存在HDFS中,这时我们就需要将在Storm的Spout中读取Kafka中的消息,然后交由具体的Spot组件去分析处理。下面开发一个简单WordCount示例程序,从Kafka读取订阅的消息行,通过空格拆分出单个单词,然后再做词频统计计算,最后将结果保存至HDFS。
1. kafka程序
package com.dxss.storm; import kafka.producer.KeyedMessage; import kafka.javaapi.producer.Producer; import kafka.producer.ProducerConfig; import java.util.Properties; /** * Created by hadoop on 2017/7/29. */ public class KakfaProceduer { public static void main(String[] args) throws InterruptedException{ Properties properties = new Properties(); properties.put("zookeeper.connect","12.13.41.41:2182"); properties.put("metadata.broker.list","12.13.41.41:9092"); properties.put("serializer.class","kafka.serializer.StringEncoder"); ProducerConfig producerConfig = new ProducerConfig(properties); Producer producer = new Producer<String, String>(producerConfig); KeyedMessage<String,String> keyedMessage = new KeyedMessage<String, String>("sunfei","I am a chinese"); while(true){ producer.send(keyedMessage); } } }
2. Storm--spout
package com.dxss.stormkafka; import java.util.Map; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; import scala.collection.mutable.ArrayBuilder.ofBoolean; public class PrinterBolt extends BaseRichBolt{ private OutputCollector collector; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void execute(Tuple input) { // System.out.println("kafak所发布的消息为"+input.getString(0)); String value = input.getString(0); String[] strings = value.split(" "); for(String var : strings){ collector.emit(new Values(var,1)); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "num")); } }
3. Storm-bolt
package com.dxss.stormkafka; import java.util.HashMap; import java.util.Map; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; public class CountBolt extends BaseRichBolt{ private OutputCollector collector; private Map<String,Integer> map = new HashMap(); @Override public void execute(Tuple arg0) { // TODO Auto-generated method stub String word = arg0.getString(0); Integer num = arg0.getInteger(1); if (map.containsKey(word)){ Integer number = map.get(word); map.put(word,number+1); }else{ map.put(word, num); } String result = word + ":" + map.get(word); this.collector.emit(new Values(result)); } @Override public void prepare(Map arg0, TopologyContext arg1, OutputCollector arg2) { this.collector = arg2; } @Override public void declareOutputFields(OutputFieldsDeclarer arg0) { arg0.declare(new Fields("result")); } }
4. Storm--Topology
package com.dxss.stormkafka; import org.apache.storm.hdfs.bolt.HdfsBolt; import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat; import org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat; import org.apache.storm.hdfs.bolt.format.FileNameFormat; import org.apache.storm.hdfs.bolt.format.RecordFormat; import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy; import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy; import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy; import org.apache.storm.hdfs.bolt.sync.SyncPolicy; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.generated.StormTopology; import backtype.storm.spout.SchemeAsMultiScheme; import backtype.storm.topology.TopologyBuilder; import backtype.storm.tuple.Fields; import storm.kafka.BrokerHosts; import storm.kafka.KafkaSpout; import storm.kafka.SpoutConfig; import storm.kafka.StringScheme; import storm.kafka.ZkHosts; public class StormKafka { public static StormTopology createTopology(){ BrokerHosts boBrokerHosts = new ZkHosts("12.13.41.41:2182"); String topic = "sunfei"; String zkRoot = "/sunfei"; String spoutId = "sunfei_storm"; SpoutConfig spoutConfig = new SpoutConfig(boBrokerHosts, topic, zkRoot, spoutId); spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); // 输出字段分隔符 RecordFormat format = new DelimitedRecordFormat().withFieldDelimiter(":"); // 每1000个tuple同步到hdfs一次 SyncPolicy syncPolicy = new CountSyncPolicy(1000); // 每个文件的大小为100M FileRotationPolicy policy = new FileSizeRotationPolicy(100.0f,FileSizeRotationPolicy.Units.MB); // 设置输出目录 FileNameFormat fileNameFormat = new DefaultFileNameFormat().withPath("/test/storm") .withPrefix("storm_").withExtension(".txt"); // 执行HDFS地址 HdfsBolt hdfsBolt = new HdfsBolt().withFsUrl("hdfs://12.13.41.43:8020") .withFileNameFormat(fileNameFormat) .withRecordFormat(format) .withRotationPolicy(policy) .withSyncPolicy(syncPolicy); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("words", new KafkaSpout(spoutConfig)); builder.setBolt("print", new PrinterBolt()).shuffleGrouping("words"); builder.setBolt("totalBolt", new CountBolt()).fieldsGrouping("print", new Fields("word")); builder.setBolt("hdfsBolt", hdfsBolt).shuffleGrouping("totalBolt"); return builder.createTopology(); } public static void main(String[] args) throws InterruptedException { Config config = new Config(); config.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS, 2000); StormTopology topology = StormKafka.createTopology(); config.setNumWorkers(1); config.setMaxTaskParallelism(2); //提交集群模式 // try { // StormSubmitter.submitTopology("mywordcount", config, topology); // } catch (AlreadyAliveException e) { // // TODO Auto-generated catch block // e.printStackTrace(); // } catch (InvalidTopologyException e) { // // TODO Auto-generated catch block // e.printStackTrace(); // } //本地运行模式 LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology("kafkastorm", config, topology); Thread.sleep(1000); } }
5. 项目pom.xml文件
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.kafka</groupId> <artifactId>StormDemo</artifactId> <version>0.0.1-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>0.9.3</version> <!--<scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-kafka</artifactId> <version>0.9.3</version> <!--<scope>provided</scope>--> </dependency> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>2.5</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.7.1</version> <exclusions> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.7.1</version> <exclusions> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-hdfs</artifactId> <version>0.10.0</version> <exclusions> <exclusion> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> </exclusion> <exclusion> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> </exclusion> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>jdk.tools</groupId> <artifactId>jdk.tools</artifactId> <version>1.8</version> <scope>system</scope> <systemPath>${JAVA_HOME}/lib/tools.jar</systemPath> </dependency> <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>2.5.0</version> </dependency> <!-- storm-kafka模块需要的依赖 --> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>2.5.0</version> <exclusions> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-io</artifactId> <version>1.3.2</version> </dependency> <!-- kafka --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.8.1.1</version> <exclusions> <exclusion> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> </exclusion> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> </exclusions> </dependency> </dependencies> <repositories> <repository> <id>central</id> <url>http://repo1.maven.org/maven2/</url> <snapshots> <enabled>false</enabled> </snapshots> <releases> <enabled>true</enabled> </releases> </repository> <repository> <id>clojars</id> <url>https://clojars.org/repo/</url> <snapshots> <enabled>true</enabled> </snapshots> <releases> <enabled>true</enabled> </releases> </repository> <repository> <id>scala-tools</id> <url>http://scala-tools.org/repo-releases</url> <snapshots> <enabled>true</enabled> </snapshots> <releases> <enabled>true</enabled> </releases> </repository> <repository> <id>conjars</id> <url>http://conjars.org/repo/</url> <snapshots> <enabled>true</enabled> </snapshots> <releases> <enabled>true</enabled> </releases> </repository> </repositories> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.1</version> <configuration> <source>1.6</source> <target>1.6</target> <encoding>UTF-8</encoding> <showDeprecation>true</showDeprecation> <showWarnings>true</showWarnings> </configuration> </plugin> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass></mainClass> </manifest> </archive> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>