zoukankan      html  css  js  c++  java
  • flume+kafka+storm


    centos06.6+JDK1.7

    flume1.4+kafka2.10+storm0.9.3

    zookeeper3.4.6


    集群:

    192.168.80.133 x01

    192.168.80.134 x02


    1.两台机器上设置hostname和hosts

    。。。

    2.两台机器上安装JDK并设置环境变量

    3.下载安装zookeeper并设置环境变量

    # example sakes.
    dataDir=/data/zookeeper/data
    # the port at which the clients will connect
    clientPort=2181
    # the maximum number of client connections.
    # increase this if you need to handle more clients
    #maxClientCnxns=60
    #
    # Be sure to read the maintenance section of the
    # administrator guide before turning on autopurge.
    #
    # http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
    #
    # The number of snapshots to retain in dataDir
    #autopurge.snapRetainCount=3
    # Purge task interval in hours
    # Set to "0" to disable auto purge feature
    #autopurge.purgeInterval=1
    
    server.1=x01:2888:3888
    server.2=x02:2888:3888
    zkServer.sh start
    zkserver.sh status

    4.下载安装flume

     http://www.cnblogs.com/admln/p/flume.html

    5.下载安装kafka

    http://www.cnblogs.com/admln/p/kafka-install.html

    6.整合flume和kafka

    下载整合插件flumeng-kafka-plugin:https://github.com/beyondj2ee/flumeng-kafka-plugin

    提取插件中的flume-conf.properties,修改后放到kafka的conf目录下

    ############################################
    #  producer config
    ###########################################
    
    #agent section
    producer.sources = s
    producer.channels = c
    producer.sinks = r
    
    #source section
    producer.sources.s.type = spooldir
    producer.sources.s.spoolDir=/home/hadoop/testFlume
    producer.sources.s.fileHeader=false
    producer.sources.s.channels = c
    
    # Each sink's type must be defined
    producer.sinks.r.type = org.apache.flume.plugins.KafkaSink
    producer.sinks.r.metadata.broker.list=x01:9092
    producer.sinks.r.partition.key=0
    producer.sinks.r.partitioner.class=org.apache.flume.plugins.SinglePartition
    producer.sinks.r.serializer.class=kafka.serializer.StringEncoder
    producer.sinks.r.request.required.acks=0
    producer.sinks.r.max.message.size=1000000
    producer.sinks.r.producer.type=sync
    producer.sinks.r.custom.encoding=UTF-8
    producer.sinks.r.custom.topic.name=test
    
    #Specify the channel the sink should use
    producer.sinks.r.channel = c
    
    # Each channel's type is defined.
    producer.channels.c.type = memory
    producer.channels.c.capacity = 1000

    将Plugin中的jar包拷贝到flume的lib目录中

    在/home/hadoop/testFlume中放入文件,在kafka中启用一个console的consumer来测试

    bin/flume-ng agent -n producer -c conf -f conf/kafka.conf -Dflume.root.logger=DEBUG,console
    bin/kafka-console-consumer.sh --zookeeper x01:2181 --topic test --from-beginning

    测试成功

    注意:如果让flume传输中文的话,文件编码最好是UTF-8,否则容易乱码导致flume死掉

    7.安装storm

    http://www.cnblogs.com/admln/p/storm-install.html

    8.整合storm和kafka

    将kafka的一些jar包复制到storm的lib目录中

    cp kafka_2.10-0.8.1.1/libs/kafka_2.10-0.8.1.1.jar apache-storm-0.9.3/lib/
    cp kafka_2.10-0.8.1.1/libs/scala-library-2.10.1.jar apache-storm-0.9.3/lib/
    cp kafka_2.10-0.8.1.1/libs/metrics-core-2.2.0.jar apache-storm-0.9.3/lib/
    cp kafka_2.10-0.8.1.1/libs/snappy-java-1.0.5.jar apache-storm-0.9.3/lib/
    cp kafka_2.10-0.8.1.1/libs/zkclient-0.3.jar apache-storm-0.9.3/lib/
    cp kafka_2.10-0.8.1.1/libs/log4j-1.2.15.jar apache-storm-0.9.3/lib/
    cp kafka_2.10-0.8.1.1/libs/slf4j-api-1.7.2.jar apache-storm-0.9.3/lib/
    cp kafka_2.10-0.8.1.1/libs/jopt-simple-3.2.jar apache-storm-0.9.3/lib/

    把zookeeper的zookeeper-3.4.6.jar复制到storm的lib目录中

    cp zookeeper-3.4.6/zookeeper-3.4.6.jar apache-storm-0.9.3/lib/

    编写storm程序来测试

    pom.xml

    <dependencies>
        <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>3.8.1</version>
                <scope>test</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.storm</groupId>
                <artifactId>storm-core</artifactId>
                <version>0.9.3</version>
            </dependency>
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka_2.10</artifactId>
                <version>0.8.1.1</version>
                <exclusions>
                    <exclusion>
                        <groupId>org.apache.zookeeper</groupId>
                        <artifactId>zookeeper</artifactId>
                    </exclusion>
                    <exclusion>
                        <groupId>log4j</groupId>
                        <artifactId>log4j</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
      </dependencies>

    spout

    package org.admln.flume_kafka_storm;
    
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.Properties;
    
    import kafka.consumer.ConsumerConfig;
    import kafka.consumer.ConsumerIterator;
    import kafka.consumer.KafkaStream;
    import kafka.javaapi.consumer.ConsumerConnector;
    import backtype.storm.spout.SpoutOutputCollector;
    import backtype.storm.task.TopologyContext;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.topology.base.BaseRichSpout;
    import backtype.storm.tuple.Fields;
    import backtype.storm.tuple.Values;
    
    public class KafkaSpout extends BaseRichSpout {
        
        private static final long serialVersionUID = -9174998944310422274L;
        private SpoutOutputCollector collector;
        private ConsumerConnector consumer;
        private String topic;
     
        public KafkaSpout() {}
         
        public KafkaSpout(String topic) {
            this.topic = topic;
        }
     
        public void nextTuple() {    }
     
        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
            this.collector = collector;
        }
     
        public void ack(Object msgId) {    }
     
        public void activate() {         
            consumer =kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig()); 
            Map<String,Integer> topickMap = new HashMap<String, Integer>();  
            topickMap.put(topic, 1);  
     
            System.out.println("*********Results********topic:"+topic);  
     
            Map<String, List<KafkaStream<byte[],byte[]>>>  streamMap=consumer.createMessageStreams(topickMap);  
            KafkaStream<byte[],byte[]>stream = streamMap.get(topic).get(0);  
            ConsumerIterator<byte[],byte[]> it =stream.iterator();   
            while(it.hasNext()){  
                 String value =new String(it.next().message());
                 System.out.println("storm接收到来自kafka的消息------->" + value);
                 collector.emit(new Values(value), value);
            }  
        }
         
        private static ConsumerConfig createConsumerConfig() {  
            Properties props = new Properties();  
            // 设置zookeeper的链接地址
            props.put("zookeeper.connect","x01:2181,x02:2181");  
            // 设置group id
            props.put("group.id", "1");  
            // kafka的group 消费记录是保存在zookeeper上的, 但这个信息在zookeeper上不是实时更新的, 需要有个间隔时间更新
            props.put("auto.commit.interval.ms", "1000");
            props.put("zookeeper.session.timeout.ms","10000");  
            return new ConsumerConfig(props);  
        }  
     
        public void close() {    }
     
        public void deactivate() {    }
     
        public void fail(Object msgId) {    }
     
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }
     
        public Map<String, Object> getComponentConfiguration() {
            System.out.println("getComponentConfiguration被调用");
            topic="test";
            return null;
        }
    }

    bolt(wordsplitter)

    package org.admln.flume_kafka_storm;
    
    import java.util.Map;
    
    import backtype.storm.task.OutputCollector;
    import backtype.storm.task.TopologyContext;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.topology.base.BaseRichBolt;
    import backtype.storm.tuple.Fields;
    import backtype.storm.tuple.Tuple;
    import backtype.storm.tuple.Values;
    
    public class KafkaWordSplitterBolt extends BaseRichBolt {
    
        private static final long serialVersionUID = 886149197481637894L;
        private OutputCollector collector;
       
        public void prepare(Map stormConf, TopologyContext context,
                  OutputCollector collector) {
             this.collector = collector;              
        }
    
        public void execute(Tuple input) {
             String line = input.getString(0);
             String[] words = line.split(",");
             for(String word : words) {
             //这里除了提交一个传向下个bolt的list集,还把tuple提交了,这是collector的emit方法之一,为了下面的ack错误恢复 collector.emit(input,
    new Values(word, 1)); } collector.ack(input); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "count")); } }

    bolt(wordcount)

    package org.admln.flume_kafka_storm;
    
    import java.util.HashMap;
    import java.util.Iterator;
    import java.util.Map;
    import java.util.Map.Entry;
    import java.util.concurrent.atomic.AtomicInteger;
    
    import backtype.storm.task.OutputCollector;
    import backtype.storm.task.TopologyContext;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.topology.base.BaseRichBolt;
    import backtype.storm.tuple.Fields;
    import backtype.storm.tuple.Tuple;
    
    public class KafkaWordCounterBolt extends BaseRichBolt {
        private static final long serialVersionUID = 886149197481637894L;
        private OutputCollector collector;
        private Map<String, AtomicInteger> counterMap;
    
        public void prepare(Map stormConf, TopologyContext context,
                OutputCollector collector) {
            this.collector = collector;
            this.counterMap = new HashMap<String, AtomicInteger>();
        }
    
        public void execute(Tuple input) {
            String word = input.getString(0);
            int count = input.getInteger(1);
            AtomicInteger ai = this.counterMap.get(word);
            if (ai == null) {
                ai = new AtomicInteger();
                this.counterMap.put(word, ai);
            }
            ai.addAndGet(count);
            collector.ack(input);
        }
    
        public void cleanup() {
            Iterator<Entry<String, AtomicInteger>> iter = this.counterMap
                    .entrySet().iterator();
            while (iter.hasNext()) {
                Entry<String, AtomicInteger> entry = iter.next();
                System.out.println(entry.getKey() + "	:	" + entry.getValue().get());
            }
    
        }
    
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word", "count"));
        }
    }

    topology

    package org.admln.flume_kafka_storm;
    
    import java.util.HashMap;
    import java.util.Map;
    
    import backtype.storm.Config;
    import backtype.storm.LocalCluster;
    import backtype.storm.StormSubmitter;
    import backtype.storm.generated.AlreadyAliveException;
    import backtype.storm.generated.InvalidTopologyException;
    import backtype.storm.topology.TopologyBuilder;
    import backtype.storm.tuple.Fields;
    
    public class KafkaTopology {
    
        public static void main(String[] args) throws AlreadyAliveException,
                InvalidTopologyException {
            TopologyBuilder builder = new TopologyBuilder();
            builder.setSpout("spout", new KafkaSpout(""), 1);
         //bolt1 是此bolt在这个图中的ID
         //2表示启用多少线程来运行,可以省略,省略的话则默认分配一个线程
    builder.setBolt(
    "bolt1", new KafkaWordSplitterBolt(), 2) .shuffleGrouping("spout"); builder.setBolt("bolt2", new KafkaWordCounterBolt(), 2).fieldsGrouping( "bolt1", new Fields("word")); String name = KafkaTopology.class.getSimpleName(); if (args != null && args.length > 0) { Config conf = new Config(); // 通过指定nimbus主机 conf.put(Config.NIMBUS_HOST, args[0]); conf.setNumWorkers(2); StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology()); } else { Map conf = new HashMap(); conf.put(Config.TOPOLOGY_WORKERS, 1); conf.put(Config.TOPOLOGY_DEBUG, true); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("my-flume-kafka-storm-topology-integration", conf, builder.createTopology()); } } }

    可以直接在eclipse中本地运行也可以放到集群上运行

    集群上

    bin/storm jar flume-kafka-storm.jar org.admln.flume_kafka_storm.KafkaToplology x01

  • 相关阅读:
    nginx优化:使用expires在浏览器端缓存静态文件
    nginx优化:worker_processes/worker_connections/worker_rlimit_nofile
    centos8平台使用ulimit做系统资源限制
    centos8平台nginx服务配置打开文件限制max open files limits
    nginx安全:配置allow/deny控制ip访问(ngx_http_access_module)
    python 菜鸟入门
    正则表达式预查询
    selenium 关键字驱动部分设计思路
    Idea安装Python插件并配置Python SDK
    ORACLE LOG的管理
  • 原文地址:https://www.cnblogs.com/admln/p/flume-kafka-storm.html
Copyright © 2011-2022 走看看