zoukankan      html  css  js  c++  java
  • Trident整合Kafka

    首先编写一个打印函数KafkaPrintFunction

    import org.apache.storm.trident.operation.BaseFunction;
    import org.apache.storm.trident.operation.TridentCollector;
    import org.apache.storm.trident.tuple.TridentTuple;
    import org.apache.storm.tuple.Values;
    
    public class KafkaPrintFunction extends BaseFunction {
    
        @Override
        public void execute(TridentTuple input, TridentCollector collector) {
            String msg = input.getStringByField("str");
            System.out.println(this.getClass().getSimpleName() + ": " + msg);
            collector.emit(new Values(msg));
        }
    
    }

    然后编写trident整合kafka的topology

    import net.baiqu.shop.report.utils.Constants;
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.kafka.BrokerHosts;
    import org.apache.storm.kafka.StringScheme;
    import org.apache.storm.kafka.ZkHosts;
    import org.apache.storm.kafka.trident.TransactionalTridentKafkaSpout;
    import org.apache.storm.kafka.trident.TridentKafkaConfig;
    import org.apache.storm.spout.SchemeAsMultiScheme;
    import org.apache.storm.trident.Stream;
    import org.apache.storm.trident.TridentTopology;
    import org.apache.storm.tuple.Fields;
    
    /**
     * kafka连接trident
     */
    public class KafkaTrident {
    
        public static void main(String[] args) {
            TridentTopology topology = new TridentTopology();
    
            BrokerHosts hosts = new ZkHosts(Constants.ZK_HOSTS);
            String topic = "tridentTestTopic";
            String id = "kafka.queue.tridentTestTopic";
            TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(hosts, topic, id);
            kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
            TransactionalTridentKafkaSpout kafkaSpout = new TransactionalTridentKafkaSpout(kafkaConfig);
    
            Stream stream = topology.newStream("kafkaSpout", kafkaSpout);
            stream.shuffle().each(new Fields("str"), new KafkaPrintFunction(), new Fields("result"));
    
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("kafkaTridentDemo", new Config(), topology.build());
        }
    
    }

    另一个Java项目发送kafka数据

        @Scheduled(fixedRate = 3000)
        public void shopDataTestJob9() {
            for (int i = 0; i < 1; i++) {
                kafkaTemplate.send("tridentTestTopic", "test kafka trident");
                System.out.println("test kafka trident");
            }
        }

    最后运行storm项目以及java项目(需要先运行java项目往kafka发数据,建立此topic,storm才能消费这个topic)

    观察结果,storm项目控制台输出

    KafkaPrintFunction: test kafka trident
    KafkaPrintFunction: test kafka trident
    KafkaPrintFunction: test kafka trident

    表示storm trident消费kafka数据成功

  • 相关阅读:
    Access Token 机制详解
    Authorization Code 授权原理和实现方法
    Access Token 与 Refresh Token
    简单介绍 Oauth2.0 原理
    进行web开发时应该考虑的架构性因素
    查看linux服务器CPU数量
    Centos7 修改主机名
    Centos6 编译安装Python3.6
    Linux编译安装软件常见问题及排查
    问题列表
  • 原文地址:https://www.cnblogs.com/tangzhe/p/9618495.html
Copyright © 2011-2022 走看看