zoukankan      html  css  js  c++  java
  • Trident整合Kafka

    首先编写一个打印函数KafkaPrintFunction

    import org.apache.storm.trident.operation.BaseFunction;
    import org.apache.storm.trident.operation.TridentCollector;
    import org.apache.storm.trident.tuple.TridentTuple;
    import org.apache.storm.tuple.Values;
    
    public class KafkaPrintFunction extends BaseFunction {
    
        @Override
        public void execute(TridentTuple input, TridentCollector collector) {
            String msg = input.getStringByField("str");
            System.out.println(this.getClass().getSimpleName() + ": " + msg);
            collector.emit(new Values(msg));
        }
    
    }

    然后编写trident整合kafka的topology

    import net.baiqu.shop.report.utils.Constants;
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.kafka.BrokerHosts;
    import org.apache.storm.kafka.StringScheme;
    import org.apache.storm.kafka.ZkHosts;
    import org.apache.storm.kafka.trident.TransactionalTridentKafkaSpout;
    import org.apache.storm.kafka.trident.TridentKafkaConfig;
    import org.apache.storm.spout.SchemeAsMultiScheme;
    import org.apache.storm.trident.Stream;
    import org.apache.storm.trident.TridentTopology;
    import org.apache.storm.tuple.Fields;
    
    /**
     * kafka连接trident
     */
    public class KafkaTrident {
    
        public static void main(String[] args) {
            TridentTopology topology = new TridentTopology();
    
            BrokerHosts hosts = new ZkHosts(Constants.ZK_HOSTS);
            String topic = "tridentTestTopic";
            String id = "kafka.queue.tridentTestTopic";
            TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(hosts, topic, id);
            kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
            TransactionalTridentKafkaSpout kafkaSpout = new TransactionalTridentKafkaSpout(kafkaConfig);
    
            Stream stream = topology.newStream("kafkaSpout", kafkaSpout);
            stream.shuffle().each(new Fields("str"), new KafkaPrintFunction(), new Fields("result"));
    
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("kafkaTridentDemo", new Config(), topology.build());
        }
    
    }

    另一个Java项目发送kafka数据

        @Scheduled(fixedRate = 3000)
        public void shopDataTestJob9() {
            for (int i = 0; i < 1; i++) {
                kafkaTemplate.send("tridentTestTopic", "test kafka trident");
                System.out.println("test kafka trident");
            }
        }

    最后运行storm项目以及java项目(需要先运行java项目往kafka发数据,建立此topic,storm才能消费这个topic)

    观察结果,storm项目控制台输出

    KafkaPrintFunction: test kafka trident
    KafkaPrintFunction: test kafka trident
    KafkaPrintFunction: test kafka trident

    表示storm trident消费kafka数据成功

  • 相关阅读:
    suse12安装详解
    Centos7上部署openstack mitaka配置详解(将疑难点都进行划分)
    菜鸟帮你跳过openstack配置过程中的坑[文末新添加福利]
    openstack中dashboard页面RuntimeError: Unable to create a new session key. It is likely that the cache is unavailable.
    Multiple network matches found for name 'selfservice', use an ID to be more specific.报错
    查看 SELinux状态及关闭SELinux
    SELinux深入理解
    IP地址、子网掩码、网络号、主机号、网络地址、主机地址
    Oracle job procedure 存储过程定时任务
    POI文件导出至EXCEL,并弹出下载框
  • 原文地址:https://www.cnblogs.com/tangzhe/p/9618495.html
Copyright © 2011-2022 走看看