zoukankan      html  css  js  c++  java
  • Trident整合Kafka

    首先编写一个打印函数KafkaPrintFunction

    import org.apache.storm.trident.operation.BaseFunction;
    import org.apache.storm.trident.operation.TridentCollector;
    import org.apache.storm.trident.tuple.TridentTuple;
    import org.apache.storm.tuple.Values;
    
    public class KafkaPrintFunction extends BaseFunction {
    
        @Override
        public void execute(TridentTuple input, TridentCollector collector) {
            String msg = input.getStringByField("str");
            System.out.println(this.getClass().getSimpleName() + ": " + msg);
            collector.emit(new Values(msg));
        }
    
    }

    然后编写trident整合kafka的topology

    import net.baiqu.shop.report.utils.Constants;
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.kafka.BrokerHosts;
    import org.apache.storm.kafka.StringScheme;
    import org.apache.storm.kafka.ZkHosts;
    import org.apache.storm.kafka.trident.TransactionalTridentKafkaSpout;
    import org.apache.storm.kafka.trident.TridentKafkaConfig;
    import org.apache.storm.spout.SchemeAsMultiScheme;
    import org.apache.storm.trident.Stream;
    import org.apache.storm.trident.TridentTopology;
    import org.apache.storm.tuple.Fields;
    
    /**
     * kafka连接trident
     */
    public class KafkaTrident {
    
        public static void main(String[] args) {
            TridentTopology topology = new TridentTopology();
    
            BrokerHosts hosts = new ZkHosts(Constants.ZK_HOSTS);
            String topic = "tridentTestTopic";
            String id = "kafka.queue.tridentTestTopic";
            TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(hosts, topic, id);
            kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
            TransactionalTridentKafkaSpout kafkaSpout = new TransactionalTridentKafkaSpout(kafkaConfig);
    
            Stream stream = topology.newStream("kafkaSpout", kafkaSpout);
            stream.shuffle().each(new Fields("str"), new KafkaPrintFunction(), new Fields("result"));
    
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("kafkaTridentDemo", new Config(), topology.build());
        }
    
    }

    另一个Java项目发送kafka数据

        @Scheduled(fixedRate = 3000)
        public void shopDataTestJob9() {
            for (int i = 0; i < 1; i++) {
                kafkaTemplate.send("tridentTestTopic", "test kafka trident");
                System.out.println("test kafka trident");
            }
        }

    最后运行storm项目以及java项目(需要先运行java项目往kafka发数据,建立此topic,storm才能消费这个topic)

    观察结果,storm项目控制台输出

    KafkaPrintFunction: test kafka trident
    KafkaPrintFunction: test kafka trident
    KafkaPrintFunction: test kafka trident

    表示storm trident消费kafka数据成功

  • 相关阅读:
    Matlab高级教程_第一篇:Matlab基础知识提炼_01
    量化投资_量化投资系统框架的DIY_02_01
    计量经济与时间序列_时间序列之物理含义
    计量经济与时间序列_协整和误差修正模型
    数学之美_正态分布(Python代码)
    [转载] ./configure,make,make install的作用
    [转载]Deep Learning(深度学习)学习笔记整理
    AJAX XML 实例
    百度搜索插件源码
    apache 服务器在ubuntu上图片无法显示解决
  • 原文地址:https://www.cnblogs.com/tangzhe/p/9618495.html
Copyright © 2011-2022 走看看