zoukankan      html  css  js  c++  java
  • GoldenGate实时投递数据到大数据平台(2)- Cassandra

    简介

    GoldenGate是一款可以实时投递数据到大数据平台的软件,针对apache cassandra,经过简单配置,即可实现从关系型数据将增量数据实时投递到Cassandra,以下介绍配置过程。

    Cassandra安装

    解压apache-cassandra-3.11.1-bin.tar.gz到 /opt/cassandra

    sudo mkdir /var/lib/cassandra

    sudo mkdir /var/log/cassandra

    sudo chown hadoop /var/log/cassandra

    sudo chown hadoop /var/lib/cassandra

    启动 cassandra

    /opt/cassandara/bin/Cassandra

    查看状态

    $ ./nodetool status

    Datacenter: datacenter1

    =======================

    Status=Up/Down

    |/ State=Normal/Leaving/Joining/Moving

    -- Address Load Tokens Owns (effective) Host ID Rack

    UN 127.0.0.1 92.1 KiB 256 100.0% e4f1431e-85ec-483a-a1b4-fe7bc8f7c9d2 rack1

    进入Cassandra的shell

    /opt/cassandara/bin/cqlsh

    Connected to Test Cluster at 127.0.0.1:9042.

    [cqlsh 5.0.1 | Cassandra 3.11.1 | CQL spec 3.4.4 | Native protocol v4]

    Use HELP for help.

    cqlsh>  SELECT cluster_name, listen_address FROM system.local;
     cluster_name | listen_address
    --------------+----------------
     Test Cluster |      127.0.0.1
     
    (1 rows)

    查看系统空间

    cqlsh> desc system.local;
     
    CREATE TABLE system.local (
        key text PRIMARY KEY,
        bootstrapped text,
        broadcast_address inet,
        cluster_name text,
        cql_version text,
        data_center text,
        gossip_generation int,
        host_id uuid,
        listen_address inet,
        native_protocol_version text,
        partitioner text,
        rack text,
        release_version text,
        rpc_address inet,
        schema_version uuid,
        thrift_version text,
        tokens set<text>,
        truncated_at map<uuid, blob>
    ) WITH bloom_filter_fp_chance = 0.01
        AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
        AND comment = 'information about the local node'
        AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
        AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
        AND crc_check_chance = 1.0
        AND dclocal_read_repair_chance = 0.0
        AND default_time_to_live = 0
        AND gc_grace_seconds = 0
        AND max_index_interval = 2048
        AND memtable_flush_period_in_ms = 3600000
        AND min_index_interval = 128
        AND read_repair_chance = 0.0
    AND speculative_retry = '99PERCENTILE';

    查看有哪些keyspaces

    cqlsh>describe keyspaces;

    system_schema system_auth system system_distributed system_traces

    创建后面OGG投递时schema对应的keyspaces

    cqlsh>CREATE KEYSPACE QASOURCE WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 1};

    创建测试表,验证一下cassandra的功能

    Cqlsh>use qasource;

    Cqlsh>create table test

    (

    id int ,

    name varchar,

    primary key(id)

    );

    插入几条记录

    cqlsh:qasource> insert into test(id,name) values(1,'bck');

    cqlsh:qasource> insert into test(id,name) values(2,'test');

    cqlsh:qasource> select * from test;

    id | name

    ----+------

    1 | bck

    2 | test

    根据名称查询,由于无索引,会报错

    cqlsh:qasource> select * from test where name='test';

    InvalidRequest: Error from server: code=2200 [Invalid query] message="Cannot execute this query as it might involve data filtering and thus may have unpredictable performance. If you want to execute this query despite the performance unpredictability, use ALLOW FILTERING"

    基于主键可正常查询

    cqlsh:qasource> select * from test where id=2;

    id | name

    ----+------

    2 | test

    (1 rows)

    创建索引后可正常查询

    cqlsh:qasource> create index idx_test on test(name);

    cqlsh:qasource> select * from test where name='test';

    id | name

    ----+------

    2 | test

    (1 rows)

    如果更新,则需要指定key对应的WHERE条件

    cqlsh:qasource> select * from test where id=2;

    id | name

    ----+-------

    2 | newly

    (1 rows)

    删除记录

    cqlsh:qasource> delete from test where id=2;

    cqlsh:qasource> select * from test ;

    id | name

    ----+------

    1 | bck

    (1 rows)


    OGG配置

    解压cassandra3.1.2 java driver到/u01/drivers/cassandra-java-driver-3.1.2

    设置OGG for bigdata需要的环境变量

    export LD_LIBRARY_PATH=$JAVA_HOME/jre/lib/amd64/server

    配置replicat进程

    REPLICAT rcass

    -- Trail file for this example is located in "AdapterExamples/trail" directory

    -- Command to add REPLICAT

    -- add replicat rcass, exttrail AdapterExamples/trail/tr

    TARGETDB LIBFILE libggjava.so SET property=dirprm/cass.props

    REPORTCOUNT EVERY 1 MINUTES, RATE

    GROUPTRANSOPS 1000

    MAP QASOURCE.*, TARGET QASOURCE.*; 


    添加投递进程,使用OGG软件中自带的队列文件

    GGSCI>add replicat rcass, exttrail AdapterExamples/trail/tr

    投递到cassandra需要的属性文件 cass.props

    gg.handlerlist=cassandra

    #The handler properties

    gg.handler.cassandra.type=cassandra

    gg.handler.cassandra.mode=op

    gg.handler.cassandra.contactPoints=localhost

    gg.handler.cassandra.ddlHandling=CREATE,ADD,DROP

    gg.handler.cassandra.compressedUpdates=true

    gg.handler.cassandra.cassandraMode=async

    gg.handler.cassandra.consistencyLevel=ONE

    goldengate.userexit.timestamp=utc

    goldengate.userexit.writers=javawriter

    javawriter.stats.display=TRUE

    javawriter.stats.full=TRUE

    gg.log=log4j

    gg.log.level=INFO

    gg.report.time=30sec

    #Set the classpath here to the Datastax Cassandra Java Driver (3.1 latest)

    #Link to the Cassandra drivers website

    #http://cassandra.apache.org/doc/latest/getting_started/drivers.html#java

    #Link to the Datastax Cassandra Java Driver

    #https://github.com/datastax/java-driver

    gg.classpath=/opt/cassandra-java-driver-3.1.2/*:/opt/cassandra-java-driver-3.1.2/lib/*

    javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=.:ggjava/ggjava.jar:./dirprm


    测试

    启动ogg replicat 进程

    GGSCI (ol73) 12> start rcass

    Sending START request to MANAGER ...

    REPLICAT RCASS starting

    查看状态

    GGSCI (ol73) 13> info rcass

    REPLICAT RCASS Last Started 2017-12-25 11:29 Status STARTING

    Checkpoint Lag 00:00:00 (updated 02:56:41 ago)

    Process ID 100794

    Log Read Checkpoint File /u01/ogg4bd_12.3/AdapterExamples/trail/tr000000000

    First Record RBA 0

    GGSCI (ol73) 14> info rcass

    REPLICAT RCASS Last Started 2017-12-25 11:30 Status RUNNING

    Checkpoint Lag 00:00:00 (updated 00:00:00 ago)

    Process ID 100794

    Log Read Checkpoint File /u01/ogg4bd_12.3/AdapterExamples/trail/tr000000000

    2015-11-06 02:45:39.000000 RBA 5660

    已经正常启动,并写入数据。

    统计写入的数据

    GGSCI (ol73) 15> stats rcass, total

    Sending STATS request to REPLICAT RCASS ...

    Start of Statistics at 2017-12-25 11:31:12.

    Replicating from QASOURCE.TCUSTMER to QASOURCE.TCUSTMER:

    *** Total statistics since 2017-12-25 11:30:43 ***

    Total inserts 5.00

    Total updates 1.00

    Total deletes 0.00

    Total discards 0.00

    Total operations 6.00

    Replicating from QASOURCE.TCUSTORD to QASOURCE.TCUSTORD:

    *** Total statistics since 2017-12-25 11:30:43 ***

    Total inserts 5.00

    Total updates 3.00

    Total deletes 2.00

    Total discards 0.00

    Total operations 10.00

    End of Statistics. 


    同步成功。

    进入cassandra shell进行验证

    ./cqlsh

    cqlsh> desc qasource;

    CREATE KEYSPACE qasource WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'} AND durable_writes = true;

    CREATE TABLE qasource.tcustord (

    cust_code text,

    order_date text,

    product_code text,

    order_id double,

    product_amount bigint,

    product_price double,

    transaction_id double,

    PRIMARY KEY (cust_code, order_date, product_code, order_id)

    ) WITH CLUSTERING ORDER BY (order_date ASC, product_code ASC, order_id ASC)

    AND bloom_filter_fp_chance = 0.01

    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}

    AND comment = ''

    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}

    AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}

    AND crc_check_chance = 1.0

    AND dclocal_read_repair_chance = 0.1

    AND default_time_to_live = 0

    AND gc_grace_seconds = 864000

    AND max_index_interval = 2048

    AND memtable_flush_period_in_ms = 0

    AND min_index_interval = 128

    AND read_repair_chance = 0.0

    AND speculative_retry = '99PERCENTILE';

    CREATE TABLE qasource.tcustmer (

    cust_code text PRIMARY KEY,

    city text,

    name text,

    state text

    ) WITH bloom_filter_fp_chance = 0.01

    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}

    AND comment = ''

    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}

    AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}

    AND crc_check_chance = 1.0

    AND dclocal_read_repair_chance = 0.1

    AND default_time_to_live = 0

    AND gc_grace_seconds = 864000

    AND max_index_interval = 2048

    AND memtable_flush_period_in_ms = 0

    AND min_index_interval = 128

    AND read_repair_chance = 0.0

    AND speculative_retry = '99PERCENTILE';


    可以看到,已经增加了2张表到qasource keyspace中。

    在cassandra中查询OGG写入的数据

    cqlsh>use QASOURCE

    cqlsh>desc keyspace QASOURCE;

    查看表定义

    cqlsh>desc table qasource.tcustmer;

    cqlsh:qasource> desc table qasource.tcustmer;

    CREATE TABLE qasource.tcustmer (

    cust_code text PRIMARY KEY,

    city text,

    name text,

    state text

    ) WITH bloom_filter_fp_chance = 0.01

    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}

    AND comment = ''

    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}

    AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}

    AND crc_check_chance = 1.0

    AND dclocal_read_repair_chance = 0.1

    AND default_time_to_live = 0

    AND gc_grace_seconds = 864000

    AND max_index_interval = 2048

    AND memtable_flush_period_in_ms = 0

    AND min_index_interval = 128

    AND read_repair_chance = 0.0

    AND speculative_retry = '99PERCENTILE'; 

    查询数据

    cqlsh>select * from qasource.tcustmer ;

    cust_code | city | name | state

    -----------+-------------+--------------------+-------

    WILL | SEATTLE | BG SOFTWARE CO. | WA

    JANE | DENVER | ROCKY FLYER INC. | CO

    ANN | NEW YORK | ANN'S BOATS | NY

    DAVE | TALLAHASSEE | DAVE'S PLANES INC. | FL

    BILL | DENVER | BILL'S USED CARS | CO

    (5 rows)


    cqlsh:qasource> select * from qasource.tcustmer where cust_code='WILL';

    cust_code | city | name | state

    -----------+---------+-----------------+-------

    WILL | SEATTLE | BG SOFTWARE CO. | WA

    (1 rows)

    cqlsh:qasource> select * from qasource.tcustord;

    cust_code | order_date | product_code | order_id | product_amount | product_price | transaction_id

    -----------+---------------------+--------------+----------+----------------+---------------+----------------

    WILL | 1994-09-30 15:33:00 | CAR | 144 | 3 | 16520 | 100

    BILL | 1995-12-31 15:00:00 | CAR | 765 | 3 | 14000 | 100

    BILL | 1996-01-01 00:00:00 | TRUCK | 333 | 15 | 25000 | 100

    (3 rows)

    可以正常访问数据,测试完成。

  • 相关阅读:
    浅谈Java 8的新特性和使用场景
    Oracle下通过EXPDP导出某用户下的所有表,实例
    官网下载I.MX6参考手册
    TFTP服务开启
    二、 U-Boot 命令使用
    CP15协处理器
    Linux内核配置Kconfig语法
    M.2接口是如何辨别插入的SSD是SATA协议还是NVME协议?
    SSH 服务开启
    一 、 UBOOT简介
  • 原文地址:https://www.cnblogs.com/margiex/p/8124862.html
Copyright © 2011-2022 走看看