zoukankan      html  css  js  c++  java
  • Storm整合Kafka Java API源码

    1.Maven项目的pom.xml源码如下:

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>com.yg</groupId>
        <artifactId>storm</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <packaging>jar</packaging>
    
        <name>storm</name>
        <url>http://maven.apache.org</url>
    
        <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        </properties>
    
        <dependencies>
    
            <dependency>
                <groupId>org.apache.storm</groupId>
                <artifactId>storm-core</artifactId>
                <version>1.1.3</version>
                <scope>provided</scope>
            </dependency>
    
            
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka_2.12</artifactId>
                <version>0.10.2.1</version>
            </dependency>
            
            <dependency>
                <groupId>org.apache.storm</groupId>
                <artifactId>storm-kafka</artifactId>
                <version>1.1.3</version>
            </dependency>
            
    
        </dependencies>
    
    
        <build>
            <plugins>
                <plugin>
                    <artifactId>maven-assembly-plugin</artifactId>
                    <configuration>
                        <descriptorRefs>
                            <descriptorRef>jar-with-dependencies</descriptorRef>
                        </descriptorRefs>
                        <archive>
                            <manifest>
                                <mainClass>com.path.to.main.Class</mainClass>
                            </manifest>
                        </archive>
                    </configuration>
                </plugin>
    
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.5</version>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                    </configuration>
                </plugin>
    
            </plugins>
        </build>
    
    </project>
    View Code

    2.KafkaSpout.java源码如下:

    package com.yg.storm.kafka.spouts;
    
    import java.util.Arrays;
    import java.util.Map;
    import java.util.Properties;
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.storm.spout.SpoutOutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichSpout;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Values;
    
    public class KafkaSpout extends BaseRichSpout {
    
    
        private static final long serialVersionUID = 7582771881226024741L;
        private KafkaConsumer<String, String> consumer;
        final private String TOPIC = "myTopic";
        SpoutOutputCollector collector;
    
        @Override
        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
            this.collector = collector;
    
            Properties props = new Properties();
            props.put("bootstrap.servers", "hadoop211:9092,hadoop212:9092,hadoop213:9092");
            props.put("group.id", "test");
            props.put("enable.auto.commit", "true");
            props.put("auto.commit.interval.ms", "1000");
            props.put("session.timeout.ms", "30000");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    
            consumer = new KafkaConsumer<String, String>(props);
            consumer.subscribe(Arrays.asList(TOPIC));
    
        }
    
        @Override
        public void nextTuple() {
            try {
                consumer.subscribe(Arrays.asList(TOPIC));//向topic订阅数据
                
                //无限循环拉取
                while (true) {
                    ConsumerRecords<String, String> records = consumer.poll(10);//一次拉取10条数据
                    for (ConsumerRecord<String, String> record : records) {
                        String key = record.key();
                        String value = record.value();
                        System.out.printf("offset = %d, key = %s, value = %s", record.offset(), key ,value);
                        collector.emit(new Values(value));//发射数据
                    }
                    Thread.sleep(1000*1);//每隔一秒拉取一次
                } 
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                consumer.close();
            }
    
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            // TODO Auto-generated method stub
            declarer.declare(new Fields("sentence"));
    
        }
    
    }
    View Code

    3.HelloWorldBolt.java源码如下:

    package com.yg.storm.bolts;
    
    import java.util.Map;
    
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichBolt;
    import org.apache.storm.tuple.Tuple;
    
    public class HelloWorldBolt extends BaseRichBolt{
    
        /**
         * 功能:就收到spout发送的数据,打印并统计hello world的数量
         * 实现:打印,创建计数变量用于统计hello world
         */
        private static final long serialVersionUID = -5061906223048521415L;
        private int myCount = 0;//计数变量,不能在execute函数中初始化
        private TopologyContext context;//上下文变量
        private OutputCollector collector;
    
        //相当于spout中的open
        @Override
        public void prepare(Map stormConf, 
                TopologyContext context, 
                OutputCollector collector) {
            this.context = context;
            this.collector = collector;
        }
    
        //相当于spout中的nextTuple
        @Override
        public void execute(Tuple input) {        
            //拿到数据,用字段名取出
            String text = input.getStringByField("sentence");
            System.out.println("One tuple gets in: " + context.getThisTaskId() + text);
            if ("Hello World".equals(text)){
                myCount++;
                System.out.println("Found a Hello World! My count is now:" + myCount);            
            }
            collector.ack(input);//处理完成要通知Storm
    //        collector.fail(input);//处理失败要通知Storm    
            
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            
        }
    }
    View Code

    4.KafkaHelloWorldTopology.java源码如下:

    package com.yg.storm.kafka.topologies;
    
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.StormSubmitter;
    import org.apache.storm.kafka.BrokerHosts;
    import org.apache.storm.kafka.KafkaSpout;
    import org.apache.storm.kafka.SpoutConfig;
    import org.apache.storm.kafka.ZkHosts;
    import org.apache.storm.spout.SchemeAsMultiScheme;
    import org.apache.storm.topology.TopologyBuilder;
    import org.apache.storm.utils.Utils;
    
    import com.yg.storm.bolts.HelloWorldBolt;
    
    
    public class KafkaHelloWorldTopology {
        
        //可以向main函数传入一个参数作为集群模式下Topology的名字,如果不传入任何参数则使用本地模式
        public static void main(String[] args) {
                
                final String brokerZkStr = "hadoop211:2181,hadoop212:2181,hadoop213:2181";
                final String topic  = "myTopic";
                
                BrokerHosts brokerHosts = new ZkHosts(brokerZkStr);
                
    //          //使用KafkaConfig
    //          KafkaConfig kafkaConfig = new KafkaConfig(
    //                  brokerHosts, 
    //                  topic
    //                  );
                
                //使用SpoutConfig,它继承自KafkaConfig,延伸了几个功能配置
                SpoutConfig spoutConfig = new SpoutConfig(
                        brokerHosts, //storm要连接的kafka的对应的zk列表
                        topic, //storm要消费的kafka的topic
                        "/HWTopo", //storm在kafka消费的过程中需要在zk中设置一个工作目录
                        "kafkaspout");  //storm在kafka中消费过程中生产一个标识ID
                
                //指定自定义的scheme
                spoutConfig.scheme = new SchemeAsMultiScheme(new MessageScheme());
                  
                TopologyBuilder builder = new TopologyBuilder();
                builder.setSpout("spout", new KafkaSpout(spoutConfig));
                builder.setBolt("bolt1", new HelloWorldBolt()).shuffleGrouping("spout");
                
                Config conf = new Config();
    //          Map<String, String> map = new HashMap<String, String>();
    //          
    //          map.put("metadata.broker.list", "hadoop211:9092,hadoop212:9092,hadoop213:9092");
    //          map.put("serializer.class", "kafka.serializer.StringEncoder");
    //          conf.put("kafka.broker.properties", map);
    //          conf.put("topic", topic);
                  
                if(args != null && args.length > 0) {
                    //提交到集群运行
                    try {
                        StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                } else {
                    //本地模式运行
                    LocalCluster cluster = new LocalCluster();
                    cluster.submitTopology("SchemeTopo", conf, builder.createTopology());
                    Utils.sleep(1000000);
                    cluster.killTopology("SchemeTopo");
                    cluster.shutdown();
                }
    
            }
    
    }
    View Code

    5.自定义schema类MessageScheme.java源码如下:

    package com.yg.storm.kafka.topologies;
    
    import java.nio.ByteBuffer;
    import java.nio.CharBuffer;
    import java.nio.charset.Charset;
    import java.nio.charset.CharsetDecoder;
    import java.util.List;
    
    import org.apache.storm.spout.Scheme;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Values;
    
    //编写storm消费kafka的逻辑
    /*
     * MultiScheme is an interface that dictates how the ByteBuffer consumed 
     * from Kafka gets transformed into a storm tuple. 
     * It also controls the naming of your output field.
     * 
     * The default RawMultiScheme just takes the ByteBuffer and returns a tuple 
     * with the ByteBuffer converted to a byte[]. 
     * The name of the outputField is "bytes". 
     * There are alternative implementations like SchemeAsMultiScheme and 
     * KeyValueSchemeAsMultiScheme which can convert the ByteBuffer to String.
     * 
     */
    
    public class MessageScheme implements Scheme {
    
    
        private static final long serialVersionUID = 1033379821285531859L;
    
        @Override
        public List<Object> deserialize(ByteBuffer buffer) {
    
            try {
                Charset charset = Charset.forName("UTF-8");
                CharsetDecoder decoder = charset.newDecoder();
                CharBuffer charBuffer = decoder.decode(buffer.asReadOnlyBuffer());
                
                String sentence = charBuffer.toString();
                return new Values(sentence);
                
            } catch (Exception e) {
                System.out.println(e);
                return null;
            }
        }
    
        @Override
        public Fields getOutputFields() {
            return new Fields("sentence");
        }
    
    }
    View Code

    直接本地运行KafkaHelloWorldTopology类即可.

  • 相关阅读:
    Get code into Bitbucket fast using Atlassian's SourceTree or the command line
    Django+angularJs
    修改默认python版本
    重拾python mac1.9.2
    REST
    Parameters.Add Parameters.Addrange
    sql建表前删除存在的同名表
    C#1.0
    [转]C#究竟能给开发者带来什么
    Laravel中上传图片至七牛云
  • 原文地址:https://www.cnblogs.com/dreamboy/p/11397874.html
Copyright © 2011-2022 走看看