3.6. Routers
眼下的实现中提供的Route实现包括:
1. Default Router:这个Router发送全部的数据到Router中定义的目标节点所属的组中的全部的节点。
2. Column Match Router:这个Router能够将一个列的旧值(数据源表中此列的值)或者新值(将要在目标节点设置的值)与一个常量值或者节点的external_id和node_id的值比較。
3. Lookup Router:这个Router能够被配置。在路由数据时,基于已经存在的表或者从属表。确定是否运行路由操作。
4. Subselect Router:这个Router对数据库运行一个SQL表达式来选择要路由到的节点。这个SQL表达式能够被传入某个列的旧值(数据变化之前的值)或者新值(数据变化之后的值)。
5. Scripted Router:这个Router运行一个Bean脚本表达式以选出要路由到的节点。这个脚本能够使用某个列的旧值或者新值。
6. XML Publishing Router:这个Router直接公布数据的变化到一个消息解决方式。而不是传送这些变化给一个已经注冊的节点。
这个Router必须被手工配置为以XML格式作为扩展点。(能够跟kafka结合,然后数据就能够进入hadoop生态系统了)
7. Audit Table Router:这个Router将数据插入到一个自己主动创建的审计表中。
这个Router记录捕获到的数据变化到与其关联的表中。
Trigger和Router是多对多的关系。
这意味着,一个触发器能够捕获数据变化然后路由这些变化到多个位置。也意味着一个Router能够与过个不同的触发器相联系。
3.6.1. Default Router
最简单的Router。这是一个发送全部相关的触发器捕获到的数据到全部router中定义的目标节点所属的组中全部节点的Router。一个Router以ROUTER表中的一行数据代表。然后连接到TRIGGER_ROUTER表。
以下这个SQL语句定义了一个Router,这个Router将从“corp”组发送数据到“store“组。
insert into SYM_ROUTER (router_id, source_node_group_id, target_node_group_id, create_time, last_update_time) values ('corp-2-store','corp', 'store', current_timestamp, current_timestamp);
以下的SQL语句建立上边的Router与item触发器的映射。
insert into SYM_TRIGGER_ROUTER (trigger_id, router_id, initial_load_order, create_time, last_update_time) values ('item', 'corp-2-store', 1, current_timestamp, current_timestamp);
3.6.2. Column Match Router
有时可能会有这种需求。数据须要依据数据的当前值(数据变化发生之后)或者某个列的先前的值(数据变化发生之前)来决定是否须要被路由。
在插入ROUTER系统表时设置router_type为column能够配置一个Column Router。然后在设置router_expression列为一个等式。这个等式代表了某个列的期望值。
表达式的第一部分总是一个列名。
这个列名应该总是用大写字母定义。大写的列名以OLD_作为前缀能够用来比較这个列的旧值。
表达式的第二个部分能够使一个常量,一个能够代表还有一个列的符号或者能够代表其它的SymmetricDS概念的符号。
符号的值总是以冒号(:)開始。
当一个表的状态列被设置为“READY TO SEND”的时候,数据变化才被路由到目标节点组的全部节点。以下的SQL语句将在ROUTER表中插入一个column router来完毕这个需求。
insert into SYM_ROUTER (router_id, source_node_group_id, target_node_group_id,router_type, router_expression,create_time, last_update_time) values ('corp-2-store-ok','corp','store', 'column', 'STATUS=READY TO SEND', current_timestamp,current_timestamp);
当一个表的状态列的值被改动时。变化的数据才被路由到目标组的全部节点。以下的SQL语句将在ROUTER表中插入一个column router来完毕这个需求。注意OLD_STATUS的使用,OLD_前缀能够訪问这个列的旧值。
insert into SYM_ROUTER (router_id, source_node_group_id,target_node_group_id, router_type, router_expression,create_time, last_update_time) values ('corp-2-store-status','corp','store', 'column', 'STATUS!=:OLD_STATUS', current_timestamp,current_timestamp);
一个表的数据变化发送到目标节点组中一个节点的external id与此表的STORE_ID列的值匹配的节点。
以下的SQL语句将插入ROUTER表中一个column router来实现这个需求。
insert into SYM_ROUTER (router_id, source_node_group_id,target_node_group_id, router_type, router_expression,create_time, last_update_time) values ('corp-2-store-id','corp','store', 'column', 'STORE_ID=:EXTERNAL_ID', current_timestamp,current_timestamp);
一个节点的以下3个属性能够被引用:
1. :NODE_ID
2. :EXTERNAL_ID
3. :NODE_GROUP_ID
在route过程中,总是能够捕获EXTERNAL_DATA的值作为一个虚拟列。
一个表的数据变化须要被路由到一个重定向节点,这个节点的external id在REGISTRATION_REDIRECT表中定义。以下的SQL语句将在ROUTER表中插入一个column router来实现这个需求。
insert into SYM_ROUTER (router_id, source_node_group_id,target_node_group_id, router_type, router_expression, create_time,last_update_time) values ('corp-2-store-redirect','corp','store', 'column', 'STORE_ID=:REDIRECT_NODE',current_timestamp, current_timestamp);
在router_expression中能够配置多个列。当配置超过一个列时,全部的匹配成功的节点都会被加到要被路由的节点列表中。
以下是一个简单的样例,一个表中,STORE_ID列的值须要等于一个节点的EXTERNAL_ID或者是ALL常量,等于ALL常量意味着全部的节点都会收到数据更新。
insert into SYM_ROUTER (router_id, source_node_group_id,target_node_group_id, router_type, router_expression,create_time, last_update_time) values ('corp-2-store-multiple-matches','corp','store', 'column', 'STORE_ID=ALL orSTORE_ID=:EXTERNAL_ID', current_timestamp, current_timestamp);
NULLkeyword能够被用来检查一个列是否为空。假设列为空,数据将被路由到全部的节点。
以下的样例中。当STORE_ID列被用来路由到节点EXTERNAL_ID等于STORE_ID的节点,假设STORE_ID是空,就路由的全部的节点。
insert into SYM_ROUTER (router_id, source_node_group_id,target_node_group_id, router_type, router_expression,create_time, last_update_time) values ('corp-2-store-multiple-matches','corp','store', 'column', 'STORE_ID=NULL orSTORE_ID=:EXTERNAL_ID', current_timestamp, current_timestamp);
3.6.3. Lookup Table Router
一个lookup表可能包括数据要被路由的节点的ID。这可能是个已经存在的表或者一个特地为了路由数据被添加的附属的表。Lookup Table Router通过在往ROUTER插入一个router时指定router_type的值为lookuptable来配置,能够再router_expression列中设置多个配置參数。
以下的每个配置參数都是必须的。
LOOKUP_TABLE
Lookup table的名字。
KEY_COLUMN
将要被router的表中的列的名字。
这将作为主键插入到lookup表中。
LOOKUP_KEY_COLUMN
Lookup表中的主键的列的名字。
EXTERNAL_ID_COLUMN
这是一个列的名字,这个列包括了lookup表中要被路由的节点的external_id。
注意,lookup表将被读到内存中缓存起来,以备数据在route到一个channel期间使用。
考虑到一个表可能被路由到某个特定的store。可是变化的表的数据仅仅包括品牌信息。在这个样例中,STORE表能够被用作lookup表。
insert into SYM_ROUTER (router_id, source_node_group_id,target_node_group_id, router_type, router_expression,create_time, last_update_time) values ('corp-2-store-ok','corp','store', 'lookuptable', 'LOOKUP_TABLE=STORE KEY_COLUMN=BRAND_IDLOOKUP_KEY_COLUMN=BRAND_ID EXTERNAL_ID_COLUMN=STORE_ID',current_timestamp, current_timestamp);
3.6.4. Subselect Router
有时候,须要基于当前没有正在同步的数据来推断是否须要路由当前的数据。Subselect Router能够在这样的情况下使用。Subselect Router的router_expression列要配置一个SQL查询语句,这个SQL语句返回须要被路由的节点的ID的列表。
Column符号能够用在SQL表达式中,这些符号能够被一行数据中的指定的列替换。使用这样的Router有非常高的开销,由于Subselect语句将会为每个要路由的行执行一次。这不应该用在有非常多行须要更新的表中。这样的Router也有一个缺点,假设用来决定要路由的节点的ID的数据已经被删除,SQL语句将不会返回结果,路由操作将不会发生。
你指定的Router_expression能够加到以下的SQL语句中,以选择节点的ID:
select c.node_id from sym_node c where c.node_group_id=:NODE_GROUP_IDand c.sync_enabled=1 and ...
正如你看到的,你能够用别名“c”来訪问当前正在处理的节点的信息。比如c.external_id。有两个节点相关的符号你能够用在你的表达式中:
1. :NODE_GROUP_ID
2. :EXTERNAL_DATA
代表你的数据的列明须要有冒号前缀,比如”:EMPLOY_ID”,或者”:OLD_EMPLOYEE_ID”。
在这里,“OLD_”前缀表明这个值是发生数据变化之前的值。
比如,考虑以下的情况,一个“Order”表和一个“OrderLineItem”表须要被路由到某个特定的store。“Order“表有一个名叫”order_id“和”STORE_ID“的表。一个store节点有一个与“Order“表的STORE_ID相等的external_id。可是,”OrderLineItem“表。仅仅有一个外键到”Order“表的”order_id“。为了路由”OrderLineItem“的数据到”Order“表要路由到的节点。我们须要引用“Order“表的记录。
在SymmetricDS中。有两种可能的方式来解决问题。
一个是配置一个“Subselect“类型的Router。例如以下(还有一种方式是使用external_select通过触发器捕获数据。然后在column router中使用这个数据匹配。具体的描写叙述在Section3.6.7 ,”Utilizing External Select when Routing“)。
我们的解决方式是利用Subselect比較当前节点的external id和Order表中的STORE_ID列的值,当order_id等于当前行的order id时就路由这个数据:
insert into SYM_ROUTER (router_id, source_node_group_id,target_node_group_id, router_type, router_expression,create_time, last_update_time) values ('corp-2-store','corp','store', 'subselect', 'c.external_id in (select STORE_ID from order whereorder_id=:ORDER_ID)', current_timestamp, current_timestamp);
最后。请注意在这个样例中,Order表中行在路由OrderLineItem表中相关联的行时必须存在,由于当路由发生的时候。select语句正在执行,select语句的执行不是在数据变化第一次被捕获的时候执行的。
3.6.5. Scripted Router
当你须要更灵活的选择要路由的节点的逻辑时。scripted router可能会用到。
眼下可用的脚本语言是Bean Shell。
BeanShell是一种类Java的脚本语言。
Bean Shell脚本语言的文档能够在http://www.beanshell.org中查看。
Bean Shell类型的Router的router_type是”bsh“。Route_expression是一个有效的BeanShell 脚本:
1. 添加一个节点的ID到目标节点的集合中
2. 返回一个包括多个节点ID的新的集合
3. 返回一个节点的ID
4. 假设目标组中全部的节点都要被路由就返回true,相反返回false。
脚本返回的是节点的列表。这些节点都是合法的org.jumpmind.symmetric.model.Node对象。数据列当前值和旧值能够再脚本中使用,使用一个Java对象代表一个列的数据。列明须要使用大写。旧的值的引用须要使用”OLD_“前缀。
假设你须要訪问SymmetricDS服务。能够使用engine变量訪问一个org.jumpmind.symmetric.ISymmetricEngine接口的实例。
在以下的样例中,node_id是STORE_ID和WORKSTATION_NUMBER的组合。这两个都是要被路由的表中的列:
insert into SYM_ROUTER (router_id, source_node_group_id,target_node_group_id, router_type, router_expression,create_time, last_update_time) values ('corp-2-store-bsh','corp','store', 'bsh', 'targetNodes.add(STORE_ID + "-" +WORKSTATION_NUMBER);', current_timestamp, current_timestamp);
相同的功能能够简单地返回Node ID来完毕。Bah脚本的最后一行总司返回这个值。
insert into SYM_ROUTER (router_id, source_node_group_id,target_node_group_id, router_type, router_expression,create_time, last_update_time) values ('corp-2-store-bsh','corp','store', 'bsh', 'STORE_ID + "-" + WORKSTATION_NUMBER',current_timestamp, current_timestamp);
以下的样例中。假设FLAG列被改变的化。数据会被同步到全部的节点,FLAG没有被改变的话。将不会同步到不论什么节点。注意,这里我们使用”OLD_“前缀,来訪问列的旧值。
insert into SYM_ROUTER (router_id, source_node_group_id,target_node_group_id, router_type, router_expression,create_time, last_update_time) values ('corp-2-store-flag-changed','corp','store', 'bsh', 'FLAG != null &&!FLAG.equals(OLD_FLAG)', current_timestamp, current_timestamp);
以下的样例中,这个脚本遍历每个合法的节点,检查是否名为STATION的列去掉两端的空格后是否等于external_id的值。
insert into SYM_ROUTER (router_id, source_node_group_id,target_node_group_id, router_type, router_expression,create_time, last_update_time) values ('corp-2-store-trimmed-station','corp','store', 'bsh', 'for (org.jumpmind.symmetric.model.Nodenode : nodes) { if (STATION != null &&node.getExternalId().equals(STATION.trim())) { targetNodes.add(node.getNodeId());} }', current_timestamp, current_timestamp);
3.6.6. Audit Table Router
Audit Router通过记录一个router创建和更新的审计表中数据变化捕获数据变化(仅仅要auto.config.database被设置为true)。
Router创建一个表,表的名字是要捕获的数据的表名加上”_AUDIT“后缀。这个表将拥有与原始表同样的结构,还有3个附加的列。
表中添加了三个额外的审计列:
1. AUDIT_ID:表的主键
2. AUDIT_TIME:数据变化发生的时间
3. AUDIT_EVENT:发生在这行数据的DML类型
以下是创建一个audit router的样例:
insert into SYM_ROUTER (router_id, source_node_group_id,target_node_group_id, router_type, create_time, last_update_time)values ('audit_at_corp','corp', 'local', 'audit', current_timestamp,current_timestamp);
Audit Router为一个组连接捕获数据。
为了使上面的AuditRoute可以工作,它必须使用”R“action类型,联系到一个node_group_link上。”R“代表”Only Route To“。在上面的样例中,我们关联到一个local 组。
这里,local组是为audit router新创建的。没有节点数据local节点组。假设corp节点上。一个联系到audit Router触发,一个新的audit表将会在corp节点创建,新变化的数据也会被插入到audit表中。