zoukankan      html  css  js  c++  java
  • 如何简单愉快的上手PipelineDB

    pipelineDB source:https://github.com/pipelinedb/pipelinedb

    安装PipelineDB

     ./configure CFLAGS="-g -O0" --enable-cassert --prefix=/usr/local/pipelinedb_0.9.7

    除了原本postgres需要安装的几个依赖包外,还需要安装ZeroMQ

    make

    make install 

    套路跟Postgres一样一样的,安装完成后初始化DB,启动,登陆。PipelineDB默认带的不是postgres用户而是pipeline

    [pipeline@bogon ~]$ /usr/local/pipelinedb_0.9.7/bin/psql

    psql (9.5.3)

    Type "help" for help.

     

    pipeline=# c

    You are now connected to database "pipeline" as user "pipeline".

    pipeline=#

    创建Stream

    在PipelineDB中,一个Stream就是一个FDW,其实不存储任何数据。

     

    pipeline=# create stream my_stream(name text,age int,birth timestamp);

     

    如果试图对stream进行查询是不被允许的:

    pipeline=# select * from my_stream;

    ERROR:  "my_stream" is a stream

    HINT:  Streams can only be read by a continuous view's FROM clause.

    被告知,只允许被continuous view 读取。

     

    创建完成后,可以看见多出了一个字段"arrival_timestamp",这个就是流数据的到达时间,在sliding windows中需要用到这个时间。

    pipeline=# d my_stream

                Stream "public.my_stream"

          Column       |            Type

    -------------------+-----------------------------

     name              | text

     age               | integer

     birth             | timestamp without time zone

     arrival_timestamp | timestamp with time zone

     

    创建Continuous Views

    pipeline=# create continuous view cv as select name,age,birth from my_stream;

    CREATE CONTINUOUS VIEW

    pipeline=# d cv

               Continuous view "public.cv"

     Column |            Type             | Modifiers

    --------+-----------------------------+-----------

     name   | text                        |

     age    | integer                     |

     birth  | timestamp without time zone |

     

    pipeline=#

    创建cv后,会附带创建一些别的东西

    pipeline=# d

                    List of relations

     Schema |   Name    |      Type       |  Owner

    --------+-----------+-----------------+----------

     public | cv        | continuous view | pipeline

     public | cv_mrel   | table           | pipeline

     public | cv_osrel  | stream          | pipeline

     public | cv_seq    | sequence        | pipeline

     public | my_stream | stream          | pipeline

    (5 rows)

    1. cv  这个跟数据库中普通的View很类似,不存储任何东西,可以把他理解成一个materialized view,并且是非常高吞吐量,realtime的物化视图。
    2. cv_mrel,这个就是存储具体数据的,跟pg中的物理表是一样一样的。上面的cv就是这个物理表的一个壳子,不过这个物理表存储的内容可能是HLL格式。
    3. cv_seq,这个是给物理表创建的一个PK,看看cv_mrel发现默认会有个$pk字段。
    1. cv_osrel  这个是internal relation representing an output stream  后面会讲到。

    插入数据到流

    pipeline=# insert into my_stream(name,age,birth) values('Li.Sang',28,'1989-03-01'::timestamp);

    INSERT 0 1

    pipeline=# select * from cv;

      name   | age |        birth

    ---------+-----+---------------------

     Li.Sang |  28 | 1989-03-01 00:00:00

    (1 row)

    我们看看表中的数据:

    pipeline=# select * from cv_mrel;

      name   | age |        birth        | $pk

    ---------+-----+---------------------+-----

     Li.Sang |  28 | 1989-03-01 00:00:00 |   1

    (1 row)

     

    pipeline=# insert into my_stream(name,age,birth) values('Zhang.San',30,now());

    INSERT 0 1

    pipeline=# select * from cv;

       name    | age |           birth

    -----------+-----+----------------------------

     Li.Sang   |  28 | 1989-03-01 00:00:00

     Zhang.San |  30 | 2017-05-15 11:20:37.614901

    (2 rows)

     

    pipeline=# select * from cv_mrel;

       name    | age |           birth            | $pk

    -----------+-----+----------------------------+-----

     Li.Sang   |  28 | 1989-03-01 00:00:00        |   1

     Zhang.San |  30 | 2017-05-15 11:20:37.614901 |   2

    (2 rows)

     

    cvcv_mrel只是多了个$pk,这是在普通情况下,数据是这样的,如果做agg可能数据存储为HLL格式.

    如果对HLL感兴趣可以看看https://stefanheule.com/papers/edbt13-hyperloglog.pdf

    滑动窗口

    我们来看看滑动窗口,在流计算中,窗口是个很重要的东西,例如最近5分钟,最近1小时,最近1天的汇总。

    pipeline=# create continuous view cv_sliding_1_minute with(sw = '1 minute')  as select time  from my_sliding_stream ;

    CREATE CONTINUOUS VIEW

    pipeline=# d cv_sliding_1_minute

       Continuous view "public.cv_sliding_1_minute"

     Column |            Type             | Modifiers

    --------+-----------------------------+-----------

     time   | timestamp without time zone |

     

     

    上面的SQL等价于:

    create continuous view cv_sliding_1_minute  as select time from my_sliding_stream where (arrival_timestamp > clock_timestamp() - interval '1 minute');

    根据stream中的arrival_timestamp来判断数据的到达时间。

     

    这个CV是获取最近一分钟的数据。

    我们来测试一下:

    pipeline=# insert into my_sliding_stream(time) values(now());

    INSERT 0 1

    pipeline=# select * from cv_sliding_1_minute;

                time

    ----------------------------

     2017-05-15 11:42:33.141251

    (1 row)

     

    pipeline=# insert into my_sliding_stream(time) values(now());

    INSERT 0 1

    pipeline=# select * from cv_sliding_1_minute;

                time

    ----------------------------

     2017-05-15 11:42:33.141251

     2017-05-15 11:43:21.256779

    (2 rows)

     

    pipeline=# insert into my_sliding_stream(time) values(now());

    INSERT 0 1

    pipeline=# select * from cv_sliding_1_minute;

                time

    ----------------------------

     2017-05-15 11:43:21.256779

     2017-05-15 11:43:59.362918

    (2 rows)

     

    pipeline=# select now();

                  now

    -------------------------------

     2017-05-15 11:44:04.015165+08

    (1 row)

     

    发现第一条数据 2017-05-15 11:42:33.141251已经没了。再过一会查询一下:

    pipeline=# select * from cv_sliding_1_minute;

     time

    ------

    (0 rows)

     

    pipeline=# select now();

                 now

    ------------------------------

     2017-05-15 11:46:39.50591+08

    (1 row)

     

    这时cv已经什么都没了。

    很好用的TTL功能(per-row time-to-live )

    pipeline=# CREATE CONTINUOUS VIEW v_ttl WITH (ttl = '10 minute', ttl_column = 'minute') AS

    pipeline-#   SELECT minute(arrival_timestamp), COUNT(*) FROM my_sliding_stream GROUP BY minute;

    CREATE CONTINUOUS VIEW

    pipeline=# insert into my_sliding_stream values(now());

    INSERT 0 1

    pipeline=# insert into my_sliding_stream values(now());

    INSERT 0 1

    pipeline=# insert into my_sliding_stream values(now());

    INSERT 0 1

    pipeline=# select * from v_ttl;

             minute         | count

    ------------------------+-------

     2017-05-15 13:48:00+08 |     3

    (1 row)

     

    pipeline=# select now();

                 now

    ------------------------------

     2017-05-15 13:49:07.11884+08

    (1 row)

     

    pipeline=# insert into my_sliding_stream values(now());

    INSERT 0 1

    pipeline=# select * from v_ttl;

             minute         | count

    ------------------------+-------

     2017-05-15 13:48:00+08 |     3

     2017-05-15 13:49:00+08 |     1

    (2 rows)

     

    pipeline=# select now();

                  now

    -------------------------------

     2017-05-15 13:50:05.236968+08

    (1 row)

     

    pipeline=# insert into my_sliding_stream values(now());

    INSERT 0 1

    pipeline=# select * from v_ttl;

             minute         | count

    ------------------------+-------

     2017-05-15 13:48:00+08 |     3

     2017-05-15 13:49:00+08 |     1

     2017-05-15 13:50:00+08 |     1

    (3 rows)

     

    pipeline=# insert into my_sliding_stream values(now());

    INSERT 0 1

    pipeline=# select * from v_ttl;

             minute         | count

    ------------------------+-------

     2017-05-15 13:48:00+08 |     3

     2017-05-15 13:49:00+08 |     1

     2017-05-15 13:50:00+08 |     2

    (3 rows)

     

    pipeline=#

    讲讲TRANSFORM

    pipeline=# create stream str1(x bigint,y text,z timestamp);

    CREATE STREAM

    pipeline=# create stream str2(x bigint,y text,z timestamp);

    CREATE STREAM

    创建对应的CV

    pipeline=# create continuous view cv_1 as select x,y,z from str1;

    CREATE CONTINUOUS VIEW

    pipeline=# create continuous view cv_2 as select x,y,z from str2;

    CREATE CONTINUOUS VIEW

    创建TRANSFORM

    pipeline=# create continuous transform tran_1 as select x,y,z from str1 then  execute procedure pipeline_stream_insert('str2');

    CREATE CONTINUOUS TRANSFORM

     

    pipeline=# insert into str1(x,y,z) values(1,'Hi,I from str1 msg',now());

    INSERT 0 1

    pipeline=# select * from cv_1;

     x |         y          |             z

    ---+--------------------+----------------------------

     1 | Hi,I from str1 msg | 2017-05-15 13:56:22.760362

    (1 row)

     

    pipeline=# select * from cv_2;

     x |         y          |             z

    ---+--------------------+----------------------------

     1 | Hi,I from str1 msg | 2017-05-15 13:56:22.760362

    (1 row)

     

    pipeline=#

     

    在创建Transform用到的pipeline_stream_insertPipelineDB自己提供的一个函数,这个我们可以自己定义一个函数。

    pipeline=# create table t(x bigint,y text,z timestamp);

    CREATE TABLE

     

    pipeline=# CREATE OR REPLACE FUNCTION insert_into_t()

    pipeline-#   RETURNS trigger AS

    pipeline-#   $$

    pipeline$#   BEGIN

    pipeline$#     INSERT INTO t (x, y,z) VALUES (NEW.x, NEW.y,NEW.z);

    pipeline$#     RETURN NEW;

    pipeline$#   END;

    pipeline$#   $$

    pipeline-#   LANGUAGE plpgsql;

    CREATE FUNCTION

     

    pipeline=# CREATE CONTINUOUS TRANSFORM tran_t AS

    pipeline-#   SELECT x,y,z FROM str1

    pipeline-#   THEN EXECUTE PROCEDURE insert_into_t();

    CREATE CONTINUOUS TRANSFORM

    pipeline=# insert into str1(x,y,z) values(10,'I want insert table t',now());

    INSERT 0 1

    pipeline=# select * from t;

     x  |           y           |             z

    ----+-----------------------+---------------------------

     10 | I want insert table t | 2017-05-15 14:01:48.17516

    (1 row)

    自己写了一个trigger,然后把数据插入到表T中。

  • 相关阅读:
    零零碎碎
    MFC入门--显示静态图片及调用本地软件
    Python版本OpenCV安装配置及简单实例
    用星星画菱形--Java
    pycharm IDE在导入自定义模块时提示有错,但实际没错
    Cmd使用方式--命令行运行程序
    cv2 & PIL(pillow)显示图像
    C++命令行多文件编译(g++)
    MNIST多图显示--Python练习
    visual studio 2017--括号自动补全
  • 原文地址:https://www.cnblogs.com/sangli/p/6856175.html
Copyright © 2011-2022 走看看