代码改变世界
[登录 · 注册]
  • Flume+Kafka+Storm整合
  • Flume+Kafka+Storm整合

    1. 需求:

    有一个客户端Client可以产生日志信息,我们需要通过Flume获取日志信息,再把该日志信息放入到Kafka的一个Topic:flume-to-kafka

    再由Storm读取该topic:flume-to-kafka,进行日志分析处理(这里我们做的逻辑处理为filter,即过滤日志信息),处理完日志信息后,再由Storm把处理好的日志信息放入到Kafka的另一个topic:storm-to-kafka

    2.组件分布情况

    我总共搭建了3个节点node1,node2,node3

    Zookeeper安装在node1,node2,nod3

    Flume安装在node2

    Kafka安装在node1,node2,node3

    Storm安装在node1,node2,node3

    3.JDK安装

    --在node1, node2, node3上面安装jdk
    --install JDK   -- http://blog.51cto.com/vvxyz/1642258(LInux安装jdk的三种方法)
    --解压安装
    rpm -ivh your-package.rpm
    
    --修改环境变量
    vi /etc/profile
    
    JAVA_HOME=/usr/java/jdk1.7.0_67
    JRE_HOME=/usr/java/jdk1.7.0_67/jre
    CLASS_PATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib
    PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin
    export JAVA_HOME JRE_HOME CLASS_PATH PATH
    
    :wq
    
    --使配置有效
    source /etc/profile

    4.Zookeeper的安装

    ---============================
    --解压zookeeper压缩包并安装
    tar -zxvf zookeeper-3.4.6.tar.gz 
    
    --创建zookeeper的软链
    ln -sf /root/zookeeper-3.4.6 /home/zk
    
    --配置zookeeper
    cd /home/zk/conf/
    
    --把下面的zoo_sample.cfg文件重新命名
    cp zoo_sample.cfg zoo.cfg
    
    --修改zoo.cfg配置文件
    vi zoo.cfg
    
    --设置zookeeper的文件存放目录
    --找到dataDir=/tmp/zookeeper,并设置为下面值
    dataDir=/opt/zookeeper
    
    --设置zookeeper集群
    server.1=node1:2888:3888
    server.2=node2:2888:3888
    server.3=node3:2888:3888
    
    :wq
    
    
    --创建/opt/zookeeper目录
    mkdir /opt/zookeeper
    
    --进入/opt/zookeeper目录
    cd /opt/zookeeper
    
    --创建一个文件myid
    vi myid
    
    --输入1
    1
    :wq
    --以此类推,在node2,node3,值分别是2, 3
    
    --拷贝zookeeper目录到node2, node3的/opt/目录下面
    cd ..
    scp -r zookeeper/ root@node2:/opt/
    scp -r zookeeper/ root@node3:/opt/
    
    --分别进入到node2, node3里面,修改/opt/zookeeper/myid,值分别是2, 3
    
    
    --作为以上配置,把node1里面的zookeeper拷贝到node2, node3上面。
    scp -r zookeeper-3.4.6 root@node2:~/
    scp -r zookeeper-3.4.6 root@node3:~/
    
    --分别进入到node2, node3里面,创建软链
    ln -sf /root/zookeeper-3.4.6/ /home/zk
    
    
    --配置zookeeper环境变量
    cd /home/zk/bin
    
    --修改/etc/profile文件,把zookeeper的bin目录路径添加进去
    vi /etc/profile
    PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin:/home/zk/bin
    
    --让配置文件生效
    source /etc/profile
    
    --分别进入到node2, node3里面,修改/etc/profile文件,把zookeeper的bin目录路径添加进去
    
    
    --作为环境变量配置,就可以启动zookeeper了。
    --分别在node1, node2, node3上面启动zookeeper
    zkServer.sh start
    
    
    --测试是否启动成功
    jps
    --观察是否有QuorumPeerMain进程

    5.Flume的安装

    ---------------------------------------------------
    --安装Flume
    --把安装包上传到node2上面
    cd
    tar -zxvf apache-flume-1.6.0-bin.tar.gz 
    
    --创建软链
    ln -s /root/apache-flume-1.6.0-bin /home/flume
    
    --配置flume
    cd /root/apache-flume-1.6.0-bin/conf
    
    cp flume-env.sh.template flume-env.sh
    
    vi flume-env.sh
    
    --配置JDK
    export JAVA_HOME=/usr/java/jdk1.7.0_67
    
    :wq
    
    
    --加入系统变量
    vi /etc/profile
    
    export FLUME_HOME=/root/apache-flume-1.6.0-bin
    export PATH=$PATH:$FLUME_HOME/bin
    
    :wq
    
    source /etc/profile
    
    --验证是否安装成功
    flume-ng version
    
    flume-ng version
    Flume 1.6.0
    Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
    Revision: 2561a23240a71ba20bf288c7c2cda88f443c2080
    Compiled by hshreedharan on Mon May 11 11:15:44 PDT 2015
    From source with checksum b29e416802ce9ece3269d34233baf43f

    6.Kafka的安装

    ---------------------------------------------------
    --kafka安装
    --在node1, node2, node3上面搭建kafka
    --先进入node1
    mkdir /root/kafka
    
    --解压
    tar -zxvf kafka_2.10-0.8.2.2.tgz
    
    --创建软链
    ln -s /root/kafka/kafka_2.10-0.8.2.2 /home/kafka-2.10
    
    --配置
    cd /root/kafka/kafka_2.10-0.8.2.2/config
    
    vi server.properties
    
    --node1=0, node2=1,node2=2
    broker.id=0
    log.dirs=/opt/kafka-log
    zookeeper.connect=node1:2181,node2:2181,node3:2181
    
    :wq
    
    
    --为了启动方便
    cd /root/kafka/kafka_2.10-0.8.2.2
    vi start-kafka.sh
    
    nohup bin/kafka-server-start.sh  config/server.properties > kafka.log 2>&1 &
    
    :wq
    
    chmod 755 start-kafka.sh
    
    
    
    --配置好以后
    --分发到node2,node3
    cd /root/kafka/
    scp -r kafka_2.10-0.8.2.2/ root@node2:/root/kafka
    scp -r kafka_2.10-0.8.2.2/ root@node3:/root/kafka
    
    --进入到node2
    cd /root/kafka/kafka_2.10-0.8.2.2/config
    
    vi server.properties
    
    --node1=0, node2=1,node2=2
    broker.id=1
    
    :wq
    
    
    --进入到node3
    cd /root/kafka/kafka_2.10-0.8.2.2/config
    
    vi server.properties
    
    --node1=0, node2=1,node2=2
    broker.id=2
    
    :wq
    
    --启动kafka
    ./zkServer.sh start
    
    --分别进入node1,node2,node3
    cd /root/kafka/kafka_2.10-0.8.2.2
    ./start-kafka.sh
    
    
    --检查是否启动
    jps
    查看是否有Kafka进程

    7.Storm的安装

    ------------
    --Storm分布式安装
    --部署到node1,node2,node3节点上
    --进入node1
    cd /root/apache-storm-0.10.0/conf
    
    vi storm.yaml
    
    
    --配置如下
    # storm.zookeeper.servers:
         - "node1"
         - "node2"
         - "node3"
    #  
    nimbus.host: "node1"
    storm.local.dir: "/opt/storm"
    supervisor.slots.ports:
         - 6700
         - 6701
         - 6702
         - 6703
    
    :wq
    
    --从node1分发到node2,node3
    scp -r apache-storm-0.10.0 root@node2:/
    scp -r apache-storm-0.10.0 root@node3:/
    
    
    --分别进入node2,node3创建软链
    ln -r /root/apache-storm-0.10.0 /home/storm-0.10
    
    
    --分别进入node1,node2,node3快捷启动
    cd /root/apache-storm-0.10.0
    
    vi start-storm.sh
    
    nohup bin/storm nimbus >> logs/numbus.out 2>&1 &
    nohup bin/storm supervisor >> logs/supervisor.out 2>&1 &
    
    
    --node1上面配置,node2,node3上面不需要UI
    nohup bin/storm ui >> logs/ui.out 2>&1 &
    nohup bin/storm drpc >> logs/drpc.out 2>&1 &
    
    
    :wq
    
    --分别进入node1,node2,node3快捷stop-storm
    
    vi stop-storm.sh
    
    --node1上面配置,node2,node3上面不需要UI
    kill -9 `jps | grep core | awk '{print $1}'`
    
    
    kill -9 `jps | grep supervisor | awk '{print $1}'`
    kill -9 `jps | grep nimbus | awk '{print $1}'`
    kill -9 `jps | grep worker | awk '{print $1}'`
    kill -9 `jps | grep LogWriter | awk '{print $1}'`
    
    :wq
    
    
    chmod 755 start-storm.sh
    chmod 755 stop-storm.sh
    
    
    
    --启动Zookeeper服务
    --在node1,node2,node3上面启动
    zkServer.sh start
    
    --在node1,node2,node3上面启动Storm
    cd /root/apache-storm-0.10.0
    ./start-storm.sh
    
    
    --上传storm_wc.jar 文件
    ./storm jar /root/storm_wc.jar storm.wordcount.Test wordcount
    
    
    ------------
    Storm DRPC 配置
    --进入node1
    cd /root/apache-storm-0.10.0/conf
    
    vi storm.yaml
    
    drpc.servers:     
         - "node1"
    
    :wq
    
    --从node1,分发到node2,node3
    cd /root/apache-storm-0.10.0/conf/
    scp -r root@node2:/root/apache-storm-0.10.0/conf
    scp -r root@node3:/root/apache-storm-0.10.0/conf
    
    
    --配置完,进入node1,node2,node3
    cd /root/apache-storm-0.10.0
    ./start-storm.sh &

    8.Flume+Kafka+Storm整合

    8.1.配置Flume

    --从node2
    cd flumedir
    
    vi flume_to_kafka
    
    --node2配置如下
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the source
    a1.sources.r1.type = avro
    a1.sources.r1.bind = node2
    a1.sources.r1.port = 41414
    
    # Describe the sink
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.topic = flume-to-kafka
    a1.sinks.k1.brokerList = node1:9092,node2:9092,node3:9092
    a1.sinks.k1.requiredAcks = 1
    a1.sinks.k1.batchSize = 20
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000000
    a1.channels.c1.transactionCapacity = 10000
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    
    :wq

    8.2.启动Zookeeper

    --启动Zookeeper,在node1,node2,node3
    --关闭防火墙
    service iptables stop
    
    --启动Zookeeper
    zkServer.sh start

    8.3.启动Kfaka

    --启动kafka
    --分别进入node1,node2,node3
    cd /root/kafka/kafka_2.10-0.8.2.2
    ./start-kafka.sh

    8.4.启动Flume

    --进入node2,启动
    cd /root/flumedir
    
    flume-ng agent -n a1 -c conf -f flume_to_kafka -Dflume.root.logger=DEBUG,console

    8.4.启动客户端Client

    启动客户端产生日志信息。

    大家可以参考RPC clients - Avro and Thrift的代码

    import org.apache.flume.Event;
    import org.apache.flume.EventDeliveryException;
    import org.apache.flume.api.RpcClient;
    import org.apache.flume.api.RpcClientFactory;
    import org.apache.flume.event.EventBuilder;
    import java.nio.charset.Charset;
    
    public class MyApp {
        public static void main(String[] args) {
            MyRpcClientFacade1 client = new MyRpcClientFacade1();
            // Initialize client with the remote Flume agent's host and port
            client.init("node2", 41414);
    
            // Send 10 events to the remote Flume agent. That agent should be
            // configured to listen with an AvroSource.
            String sampleData = "Hello ERROR ! ------ Test";
            for (int i = 500; i < 505; i++) {
                client.sendDataToFlume(sampleData + " " + i);
                System.out.println(sampleData + " " + i);
            }
    
            client.cleanUp();
        }
    }
    
    class MyRpcClientFacade1 {
        private RpcClient client;
        private String hostname;
        private int port;
    
        public void init(String hostname, int port) {
            // Setup the RPC connection
            this.hostname = hostname;
            this.port = port;
            this.client = RpcClientFactory.getDefaultInstance(hostname, port);
            // Use the following method to create a thrift client (instead of the
            // above line):
            // this.client = RpcClientFactory.getThriftInstance(hostname, port);
        }
    
        public void sendDataToFlume(String data) {
            // Create a Flume Event object that encapsulates the sample data
            Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));
    
            // Send the event
            try {
                client.append(event);
            } catch (EventDeliveryException e) {
                // clean up and recreate the client
                client.close();
                client = null;
                client = RpcClientFactory.getDefaultInstance(hostname, port);
                // Use the following method to create a thrift client (instead of
                // the above line):
                // this.client = RpcClientFactory.getThriftInstance(hostname, port);
            }
        }
    
        public void cleanUp() {
            // Close the RPC connection
            client.close();
        }
    }

    在eclipse控制台输出的结果是:

    [ WARN ] - [ org.apache.flume.api.NettyAvroRpcClient.configure(NettyAvroRpcClient.java:505) ] Invalid value for batchSize: 0; Using default value.
    [ WARN ] - [ org.apache.flume.api.NettyAvroRpcClient.configure(NettyAvroRpcClient.java:634) ] Using default maxIOWorkers
    Hello ERROR ! ------ Test 500
    Hello ERROR ! ------ Test 501
    Hello ERROR ! ------ Test 502
    Hello ERROR ! ------ Test 503
    Hello ERROR ! ------ Test 504

    8.5.查看Kafka的Topic

    --进入node3,查看kafka的topic
    cd /home/kafka-2.10/bin
    ./kafka-topics.sh --zookeeper node1,node2,node3 --list

    可以看到,由于客户端代码的执行,Kafka里面的topic:flume-to-kafka被自动创建

    8.6.启动Kafka Consumer:flume-to-kafka

    我们在这里是查看topic: flume-to-kafka的消费信息

    --进入node3,启动kafka消费者
    cd /home/kafka-2.10/bin
    ./kafka-console-consumer.sh --zookeeper node1,node2,node3 --from-beginning --topic flume-to-kafka

    控制台输出:

    Hello ERROR ! ------ Test 500
    Hello ERROR ! ------ Test 501
    Hello ERROR ! ------ Test 502
    Hello ERROR ! ------ Test 503
    Hello ERROR ! ------ Test 504

    8.7.创建Topic:storm-to-kafka

    在Kafka里面创建另一个topic:

    --进入node1,创建一个topic:storm-to-kafka
    --设置3个partitions
    --replication-factor=3
    ./kafka-topics.sh --zookeeper node1,node2,node3 --create --topic storm-to-kafka --partitions 3 --replication-factor 3

    8.8.运行Storm代码

    package storm.logfilter;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Properties;
    
    import storm.kafka.KafkaSpout;
    import storm.kafka.SpoutConfig;
    import storm.kafka.StringScheme;
    import storm.kafka.ZkHosts;
    import storm.kafka.bolt.KafkaBolt;
    import storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
    import storm.kafka.bolt.selector.DefaultTopicSelector;
    import backtype.storm.Config;
    import backtype.storm.LocalCluster;
    import backtype.storm.StormSubmitter;
    import backtype.storm.spout.SchemeAsMultiScheme;
    import backtype.storm.topology.BasicOutputCollector;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.topology.TopologyBuilder;
    import backtype.storm.topology.base.BaseBasicBolt;
    import backtype.storm.tuple.Fields;
    import backtype.storm.tuple.Tuple;
    import backtype.storm.tuple.Values;
    
    public class LogFilterTopology {
    
        public static class FilterBolt extends BaseBasicBolt {
            private static final long serialVersionUID = 1L;
    
            @Override
            public void execute(Tuple tuple, BasicOutputCollector collector) {
                String line = tuple.getString(0);
                // 包含ERROR的行留下
                if (line.contains("ERROR")) {
                    System.err.println("Filter:  " + line + " ~ filtered.");
                    collector.emit(new Values(line + " ~ filtered."));
                }
            }
    
            @Override
            public void declareOutputFields(OutputFieldsDeclarer declarer) {
                // 定义message提供给后面FieldNameBasedTupleToKafkaMapper使用
                declarer.declare(new Fields("message"));
            }
        }
    
        public static void main(String[] args) throws Exception {
            TopologyBuilder builder = new TopologyBuilder();
    
            // https://github.com/apache/storm/tree/master/external/storm-kafka
            // config kafka spout,话题
            String topic = "flume-to-kafka";
            ZkHosts zkHosts = new ZkHosts("node1:2181,node2:2181,node3:2181");
            // /MyKafka,偏移量offset的根目录,记录队列取到了哪里
            SpoutConfig spoutConfig = new SpoutConfig(zkHosts, topic, "/MyKafka", "MyTrack");
            List<String> zkServers = new ArrayList<String>();
            for (String host : zkHosts.brokerZkStr.split(",")) {
                zkServers.add(host.split(":")[0]);
            }
    
            spoutConfig.zkServers = zkServers;
            spoutConfig.zkPort = 2181;
            // 是否从头开始消费
            spoutConfig.forceFromStart = true;
            spoutConfig.socketTimeoutMs = 60 * 1000;
            // StringScheme将字节流转解码成某种编码的字符串
            spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
    
            KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
    
            // set kafka spout
            builder.setSpout("kafkaSpout", kafkaSpout, 3);
    
            // set bolt
            builder.setBolt("filterBolt", new FilterBolt(), 8).shuffleGrouping("kafkaSpout");
    
            // 数据写出
            // set kafka bolt
            // withTopicSelector使用缺省的选择器指定写入的topic: storm-to-kafka
            // withTupleToKafkaMapper tuple==>kafka的key和message
            KafkaBolt kafka_bolt = new KafkaBolt().withTopicSelector(new DefaultTopicSelector("storm-to-kafka"))
                    .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper());
    
            builder.setBolt("kafkaBolt", kafka_bolt, 2).shuffleGrouping("filterBolt");
    
            Config conf = new Config();
            // set producer properties.
            Properties props = new Properties();
            props.put("metadata.broker.list", "node1:9092,node2:9092,node3:9092");
            /**
             * Kafka生产者ACK机制 0 : 生产者不等待Kafka broker完成确认,继续发送下一条数据 1 :
             * 生产者等待消息在leader接收成功确认之后,继续发送下一条数据 -1 :
             * 生产者等待消息在follower副本接收到数据确认之后,继续发送下一条数据
             */
            props.put("request.required.acks", "1");
            props.put("serializer.class", "kafka.serializer.StringEncoder");
            conf.put("kafka.broker.properties", props);
    
            conf.put(Config.STORM_ZOOKEEPER_SERVERS, zkServers);
            
            if (args == null || args.length == 0) {
                // 本地方式运行
                LocalCluster localCluster = new LocalCluster();
                localCluster.submitTopology("storm-kafka-topology", conf, builder.createTopology());
            } else {
                // 集群方式运行
                conf.setNumWorkers(3);
                StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
            }
            
        }
    }

    8.9.启动Kafka Consumer:storm-to-kafka

    我们在这里是查看topic: storm-to-kafka的消费信息

    --进入node1,启动kafka消费者
    cd /home/kafka-2.10/bin
    ./kafka-console-consumer.sh --zookeeper node1,node2,node3 --from-beginning --topic storm-to-kafka

    控制台输出:

    Hello ERROR ! ------ Test 504 ~ filtered.
    Hello ERROR ! ------ Test 500 ~ filtered.
    Hello ERROR ! ------ Test 501 ~ filtered.
    Hello ERROR ! ------ Test 503 ~ filtered.
    Hello ERROR ! ------ Test 502 ~ filtered.

    ========================================================

    More reading,and english is important.

    I'm Hongten

     

    大哥哥大姐姐,觉得有用打赏点哦!你的支持是我最大的动力。谢谢。
    Hongten博客排名在100名以内。粉丝过千。
    Hongten出品,必是精品。

    E | hongtenzone@foxmail.com  B | http://www.cnblogs.com/hongten

    ========================================================

  • 上一篇:Flume+Kafka+Storm+Hbase+HDSF+Poi整合
    下一篇:hadoop2elasticsearch的安装
  • 【推广】 阿里云小站-上云优惠聚集地(新老客户同享)更有每天限时秒杀!
    【推广】 云服务器低至0.95折 1核2G ECS云服务器8.1元/月
    【推广】 阿里云老用户升级四重礼遇享6.5折限时折扣!
  • 原文:https://www.cnblogs.com/hongten/p/hongten_flume_kafka_storm.html
走看看 - 开发者的网上家园