zoukankan      html  css  js  c++  java
  • kafka+storm+hbase

    kafka+storm+hbase实现计算WordCount。

    (1)表名:wc

    (2)列族:result

    (3)RowKey:word

    (4)Field:count

    1、解决:

    1)第一步:首先准备kafkastormhbase相关jar包。依赖如下

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
      <groupId>com</groupId>
      <artifactId>kafkaSpout</artifactId>
      <version>0.0.1-SNAPSHOT</version>
      
    	<dependencies>
            <dependency>
                <groupId>org.apache.storm</groupId>
                <artifactId>storm-core</artifactId>
                <version>0.9.3</version>
            </dependency>
            <dependency>
                <groupId>org.apache.storm</groupId>
                <artifactId>storm-kafka</artifactId>
                <version>0.9.3</version>
            </dependency>
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka_2.10</artifactId>
                <version>0.8.1.1</version>
                <exclusions>
                    <exclusion>
                        <groupId>org.apache.zookeeper</groupId>
                        <artifactId>zookeeper</artifactId>
                    </exclusion>
                    <exclusion>
                        <groupId>log4j</groupId>
                        <artifactId>log4j</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
            <dependency>
                <groupId>org.apache.hbase</groupId>
                <artifactId>hbase-client</artifactId>
                <version>0.99.2</version>
                <exclusions>
                    <exclusion>
                        <groupId>org.slf4j</groupId>
                        <artifactId>slf4j-log4j12</artifactId>
                    </exclusion>
                    <exclusion>
                        <groupId>org.apache.zookeeper</groupId>
                        <artifactId>zookeeper</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
            
           <dependency>
    
             <groupId>com.google.protobuf</groupId>
    
             <artifactId>protobuf-java</artifactId>
    
             <version>2.5.0</version>
    
            </dependency>
    
            <dependency>
                <groupId>org.apache.curator</groupId>
                <artifactId>curator-framework</artifactId>
                <version>2.5.0</version>
                <exclusions>
                    <exclusion>
                        <groupId>log4j</groupId>
                        <artifactId>log4j</artifactId>
                    </exclusion>
                    <exclusion>
                        <groupId>org.slf4j</groupId>
                        <artifactId>slf4j-log4j12</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
    
    
    
    			
    			
    			
    			
    			 
            
               <dependency>
    			<groupId>jdk.tools</groupId>
    			<artifactId>jdk.tools</artifactId>
    			<version>1.7</version>
    			<scope>system</scope>
    			<systemPath>C:Program FilesJavajdk1.7.0_51lib	ools.jar</systemPath>
    		</dependency>     
            
        </dependencies>
    
    
        <repositories>
            <repository>
                <id>central</id>
                <url>http://repo1.maven.org/maven2/</url>
                <snapshots>
                    <enabled>false</enabled>
                </snapshots>
                <releases>
                    <enabled>true</enabled>
                </releases>
            </repository>
            <repository>
                <id>clojars</id>
                <url>https://clojars.org/repo/</url>
                <snapshots>
                    <enabled>true</enabled>
                </snapshots>
                <releases>
                    <enabled>true</enabled>
                </releases>
            </repository>
            <repository>
                <id>scala-tools</id>
                <url>http://scala-tools.org/repo-releases</url>
                <snapshots>
                    <enabled>true</enabled>
                </snapshots>
                <releases>
                    <enabled>true</enabled>
                </releases>
            </repository>
            <repository>
                <id>conjars</id>
                <url>http://conjars.org/repo/</url>
                <snapshots>
                    <enabled>true</enabled>
                </snapshots>
                <releases>
                    <enabled>true</enabled>
                </releases>
            </repository>
        </repositories>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.1</version>
                    <configuration>
                        <source>1.6</source>
                        <target>1.6</target>
                        <encoding>UTF-8</encoding>
                        <showDeprecation>true</showDeprecation>
                        <showWarnings>true</showWarnings>
                    </configuration>
                </plugin>
                <plugin>
                    <artifactId>maven-assembly-plugin</artifactId>
                    <configuration>
                        <descriptorRefs>
                            <descriptorRef>jar-with-dependencies</descriptorRef>
                        </descriptorRefs>
                        <archive>
                            <manifest>
                                <mainClass></mainClass>
                            </manifest>
                        </archive>
                    </configuration>
                    <executions>
                        <execution>
                            <id>make-assembly</id>
                            <phase>package</phase>
                            <goals>
                                <goal>single</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>
    </project>

     

    (2)kafka发来的数据通过levelSplitbolt进行分割处理,然后再发送到下一个Bolt中。代码如下:

    package com.kafka.spout;
    
    import java.util.regex.Matcher;
    import java.util.regex.Pattern;
    import backtype.storm.topology.BasicOutputCollector;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.topology.base.BaseBasicBolt;
    import backtype.storm.tuple.Fields;
    import backtype.storm.tuple.Tuple;
    import backtype.storm.tuple.Values;
    
    
    public class LevelSplit extends BaseBasicBolt {
    
    
        public void execute(Tuple tuple, BasicOutputCollector collector) {
            String words = tuple.getString(0).toString();//the cow jumped over the moon
            String []va=words.split(" ");
            for(String word : va)
            {
            	collector.emit(new Values(word));
            }
    		
        }
    
      
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }
    
    }
    

      

    (3)将levelSplit的Bolt发来的数据到levelCount的Bolt中进行计数处理,然后发送到hbase(Bolt)中。代码如下:

    package com.kafka.spout;
    
    import java.util.HashMap;
    import java.util.Map;
    import java.util.Map.Entry;
    
    import backtype.storm.topology.BasicOutputCollector;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.topology.base.BaseBasicBolt;
    import backtype.storm.tuple.Fields;
    import backtype.storm.tuple.Tuple;
    import backtype.storm.tuple.Values;
    
    
    public class LevelCount extends BaseBasicBolt {
        Map<String, Integer> counts = new HashMap<String, Integer>();
    
    	public void execute(Tuple tuple, BasicOutputCollector collector) {
    		// TODO Auto-generated method stub
    		String word = tuple.getString(0);
            Integer count = counts.get(word);
            if (count == null)
                count = 0;
            count++;
            counts.put(word, count);
    
    		for (Entry<String, Integer> e : counts.entrySet()) {
    			//sum += e.getValue();
    			System.out.println(e.getKey()
    								+ "----------->" +e.getValue());
            }
            collector.emit(new Values(word, count));      
    	}
    
    	public void declareOutputFields(OutputFieldsDeclarer declarer) {
    		// TODO Auto-generated method stub
    		 declarer.declare(new Fields("word", "count"));
    	}
    }
    

      

    (4)准备连接kafkahbase条件以及设置整个拓扑结构并且提交拓扑。代码如下:

    package com.kafka.spout;
    
    
    import java.util.HashMap;
    import java.util.Map;
    
    import com.google.common.collect.Maps;
    
    //import org.apache.storm.guava.collect.Maps;
    
    
    import backtype.storm.Config;
    import backtype.storm.LocalCluster;
    import backtype.storm.StormSubmitter;
    import backtype.storm.generated.AlreadyAliveException;
    import backtype.storm.generated.InvalidTopologyException;
    import backtype.storm.spout.SchemeAsMultiScheme;
    import backtype.storm.topology.TopologyBuilder;
    import backtype.storm.tuple.Fields;
    import backtype.storm.utils.Utils;
    import storm.kafka.BrokerHosts;
    import storm.kafka.KafkaSpout;
    import storm.kafka.SpoutConfig;
    import storm.kafka.ZkHosts;
    
    
    
    public class StormKafkaTopo {
        public static void main(String[] args) {
        	
        	
            BrokerHosts brokerHosts = new ZkHosts("zeb,yjd,ylh");
            SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, "yjd", "/storm", "kafkaspout");
            Config conf = new Config();   
            spoutConfig.scheme =  new SchemeAsMultiScheme(new MessageScheme());    
            
            SimpleHBaseMapper mapper = new SimpleHBaseMapper();
            mapper.withColumnFamily("result");
            mapper.withColumnFields(new Fields("count"));
            mapper.withRowKeyField("word");
            
            Map<String, Object> map = Maps.newTreeMap();
            map.put("hbase.rootdir", "hdfs://zeb:9000/hbase");
            map.put("hbase.zookeeper.quorum", "zeb:2181,yjd:2181,ylh:2181");
            
            // hbase-bolt
            HBaseBolt hBaseBolt = new HBaseBolt("wc", mapper).withConfigKey("hbase.conf");
    
            conf.setDebug(true);
            conf.put("hbase.conf", map);
    
            
            TopologyBuilder builder = new TopologyBuilder();
            builder.setSpout("spout", new KafkaSpout(spoutConfig));
            builder.setBolt("split", new LevelSplit(), 1).shuffleGrouping("spout");
            builder.setBolt("count", new LevelCount(), 1).fieldsGrouping("split", new Fields("word"));
            builder.setBolt("hbase", hBaseBolt, 1).shuffleGrouping("count");
            
            if(args != null && args.length > 0) {
                //提交到集群运行
                try {
                    StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
                } catch (AlreadyAliveException e) {
                    e.printStackTrace();
                } catch (InvalidTopologyException e) {
                    e.printStackTrace();
                }
            } else {
                //本地模式运行
                LocalCluster cluster = new LocalCluster();
                cluster.submitTopology("Topotest1121", conf, builder.createTopology());
                Utils.sleep(1000000);
                cluster.killTopology("Topotest1121");
                cluster.shutdown();
            }           
        }
    }
    

      

    (5)在kafka端用控制台生产数据,如下:

    2、运行结果截图:

     

    3、遇到的问题:

    (1)把所有的工作做好后,提交了拓扑,运行代码。发生了错误1,如下:

     

    解决:原来是因为依赖版本要统一的问题,最后将版本修改一致后,成功解决。

    (2)发生了错误2,如下:

     

    解决:原来是忘记开hbase中的HMaster和HRegionServer。启动后问题成功解决。

  • 相关阅读:
    Objective-C中不同方式实现锁(二)-11-多线程
    共享资源加锁的操作方法-10-多线程
    ios 下锁使用- 09-多线程
    iOS开发-线程安全-09-多线程
    线程同步-iOS多线程编程指南(四)-08-多线程
    《GCD 实现同步锁》-07-多线程
    死锁-06-多线程
    生产者消费者问题-05-多线程
    递归锁+条件锁+互斥锁-04-多线程
    Android开发技术周报 Issue#62
  • 原文地址:https://www.cnblogs.com/yjd_hycf_space/p/6918769.html
Copyright © 2011-2022 走看看