kafka+storm+hbase实现计算WordCount。
(1)表名:wc
(2)列族:result
(3)RowKey:word
(4)Field:count
1、解决:
(1)第一步:首先准备kafka、storm和hbase相关jar包。依赖如下:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com</groupId> <artifactId>kafkaSpout</artifactId> <version>0.0.1-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>0.9.3</version> </dependency> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-kafka</artifactId> <version>0.9.3</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.8.1.1</version> <exclusions> <exclusion> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> </exclusion> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>0.99.2</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> <exclusion> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>2.5.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>2.5.0</version> <exclusions> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>jdk.tools</groupId> <artifactId>jdk.tools</artifactId> <version>1.7</version> <scope>system</scope> <systemPath>C:Program FilesJavajdk1.7.0_51lib ools.jar</systemPath> </dependency> </dependencies> <repositories> <repository> <id>central</id> <url>http://repo1.maven.org/maven2/</url> <snapshots> <enabled>false</enabled> </snapshots> <releases> <enabled>true</enabled> </releases> </repository> <repository> <id>clojars</id> <url>https://clojars.org/repo/</url> <snapshots> <enabled>true</enabled> </snapshots> <releases> <enabled>true</enabled> </releases> </repository> <repository> <id>scala-tools</id> <url>http://scala-tools.org/repo-releases</url> <snapshots> <enabled>true</enabled> </snapshots> <releases> <enabled>true</enabled> </releases> </repository> <repository> <id>conjars</id> <url>http://conjars.org/repo/</url> <snapshots> <enabled>true</enabled> </snapshots> <releases> <enabled>true</enabled> </releases> </repository> </repositories> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.1</version> <configuration> <source>1.6</source> <target>1.6</target> <encoding>UTF-8</encoding> <showDeprecation>true</showDeprecation> <showWarnings>true</showWarnings> </configuration> </plugin> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass></mainClass> </manifest> </archive> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
(2)将kafka发来的数据通过levelSplit的bolt进行分割处理,然后再发送到下一个Bolt中。代码如下:
package com.kafka.spout; import java.util.regex.Matcher; import java.util.regex.Pattern; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; public class LevelSplit extends BaseBasicBolt { public void execute(Tuple tuple, BasicOutputCollector collector) { String words = tuple.getString(0).toString();//the cow jumped over the moon String []va=words.split(" "); for(String word : va) { collector.emit(new Values(word)); } } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } }
(3)将levelSplit的Bolt发来的数据到levelCount的Bolt中进行计数处理,然后发送到hbase(Bolt)中。代码如下:
package com.kafka.spout; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; public class LevelCount extends BaseBasicBolt { Map<String, Integer> counts = new HashMap<String, Integer>(); public void execute(Tuple tuple, BasicOutputCollector collector) { // TODO Auto-generated method stub String word = tuple.getString(0); Integer count = counts.get(word); if (count == null) count = 0; count++; counts.put(word, count); for (Entry<String, Integer> e : counts.entrySet()) { //sum += e.getValue(); System.out.println(e.getKey() + "----------->" +e.getValue()); } collector.emit(new Values(word, count)); } public void declareOutputFields(OutputFieldsDeclarer declarer) { // TODO Auto-generated method stub declarer.declare(new Fields("word", "count")); } }
(4)准备连接kafka和hbase条件以及设置整个拓扑结构并且提交拓扑。代码如下:
package com.kafka.spout; import java.util.HashMap; import java.util.Map; import com.google.common.collect.Maps; //import org.apache.storm.guava.collect.Maps; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.generated.AlreadyAliveException; import backtype.storm.generated.InvalidTopologyException; import backtype.storm.spout.SchemeAsMultiScheme; import backtype.storm.topology.TopologyBuilder; import backtype.storm.tuple.Fields; import backtype.storm.utils.Utils; import storm.kafka.BrokerHosts; import storm.kafka.KafkaSpout; import storm.kafka.SpoutConfig; import storm.kafka.ZkHosts; public class StormKafkaTopo { public static void main(String[] args) { BrokerHosts brokerHosts = new ZkHosts("zeb,yjd,ylh"); SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, "yjd", "/storm", "kafkaspout"); Config conf = new Config(); spoutConfig.scheme = new SchemeAsMultiScheme(new MessageScheme()); SimpleHBaseMapper mapper = new SimpleHBaseMapper(); mapper.withColumnFamily("result"); mapper.withColumnFields(new Fields("count")); mapper.withRowKeyField("word"); Map<String, Object> map = Maps.newTreeMap(); map.put("hbase.rootdir", "hdfs://zeb:9000/hbase"); map.put("hbase.zookeeper.quorum", "zeb:2181,yjd:2181,ylh:2181"); // hbase-bolt HBaseBolt hBaseBolt = new HBaseBolt("wc", mapper).withConfigKey("hbase.conf"); conf.setDebug(true); conf.put("hbase.conf", map); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new KafkaSpout(spoutConfig)); builder.setBolt("split", new LevelSplit(), 1).shuffleGrouping("spout"); builder.setBolt("count", new LevelCount(), 1).fieldsGrouping("split", new Fields("word")); builder.setBolt("hbase", hBaseBolt, 1).shuffleGrouping("count"); if(args != null && args.length > 0) { //提交到集群运行 try { StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); } catch (AlreadyAliveException e) { e.printStackTrace(); } catch (InvalidTopologyException e) { e.printStackTrace(); } } else { //本地模式运行 LocalCluster cluster = new LocalCluster(); cluster.submitTopology("Topotest1121", conf, builder.createTopology()); Utils.sleep(1000000); cluster.killTopology("Topotest1121"); cluster.shutdown(); } } }
(5)在kafka端用控制台生产数据,如下:
2、运行结果截图:
3、遇到的问题:
(1)把所有的工作做好后,提交了拓扑,运行代码。发生了错误1,如下:
解决:原来是因为依赖版本要统一的问题,最后将版本修改一致后,成功解决。
(2)发生了错误2,如下:
解决:原来是忘记开hbase中的HMaster和HRegionServer。启动后问题成功解决。