zoukankan      html  css  js  c++  java
  • 每日一题 为了工作 2020 0425 第五十四题

    // SparkStreaming+Kafka 数据传导

    package com.swust.streaming;
    
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.Properties;
    import java.util.Random;
    
    /**
     * kafka作为主流的消息队列 与spark整合是实际操作中较为常用的一种数据传导模式
     * @author 雪瞳
     * @Slogan 时钟尚且前行,人怎能再此止步!
     * @Function 向kafka中生产数据
     *
     */
    public class ProduceToKafka {
    
        private final static Random random = new Random();
        private final static String[] ArrayChannel = new String[]{"Spark","Scala","Kafka","Flink","Hadoop","Storm","Hive","Impala","HBase","ML"};
        private final static String[] ArrayActionName = new String[]{"View", "Register"};
        private final static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
    
    
        public static void main(String[] args) {
            Properties properties = new Properties();
            properties.setProperty("bootstrap.servers","data001:9092,data003:9092,data004:9092");
            properties.setProperty("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
            properties.setProperty("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
            KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
            int  count = 0;
            int  keyFlag = 0;
    
            while (true){
                count++;
                keyFlag++;
                String userLogs = getUserLogs();
                producer.send(new ProducerRecord<String, String>("wdxll","key:"+keyFlag,userLogs));
                System.out.println(userLogs);
                if (count % 200 ==0){
                    count = 0;
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
        public static String getUserLogs(){
            StringBuffer logs = new StringBuffer();
            long time = System.currentTimeMillis();
            long userID = 0L;
            long pageID = 0L;
    
            userID = random.nextInt(2000);
            pageID = random.nextInt(2000);
    
            String chanel = ArrayChannel[random.nextInt(ArrayChannel.length)];
            String action = ArrayActionName[random.nextInt(ArrayActionName.length)];
            String format = sdf.format(new Date());
    
            logs.append(format)
                    .append("	")
                    .append(time)
                    .append("	")
                    .append(userID)
                    .append("	")
                    .append(pageID)
                    .append("	")
                    .append(chanel)
                    .append("	")
                    .append(action);
    
            return logs.toString();
        }
    }
    

      

    // 执行结果

    ./kafka-consloe-consumer.sh --zookeeper data003:2181,data004:2181,data005:2181 --topic wdxll

     

     

  • 相关阅读:
    总结7.13 tp5模板布局
    总结7.13 tp5图像处理
    Flask数据库
    java学习day72-JT项目10(Nginx服务器/tomcat部署/数据库高可用)
    java学习day71-Linux学习(基本指令)
    java学习day71-JT项目09(Linux/JDK/Mariadb/tomcat部署)
    java学习day70-JT项目08(图片回显/Nginx)
    java学习day69-JT项目07-(商品/详情一对一操作//文件上传)
    java学习day68-JT项目06(商品curd)
    java学习day67-JT项目05(商品分类树结构显示)
  • 原文地址:https://www.cnblogs.com/walxt/p/12772591.html
Copyright © 2011-2022 走看看