zoukankan      html  css  js  c++  java
  • Trident整合Kafka

    首先编写一个打印函数KafkaPrintFunction

    import org.apache.storm.trident.operation.BaseFunction;
    import org.apache.storm.trident.operation.TridentCollector;
    import org.apache.storm.trident.tuple.TridentTuple;
    import org.apache.storm.tuple.Values;
    
    public class KafkaPrintFunction extends BaseFunction {
    
        @Override
        public void execute(TridentTuple input, TridentCollector collector) {
            String msg = input.getStringByField("str");
            System.out.println(this.getClass().getSimpleName() + ": " + msg);
            collector.emit(new Values(msg));
        }
    
    }

    然后编写trident整合kafka的topology

    import net.baiqu.shop.report.utils.Constants;
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.kafka.BrokerHosts;
    import org.apache.storm.kafka.StringScheme;
    import org.apache.storm.kafka.ZkHosts;
    import org.apache.storm.kafka.trident.TransactionalTridentKafkaSpout;
    import org.apache.storm.kafka.trident.TridentKafkaConfig;
    import org.apache.storm.spout.SchemeAsMultiScheme;
    import org.apache.storm.trident.Stream;
    import org.apache.storm.trident.TridentTopology;
    import org.apache.storm.tuple.Fields;
    
    /**
     * kafka连接trident
     */
    public class KafkaTrident {
    
        public static void main(String[] args) {
            TridentTopology topology = new TridentTopology();
    
            BrokerHosts hosts = new ZkHosts(Constants.ZK_HOSTS);
            String topic = "tridentTestTopic";
            String id = "kafka.queue.tridentTestTopic";
            TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(hosts, topic, id);
            kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
            TransactionalTridentKafkaSpout kafkaSpout = new TransactionalTridentKafkaSpout(kafkaConfig);
    
            Stream stream = topology.newStream("kafkaSpout", kafkaSpout);
            stream.shuffle().each(new Fields("str"), new KafkaPrintFunction(), new Fields("result"));
    
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("kafkaTridentDemo", new Config(), topology.build());
        }
    
    }

    另一个Java项目发送kafka数据

        @Scheduled(fixedRate = 3000)
        public void shopDataTestJob9() {
            for (int i = 0; i < 1; i++) {
                kafkaTemplate.send("tridentTestTopic", "test kafka trident");
                System.out.println("test kafka trident");
            }
        }

    最后运行storm项目以及java项目(需要先运行java项目往kafka发数据,建立此topic,storm才能消费这个topic)

    观察结果,storm项目控制台输出

    KafkaPrintFunction: test kafka trident
    KafkaPrintFunction: test kafka trident
    KafkaPrintFunction: test kafka trident

    表示storm trident消费kafka数据成功

  • 相关阅读:
    序列、元组、列表(基本的增、删、改、查)
    Python基础运算符(算数、比较、赋值、逻辑、成员)
    2015年9月14日记事
    2014年3月31日梦
    华为S5700系列交换机配置文件导出、导入
    C语言单链表简单实现(简单程序复杂化)
    北邮《大学英语2》第三次阶段作业带答案
    C++走向远洋——30(六周,项目一1.0)
    C++走向远洋——29(长方柱类)
    C++走向远洋——28(项目三,时间类,2)
  • 原文地址:https://www.cnblogs.com/tangzhe/p/9618495.html
Copyright © 2011-2022 走看看