zoukankan      html  css  js  c++  java
  • storm写入hbase

    测试的时候  要在  windows 上修改 hosts文件     添加主机  URL  :hostname

    单机版hbase利用自带的zk的时候 需要在   ${hbase_home}/bin/hbase-site.xml 

    添加

    <property>
                <name>hbase.zookeeper.quorum</name>
                <value>192.168.X.X</value>
        </property>
    程序框架图
    java
        com
            heibaiying
                            component
                                            CountBolt.java
                                            DataSourceSpout.java
                                            SplitBolt
                            WordCountToHBase.java    

    CountBolt

    package com.heibaiying.component;
    
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichBolt;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    import org.apache.storm.tuple.Values;
    
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * 进行词频统计
     */
    public class CountBolt extends BaseRichBolt {
    
        private Map<String, Integer> counts = new HashMap<>();
    
        private OutputCollector collector;
    
    
        @Override
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
            this.collector=collector;
        }
    
        @Override
        public void execute(Tuple input) {
            String word = input.getStringByField("word");
            Integer count = counts.get(word);
            if (count == null) {
                count = 0;
            }
            count++;
            counts.put(word, count);
            // 输出
            collector.emit(new Values(word, String.valueOf(count)));
    
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word", "count"));
        }
    }

    DataSourceSpout

    package com.heibaiying.component;
    
    import org.apache.storm.shade.org.apache.commons.lang.StringUtils;
    import org.apache.storm.spout.SpoutOutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichSpout;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Values;
    import org.apache.storm.utils.Utils;
    
    import java.util.*;
    
    
    /**
     * 产生词频样本的数据源
     */
    public class DataSourceSpout extends BaseRichSpout {
    
        private List<String> list = Arrays.asList("Spark", "Hadoop", "HBase", "Storm", "Flink", "Hive");
    
        private SpoutOutputCollector spoutOutputCollector;
    
        @Override
        public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
            this.spoutOutputCollector = spoutOutputCollector;
        }
    
        @Override
        public void nextTuple() {
            // 模拟产生数据
            String lineData = productData();
            spoutOutputCollector.emit(new Values(lineData));
            Utils.sleep(10000);
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
            outputFieldsDeclarer.declare(new Fields("line"));
        }
    
    
        /**
         * 模拟数据
         */
        private String productData() {
            Collections.shuffle(list);
            Random random = new Random();
            int endIndex = random.nextInt(list.size()) % (list.size()) + 1;
            return StringUtils.join(list.toArray(), "	", 0, endIndex);
        }
    
    }

    SplitBolt

    package com.heibaiying.component;
    
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichBolt;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    
    import java.util.Map;
    
    import static org.apache.storm.utils.Utils.tuple;
    
    /**
     * 将每行数据按照指定分隔符进行拆分
     */
    public class SplitBolt extends BaseRichBolt {
    
        private OutputCollector collector;
    
        @Override
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
            this.collector = collector;
        }
    
        @Override
        public void execute(Tuple input) {
            String line = input.getStringByField("line");
            String[] words = line.split("	");
            for (String word : words) {
                collector.emit(tuple(word, 1));
            }
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word", "count"));
        }
    }

      

    WordCountToHbaseApp

    package com.heibaiying;
    
    import com.heibaiying.component.CountBolt;
    import com.heibaiying.component.DataSourceSpout;
    import com.heibaiying.component.SplitBolt;
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.StormSubmitter;
    import org.apache.storm.generated.AlreadyAliveException;
    import org.apache.storm.generated.AuthorizationException;
    import org.apache.storm.generated.InvalidTopologyException;
    import org.apache.storm.hbase.bolt.HBaseBolt;
    import org.apache.storm.hbase.bolt.mapper.SimpleHBaseMapper;
    import org.apache.storm.topology.TopologyBuilder;
    import org.apache.storm.tuple.Fields;
    
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * 进行词频统计 并将统计结果存储到HBase中
     */
    public class WordCountToHBaseApp {
    
        private static final String DATA_SOURCE_SPOUT = "dataSourceSpout";
        private static final String SPLIT_BOLT = "splitBolt";
        private static final String COUNT_BOLT = "countBolt";
        private static final String HBASE_BOLT = "hbaseBolt";
    
        public static void main(String[] args) {
    
            // storm的配置
            Config config = new Config();
    
            // HBase的配置
            Map<String, Object> hbConf = new HashMap<>();
            hbConf.put("hbase.rootdir", "hdfs://192.168.0.108:8020/hbase");
            hbConf.put("hbase.zookeeper.quorum", "192.168.0.108:2181");
            hbConf.put("zookeeper.znode.parent", "/hbase");
    
            // 将HBase的配置传入Storm的配置中
            config.put("hbase.conf", hbConf);
    
            // 定义流数据与HBase中数据的映射
            SimpleHBaseMapper mapper = new SimpleHBaseMapper()
                    .withRowKeyField("word")
                    .withColumnFields(new Fields("word","count"))
                    .withColumnFamily("info");
    
            /*
             * 给HBaseBolt传入表名、数据映射关系、和HBase的配置信息
             * 表需要预先创建: create 'WordCount','info'
             */
            HBaseBolt hbase = new HBaseBolt("WordCount", mapper)
                    .withConfigKey("hbase.conf");
    
            // 构建Topology
            TopologyBuilder builder = new TopologyBuilder();
            builder.setSpout(DATA_SOURCE_SPOUT, new DataSourceSpout(),1);
            // split
            builder.setBolt(SPLIT_BOLT, new SplitBolt(), 1).shuffleGrouping(DATA_SOURCE_SPOUT);
            // count
            builder.setBolt(COUNT_BOLT, new CountBolt(),1).shuffleGrouping(SPLIT_BOLT);
            // save to HBase
            builder.setBolt(HBASE_BOLT, hbase, 1).shuffleGrouping(COUNT_BOLT);
    
    
            // 如果外部传参cluster则代表线上环境启动,否则代表本地启动
            if (args.length > 0 && args[0].equals("cluster")) {
                try {
                    StormSubmitter.submitTopology("ClusterWordCountToRedisApp", config, builder.createTopology());
                } catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {
                    e.printStackTrace();
                }
            } else {
                LocalCluster cluster = new LocalCluster();
                cluster.submitTopology("LocalWordCountToRedisApp",
                        config, builder.createTopology());
            }
        }
    }
    RUSH B
  • 相关阅读:
    java 生成12位随机数,解决The literal 9999999999999 of type int is out of range 问题
    使用loadrunner 12 手动关联
    解析xml文件,修改Jenkins的配置
    自定义报告,用Java写一个html文件
    解决Jenkins的错误“The Server rejected the connection: None of the protocols were accepted”
    selenium项目--读取测试用例
    header("Location:login.php")
    详细设计说明书(转)
    Visual EmbedLinux Tools:让vs支持嵌入式Linux开发(转)
    phpstorm集成phpunit(转)
  • 原文地址:https://www.cnblogs.com/tangsonghuai/p/11150865.html
Copyright © 2011-2022 走看看