zoukankan      html  css  js  c++  java
  • Trident继承kafka

    1.Kafka涉及的类

      上一个类是不透明事务

      后一个是完全事务

      

    2.启动服务

      

    3..驱动类

      重要的地方是修改了两个部分:

      1.数据的来源是kafka

      2.第二个是字段的Fields是str

     1 package com.jun.tridentWithKafka;
     2 
     3 import backtype.storm.Config;
     4 import backtype.storm.LocalCluster;
     5 import backtype.storm.StormSubmitter;
     6 import backtype.storm.generated.AlreadyAliveException;
     7 import backtype.storm.generated.InvalidTopologyException;
     8 import backtype.storm.spout.SchemeAsMultiScheme;
     9 import backtype.storm.tuple.Fields;
    10 import backtype.storm.tuple.Values;
    11 import storm.kafka.BrokerHosts;
    12 import storm.kafka.StringScheme;
    13 import storm.kafka.ZkHosts;
    14 import storm.kafka.trident.OpaqueTridentKafkaSpout;
    15 import storm.kafka.trident.TridentKafkaConfig;
    16 import storm.trident.Stream;
    17 import storm.trident.TridentState;
    18 import storm.trident.TridentTopology;
    19 import storm.trident.operation.builtin.Count;
    20 import storm.trident.operation.builtin.Sum;
    21 import storm.trident.testing.FixedBatchSpout;
    22 import storm.trident.testing.MemoryMapState;
    23 
    24 public class TridentWithKafka {
    25     public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
    26         TridentTopology tridentTopology=new TridentTopology();
    27         //使用Kafka中的数据
    28         BrokerHosts hosts = new ZkHosts("linux-hadoop01.ibeifeng.com:2181");
    29         String topic = "nginxlog";
    30         TridentKafkaConfig conf = new TridentKafkaConfig(hosts, topic);
    31 
    32         conf.scheme = new SchemeAsMultiScheme(new StringScheme());
    33         conf.forceFromStart = true;
    34 
    35         OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(conf);
    36 
    37         //流处理
    38         Stream stream=tridentTopology.newStream("orderAnalyse",spout)
    39                 //过滤
    40             .each(new Fields("str"),new ValidLogFilter())
    41                 //解析
    42             .each(new Fields("str"), new LogParserFunction(),new Fields("orderId","orderTime","orderAmtStr","memberId"))
    43                 //投影
    44             .project(new Fields("orderId","orderTime","orderAmtStr","memberId"))
    45                 //时间解析
    46             .each(new Fields("orderTime"),new DateTransFormerFunction(),new Fields("day","hour","minter"))
    47          ;
    48         //分流
    49         //1.基于minter统计订单数量,分组统计
    50         TridentState state=stream.groupBy(new Fields("minter"))
    51                 //全局聚合,使用内存存储状态信息
    52                 .persistentAggregate(new MemoryMapState.Factory(),new Count(),new Fields("orderNumByMinter"));
    53 //        state.newValuesStream().each(new Fields("minter","orderNumByMinter"),new PrintFilter());
    54 
    55         //2.另一个流,基于分钟的订单金额,局部聚合
    56         Stream partitionStream=stream.each(new Fields("orderAmtStr"),new TransforAmtToDoubleFunction(),new Fields("orderAmt"))
    57             .groupBy(new Fields("minter"))
    58                     //局部聚合
    59                 .chainedAgg()    //聚合链
    60             .partitionAggregate(new Fields("orderAmt"),new LocalSum(),new Fields("orderAmtSumOfLocal"))
    61                 .chainEnd();      //聚合链
    62 //        partitionStream.each(new Fields("minter","orderAmtSumOfLocal"),new PrintFilter());
    63         //做一次全局聚合
    64         TridentState partitionState=partitionStream.groupBy(new Fields("minter"))
    65                 //全局聚合
    66                 .persistentAggregate(new MemoryMapState.Factory(),new Fields("orderAmtSumOfLocal"),new Sum(),new Fields("totalOrderAmt"));
    67         partitionState.newValuesStream().each(new Fields("minter","totalOrderAmt"),new PrintFilter());
    68 
    69         //提交
    70         Config config=new Config();
    71         if(args==null || args.length<=0){
    72             LocalCluster localCluster=new LocalCluster();
    73             localCluster.submitTopology("tridentDemo",config,tridentTopology.build());
    74         }else {
    75             config.setNumWorkers(2);
    76             StormSubmitter.submitTopology(args[0],config,tridentTopology.build());
    77         }
    78     }
    79 }

    4.输入数据

      

    5.控制台

      

      

  • 相关阅读:
    SpringCloud分布式配置中心
    SpringCloud服务降级案列
    SpringCloud断路器(Hystrix)
    SpringCloud服务过滤filter
    SpringCloud路由网关Zuul
    SpringCloud微服务实现生产者消费者+ribbon负载均衡
    SpringCloud微服务的Eureka
    忘记MySQL密码以及无法登陆等解决办法
    MySQL备份
    实现两个MySQL数据库之间的主从同步
  • 原文地址:https://www.cnblogs.com/juncaoit/p/9169420.html
Copyright © 2011-2022 走看看