zoukankan      html  css  js  c++  java
  • Flume、Kafka、Storm结合

    Todo:

    对Flume的sink进行重构,调用kafka的消费生产者(producer)发送消息;

    在Sotrm的spout中继承IRichSpout接口,调用kafka的消息消费者(Consumer)来接收消息,然后经过几个自定义的Bolt,将自定义的内容进行输出

    Flume -- Kafka 

    编写KafkaSink

    从$KAFKA_HOME/lib下复制

    kafka_2.10-0.8.2.1.jar

    kafka-clients-0.8.2.1.jar

    scala-library-2.10.4.jar

    到$FLUME_HOME/lib


    在Eclipse新建工程,从$FLUME_HOME/lib下导入

    commons-logging-1.1.1.jar

    flume-ng-configuration-1.6.0.jar

    flume-ng-core-1.6.0.jar

    flume-ng-sdk-1.6.0.jar

    zkclient-0.3.jar

    kafka_2.10-0.8.2.1.jar

    kafka-clients-0.8.2.1.jar

    scala-library-2.10.4.jar

    到工程。

    新建文件KafkaSink.java

    import java.util.Properties;
     
    import kafka.javaapi.producer.Producer;
    import kafka.producer.KeyedMessage;
    import kafka.producer.ProducerConfig;
     
    import org.apache.commons.logging.Log;
    import org.apache.commons.logging.LogFactory;
    import org.apache.flume.Channel;
    import org.apache.flume.Context;
    import org.apache.flume.Event;
    import org.apache.flume.EventDeliveryException;
    import org.apache.flume.Transaction;
    import org.apache.flume.conf.Configurable;
    import org.apache.flume.sink.AbstractSink;
     
     
    public class KafkaSink extends AbstractSink implements Configurable {
        private static final Log logger = LogFactory.getLog(KafkaSink.class);
     
        private String topic;
        private Producer<String, String> producer;
     
        public void configure(Context context) {
            topic = "flume_test";
            Properties props = new Properties();
            props.setProperty("metadata.broker.list", "localhost:9092");
            props.setProperty("serializer.class", "kafka.serializer.StringEncoder");
            props.put("zookeeper.connect", "localhost:2181");
            props.setProperty("num.partitions", "4"); // 
            props.put("request.required.acks", "1");
            ProducerConfig config = new ProducerConfig(props);
            producer = new Producer<String, String>(config);
            logger.info("KafkaSink初始化完成.");
     
        }
     
        public Status process() throws EventDeliveryException {
            Channel channel = getChannel();
            Transaction tx = channel.getTransaction();
            try {
                tx.begin();
                Event e = channel.take();
                if (e == null) {
                    tx.rollback();
                    return Status.BACKOFF;
                }
                KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, new String(e.getBody()));
                producer.send(data);
                logger.info("flume向kafka发送消息:" + new String(e.getBody()));
                tx.commit();
                return Status.READY;
            } catch (Exception e) {
                logger.error("Flume KafkaSinkException:", e);
                tx.rollback();
                return Status.BACKOFF;
            } finally {
                tx.close();
            }
        }
    }

    导出jar包,放到$FLUME_HOME/lib下

    (File->Export->Jar File 全部默认参数)

    创建kafka.conf

    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
     
    # Describe/configure the source
    a1.sources.r1.type = syslogtcp
    a1.sources.r1.port = 5140
    a1.sources.r1.host = localhost
    a1.sources.r1.channels = c1
     
    # Describe the sink
    a1.sinks.k1.type = KafkaSink
     
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
     
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1

    测试

    启动kafka

    cd ~/app/kafka
    ./bin/zookeeper-server-start.sh ./config/zookeeper.properties> /dev/null &
    ./bin/kafka-server-start.sh ./config/server.properties > /dev/null &

    创建topic

    ~/app/kafka_2.10-0.8.2.1/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 4  --topic flume_test

    启动控制台消费者

    ~/app/kafka_2.10-0.8.2.1/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic flume_test  --from-beginning

    启动flume agent

    flume-ng agent -c conf  -f ~/test/kafka.conf --name a1 -Dflume.root.logger=INFO,console

    发送消息

    echo "hey manhua" |nc localhost 5140
    echo "nice shot" |nc localhost 5140

    flume和kafka结合的一个工具

    https://github.com/kevinjmh/flumeng-kafka-plugin/tree/master/flumeng-kafka-plugin/src/main/java/org/apache/flume/plugins


    Kafka -- Storm

    http://storm.apache.org/index.html

    下载-解压-修改/etc/profile

    在Eclipse新建maven工程,其中pom.xml文件填入如下:

    <?xml version="1.0" encoding="utf-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
            xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
            <modelVersion>4.0.0</modelVersion>
            <groupId>manhua</groupId>
            <artifactId>kafka-storm-test</artifactId>
            <version>0.0.1-SNAPSHOT</version>
            <packaging>jar</packaging>
            <name>kafka-storm</name>
            <url>http://maven.apache.org</url>
            <properties>
                    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            </properties>
            <repositories>
                    <repository>
                            <id>github-releases</id>
                            <url>http://oss.sonatype.org/content/repositories/github-releases/</url>
                    </repository>
                    <repository>
                            <id>clojars.org</id>
                            <url>http://clojars.org/repo</url>
                    </repository>
            </repositories>
            <dependencies>
                    <dependency>
                            <groupId>junit</groupId>
                            <artifactId>junit</artifactId>
                            <version>4.11</version>
                            <scope>test</scope>
                    </dependency>
                    <dependency>
                            <groupId>org.apache.kafka</groupId>
                            <artifactId>kafka_2.10</artifactId>
                            <version>0.8.2.1</version>
                    </dependency>
                    <dependency>
                            <groupId>log4j</groupId>
                            <artifactId>log4j</artifactId>
                            <version>1.2.14</version>
                    </dependency>
                    <dependency>
                            <groupId>org.apache.storm</groupId>
                            <artifactId>storm-core</artifactId>
                            <version>0.10.0</version>
                            <!-- keep storm out of the jar-with-dependencies -->
                            <scope>provided</scope>
                    </dependency>
                    <dependency>
                            <groupId>commons-collections</groupId>
                            <artifactId>commons-collections</artifactId>
                            <version>3.2.1</version>
                    </dependency>
            </dependencies>
    </project>

    在src/main/java创建两个java文件 

     KafkaSpouttest.java

    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.Properties;
    import kafka.consumer.ConsumerConfig;
    import kafka.consumer.ConsumerIterator;
    import kafka.consumer.KafkaStream;
    import kafka.javaapi.consumer.ConsumerConnector;
    import backtype.storm.spout.SpoutOutputCollector;
    import backtype.storm.task.TopologyContext;
    import backtype.storm.topology.IRichSpout;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.tuple.Fields;
    import backtype.storm.tuple.Values;
    
    public class KafkaSpouttest implements IRichSpout {
    
            private SpoutOutputCollector collector;
            private ConsumerConnector consumer;
            private String topic;
    
            public KafkaSpouttest() {
            }
    
            public KafkaSpouttest(String topic) {
                    this.topic = topic;
            }
    
            public void nextTuple() {
            }
    
            public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
                    this.collector = collector;
            }
    
            public void ack(Object msgId) {
            }
    
            public void activate() {
    
                    consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());
    
                    Map<String, Integer> topickMap = new HashMap<String, Integer>();
                    topickMap.put(topic, 1);
    
                    System.out.println("*********Results********topic:" + topic);
    
                    Map<String, List<KafkaStream<byte[], byte[]>>> streamMap = consumer.createMessageStreams(topickMap);
                    KafkaStream<byte[], byte[]> stream = streamMap.get(topic).get(0);
                    ConsumerIterator<byte[], byte[]> it = stream.iterator();
                    while (it.hasNext()) {
                            String value = new String(it.next().message());
                            SimpleDateFormat formatter = new SimpleDateFormat("yyyy年MM月dd日 HH:mm:ss SSS");
                            Date curDate = new Date(System.currentTimeMillis());// 获取当前时间
                            String str = formatter.format(curDate);
    
                            System.out.println("storm接收到来自kafka的消息------->" + value);
    
                            collector.emit(new Values(value, 1, str), value);
                    }
            }
    
            private static ConsumerConfig createConsumerConfig() {
                    Properties props = new Properties();
                    // 设置zookeeper的链接地址
                    props.put("zookeeper.connect", "localhost:2181");
                    // 设置group id
                    props.put("group.id", "1");
                    // kafka的group 消费记录是保存在zookeeper上的, 但这个信息在zookeeper上不是实时更新的, 需要有个间隔时间更新
                    props.put("auto.commit.interval.ms", "1000");
                    props.put("zookeeper.session.timeout.ms", "10000");
                    return new ConsumerConfig(props);
            }
    
            public void close() {
            }
    
            public void deactivate() {
            }
    
            public void fail(Object msgId) {
            }
    
            public void declareOutputFields(OutputFieldsDeclarer declarer) {
                    declarer.declare(new Fields("word", "id", "time"));
            }
    
            public Map<String, Object> getComponentConfiguration() {
                    System.out.println("getComponentConfiguration被调用");
                    topic = "flume_test";
                    return null;
            }
    }

    KafkaTopologytest.java

    import java.util.HashMap;
    import java.util.Map;
    import backtype.storm.Config;
    import backtype.storm.LocalCluster;
    import backtype.storm.topology.BasicOutputCollector;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.topology.TopologyBuilder;
    import backtype.storm.topology.base.BaseBasicBolt;
    import backtype.storm.tuple.Fields;
    import backtype.storm.tuple.Tuple;
    import backtype.storm.tuple.Values;
    import backtype.storm.utils.Utils;
     
    public class KafkaTopologytest {
     
        public static void main(String[] args) {
            TopologyBuilder builder = new TopologyBuilder();
     
            builder.setSpout("spout", new KafkaSpouttest(""), 1);
            builder.setBolt("bolt1", new Bolt1(), 2).shuffleGrouping("spout");
            builder.setBolt("bolt2", new Bolt2(), 2).fieldsGrouping("bolt1",new Fields("word"));
     
            Map conf = new HashMap();
            conf.put(Config.TOPOLOGY_WORKERS, 1);
            conf.put(Config.TOPOLOGY_DEBUG, true);
     
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("my-flume-kafka-storm-topology-integration", conf, builder.createTopology());
             
            Utils.sleep(1000*60*5); // local cluster test ...
            cluster.shutdown();
        }
         
        public static class Bolt1 extends BaseBasicBolt {
             
            public void execute(Tuple input, BasicOutputCollector collector) {
                try {
                    String msg = input.getString(0);
                    int id = input.getInteger(1);
                    String time = input.getString(2);
                    msg = msg+"bolt1";
                    System.out.println("对消息加工第1次-------[arg0]:"+ msg +"---[arg1]:"+id+"---[arg2]:"+time+"------->"+msg);
                    if (msg != null) {
                        collector.emit(new Values(msg));
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
      
            
            public void declareOutputFields(OutputFieldsDeclarer declarer) {
                declarer.declare(new Fields("word"));
            }
        }
         
        public static class Bolt2 extends BaseBasicBolt {
            Map<String, Integer> counts = new HashMap<String, Integer>();
      
            
            public void execute(Tuple tuple, BasicOutputCollector collector) {
                String msg = tuple.getString(0);
                msg = msg + "bolt2";
                System.out.println("对消息加工第2次---------->"+msg);
                collector.emit(new Values(msg,1));
            }
      
           
            public void declareOutputFields(OutputFieldsDeclarer declarer) {
                declarer.declare(new Fields("word", "count"));
            }
        }
    }

    测试

    接着上面Flume-Kafka的测试,保证kafka已经启动,以及创建了对应的topic

    # 启动storm之前必须启动zookeeper
    
    # 启动storm
    storm nimbus &
    storm supervisor &
    storm ui &
    # 打开浏览器地址http://localhost:8080 看到界面表示启动成功

    测试1

    启动控制台的生产者和消费者

    ~/app/kafka_2.10-0.8.2.1/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic flume_test
    
    ~/app/kafka_2.10-0.8.2.1/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic flume_test  --from-beginning

    右键工程中KafkaTopologytest.java运行storm程序

    现在在运行生产者的控制台输入值,在消费者和Eclipse都会有显示

    测试2

    从$KAFKA_HOME/lib下复制

    kafka_2.10-0.8.2.1.jar

    kafka-clients-0.8.2.1.jar

    scala-library-2.10.4.jar

    metrics-core-2.2.0.jar

    zkclient-0.3.jar

    zookeeper-3.4.6.jar

    到$STORM_HOME/lib

    类似上面的方法导出jar包(File->Export->Jar File 全部默认参数),放到任意目录下

    使用storm执行jar包

    storm jar  kafkaSpout.jar KafkaTopologytest

    启动流程:zookeeper - kafka - storm - flume

    Ref:http://www.aboutyun.com/thread-8915-1-1.html

  • 相关阅读:
    Django测试开发-36- xadmin模板中缩略图django-stdimage
    Django测试开发-35- xadmin模板中添加上传文件和图片功能
    Django测试开发-34- xadmin模板中添加action插件
    Django测试开发-33- xadmin模板中添加小组件报错
    Django测试开发-32- xadmin模板使用自定义菜单项
    Django测试开发-31- xadmin模板中choices使用
    序列化模块
    一大群模块
    python基础之内置函数和匿名函数
    python基础之迭代器和生成器
  • 原文地址:https://www.cnblogs.com/manhua/p/5000939.html
Copyright © 2011-2022 走看看