zoukankan      html  css  js  c++  java
  • Flink使用POJO实现分组和汇总

    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.common.typeinfo.TypeInformation;
    import org.apache.flink.api.java.functions.KeySelector;
    import org.apache.flink.streaming.api.datastream.DataStream;
    
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    public class FraudDetection {
    
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            DataStream<MyTransaction> myTransactions = env.socketTextStream("localhost", 9999).map(new MapFunction<String, MyTransaction>() {
    
                MyTransaction myTransaction = null;
    
                @Override
                public MyTransaction map(String value) throws Exception {
                    String[] split = value.split(",");
                    myTransaction = new MyTransaction(split[0].trim(), Long.valueOf(split[1].trim()));
                    return myTransaction;
                }
            });
    
            myTransactions
                    //使用新的API
                    .keyBy(new KeySelector<MyTransaction, String>() {
                @Override
                public String getKey(MyTransaction value) throws Exception {
                    return value.getAccounId();
                }
            },TypeInformation.of(String.class))
                    //或者也可以使用旧API
    //                .keyBy("accounId")
    
                    .sum("amount").print();
    
            env.execute();
    //        DataStream<MyTransaction> transactions = env.addSource(new TransactionSource()).name("transaction");
        }
    
        public static class MyTransaction {
    
            private String accounId;
            private long amount;
    
            public MyTransaction() {
            }
    
            public MyTransaction(String accounId, long amount) {
                this.accounId = accounId;
                this.amount = amount;
            }
    
            public String getAccounId() {
                return accounId;
            }
    
            public void setAccounId(String accounId) {
                this.accounId = accounId;
            }
    
            public long getAmount() {
                return amount;
            }
    
            public void setAmount(long amount) {
                this.amount = amount;
            }
    
            @Override
            public String toString() {
                return "{"accounId":"" + accounId + "","  +
                        ""amount":" + amount + "}";
            }
        }
    
    
    }

    官网介绍的使用可转化为流的类型

  • 相关阅读:
    JavaScript脚本语言特色时钟
    这个周末安排,
    市场营销书籍推荐
    比较好的管理类书籍推荐
    如何培养自己的领导力?或许你该看看这些书
    十本最畅销的市场营销书籍,你看过几本?
    如何提高情商?答案可能在《情商必读12篇》这本书里
    如何管理好员工?
    做销售要看哪些书?《销售管理必读12篇》了解下
    管理书籍推荐,你看过哪些呢?
  • 原文地址:https://www.cnblogs.com/yoyowin/p/14800178.html
Copyright © 2011-2022 走看看