zoukankan      html  css  js  c++  java
  • Kafka数据源生成器

    一.准备工作

    • 数据源:来自阿里云天池公开数据集 或者在 Github 下载

    • 创建Topic:user_behavior

      $ bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic user_behavior
      WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
      
      $ bin/kafka-topics.sh --list --bootstrap-server localhost:9092
      test
      user_behavior
      

    二.生成器代码

    参考:SourceGenerator

    Java代码:MockSourceGenerator

    public class MockSourceGenerator {
        private static final long SPEED = 10; // 默认每秒10条 hecg
        public static void main(String[] args) {
            long speed = SPEED;
            if (args.length > 0) {
                speed = Long.valueOf(args[0]);
            }
            long delay = 1000_000 / speed; // 每条耗时多少毫秒
    
            // 读取上面的数据集,按行为单位
            try (InputStream inputStream = MockSourceGenerator.class.getClassLoader().getResourceAsStream("user_behavior.log")) {
                BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
                long start = System.nanoTime();
                while (reader.ready()) {
                    String line = reader.readLine();
                    System.out.println(line);
    
                    long end = System.nanoTime();
                    long diff = end - start;
                    while (diff < (delay*1000)) {
                        Thread.sleep(1);
                        end = System.nanoTime();
                        diff = end - start;
                    }
                    start = end;
                }
                reader.close();
            } catch (IOException e) {
                throw new RuntimeException(e);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
    }
    

    编译打包,命令行测试:后面参数表示每秒输出多少条数据

    $ java -cp target/java-flink-1.0-SNAPSHOT.jar cn.rumoss.study.flink.MockSourceGenerator 1
    {"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
    {"user_id": "662867", "item_id":"2244074", "category_id": "1575622", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
    {"user_id": "561558", "item_id":"3611281", "category_id": "965809", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
    ...
    

    三.使用管道,往Kafka中丢入数据

    把上面的Jar 包复制到Kafka根目录下:

    • 往Topic中生产数据:

      $ java -cp java-flink-1.0-SNAPSHOT.jar cn.rumoss.study.flink.MockSourceGenerator 1 | bin/kafka-console-producer.sh --broker-list localhost:9092 --topic user_behavior
      >>>...
      
    • 订阅Topic消费放入的数据,可以看到陆续有数据进来:

      $ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic user_behavior --from-beginning
      {"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
      {"user_id": "662867", "item_id":"2244074", "category_id": "1575622", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
      {"user_id": "561558", "item_id":"3611281", "category_id": "965809", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
      ...
      
  • 相关阅读:
    【洛谷P3992】开车
    Easyui datagrid+ashx 实现动态生成列
    SQL字符串处理函数大全
    在sqlserver中查找某个字段存在于那个表中的语句
    C# 合并图片
    JavaScript 监听屏幕滑动事件的JS
    JavaScript无提示关闭窗口(兼容IE/Firefox/Chrome)
    使用jquery插件实现图片延迟加载技术
    DataTable随机复制一行给新的DataTable
    MySQL恢复数据报错 #1289 The 'InnoDB' feature is disabled; you need MySQL built with 'InnoDB' to hav
  • 原文地址:https://www.cnblogs.com/HeCG95/p/12218650.html
Copyright © 2011-2022 走看看