zoukankan      html  css  js  c++  java
  • strom-kafka简单整合

    官方文档:http://storm.apache.org/releases/1.2.2/storm-kafka-client.html


    原理:

    KafkaSpout封装了一个kafka的消费者,通过简单配置servers以及topic后即可自动从kafka消费数据,并通过自定义的输出字段发送数据到bolt进行处理

    pom.xml

    kafka和storm都使用了较新的版本

    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <version>1.1.3</version>
    		<!--提交到集群运行时,需要取消注释,否则jar包冲突-->
            <!--<scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-kafka-client</artifactId>
            <version>1.2.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.12</artifactId>
            <version>2.1.0</version>
            <exclusions>
               <!-- <exclusion>
                    <groupId>org.apache.zookeeper</groupId>
                    <artifactId>zookeeper</artifactId>
                </exclusion>-->
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.1.0</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.14</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <appendAssemblyId>false</appendAssemblyId>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <!-- 此处指定main方法入口的class -->
                            <mainClass>com.kafkatest.TopologyTest</mainClass>
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>assembly</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
    
        </plugins>
    </build>
    

    Toplogy:

    /**
     * Created by wxg on 2018/11/14 14:37
     */
    public class TopologyTest {
        public static void main(String[] args) {
            TopologyTest topologyTest = new TopologyTest();
            if (!ArrayUtils.isEmpty(args)) {
                try {
                    StormSubmitter.submitTopology("kafka-test", topologyTest.stormConfig(), topologyTest.builder().createTopology());
                } catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {
                    e.printStackTrace();
                }
            } else {
                LocalCluster localCluster = new LocalCluster();
                localCluster.submitTopology("kafka-test", topologyTest.stormConfig(), topologyTest.builder().createTopology());
            }
        }
    
        private Config stormConfig() {
            Config config = new Config();
            config.setDebug(true);
            config.setNumAckers(1);
            return config;
        }
    
        private TopologyBuilder builder() {
            TopologyBuilder builder = new TopologyBuilder();
            builder.setSpout("spout", kafkaSpout(), 1);
            builder.setBolt("bolt1", new Bolt1Test(), 1).shuffleGrouping("spout");
            return builder;
        }
    
        private KafkaSpout<String, String> kafkaSpout() {
            final Fields outputFields = new Fields("topic", "partition", "offset", "timestamp", "key", "msg_from_kafka");
            final String host = "ip_address:9092";
            final String topic = "topic_test";
            KafkaSpoutConfig<String, String> kafkaSpoutConfig;
            kafkaSpoutConfig = KafkaSpoutConfig
                    .builder(host, topic)
                    .setRecordTranslator((r) -> new Values(r.topic(), r.partition(), r.offset(), r.timestamp(), r.key(), r.value()), outputFields)
                    .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST)
                    .build();
            return new KafkaSpout<>(kafkaSpoutConfig);
        }
    }
    

    Bolt:

    /**
     * Created by wxg on 2018/11/14 14:46
     */
    public class Bolt1Test extends BaseBasicBolt {
        private Logger logger = LoggerFactory.getLogger(Bolt1Test.class);
    
        public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
            String msg = tuple.getStringByField("msg_from_kafka");
            logger.info("bolt1处理数据: {}", msg);
            basicOutputCollector.emit(new Values(msg));
        }
    
        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
            outputFieldsDeclarer.declare(new Fields("msg"));
        }
    }
    

    输出

    可以看到每发送一个tuple,就会跟一个ack,用来保证数据的可靠性,防止丢失。

  • 相关阅读:
    Markdown入门
    JavaScript之bind,call,apply
    CentOS7中禁用IPV6
    How to install Shadow•socks in CentOS7
    How to install OpenBazaar Server in CentOS7
    array_map,array_walk的使用以及区别
    phpstudy 升级mysql 及MySQL服务等问题
    YII2 架构文章链接
    nginx 配置详解(新手必看)
    YII2常用笔记
  • 原文地址:https://www.cnblogs.com/cnsec/p/13286641.html
Copyright © 2011-2022 走看看