数据传输服务DTS

源库为PostgreSQL在增量阶段同步DDL数据的前置工作

2023-10-19 03:51:07

源库为PostgreSQL的增量同步任务,如需在增量阶段同步DDL数据,可通过本小结介绍的方法,在源库创建触发器和函数获取源库的DDL信息并写到特定表中,然后在DTS增量迁移阶段,通过对特定表的DML数据处理后实现DDL操作的同步。

注意事项

  • 仅支持如下DDL:

    CREATE TABLE、ALTER TABLE、DROP TABLE、CREATE SEQUENCE、ALTER SEQUENCE、DROP SEQUENCE、CREATE VIEW、ALTER VIEW、DROP VIEW、CREATE INDEX、ALTER INDEX和DROP INDEX。

  • 源库执行RENAME表名之后,后续对更改名称后的表的所有DML操作,DTS都将不会同步新的数据到目标库。

  • 暂不支持以注释开头的DDL语句的同步。

  • 在执行以下操作步骤之前,请确认您要同步的源数据库的public模式中是否存在名为dts_ddl_info的表、名为dts_capture_ddl()的函数以及名为dts_ddl_event的触发器。如果存在,请先将其删除。

  • 在进行整库同步时,如果创建了无主键表,请执行以下命令,将该无主键表的复制属性设置为full:

    alter table tablename replica identity full;

操作步骤

  1. 连接待同步的源数据库,确保该用户具有创建事件触发器权限。

  2. 执行下述语句,创建存储DDL信息的表。

    DROP TABLE IF EXISTS public.dts_ddl_info;
    DROP SEQUENCE IF EXISTS public.dts_ddl_info_id_seq;
    CREATE TABLE public.dts_ddl_info(
      id                             bigserial primary key,
      ddl                            text, 
      database                       varchar(64) default current_database(), 
      schema                         varchar(64) default current_schema,
      username                       varchar(64) default current_user, 
      client_host                    varchar(64) default inet_client_addr(),
      client_port                    integer default inet_client_port(),
      event_time                     timestamp default current_timestamp
      txid                           varchar(16) default txid_current()::varchar(16),
      tag                            varchar(64),
    );
  3. 执行如下语句,创建捕获DDL信息的函数。

    CREATE OR REPLACE FUNCTION public.dts_capture_ddl()
        RETURNS event_trigger
        LANGUAGE plpgsql
        SECURITY INVOKER
    AS $BODY$
        declare ddl text;
        declare real_rows int;
        declare max_rows int := 10000;
    begin
      if (tg_tag in ('CREATE TABLE','ALTER TABLE','DROP TABLE','CREATE SEQUENCE','ALTER SEQUENCE','DROP SEQUENCE','CREATE VIEW','ALTER VIEW','DROP VIEW','CREATE INDEX','ALTER INDEX','DROP INDEX')) then
          select current_query() into ddl; 
          insert into public.dts_ddl_info(ddl, username, txid, tag, database, schema, client_host, client_port, event_time)
          values (ddl, current_user, cast(txid_current() as varchar(16)), tg_tag, current_database(), current_schema,  inet_client_addr(), inet_client_port(), current_timestamp);
          select count(id) into real_rows from public.dts_ddl_info;
          if real_rows > max_rows then
          	delete from public.dts_ddl_info where id in (select min(id) from public.dts_ddl_info);
          end if;
      end if;
    end;
    $BODY$;
  4. 执行下述命令,将刚创建的函数的所有者修改为DTS连接源库的账号,以postgres为例。

    CREATE EVENT TRIGGER dts_ddl_event ON ddl_command_end EXECUTE PROCEDURE public.dts_capture_ddl();
  5. 执行下述命令,创建一个事件触发器。

    CREATE EVENT TRIGGER dts_ddl_event ON ddl_command_end EXECUTE PROCEDURE public.dts_capture_ddl();
  6. 执行以下语句,使能刚创建的事件触发器。

    ALTER EVENT TRIGGER dts_ddl_event ENABLE ALWAYS;
  7. 上述的前置操作完成后,返回数据传输服务DTS的控制台,创建源库为PostgreSQL的增量同步任务。

  8. 待同步任务结束后,为避免前置操作创建的对象对源库的影响,请执行下面语句,删除创建的表、函数、触发器。

    drop EVENT trigger dts_ddl_event;
    drop function public.dts_capture_ddl();
    drop table public.dts_ddl_info;


E5ByCL2.d6bs