zoukankan      html  css  js  c++  java
  • Trident整合Kafka

    首先编写一个打印函数KafkaPrintFunction

    import org.apache.storm.trident.operation.BaseFunction;
    import org.apache.storm.trident.operation.TridentCollector;
    import org.apache.storm.trident.tuple.TridentTuple;
    import org.apache.storm.tuple.Values;
    
    public class KafkaPrintFunction extends BaseFunction {
    
        @Override
        public void execute(TridentTuple input, TridentCollector collector) {
            String msg = input.getStringByField("str");
            System.out.println(this.getClass().getSimpleName() + ": " + msg);
            collector.emit(new Values(msg));
        }
    
    }

    然后编写trident整合kafka的topology

    import net.baiqu.shop.report.utils.Constants;
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.kafka.BrokerHosts;
    import org.apache.storm.kafka.StringScheme;
    import org.apache.storm.kafka.ZkHosts;
    import org.apache.storm.kafka.trident.TransactionalTridentKafkaSpout;
    import org.apache.storm.kafka.trident.TridentKafkaConfig;
    import org.apache.storm.spout.SchemeAsMultiScheme;
    import org.apache.storm.trident.Stream;
    import org.apache.storm.trident.TridentTopology;
    import org.apache.storm.tuple.Fields;
    
    /**
     * kafka连接trident
     */
    public class KafkaTrident {
    
        public static void main(String[] args) {
            TridentTopology topology = new TridentTopology();
    
            BrokerHosts hosts = new ZkHosts(Constants.ZK_HOSTS);
            String topic = "tridentTestTopic";
            String id = "kafka.queue.tridentTestTopic";
            TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(hosts, topic, id);
            kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
            TransactionalTridentKafkaSpout kafkaSpout = new TransactionalTridentKafkaSpout(kafkaConfig);
    
            Stream stream = topology.newStream("kafkaSpout", kafkaSpout);
            stream.shuffle().each(new Fields("str"), new KafkaPrintFunction(), new Fields("result"));
    
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("kafkaTridentDemo", new Config(), topology.build());
        }
    
    }

    另一个Java项目发送kafka数据

        @Scheduled(fixedRate = 3000)
        public void shopDataTestJob9() {
            for (int i = 0; i < 1; i++) {
                kafkaTemplate.send("tridentTestTopic", "test kafka trident");
                System.out.println("test kafka trident");
            }
        }

    最后运行storm项目以及java项目(需要先运行java项目往kafka发数据,建立此topic,storm才能消费这个topic)

    观察结果,storm项目控制台输出

    KafkaPrintFunction: test kafka trident
    KafkaPrintFunction: test kafka trident
    KafkaPrintFunction: test kafka trident

    表示storm trident消费kafka数据成功

  • 相关阅读:
    mysql增加索引、删除索引、查看索引
    微信小程序跳转页面时参数过长导致参数丢失
    微信小程序:使用wx.request()请求后台接收不到参数
    微信小程序跳转web-vie时提示appId无法读取:Cannot read property 'appId' of undefined
    tomcat正常运行一段时间后,突然访问不了项目了
    注解@Async解决异步调用问题
    Linux之acl库的安装与使用(限制Linux某用户的访问权限)
    HashMap和Hashtable的详细区别
    如何处理MySQL经常出现CPU占用率达到99%
    IntelliJ IDEA 提交代码时出现:Code analysis failed with exception: com.intellij.psi......
  • 原文地址:https://www.cnblogs.com/tangzhe/p/9618495.html
Copyright © 2011-2022 走看看