zoukankan      html  css  js  c++  java
  • 基于docker构建flink大数据处理平台

    https://www.cnblogs.com/1ssqq1lxr/p/10417005.html

    由于公司业务需求,需要搭建一套实时处理数据平台,基于多方面调研选择了Flink.

    • 初始化Swarm环境(也可以选择k8s)

      部署zookeeper集群 基于docker-compose ,使用 docker stack 部署在容器中,由于zookeeper存在数据持久化存储,这块后面可以考虑共享存储方案.

    services:
      zoo1:
        image: zookeeper
        restart: always
        hostname: zoo1
        ports:
          - 2181:2181
        environment:
          ZOO_MY_ID: 1
          ZOO_SERVERS: server.1=0.0.0.0:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888
    
      zoo2:
        image: zookeeper
        restart: always
        hostname: zoo2
        ports:
          - 2182:2181
        environment:
          ZOO_MY_ID: 2
          ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=0.0.0.0:2888:3888 server.3=zoo3:2888:3888
    
      zoo3:
        image: zookeeper
        restart: always
        hostname: zoo3
        ports:
          - 2183:2181
        environment:
          ZOO_MY_ID: 3
          ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=0.0.0.0:2888:3888
    • 部署flink镜像
    version: "3"
    
    services:
      jobmanager:
        image: flink:1.7.2-scala_2.12-alpine
        ports:
          - "8081:8081"
        command: jobmanager
        environment:
          - JOB_MANAGER_RPC_ADDRESS=jobmanager
    
      taskmanager:
        image: flink:1.7.2-scala_2.12-alpine
        command: taskmanager
        environment:
          - JOB_MANAGER_RPC_ADDRESS=jobmanager

    此时只是一个jobmanager 存在单机问题,可以考虑将容器内部的 fluentd.conf 挂载出来,配置zookeeper HA。

    • 对于扩充 TaskManager直接 docker service scala  TaskManager-NAME=3即可

    Flink案例demo,采用读取kafka中数据实时处理,然后将结果存储到influxDb中展示

    // 实时流main
    public class SportRealTimeJob {
    
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
            KafkaConnector connector = new KafkaConnector("192.168.30.60:9092","big-data");
            env
                    .addSource(connector.getConsumerConnector(Lists.newArrayList("test0")))
                    .<MessageBody>flatMap((sentence,out)->{
                        MessageBody body=JSON.parseObject(sentence, MessageBody.class);
                        out.collect(body);
                    })
                    .shuffle()
                    .keyBy(messageBody -> messageBody.getPhone()+messageBody.getUserId())
                    .timeWindow(Time.seconds(10))
                    .reduce((t0, t1) -> new MessageBody(t0.getUserId(),t0.getPhone(),t0.getValue()+t1.getValue()))
                    .addSink(new InfluxWriter())
                    .setParallelism(1);
            env.execute("Window WordCount");
        }
    
    
    }
    // 数据处理实体类demo
    @Data
    @Measurement(name = "sport")
    public class MessageBody {
    
        @Column(name = "userId",tag = true)
        private String userId;
    
        @Column(name = "phone",tag = true)
        private String phone;
    
        @Column(name = "value")
        private int value;
    
    
        public MessageBody() {
        }
    
        public MessageBody(String userId, String phone, int value) {
            this.userId = userId;
            this.phone = phone;
            this.value = value;
        }
    }
    // 自定义数据输出源
    public class InfluxWriter extends RichSinkFunction<MessageBody> {
    
        private InfluxTemplate template;
    
        @Override
        public void open(Configuration parameters) throws Exception {
            InfluxBean bean= InfluxBean.builder().dbName("game")
                    .url("http://localhost:8086")
                    .username("admin")
                    .password("admin")
                    .build();
            template = new SimpleInfluxTemplate(bean);
        }
    
        @Override
        public void close() throws Exception {
            template.close();
        }
    
        @Override
        public void invoke(MessageBody value, Context context) throws Exception {
            template.write(Point.measurement("sport")
                    .addField("value",value.getValue())
                    .tag("userId",String.valueOf(value.getUserId()))
                    .tag("phone",value.getPhone())
                    .time(context.currentProcessingTime(), TimeUnit.MILLISECONDS).build());
        }
    }
    // influxDb操作类
    public class SimpleInfluxTemplate implements InfluxTemplate {
    
        private final InfluxDB db;
    
        public SimpleInfluxTemplate(InfluxBean bean){
            this.db= InfluxDBFactory.connect(bean.getUrl(), bean.getUsername(), bean.getPassword());
            db.setDatabase(bean.getDbName());
            db.enableBatch(BatchOptions.DEFAULTS.exceptionHandler(
                    (failedPoints, throwable) -> {
                        /* custom error handling here */ })
                    .consistency(InfluxDB.ConsistencyLevel.ALL)
                    .bufferLimit(100)
            );
        }
    
        @Override
        public void write(Point point) {
            db.write(point);
        }
    
        @Override
        public void bentchWrite(BatchPoints points) {
            db.write(points);
        }
    
        @Override
        public <T> List<T> query(Query query, Class<T> tClass) {
            QueryResult result=db.query(query);
            InfluxDBResultMapper resultMapper = new InfluxDBResultMapper(); // thread-safe - can be reused
            return resultMapper.toPOJO(result, tClass);
        }
    
        @Override
        public void close() {
            db.close();
        }
    
    
    
    public interface InfluxTemplate {
    
        void write(Point point);
    
        void bentchWrite(BatchPoints points);
    
        <T> List<T> query(Query query, Class<T> tClass);
    
        void close();
    }
    
    
    @ToString
    @Getter
    @Setter
    @Builder
    public class InfluxBean {
    
        private String url;
    
        private String username;
    
        private String password;
    
        private String dbName;
    
    
    
    }
  • 相关阅读:
    sqlserver 把两个sql查询语句查询出来的两张表合并成一张表
    highcharts series几种写法
    Collection、 List 、Set接口 LinkedList 、HashSet类, Collections 集合工具类
    java.io.File
    Object、Objects
    java.lang.StringBuilder
    String
    java学习日记(17-18)
    java学习日记(14-16)
    java学习日记(8-13)
  • 原文地址:https://www.cnblogs.com/lenmom/p/11032666.html
Copyright © 2011-2022 走看看