最近开发有个小需求,就是如何比较两个数据库里指定表的数据是否一致。
自己就写了个简单函数,供大家参考。
CREATE OR REPLACE FUNCTION public.f_compare_tabledata(
pi_localtablename character varying,
pi_localtable_excludecolumn character varying,
pi_remotetablename character varying,
pi_remotetable_excludecolumn character varying,
OUT po_result character varying
)
RETURNS character varying
LANGUAGE plpgsql
AS $function$
DECLARE
lv_colstr_orderby_local varchar;
lv_colstr_orderby_remote varchar;
lv_colstr_select_local varchar;
lv_colstr_select_remote varchar;
lv_sql_insert text;
lv_sql_update text;
begin
/*
create table tmp_compare_tabledata_repeat(
local_tablerepeat varchar(100),
remote_tablerepeat varchar(100)
);
create table tmp_compare_tabledata_result(
id int8,
local_tablename varchar(100),
local_tabledata text,
remote_tablename varchar(100),
remote_tabledata text,
eq_flag varchar(10)
);
create table tmp_compare_tabledata_result2(
local_id text,
local_tablename varchar(100),
local_tabledata text,
remote_id text,
remote_tablename varchar(100),
remote_tabledata text,
eq_flag varchar(10)
);
*/
truncate table tmp_compare_tabledata_repeat;
truncate table tmp_compare_tabledata_result;
truncate table tmp_compare_tabledata_result2;
drop sequence if exists public.seq_compare_localdata;
CREATE SEQUENCE public.seq_compare_localdata
INCREMENT BY 1
MINVALUE 1
MAXVALUE 9223372036854775807
START 1;
drop sequence if exists public.seq_compare_remotedata;
CREATE SEQUENCE public.seq_compare_remotedata
INCREMENT BY 1
MINVALUE 1
MAXVALUE 9223372036854775807
START 1;
--获取local tablename 的列拼接
select string_agg(case when t0.typname in ('bpchar','varchar') then 'coalesce('||t0.attname||','''')'
when t0.typname in ('int4','int8','numeric') then 'coalesce('||t0.attname||'::varchar,'''')'
when t0.typname in ('timestamp') then 'coalesce('||t0.attname||',''1990-01-01 00:00:01''::timestamp)'
else t0.attname
end ,
'||''##$$##''||') as colstr_select_local,
string_agg(t0.attname,',') as colstr_orderby_local
into lv_colstr_select_local,
lv_colstr_orderby_local
from ( select pc.attname,pc.attnum,pt.typname
from pg_attribute pc
left outer join pg_type pt
on pc.atttypid = pt.oid
where 1=1
and pc.attrelid=pi_localtablename::regclass --'t_acktradeblotter_t'
and pc.attnum >= 1
and pc.attname not in ( select trim( both ' ' from regexp_split_to_table(coalesce(pi_localtable_excludecolumn,''),',')) )
order by pc.attnum
) t0
;
--获取remote tablename 的列拼接
with tmp_t0 as (
select *
from dblink('dbname=peiybdb host=127.0.0.1 port=5432 user=peiyb password=peiybpeiyb',
' select string_agg(case when t0.typname in (''bpchar'',''varchar'') then ''coalesce(''||t0.attname||'','''''''''''''''')''
when t0.typname in (''int4'',''int8'',''numeric'') then ''coalesce(''||t0.attname||''::varchar,'''''''''''''''')''
when t0.typname in (''timestamp'') then ''coalesce(''||t0.attname||'',''''''''1990-01-01 00:00:01''''''''::timestamp)''
else t0.attname
end ,
''||''''''''##$$##''''''''||'') as colstr_select_remote,
string_agg(t0.attname,'','') as colstr_orderby_remote
from ( select pc.attname,pc.attnum,pt.typname
from pg_attribute pc
left outer join pg_type pt
on pc.atttypid = pt.oid
where 1=1
and pc.attrelid='''||pi_remotetablename||'''::regclass
and pc.attnum >= 1
and pc.attname not in ( select trim( both '' '' from regexp_split_to_table(coalesce('''||coalesce(pi_remotetable_excludecolumn,'')||''',''''),'','')) )
order by pc.attnum
) t0
') as t0 (colstr_select_remote varchar,
colstr_orderby_remote varchar
)
)
select colstr_select_remote,
colstr_orderby_remote
into lv_colstr_select_remote,
lv_colstr_orderby_remote
from tmp_t0
;
lv_sql_insert:='
insert into tmp_compare_tabledata_result
(
id ,
local_tablename ,
local_tabledata ,
remote_tablename ,
remote_tabledata
)
with tmp_local as (
select nextval(''public.seq_compare_localdata''::regclass) as id,
'''||pi_localtablename||'''::varchar as local_tablename,
t0.tabdata::text as local_tabledata
from (
select '||lv_colstr_select_local||' as tabdata
from '||pi_localtablename||'
order by '||lv_colstr_orderby_local||'
) t0
),tmp_remote as (
select nextval(''public.seq_compare_remotedata''::regclass) as id,
'''||pi_remotetablename||'''::varchar as remote_tablename,
t1.tabdata::text as remote_tabledata
from (
select *
from dblink(''dbname=peiybdb host=127.0.0.1 port=5432 user=peiyb password=peiybpeiyb'',
'' select '||lv_colstr_select_remote||' as tabdata
from '||pi_remotetablename||'
order by '||lv_colstr_orderby_remote||'
'') as t1 (tabdata text)
) t1
)
select p0.id,
max(p0.local_tablename) as local_tablename,
max(p0.local_tabledata) as local_tabledata,
max(p0.remote_tablename) as remote_tablename,
max(p0.remote_tabledata) as remote_tabledata
from (
select tc.id,
tc.local_tablename,
tc.local_tabledata,
''''::varchar as remote_tablename,
''''::text as remote_tabledata
from tmp_local tc
union all
select tr.id,
''''::varchar as local_tablename,
''''::text as local_tabledata,
tr.remote_tablename,
tr.remote_tabledata
from tmp_remote tr
) p0
group by p0.id
order by p0.id
';
raise notice 'f_compare_tabledata,%',lv_sql_insert;
execute lv_sql_insert;
insert into tmp_compare_tabledata_repeat(
local_tablerepeat ,
remote_tablerepeat
)
with tmp_t0 as (
select t0.local_tabledata,
count(1) as cnt
from tmp_compare_tabledata_result t0
where 1=1
and ( t0.local_tabledata is not null
and t0.local_tabledata <> ''
)
group by t0.local_tabledata
having count(1) > 1
),tmp_t1 as (
select t1.remote_tabledata,
count(1) as cnt
from tmp_compare_tabledata_result t1
where 1=1
and ( t1.remote_tabledata is not null
and t1.remote_tabledata <> ''
)
group by t1.remote_tabledata
having count(1) > 1
)
select coalesce(pi_localtablename,'')||' :重复值有 '||coalesce((select count(cnt)::varchar from tmp_t0),'0'),
coalesce(pi_remotetablename,'')||' :重复值有 '||coalesce((select count(cnt)::varchar from tmp_t1),'0')
;
insert into tmp_compare_tabledata_result2(
local_tabledata,
remote_tabledata,
local_id,
local_tablename,
remote_id,
remote_tablename
)
select t0.local_tabledata,
t1.remote_tabledata,
string_agg(t0.id::varchar,',') as local_id,
max(t0.local_tablename) as local_tablename,
string_agg(t1.id::varchar,',') as remote_id,
max(t1.remote_tablename) as remote_tablename
from ( select * from tmp_compare_tabledata_result where local_tablename is not null and local_tablename <> '') t0
full outer join (select * from tmp_compare_tabledata_result where remote_tabledata is not null and remote_tablename <> '') t1
on t0.local_tabledata = t1.remote_tabledata
where 1=1
group by t0.local_tabledata,
t1.remote_tabledata
;
update tmp_compare_tabledata_result2
set eq_flag = case when local_tabledata = remote_tabledata then 'eq'
else 'neq'
end
;
--po_result=lv_sql_insert;
--return ;
po_result=lv_sql_insert;
po_result='sucess:
select * from tmp_compare_tabledata_repeat; --查看重复值情况
select local_tabledata,count(1) from tmp_compare_tabledata_result where local_tablename is not null and local_tablename <> '' group by local_tabledata having count(1) >1 --查看本地表重复值
select remote_tabledata,count(1) from tmp_compare_tabledata_result where remote_tablename is not null and remote_tablename <> '' group by remote_tabledata having count(1) >1 --查看远程表重复值
select * from tmp_compare_tabledata_result2 where eq_flag = ''neq'' order by local_tabledata,remote_tabledata -- 查看不相等的数据
';
return ;
END;
$function$