zoukankan      html  css  js  c++  java
  • 大数据处理框架之Strom:Flume+Kafka+Storm整合

    环境
      虚拟机:VMware 10
      Linux版本:CentOS-6.5-x86_64
      客户端:Xshell4
      FTP:Xftp4
      jdk1.8
      storm-0.9
      apache-flume-1.6.0

    一、Flume+Kafka+Storm架构设计

    采集层:实现日志收集,使用负载均衡策略
    消息队列:作用是解耦及不同速度系统缓冲
    实时处理单元:用Storm来进行数据处理,最终数据流入DB中
    展示单元:数据可视化,使用WEB框架展示

    二、案例:
    通过flume客户端向flume采集器发送日志,flume将日志发送到kafka集群主题testflume,storm集群消费kafka主题testflume日志,将经过过滤处理的消息发送给kafka集群主题LOGError,实现数据清理。

    Client:

    package com.sxt.flume;
    
    import org.apache.flume.Event;
    import org.apache.flume.EventDeliveryException;
    import org.apache.flume.api.RpcClient;
    import org.apache.flume.api.RpcClientFactory;
    import org.apache.flume.event.EventBuilder;
    import java.nio.charset.Charset;
    
    /**
     * Flume官网案例
     * http://flume.apache.org/FlumeDeveloperGuide.html 
     * @author root
     */
    public class RpcClientDemo {
        
        public static void main(String[] args) {
            MyRpcClientFacade client = new MyRpcClientFacade();
            // Initialize client with the remote Flume agent's host and port
            client.init("node1", 41414);
    
            // Send 10 events to the remote Flume agent. That agent should be
            // configured to listen with an AvroSource.
            for (int i = 10; i < 20; i++) {
                String sampleData = "Hello Flume!ERROR" + i;
                client.sendDataToFlume(sampleData);
                System.out.println("发送数据:" + sampleData);
            }
    
            client.cleanUp();
        }
    }
    
    class MyRpcClientFacade {
        private RpcClient client;
        private String hostname;
        private int port;
    
        public void init(String hostname, int port) {
            // Setup the RPC connection
            this.hostname = hostname;
            this.port = port;
            this.client = RpcClientFactory.getDefaultInstance(hostname, port);
            // Use the following method to create a thrift client (instead of the
            // above line):
            // this.client = RpcClientFactory.getThriftInstance(hostname, port);
        }
    
        public void sendDataToFlume(String data) {
            // Create a Flume Event object that encapsulates the sample data
            Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));
    
            // Send the event
            try {
                client.append(event);
            } catch (EventDeliveryException e) {
                // clean up and recreate the client
                client.close();
                client = null;
                client = RpcClientFactory.getDefaultInstance(hostname, port);
                // Use the following method to create a thrift client (instead of
                // the above line):
                // this.client = RpcClientFactory.getThriftInstance(hostname, port);
            }
        }
    
        public void cleanUp() {
            // Close the RPC connection
            client.close();
        }
    }

    storm处理:

    /**
     * Licensed to the Apache Software Foundation (ASF) under one
     * or more contributor license agreements.  See the NOTICE file
     * distributed with this work for additional information
     * regarding copyright ownership.  The ASF licenses this file
     * to you under the Apache License, Version 2.0 (the
     * "License"); you may not use this file except in compliance
     * with the License.  You may obtain a copy of the License at
     *
     * http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    package com.sxt.storm.logfileter;
    
    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.List;
    import java.util.Properties;
    
    import backtype.storm.Config;
    import backtype.storm.LocalCluster;
    import backtype.storm.spout.SchemeAsMultiScheme;
    import backtype.storm.topology.BasicOutputCollector;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.topology.TopologyBuilder;
    import backtype.storm.topology.base.BaseBasicBolt;
    import backtype.storm.tuple.Fields;
    import backtype.storm.tuple.Tuple;
    import backtype.storm.tuple.Values;
    import storm.kafka.KafkaSpout;
    import storm.kafka.SpoutConfig;
    import storm.kafka.StringScheme;
    import storm.kafka.ZkHosts;
    import storm.kafka.bolt.KafkaBolt;
    import storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
    import storm.kafka.bolt.selector.DefaultTopicSelector;
    
    /**
     * This topology demonstrates Storm's stream groupings and multilang
     * capabilities.
     */
    public class LogFilterTopology {
    
        public static class FilterBolt extends BaseBasicBolt {
            @Override
            public void execute(Tuple tuple, BasicOutputCollector collector) {
                String line = tuple.getString(0);
                System.err.println("Accept:  " + line);
                // 包含ERROR的行留下
                if (line.contains("ERROR")) {
                    System.err.println("Filter:  " + line);
                    collector.emit(new Values(line));
                }
            }
    
            @Override
            public void declareOutputFields(OutputFieldsDeclarer declarer) {
                // 定义message提供给后面FieldNameBasedTupleToKafkaMapper使用
                declarer.declare(new Fields("message"));
            }
        }
    
        public static void main(String[] args) throws Exception {
            TopologyBuilder builder = new TopologyBuilder();
    
            // https://github.com/apache/storm/tree/master/external/storm-kafka
            // config kafka spout,话题
            String topic = "testflume";
            ZkHosts zkHosts = new ZkHosts("node1:2181,node2:2181,node3:2181");
            // /MyKafka,偏移量offset的根目录,记录队列取到了哪里
            SpoutConfig spoutConfig = new SpoutConfig(zkHosts, topic, "/MyKafka", "MyTrack");// 对应一个应用
            List<String> zkServers = new ArrayList<String>();
            System.out.println(zkHosts.brokerZkStr);
            for (String host : zkHosts.brokerZkStr.split(",")) {
                zkServers.add(host.split(":")[0]);
            }
    
            spoutConfig.zkServers = zkServers;
            spoutConfig.zkPort = 2181;
            // 是否从头开始消费
            spoutConfig.forceFromStart = true;
            spoutConfig.socketTimeoutMs = 60 * 1000;
            // StringScheme将字节流转解码成某种编码的字符串
            spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
    
            KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
    
            // set kafka spout
            builder.setSpout("kafka_spout", kafkaSpout, 3);
    
            // set bolt
            builder.setBolt("filter", new FilterBolt(), 8).shuffleGrouping("kafka_spout");
    
            // 数据写出
            // set kafka bolt
            // withTopicSelector使用缺省的选择器指定写入的topic: LogError
            // withTupleToKafkaMapper tuple==>kafka的key和message
            KafkaBolt kafka_bolt = new KafkaBolt().withTopicSelector(new DefaultTopicSelector("LogError"))
                    .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper());
    
            builder.setBolt("kafka_bolt", kafka_bolt, 2).shuffleGrouping("filter");
    
            Config conf = new Config();
            // set producer properties.
            Properties props = new Properties();
            props.put("metadata.broker.list", "node1:9092,node2:9092,node3:9092");
            /**
             * Kafka生产者ACK机制 0 : 生产者不等待Kafka broker完成确认,继续发送下一条数据 1 :
             * 生产者等待消息在leader接收成功确认之后,继续发送下一条数据 -1 :
             * 生产者等待消息在follower副本接收到数据确认之后,继续发送下一条数据
             */
            props.put("request.required.acks", "1");
            props.put("serializer.class", "kafka.serializer.StringEncoder");
            conf.put("kafka.broker.properties", props);
    
            conf.put(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList(new String[] { "node1", "node2", "node3" }));
    
            // 本地方式运行
            LocalCluster localCluster = new LocalCluster();
            localCluster.submitTopology("mytopology", conf, builder.createTopology());
    
        }
    }
    /**
     * Licensed to the Apache Software Foundation (ASF) under one
     * or more contributor license agreements.  See the NOTICE file
     * distributed with this work for additional information
     * regarding copyright ownership.  The ASF licenses this file
     * to you under the Apache License, Version 2.0 (the
     * "License"); you may not use this file except in compliance
     * with the License.  You may obtain a copy of the License at
     *
     * http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    package com.sxt.storm.logfileter;
    
    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.List;
    import java.util.Properties;
    
    import backtype.storm.Config;
    import backtype.storm.LocalCluster;
    import backtype.storm.spout.SchemeAsMultiScheme;
    import backtype.storm.topology.BasicOutputCollector;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.topology.TopologyBuilder;
    import backtype.storm.topology.base.BaseBasicBolt;
    import backtype.storm.tuple.Fields;
    import backtype.storm.tuple.Tuple;
    import backtype.storm.tuple.Values;
    import storm.kafka.KafkaSpout;
    import storm.kafka.SpoutConfig;
    import storm.kafka.StringScheme;
    import storm.kafka.ZkHosts;
    import storm.kafka.bolt.KafkaBolt;
    import storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
    import storm.kafka.bolt.selector.DefaultTopicSelector;
    
    /**
     * This topology demonstrates Storm's stream groupings and multilang
     * capabilities.
     */
    public class LogFilterTopology {
    
        public static class FilterBolt extends BaseBasicBolt {
            @Override
            public void execute(Tuple tuple, BasicOutputCollector collector) {
                String line = tuple.getString(0);
                System.err.println("Accept:  " + line);
                // 包含ERROR的行留下
                if (line.contains("ERROR")) {
                    System.err.println("Filter:  " + line);
                    collector.emit(new Values(line));
                }
            }
    
            @Override
            public void declareOutputFields(OutputFieldsDeclarer declarer) {
                // 定义message提供给后面FieldNameBasedTupleToKafkaMapper使用
                declarer.declare(new Fields("message"));
            }
        }
    
        public static void main(String[] args) throws Exception {
            TopologyBuilder builder = new TopologyBuilder();
    
            // https://github.com/apache/storm/tree/master/external/storm-kafka
            // config kafka spout,话题
            String topic = "testflume";
            ZkHosts zkHosts = new ZkHosts("node1:2181,node2:2181,node3:2181");
            // /MyKafka,偏移量offset的根目录,记录队列取到了哪里
            SpoutConfig spoutConfig = new SpoutConfig(zkHosts, topic, "/MyKafka", "MyTrack");// 对应一个应用
            List<String> zkServers = new ArrayList<String>();
            System.out.println(zkHosts.brokerZkStr);
            for (String host : zkHosts.brokerZkStr.split(",")) {
                zkServers.add(host.split(":")[0]);
            }
    
            spoutConfig.zkServers = zkServers;
            spoutConfig.zkPort = 2181;
            // 是否从头开始消费
            spoutConfig.forceFromStart = true;
            spoutConfig.socketTimeoutMs = 60 * 1000;
            // StringScheme将字节流转解码成某种编码的字符串
            spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
    
            KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
    
            // set kafka spout
            builder.setSpout("kafka_spout", kafkaSpout, 3);
    
            // set bolt
            builder.setBolt("filter", new FilterBolt(), 8).shuffleGrouping("kafka_spout");
    
            // 数据写出
            // set kafka bolt
            // withTopicSelector使用缺省的选择器指定写入的topic: LogError
            // withTupleToKafkaMapper tuple==>kafka的key和message
            KafkaBolt kafka_bolt = new KafkaBolt().withTopicSelector(new DefaultTopicSelector("LogError"))
                    .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper());
    
            builder.setBolt("kafka_bolt", kafka_bolt, 2).shuffleGrouping("filter");
    
            Config conf = new Config();
            // set producer properties.
            Properties props = new Properties();
            props.put("metadata.broker.list", "node1:9092,node2:9092,node3:9092");
            /**
             * Kafka生产者ACK机制 0 : 生产者不等待Kafka broker完成确认,继续发送下一条数据 1 :
             * 生产者等待消息在leader接收成功确认之后,继续发送下一条数据 -1 :
             * 生产者等待消息在follower副本接收到数据确认之后,继续发送下一条数据
             */
            props.put("request.required.acks", "1");
            props.put("serializer.class", "kafka.serializer.StringEncoder");
            conf.put("kafka.broker.properties", props);
    
            conf.put(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList(new String[] { "node1", "node2", "node3" }));
    
            // 本地方式运行
            LocalCluster localCluster = new LocalCluster();
            localCluster.submitTopology("mytopology", conf, builder.createTopology());
    
        }
    }


    参考:
    美团日志收集系统
    Apache Flume
    Apache Flume负载均衡

  • 相关阅读:
    怎样编写YARN应用程序
    Oracle整形转字符串to_char()
    js 前加分号和感叹号的含义
    Android 多屏适配解决方式
    nginx负载均衡基于ip_hash的session粘帖
    mysql锁SELECT FOR UPDATE【转】
    redis主从复制
    mysql 优化实例之索引创建
    mysql sql优化实例
    MySQL 慢查询日志分析及可视化结果
  • 原文地址:https://www.cnblogs.com/cac2020/p/10791843.html
Copyright © 2011-2022 走看看