源库为PostgreSQL的增量同步任务,如需在增量阶段同步DDL数据,可通过本小结介绍的方法,在源库创建触发器和函数获取源库的DDL信息并写到特定表中,然后在DTS增量迁移阶段,通过对特定表的DML数据处理后实现DDL操作的同步。
注意事项
仅支持如下DDL:
CREATE TABLE、ALTER TABLE、DROP TABLE、CREATE SEQUENCE、ALTER SEQUENCE、DROP SEQUENCE、CREATE VIEW、ALTER VIEW、DROP VIEW、CREATE INDEX、ALTER INDEX和DROP INDEX。
源库执行RENAME表名之后,后续对更改名称后的表的所有DML操作,DTS都将不会同步新的数据到目标库。
暂不支持以注释开头的DDL语句的同步。
在执行以下操作步骤之前,请确认您要同步的源数据库的public模式中是否存在名为dts_ddl_info的表、名为dts_capture_ddl()的函数以及名为dts_ddl_event的触发器。如果存在,请先将其删除。
在进行整库同步时,如果创建了无主键表,请执行以下命令,将该无主键表的复制属性设置为full:
alter table tablename replica identity full;
操作步骤
连接待同步的源数据库,确保该用户具有创建事件触发器权限。
执行下述语句,创建存储DDL信息的表。
DROP TABLE IF EXISTS public.dts_ddl_info; DROP SEQUENCE IF EXISTS public.dts_ddl_info_id_seq; CREATE TABLE public.dts_ddl_info( id bigserial primary key, ddl text, database varchar(64) default current_database(), schema varchar(64) default current_schema, username varchar(64) default current_user, client_host varchar(64) default inet_client_addr(), client_port integer default inet_client_port(), event_time timestamp default current_timestamp txid varchar(16) default txid_current()::varchar(16), tag varchar(64), );
执行如下语句,创建捕获DDL信息的函数。
CREATE OR REPLACE FUNCTION public.dts_capture_ddl() RETURNS event_trigger LANGUAGE plpgsql SECURITY INVOKER AS $BODY$ declare ddl text; declare real_rows int; declare max_rows int := 10000; begin if (tg_tag in ('CREATE TABLE','ALTER TABLE','DROP TABLE','CREATE SEQUENCE','ALTER SEQUENCE','DROP SEQUENCE','CREATE VIEW','ALTER VIEW','DROP VIEW','CREATE INDEX','ALTER INDEX','DROP INDEX')) then select current_query() into ddl; insert into public.dts_ddl_info(ddl, username, txid, tag, database, schema, client_host, client_port, event_time) values (ddl, current_user, cast(txid_current() as varchar(16)), tg_tag, current_database(), current_schema, inet_client_addr(), inet_client_port(), current_timestamp); select count(id) into real_rows from public.dts_ddl_info; if real_rows > max_rows then delete from public.dts_ddl_info where id in (select min(id) from public.dts_ddl_info); end if; end if; end; $BODY$;执行下述命令,将刚创建的函数的所有者修改为DTS连接源库的账号,以postgres为例。
CREATE EVENT TRIGGER dts_ddl_event ON ddl_command_end EXECUTE PROCEDURE public.dts_capture_ddl();
执行下述命令,创建一个事件触发器。
CREATE EVENT TRIGGER dts_ddl_event ON ddl_command_end EXECUTE PROCEDURE public.dts_capture_ddl();
执行以下语句,使能刚创建的事件触发器。
ALTER EVENT TRIGGER dts_ddl_event ENABLE ALWAYS;
上述的前置操作完成后,返回数据传输服务DTS的控制台,创建源库为PostgreSQL的增量同步任务。
待同步任务结束后,为避免前置操作创建的对象对源库的影响,请执行下面语句,删除创建的表、函数、触发器。
drop EVENT trigger dts_ddl_event; drop function public.dts_capture_ddl(); drop table public.dts_ddl_info;