代码改变世界
[登录 · 注册]
  • Flume+Kafka+Storm+Hbase+HDSF+Poi整合
  • Flume+Kafka+Storm+Hbase+HDSF+Poi整合

    需求:

    针对一个网站,我们需要根据用户的行为记录日志信息,分析对我们有用的数据。

    举例:这个网站www.hongten.com(当然这是一个我虚拟的电商网站),用户在这个网站里面可以有很多行为,比如注册,登录,查看,点击,双击,购买东西,加入购物车,添加记录,修改记录,删除记录,评论,登出等一系列我们熟悉的操作。这些操作都被记录在日志信息里面。我们要对日志信息进行分析。

    本文中,我们对购买东西和加入购物车两个行为进行分析。然后生成相应的报表,这样我们可以通过报表查看用户在什么时候喜欢购买东西,什么时候喜欢加入购物车,从而,在相应的时间采取行动,激烈用户购买东西,推荐商品给用户加入购物车(加入购物车,这属于潜在购买用户)。

    毕竟网站盈利才是我们希望达到的目的,对吧。

    1.抽象用户行为

        // 用户的action
        public static final String[] USER_ACTION = { "Register", "Login", "View", "Click", "Double_Click", "Buy", "Shopping_Car", "Add", "Edit", "Delete", "Comment", "Logout" };

    2.日志格式定义

    115.19.62.102    海南    2018-12-20    1545286960749    1735787074662918890    www.hongten.com    Edit
    27.177.45.84    新疆    2018-12-20    1545286962255    6667636903937987930    www.hongten.com    Delete
    176.54.120.96    宁夏    2018-12-20    1545286962256    6988408478348165495    www.hongten.com    Comment
    175.117.33.187    辽宁    2018-12-20    1545286962257    8411202446705338969    www.hongten.com    Shopping_Car
    17.67.62.213    天津    2018-12-20    1545286962258    7787584752786413943    www.hongten.com    Add
    137.81.41.9    海南    2018-12-20    1545286962259    6218367085234099455    www.hongten.com    Shopping_Car
    125.187.107.57    山东    2018-12-20    1545286962260    3358658811146151155    www.hongten.com    Double_Click
    104.167.205.87    内蒙    2018-12-20    1545286962261    2303468282544965471    www.hongten.com    Shopping_Car
    64.106.149.83    河南    2018-12-20    1545286962262    8422202443986582525    www.hongten.com    Delete
    138.22.156.183    浙江    2018-12-20    1545286962263    7649154147863130337    www.hongten.com    Shopping_Car
    41.216.103.31    河北    2018-12-20    1545286962264    6785302169446728008    www.hongten.com    Shopping_Car
    132.144.93.20    广东    2018-12-20    1545286962265    6444575166009004406    www.hongten.com    Add

    日志格式:

    //log fromat
    String log = ip + "	" + address + "	" + d + "	" + timestamp + "	" + userid + "	" + Common.WEB_SITE + "	" + action;

    3.系统架构

    4.报表样式

    由于我采用的是随机生成数据,所有,我们看到的结果呈现线性增长

    这里我只是实现了一个小时的报表,当然,也可以做一天,一个季度,全年,三年,五年的报表,可以根据实际需求实现即可。

     

    5.组件分布情况

    我总共搭建了4个节点node1,node2,node3,node4(注: 4个节点上面都要有JDK)

    Zookeeper安装在node1,node2,nod3

    Hadoop集群在node1,node2,nod3,node4

    Hbase集群在node1,node2,nod3,node4

    Flume安装在node2

    Kafka安装在node1,node2,node3

    Storm安装在node1,node2,node3

    6.具体实现

    6.1.配置Flume

    --从node2
    cd flumedir
    
    vi flume2kafka
    
    --node2配置如下
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the source
    a1.sources.r1.type = avro
    a1.sources.r1.bind = node2
    a1.sources.r1.port = 41414
    
    # Describe the sink
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.topic = all_my_log
    a1.sinks.k1.brokerList = node1:9092,node2:9092,node3:9092
    a1.sinks.k1.requiredAcks = 1
    a1.sinks.k1.batchSize = 20
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000000
    a1.channels.c1.transactionCapacity = 10000
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    
    :wq

    6.2.启动Zookeeper

    --关闭防火墙node1,node2,node3,node4
    service iptables stop
    
    --启动Zookeeper,在node1,node2,node3
    zkServer.sh start

    6.3.启动Kafka

    --启动kafka
    --分别进入node1,node2,node3
    cd /root/kafka/kafka_2.10-0.8.2.2
    ./start-kafka.sh

    6.4.启动Flume服务

    --进入node2,启动
    cd /root/flumedir
    flume-ng agent -n a1 -c conf -f flume2kafka -Dflume.root.logger=DEBUG,console

    6.5.产生日志信息并写入到Flume

    运行java 代码,产生日志信息并写入到Flume服务器

    package com.b510.big.data.flume.client;
    
    import java.nio.charset.Charset;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.Random;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;
    
    import org.apache.flume.Event;
    import org.apache.flume.EventDeliveryException;
    import org.apache.flume.api.RpcClient;
    import org.apache.flume.api.RpcClientFactory;
    import org.apache.flume.event.EventBuilder;
    
    /**
     * @author Hongten
     * 
     *         功能: 模拟产生用户日志信息,并且向Flume发送数据
     */
    public class FlumeClient {
    
        public static void main(String[] args) {
            ExecutorService exec = Executors.newCachedThreadPool();
            exec.execute(new GenerateDataAndSend2Flume());
    
            exec.shutdown();
        }
    
    }
    
    class GenerateDataAndSend2Flume implements Runnable {
    
        FlumeRPCClient flumeRPCClient;
        static Random random = new Random();
    
        GenerateDataAndSend2Flume() {
            // 初始化RPC客户端
            flumeRPCClient = new FlumeRPCClient();
            flumeRPCClient.init(Common.FLUME_HOST_NAME, Common.FLUME_PORT);
        }
    
        @Override
        public void run() {
            while (true) {
                Date date = new Date();
                SimpleDateFormat simpleDateFormat = new SimpleDateFormat(Common.DATE_FORMAT_YYYYDDMM);
                String d = simpleDateFormat.format(date);
                Long timestamp = new Date().getTime();
                // ip地址生成
                String ip = random.nextInt(Common.MAX_IP_NUMBER) + "." + random.nextInt(Common.MAX_IP_NUMBER) + "." + random.nextInt(Common.MAX_IP_NUMBER) + "." + random.nextInt(Common.MAX_IP_NUMBER);
                // ip地址对应的address(这里是为了构造数据,并没有按照真实的ip地址,找到对应的address)
                String address = Common.ADDRESS[random.nextInt(Common.ADDRESS.length)];
    
                Long userid = Math.abs(random.nextLong());
                String action = Common.USER_ACTION[random.nextInt(Common.USER_ACTION.length)];
                // 日志信息构造
                // example : 199.80.45.117 云南 2018-12-20 1545285957720 3086250439781555145 www.hongten.com Buy
                String data = ip + "	" + address + "	" + d + "	" + timestamp + "	" + userid + "	" + Common.WEB_SITE + "	" + action;
                //System.out.println(data);
    
                // 往Flume发送数据
                flumeRPCClient.sendData2Flume(data);
    
                try {
                    TimeUnit.MICROSECONDS.sleep(random.nextInt(1000));
                } catch (InterruptedException e) {
                    flumeRPCClient.cleanUp();
                    System.out.println("interrupted exception : " + e);
                }
            }
        }
    }
    
    class FlumeRPCClient {
    
        private RpcClient client;
        private String hostname;
        private int port;
    
        public void init(String hostname, int port) {
            this.hostname = hostname;
            this.port = port;
            this.client = getRpcClient(hostname, port);
        }
    
        public void sendData2Flume(String data) {
            Event event = EventBuilder.withBody(data, Charset.forName(Common.CHAR_FORMAT));
    
            try {
                client.append(event);
            } catch (EventDeliveryException e) {
                cleanUp();
                client = null;
                client = getRpcClient(hostname, port);
            }
        }
    
        public RpcClient getRpcClient(String hostname, int port) {
            return RpcClientFactory.getDefaultInstance(hostname, port);
        }
    
        public void cleanUp() {
            // Close the RPC connection
            client.close();
        }
    }
    
    // 所有的常量定义
    class Common {
        public static final String CHAR_FORMAT = "UTF-8";
    
        public static final String DATE_FORMAT_YYYYDDMM = "yyyy-MM-dd";
    
        // this is a test web site
        public static final String WEB_SITE = "www.hongten.com";
    
        // 用户的action
        public static final String[] USER_ACTION = { "Register", "Login", "View", "Click", "Double_Click", "Buy", "Shopping_Car", "Add", "Edit", "Delete", "Comment", "Logout" };
    
        public static final int MAX_IP_NUMBER = 224;
        // ip所对应的地址
        public static String[] ADDRESS = { "北京", "天津", "上海", "广东", "重庆", "河北", "山东", "河南", "云南", "山西", "甘肃", "安徽", "福建", "黑龙江", "海南", "四川", "贵州", "宁夏", "新疆", "湖北", "湖南", "山西", "辽宁", "吉林", "江苏", "浙江", "青海", "江西", "西藏", "内蒙", "广西", "香港", "澳门", "台湾", };
    
        // Flume conf
        public static final String FLUME_HOST_NAME = "node2";
        public static final int FLUME_PORT = 41414;
    
    }

     

    6.6.监听Kafka

    --进入node3,启动kafka消费者
    cd /home/kafka-2.10/bin
    ./kafka-console-consumer.sh --zookeeper node1,node2,node3 --from-beginning --topic all_my_log

    运行效果:

    168.208.193.207    安徽    2018-12-20    1545287646527    5462770148222682599    www.hongten.com    Login
    103.143.79.127    新疆    2018-12-20    1545287646529    3389475301916412717    www.hongten.com    Login
    111.208.80.39    山东    2018-12-20    1545287646531    535601622597096753    www.hongten.com    Shopping_Car
    105.30.86.46    四川    2018-12-20    1545287646532    7825340079790811845    www.hongten.com    Login
    205.55.33.74    新疆    2018-12-20    1545287646533    4228838365367235561    www.hongten.com    Logout
    34.44.60.134    安徽    2018-12-20    1545287646536    702584874247456732    www.hongten.com    Double_Click
    154.169.15.145    广东    2018-12-20    1545287646537    1683351753576425036    www.hongten.com    View
    126.28.192.28    湖南    2018-12-20    1545287646538    8319814684518483148    www.hongten.com    Edit
    5.140.156.73    台湾    2018-12-20    1545287646539    7432409906375230025    www.hongten.com    Logout
    72.175.210.95    西藏    2018-12-20    1545287646540    5233707593244910849    www.hongten.com    View
    121.25.190.25    广西    2018-12-20    1545287646541    268200251881841673    www.hongten.com    Buy

    6.7.在Kafka创建Topic

    --进入node1,创建一个topic:filtered_log
    --设置3个partitions
    --replication-factor=3
    ./kafka-topics.sh --zookeeper node1,node2,node3 --create --topic filtered_log --partitions 3 --replication-factor 3

    6.8.Storm清洗数据

    • Storm从Kafka消费数据
    • Storm对数据进行筛选(Buy-已经购买,Shopping_Car-潜在购买)
    • Storm把筛选的数据放入到Kafka
    package com.b510.big.data.storm.process;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Properties;
    
    import storm.kafka.KafkaSpout;
    import storm.kafka.SpoutConfig;
    import storm.kafka.StringScheme;
    import storm.kafka.ZkHosts;
    import storm.kafka.bolt.KafkaBolt;
    import storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
    import storm.kafka.bolt.selector.DefaultTopicSelector;
    import backtype.storm.Config;
    import backtype.storm.LocalCluster;
    import backtype.storm.StormSubmitter;
    import backtype.storm.generated.AlreadyAliveException;
    import backtype.storm.generated.InvalidTopologyException;
    import backtype.storm.spout.SchemeAsMultiScheme;
    import backtype.storm.topology.BasicOutputCollector;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.topology.TopologyBuilder;
    import backtype.storm.topology.base.BaseBasicBolt;
    import backtype.storm.tuple.Fields;
    import backtype.storm.tuple.Tuple;
    import backtype.storm.tuple.Values;
    
    public class LogFilterTopology {
    
        public static void main(String[] args) {
    
            ZkHosts zkHosts = new ZkHosts(Common.ZOOKEEPER_QUORUM);
            //Spout从'filtered_log' topic里面获取数据
            SpoutConfig spoutConfig = new SpoutConfig(zkHosts, Common.ALL_MY_LOG_TOPIC, Common.ZOOKEEPER_ROOT, Common.ZOOKEEPER_ID);
            List<String> zkServers = new ArrayList<>();
            for (String host : zkHosts.brokerZkStr.split(",")) {
                zkServers.add(host.split(":")[0]);
            }
    
            spoutConfig.zkServers = zkServers;
            spoutConfig.zkPort = Common.ZOOKEEPER_PORT;
            spoutConfig.forceFromStart = true;
            spoutConfig.socketTimeoutMs = 60 * 60 * 1000;
            spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
    
            // 创建KafkaSpout
            KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
    
            TopologyBuilder builder = new TopologyBuilder();
            // Storm从Kafka消费数据
            builder.setSpout(Common.KAFKA_SPOUT, kafkaSpout, 3);
            // Storm对数据进行筛选(Buy-已经购买,Shopping_Car-潜在购买)
            builder.setBolt(Common.FILTER_BOLT, new FilterBolt(), 8).shuffleGrouping(Common.KAFKA_SPOUT);
    
            // 创建KafkaBolt
            @SuppressWarnings({ "unchecked", "rawtypes" })
            KafkaBolt kafkaBolt = new KafkaBolt().withTopicSelector(new DefaultTopicSelector(Common.FILTERED_LOG_TOPIC)).withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper());
    
            // Storm把筛选的数据放入到Kafka
            builder.setBolt(Common.KAFKA_BOLT, kafkaBolt, 2).shuffleGrouping(Common.FILTER_BOLT);
    
            Properties props = new Properties();
            props.put("metadata.broker.list", Common.STORM_METADATA_BROKER_LIST);
            props.put("request.required.acks", Common.STORM_REQUEST_REQUIRED_ACKS);
            props.put("serializer.class", Common.STORM_SERILIZER_CLASS);
    
            Config conf = new Config();
            conf.put("kafka.broker.properties", props);
    
            conf.put(Config.STORM_ZOOKEEPER_SERVERS, zkServers);
    
            if (args == null || args.length == 0) {
                // 本地方式运行
                LocalCluster localCluster = new LocalCluster();
                localCluster.submitTopology("storm-kafka-topology", conf, builder.createTopology());
            } else {
                // 集群方式运行
                conf.setNumWorkers(3);
                try {
                    StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
                } catch (AlreadyAliveException | InvalidTopologyException e) {
                    System.out.println("error : " + e);
                }
            }
        }
    }
    
    class FilterBolt extends BaseBasicBolt {
    
        private static final long serialVersionUID = 1L;
    
        @Override
        public void execute(Tuple input, BasicOutputCollector collector) {
            String logStr = input.getString(0);
            // 只针对我们感兴趣的关键字进行过滤
            // 这里我们过滤包含'Buy', 'Shopping_Car'的日志信息
            if (logStr.contains(Common.KEY_WORD_BUY) || logStr.contains(Common.KEY_WORD_SHOPPING_CAR)) {
                collector.emit(new Values(logStr));
            }
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields(FieldNameBasedTupleToKafkaMapper.BOLT_MESSAGE));
        }
    }
    
    class Common {
        public static final String ALL_MY_LOG_TOPIC = "all_my_log";
        public static final String FILTERED_LOG_TOPIC = "filtered_log";
        
        public static final String DATE_FORMAT_YYYYDDMMHHMMSS = "yyyyMMddHHmmss";
        public static final String DATE_FORMAT_HHMMSS = "HHmmss";
        public static final String DATE_FORMAT_HHMMSS_DEFAULT_VALUE = "000001";
    
        public static final String HBASE_ZOOKEEPER_LIST = "node1:2888,node2:2888,node3:2888";
        public static final int ZOOKEEPER_PORT = 2181;
        public static final String ZOOKEEPER_QUORUM = "node1:" + ZOOKEEPER_PORT + ",node2:" + ZOOKEEPER_PORT + ",node3:" + ZOOKEEPER_PORT + "";
        public static final String ZOOKEEPER_ROOT = "/MyKafka";
        public static final String ZOOKEEPER_ID = "MyTrack";
    
        public static final String KAFKA_SPOUT = "kafkaSpout";
        public static final String FILTER_BOLT = "filterBolt";
        public static final String PROCESS_BOLT = "processBolt";
        public static final String HBASE_BOLT = "hbaseBolt";
        public static final String KAFKA_BOLT = "kafkaBolt";
    
        // Storm Conf
        public static final String STORM_METADATA_BROKER_LIST = "node1:9092,node2:9092,node3:9092";
        public static final String STORM_REQUEST_REQUIRED_ACKS = "1";
        public static final String STORM_SERILIZER_CLASS = "kafka.serializer.StringEncoder";
    
        // key word
        public static final String KEY_WORD_BUY = "Buy";
        public static final String KEY_WORD_SHOPPING_CAR = "Shopping_Car";
        
        //hbase
        public static final String TABLE_USER_ACTION = "t_user_actions";
        public static final String COLUMN_FAMILY = "cf";
        //间隔多少秒写入Hbase一次
        public static final int WRITE_RECORD_TO_TABLE_PER_SECOND = 1;
        public static final int TABLE_MAX_VERSION = (60/WRITE_RECORD_TO_TABLE_PER_SECOND) * 60 * 24;
    }

    6.9.监听Kafka

    --进入node3,启动kafka消费者
    cd /home/kafka-2.10/bin
    ./kafka-console-consumer.sh --zookeeper node1,node2,node3 --from-beginning --topic filtered_log

    效果:

    87.26.135.185    黑龙江    2018-12-20    1545290594658    7290881731606227972    www.hongten.com    Shopping_Car
    60.96.96.38    青海    2018-12-20    1545290594687    6935901257286057015    www.hongten.com    Shopping_Car
    43.159.110.193    江苏    2018-12-20    1545290594727    7096698224110515553    www.hongten.com    Shopping_Car
    21.103.139.11    山西    2018-12-20    1545290594693    7805867078876194442    www.hongten.com    Shopping_Car
    139.51.213.184    广东    2018-12-20    1545290594729    8048796865619113514    www.hongten.com    Buy
    58.213.148.89    河北    2018-12-20    1545290594708    5176551342435592748    www.hongten.com    Buy
    36.205.221.116    湖南    2018-12-20    1545290594715    4484717918039766421    www.hongten.com    Shopping_Car
    135.194.103.53    北京    2018-12-20    1545290594769    4833011508087432349    www.hongten.com    Shopping_Car
    180.21.100.66    贵州    2018-12-20    1545290594752    5270357330431599426    www.hongten.com    Buy
    167.71.65.70    山西    2018-12-20    1545290594790    275898530145861990    www.hongten.com    Buy
    125.51.21.199    宁夏    2018-12-20    1545290594814    3613499600574777198    www.hongten.com    Buy

    6.10.Storm再次消费Kafka数据处理后保存数据到Hbase

    • Storm再次从Kafka消费数据
    • Storm对数据进行统计(Buy-已经购买人数,Shopping_Car-潜在购买人数)
    • Storm将数据写入到Hbase
    package com.b510.big.data.storm.process;
    
    import java.io.IOException;
    import java.text.SimpleDateFormat;
    import java.util.ArrayList;
    import java.util.Date;
    import java.util.List;
    import java.util.Map;
    import java.util.Properties;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.HColumnDescriptor;
    import org.apache.hadoop.hbase.HTableDescriptor;
    import org.apache.hadoop.hbase.TableName;
    import org.apache.hadoop.hbase.client.HBaseAdmin;
    import org.apache.hadoop.hbase.client.HConnection;
    import org.apache.hadoop.hbase.client.HConnectionManager;
    import org.apache.hadoop.hbase.client.HTableInterface;
    import org.apache.hadoop.hbase.client.Put;
    
    import storm.kafka.KafkaSpout;
    import storm.kafka.SpoutConfig;
    import storm.kafka.StringScheme;
    import storm.kafka.ZkHosts;
    import backtype.storm.Config;
    import backtype.storm.LocalCluster;
    import backtype.storm.StormSubmitter;
    import backtype.storm.generated.AlreadyAliveException;
    import backtype.storm.generated.InvalidTopologyException;
    import backtype.storm.spout.SchemeAsMultiScheme;
    import backtype.storm.task.TopologyContext;
    import backtype.storm.topology.BasicOutputCollector;
    import backtype.storm.topology.IBasicBolt;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.topology.TopologyBuilder;
    import backtype.storm.topology.base.BaseBasicBolt;
    import backtype.storm.tuple.Fields;
    import backtype.storm.tuple.Tuple;
    import backtype.storm.tuple.Values;
    
    public class LogProcessTopology {
    
        public static void main(String[] args) {
    
            ZkHosts zkHosts = new ZkHosts(Common.ZOOKEEPER_QUORUM);
            //Spout从'filtered_log' topic里面获取数据
            SpoutConfig spoutConfig = new SpoutConfig(zkHosts, Common.FILTERED_LOG_TOPIC, Common.ZOOKEEPER_ROOT, Common.ZOOKEEPER_ID);
            List<String> zkServers = new ArrayList<>();
            for (String host : zkHosts.brokerZkStr.split(",")) {
                zkServers.add(host.split(":")[0]);
            }
    
            spoutConfig.zkServers = zkServers;
            spoutConfig.zkPort = Common.ZOOKEEPER_PORT;
            spoutConfig.forceFromStart = true;
            spoutConfig.socketTimeoutMs = 60 * 60 * 1000;
            spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
    
            // 创建KafkaSpout
            KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
    
            TopologyBuilder builder = new TopologyBuilder();
            // Storm再次从Kafka消费数据
            builder.setSpout(Common.KAFKA_SPOUT, kafkaSpout, 3);
            // Storm对数据进行统计(Buy-已经购买人数,Shopping_Car-潜在购买人数)
            builder.setBolt(Common.PROCESS_BOLT, new ProcessBolt(), 3).shuffleGrouping(Common.KAFKA_SPOUT);
            // Storm将数据写入到Hbase
            builder.setBolt(Common.HBASE_BOLT, new HbaseBolt(), 3).shuffleGrouping(Common.PROCESS_BOLT);
    
            Properties props = new Properties();
            props.put("metadata.broker.list", Common.STORM_METADATA_BROKER_LIST);
            props.put("request.required.acks", Common.STORM_REQUEST_REQUIRED_ACKS);
            props.put("serializer.class", Common.STORM_SERILIZER_CLASS);
    
            Config conf = new Config();
            conf.put("kafka.broker.properties", props);
    
            conf.put(Config.STORM_ZOOKEEPER_SERVERS, zkServers);
    
            if (args == null || args.length == 0) {
                // 本地方式运行
                LocalCluster localCluster = new LocalCluster();
                localCluster.submitTopology("storm-kafka-topology", conf, builder.createTopology());
            } else {
                // 集群方式运行
                conf.setNumWorkers(3);
                try {
                    StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
                } catch (AlreadyAliveException | InvalidTopologyException e) {
                    System.out.println("error : " + e);
                }
            }
            
        }
    }
    
    class ProcessBolt extends BaseBasicBolt {
    
        private static final long serialVersionUID = 1L;
    
        @Override
        public void execute(Tuple input, BasicOutputCollector collector) {
            String logStr = input.getString(0);
            if (logStr != null) {
                String infos[] = logStr.split("\t");
                //180.21.100.66    贵州    2018-12-20    1545290594752    5270357330431599426    www.hongten.com    Buy
                collector.emit(new Values(infos[2], infos[6]));
            }
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("date", "user_action"));
        }
    }
    
    class HbaseBolt implements IBasicBolt {
        private static final long serialVersionUID = 1L;
    
        HBaseDAO hBaseDAO = null;
        
        SimpleDateFormat simpleDateFormat = null;
        SimpleDateFormat simpleDateFormatHHMMSS = null;
        
        int userBuyCount = 0;
        int userShoopingCarCount = 0;
        
        //这里要考虑避免频繁写入数据到hbase
        int writeToHbaseMaxNum = Common.WRITE_RECORD_TO_TABLE_PER_SECOND * 1000;
        long begin = System.currentTimeMillis();
        long end = 0;
        
        @SuppressWarnings("rawtypes")
        @Override
        public void prepare(Map map, TopologyContext context) {
            hBaseDAO = new HBaseDAOImpl();
            simpleDateFormat = new SimpleDateFormat(Common.DATE_FORMAT_YYYYDDMMHHMMSS);
            simpleDateFormatHHMMSS = new SimpleDateFormat(Common.DATE_FORMAT_HHMMSS);
            hBaseDAO.createTable(Common.TABLE_USER_ACTION, new String[]{Common.COLUMN_FAMILY}, Common.TABLE_MAX_VERSION);
        }
    
        @Override
        public void execute(Tuple input, BasicOutputCollector collector) {
            // 如果时间是第二天的凌晨1s
            // 需要对count做清零处理
            //不过这里的判断不是很准确,因为在此时,可能前一天的数据还没有处理完
            if (simpleDateFormatHHMMSS.format(new Date()).equals(Common.DATE_FORMAT_HHMMSS_DEFAULT_VALUE)) {
                userBuyCount = 0;
                userShoopingCarCount = 0;
            }
            
            if (input != null) {
                // base one ProcessBolt.declareOutputFields()
                String date = input.getString(0);
                String userAction = input.getString(1);
    
                if (userAction.equals(Common.KEY_WORD_BUY)) {
                    //同一个user在一天之内可以重复'Buy'动作
                    userBuyCount++;
                }
    
                if (userAction.equals(Common.KEY_WORD_SHOPPING_CAR)) {
                    userShoopingCarCount++;
                }
    
                end = System.currentTimeMillis();
                if ((end - begin) > writeToHbaseMaxNum) {
                    System.out.println("hbase_key: " + Common.KEY_WORD_BUY + "_" + date + " , userBuyCount: " + userBuyCount + ", userShoopingCarCount :" + userShoopingCarCount);
                    
                    //往hbase中写入数据
                    String quailifer = simpleDateFormat.format(new Date());
                    hBaseDAO.insert(Common.TABLE_USER_ACTION , 
                            Common.KEY_WORD_BUY + "_" + date, 
                            Common.COLUMN_FAMILY, 
                            new String[] { quailifer },
                            new String[] { "{user_buy_count:" + userBuyCount + "}" }
                            );
                    hBaseDAO.insert(Common.TABLE_USER_ACTION , 
                            Common.KEY_WORD_SHOPPING_CAR + "_" + date, 
                            Common.COLUMN_FAMILY, 
                            new String[] { quailifer },
                            new String[] { "{user_shopping_car_count:" + userShoopingCarCount + "}" }
                            );
                    begin = System.currentTimeMillis();
                }
            }
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
    
        }
    
        @Override
        public Map<String, Object> getComponentConfiguration() {
            return null;
        }
    
        @Override
        public void cleanup() {
    
        }
    }
    
    interface HBaseDAO {
        public void createTable(String tableName, String[] columnFamilys, int maxVersion);
        public void insert(String tableName, String rowKey, String family, String quailifer[], String value[]);
    }
    
    class HBaseDAOImpl implements HBaseDAO {
    
        HConnection hConnection = null;
        static Configuration conf = null;
    
        public HBaseDAOImpl() {
            conf = new Configuration();
            conf.set("hbase.zookeeper.quorum", Common.HBASE_ZOOKEEPER_LIST);
            try {
                hConnection = HConnectionManager.createConnection(conf);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        
        public void createTable(String tableName, String[] columnFamilys, int maxVersion) {
            try {
                HBaseAdmin admin = new HBaseAdmin(conf);
                if (admin.tableExists(tableName)) {
                    System.err.println("table existing in hbase.");
                } else {
                    HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName));
                    for (String columnFamily : columnFamilys) {
                        HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(columnFamily);
                        hColumnDescriptor.setMaxVersions(maxVersion);
                        tableDesc.addFamily(hColumnDescriptor);
                    }
    
                    admin.createTable(tableDesc);
                    System.err.println("table is created.");
                }
                admin.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        
        @Override
        public void insert(String tableName, String rowKey, String family, String quailifer[], String value[]) {
            HTableInterface table = null;
            try {
                table = hConnection.getTable(tableName);
                Put put = new Put(rowKey.getBytes());
                for (int i = 0; i < quailifer.length; i++) {
                    String col = quailifer[i];
                    String val = value[i];
                    put.add(family.getBytes(), col.getBytes(), val.getBytes());
                }
                table.put(put);
                System.err.println("save record successfuly.");
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                try {
                    table.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    
    }

    Storm处理逻辑:

    1.每秒向Hbase写入数据

    2.明天凌晨会重置数据

    如果,我们一直运行上面的程序,那么,系统就会一直往Hbase里面写入数据,那么这样,我们就可以采集到我们生成报表的数据了。

    那么下面就是报表实现

    6.11.读取Hbase数据通过POI生成Excel Report

    • 读取Hbase数据
    • 通过POI生成Excel报表
    package com.b510.big.data.poi;
    
    import java.io.File;
    import java.io.FileInputStream;
    import java.io.FileOutputStream;
    import java.io.IOException;
    import java.io.InputStream;
    import java.util.ArrayList;
    import java.util.List;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.Cell;
    import org.apache.hadoop.hbase.CellUtil;
    import org.apache.hadoop.hbase.client.Get;
    import org.apache.hadoop.hbase.client.HConnection;
    import org.apache.hadoop.hbase.client.HConnectionManager;
    import org.apache.hadoop.hbase.client.HTableInterface;
    import org.apache.hadoop.hbase.client.Result;
    import org.apache.poi.xssf.usermodel.XSSFCell;
    import org.apache.poi.xssf.usermodel.XSSFSheet;
    import org.apache.poi.xssf.usermodel.XSSFWorkbook;
    
    public class ReportUtil {
    
        public static void main(String[] args) throws Exception {
    
            String year = "2018";
            String month = "12";
            String day = "21";
            String hour = "14";
    
            generateReport(year, month, day, hour);
        }
    
        private static void generateReport(String year, String month, String day, String hour) {
            HBaseDAO hBaseDAO = new HBaseDAOImpl();
            // format: yyyyMMddHH
            String begin = year + month + day + hour;
            String[] split = generateQuailifers(begin);
    
            List<Integer> userBuyCountList = getData(hBaseDAO, year, month, day, split, Common.KEY_WORD_BUY);
            List<Integer> userShoppingCarCountList = getData(hBaseDAO, year, month, day, split, Common.KEY_WORD_SHOPPING_CAR);
    
            //System.err.println(userBuyCountList.size());
            //System.err.println(userShoppingCarCountList.size());
    
            writeExcel(year, month, day, hour, userBuyCountList, userShoppingCarCountList);
        }
    
        private static void writeExcel(String year, String month, String day, String hour, List<Integer> userBuyCountList, List<Integer> userShoppingCarCountList) {
            try {
                File file = new File(Common.REPORT_TEMPLATE);
                InputStream in = new FileInputStream(file);
                XSSFWorkbook wb = new XSSFWorkbook(in);
                XSSFSheet sheet = wb.getSheetAt(0);
                if (sheet != null) {
                    XSSFCell cell = null;
    
                    cell = sheet.getRow(0).getCell(0);
                    cell.setCellValue("One Hour Report-" + year + "-" + month + "-" + day + " From " + hour + ":00 To " + hour + ":59");
    
                    putData(userBuyCountList, sheet, 3);
                    putData(userShoppingCarCountList, sheet, 7);
    
                    FileOutputStream out = new FileOutputStream(Common.REPORT_ONE_HOUR);
                    wb.write(out);
                    out.close();
                    System.err.println("done.");
                }
            } catch (Exception e) {
                System.err.println("Exception" + e);
            }
        }
    
        private static void putData(List<Integer> userBuyCountList, XSSFSheet sheet, int rowNum) {
            XSSFCell cell;
            if (userBuyCountList != null && userBuyCountList.size() > 0) {
                for (int i = 0; i < userBuyCountList.size(); i++) {
                    cell = sheet.getRow(rowNum).getCell(i + 1);
                    cell.setCellValue(userBuyCountList.get(i));
                }
            }
        }
    
        private static List<Integer> getData(HBaseDAO hBaseDAO, String year, String month, String day, String[] split, String preKey) {
            List<Integer> list = new ArrayList<Integer>();
            Result rs = hBaseDAO.getOneRowAndMultiColumn(Common.TABLE_USER_ACTION, preKey + "_" + year + "-" + month + "-" + day, split);
            for (Cell cell : rs.rawCells()) {
                String value = new String(CellUtil.cloneValue(cell)).split(":")[1].trim();
                value = value.substring(0, value.length() - 1);
                list.add(Integer.valueOf(value));
            }
            return list;
        }
    
        private static String[] generateQuailifers(String begin) {
            StringBuilder sb = new StringBuilder();
            for (int i = 0; i < 60;) {
    
                if (i == 0 || i == 5) {
                    sb.append(begin).append("0").append(i).append("00").append(",");
                } else {
                    sb.append(begin).append(i).append("00").append(",");
                }
                i = i + 5;
            }
            sb.append(begin).append("5959");
            String sbStr = sb.toString();
            String[] split = sbStr.split(",");
            return split;
        }
    }
    
    interface HBaseDAO {
        Result getOneRowAndMultiColumn(String tableName, String rowKey, String[] cols);
    }
    
    class HBaseDAOImpl implements HBaseDAO {
    
        HConnection hConnection = null;
        static Configuration conf = null;
    
        public HBaseDAOImpl() {
            conf = new Configuration();
            conf.set("hbase.zookeeper.quorum", Common.HBASE_ZOOKEEPER_LIST);
            try {
                hConnection = HConnectionManager.createConnection(conf);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        @Override
        public Result getOneRowAndMultiColumn(String tableName, String rowKey, String[] cols) {
            HTableInterface table = null;
            Result rsResult = null;
            try {
                table = hConnection.getTable(tableName);
                Get get = new Get(rowKey.getBytes());
                for (int i = 0; i < cols.length; i++) {
                    get.addColumn(Common.COLUMN_FAMILY.getBytes(), cols[i].getBytes());
                }
                rsResult = table.get(get);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                try {
                    table.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            return rsResult;
        }
    
    }
    
    class Common {
    
        // report
        public static final String REPORT_TEMPLATE = "./resources/report.xlsx";
        public static final String REPORT_ONE_HOUR = "./resources/one_report.xlsx";
    
        public static final String DATE_FORMAT_YYYYDDMMHHMMSS = "yyyyMMddHHmmss";
    
        public static final String HBASE_ZOOKEEPER_LIST = "node1:2888,node2:2888,node3:2888";
    
        // key word
        public static final String KEY_WORD_BUY = "Buy";
        public static final String KEY_WORD_SHOPPING_CAR = "Shopping_Car";
    
        // hbase
        public static final String TABLE_USER_ACTION = "t_user_actions";
        public static final String COLUMN_FAMILY = "cf";
    
    }

    7.源码下载

    Source Code:Flume_Kafka_Storm_Hbase_Hdfs_Poi_src.zip

    相应的Jar文件,由于so big,自己根据import *信息加入。

    8.总结

    学习Big Data一段时间了,通过自己的学习和摸索,实现自己想要的应用,还是很有成就感的哈....当然,踩地雷也是一种不错的体验...:)

    ========================================================

    More reading,and english is important.

    I'm Hongten

     

    大哥哥大姐姐,觉得有用打赏点哦!你的支持是我最大的动力。谢谢。
    Hongten博客排名在100名以内。粉丝过千。
    Hongten出品,必是精品。

    E | hongtenzone@foxmail.com  B | http://www.cnblogs.com/hongten

    ========================================================

  • 上一篇:关系数据库数据与hadoop数据进行转换的工具 Sqoop
    下一篇:Flume+Kafka+Storm整合
  • 【推广】 阿里云小站-上云优惠聚集地(新老客户同享)更有每天限时秒杀!
    【推广】 云服务器低至0.95折 1核2G ECS云服务器8.1元/月
    【推广】 阿里云老用户升级四重礼遇享6.5折限时折扣!
  • 原文:https://www.cnblogs.com/hongten/p/hongten_flume_kafka_storm_hbase_hdfs_poi.html
走看看 - 开发者的网上家园