zoukankan      html  css  js  c++  java
  • kafka ksql && docker 安装试用

    备注: 使用docker 模式进行安装
     
    1. 准备docker 环境(需要docker-compose)
        
    docker 的安装不需要进行多描述了,直接yum 或者源码编译也可以
     
     
    2. 安装 kafa 以及ksql
     
    git clone git@github.com:confluentinc/ksql.git
    
    cd ksql/docs/quickstart/
        
    docker-compose up -d
     
    3. 启动使用
     
    docker-compose exec ksql-cli ksql-cli local --bootstrap-server kafka:29092
     
      参考界面:
      
     
    4. 使用
      创建示例
     
      a. 非持久化数据查询
    创建测试使用的表
    
    CREATE STREAM pageviews_original (viewtime bigint, userid varchar, pageid varchar) WITH (kafka_topic='pageviews', value_format='DELIMITED');
    DESCRIBE pageviews_original;
    
    CREATE TABLE users_original (registertime bigint, gender varchar, regionid varchar, userid varchar) WITH (kafka_topic='users', value_format='JSON');
    DESCRIBE users_original;
    
    显示系统的stream
    SHOW STREAMS;
    
     Stream Name        | Kafka Topic | Format    
    ----------------------------------------------
     PAGEVIEWS_ORIGINAL | pageviews   | DELIMITED 
    
    显示系统的表
    SHOW TABLES;
    
     Table Name     | Kafka Topic | Format | Windowed 
    --------------------------------------------------
     USERS_ORIGINAL | users       | JSON   | false    
    
    查询数据
    SELECT pageid FROM pageviews_original LIMIT 3;
    Page_66
    Page_17
    Page_67
    LIMIT reached for the partition.
    Query terminated
    
     
     b. 持久化数据查询
     
    创建数据
    CREATE STREAM pageviews_female AS SELECT users_original.userid AS userid, pageid, regionid, gender FROM pageviews_original LEFT JOIN users_original ON pageviews_original.userid = users_original.userid WHERE gender = 'FEMALE';
    
    DESCRIBE pageviews_female;
    
     Field    | Type            
    ----------------------------
     ROWTIME  | BIGINT          
     ROWKEY   | VARCHAR(STRING) 
     USERID   | VARCHAR(STRING) 
     PAGEID   | VARCHAR(STRING) 
     REGIONID | VARCHAR(STRING) 
     GENDER   | VARCHAR(STRING) 
    
    查询数据
    SELECT * FROM pageviews_female;
    1504252783201 | User_5 | User_5 | Page_49 | Region_8 | FEMALE
    1504252783525 | User_6 | User_6 | Page_39 | Region_6 | FEMALE
    1504252783813 | User_5 | User_5 | Page_15 | Region_8 | FEMALE
    1504252789309 | User_6 | User_6 | Page_90 | Region_5 | FEMALE
    1504252792424 | User_8 | User_8 | Page_40 | Region_1 | FEMALE
    1504252796605 | User_4 | User_4 | Page_12 | Region_8 | FEMALE
    1504252797405 | User_3 | User_3 | Page_22 | Region_3 | FEMALE
    1504252802099 | User_6 | User_6 | Page_43 | Region_7 | FEMALE
     
    5. 支持的模式
      
    stream、table、like 、 join、limit、tumbling window、简单聚合函数,目前来说还是比较强大的
     
     
    6. 参考资料
    https://github.com/confluentinc/ksql/tree/0.1.x/docs/quickstart
    https://github.com/confluentinc/ksql/blob/0.1.x/docs/quickstart/quickstart-docker.md#docker-setup-for-ksql
  • 相关阅读:
    python csv例子
    【LR11】Error -27796: Failed to connect to server"server:port": [10060] Connection timed out错误解决办法
    LR11 scan correlation 卡死解决方案
    EC笔记:第三部分:13、以对象管理资源
    EC笔记:第二部分:12、复制对象时勿忘其每一个成分
    EC笔记:第二部分:11:在operator=中处理“自我赋值”
    EC笔记,第二部分:10.让=返回指向*this的引用
    EC笔记,第二部分:9.不在构造、析构函数中调用虚函数
    EC笔记,第二部分:8.别让异常逃离析构函数
    EC笔记,第二部分:7.为多态基类声明虚析构函数
  • 原文地址:https://www.cnblogs.com/rongfengliang/p/7463695.html
Copyright © 2011-2022 走看看