Data Transmission Service

Prerequisites for DDL Operations for Synchronizing Incremental Data from a PostgreSQL Database

2023-10-25 08:45:17

This section describes how to create triggers and functions for incremental DDL synchronization if the source database is a PostgreSQL database.

To perform incremental DDL synchronization, you can create triggers and functions in the source database to obtain the information about DDL operations and write the information into a specific table. During incremental data synchronization, DTS performs DML operations on the data in the table to implement DDL synchronization.

Precautions

● The following DDL operations are supported:

CREATE TABLE, ALTER TABLE, DROP TABLE, CREATE SEQUENCE, ALTER SEQUENCE, DROP SEQUENCE, CREATE VIEW, ALTER VIEW, DROP VIEW, CREATE INDEX, ALTER INDEX, and DROP INDEX.

● If you rename tables in the source database and then perform DML operations to update data in the tables in the source database, DTS does not synchronize the updated data to the destination database.

● DTS does not synchronize DDL statements starting with comments.

●Before you perform incremental DDL synchronization, check whether a table named dts_ddl_info, a function named dts_capture_ddl(), and a trigger named dts_ddl_event exist in the public schema of the source database. If they exist, delete them.

● If a table without a primary key exists, to perform full data synchronization, run the following command to set the replication attribute of the table to full:

content_copy
alter table tablename replica identity full;

Procedure

  1. Connect to the source database. You must make sure that you have the permissions to create event triggers.

  2. Run the following statements to create a table for storing DDL information.

    content_copy
    DROP TABLE IF EXISTS public.dts_ddl_info;
    DROP SEQUENCE IF EXISTS public.dts_ddl_info_id_seq;
    CREATE TABLE public.dts_ddl_info(
      id                             bigserial primary key,
      ddl                            text
      database                       varchar(64default current_database(), 
      schema                         varchar(64default current_schema,
      username                       varchar(64default current_user
      client_host                    varchar(64default inet_client_addr(),
      client_port                    integer default inet_client_port(),
      event_time                     timestamp default current_timestamp
      txid                           varchar(16default txid_current()::varchar(16),
      tag                            varchar(64),
    );
  3. Run the following statements to create a function for obtaining the DDL information.

    content_copy
    CREATE OR REPLACE FUNCTION public.dts_capture_ddl()
        RETURNS event_trigger
        LANGUAGE plpgsql
        SECURITY INVOKER
    AS $BODY$
        declare ddl text;
        declare real_rows int;
        declare max_rows int := 10000;
    begin
      if (tg_tag in ('CREATE TABLE','ALTER TABLE','DROP TABLE','CREATE SEQUENCE','ALTER SEQUENCE','DROP SEQUENCE','CREATE VIEW','ALTER VIEW','DROP VIEW','CREATE INDEX','ALTER INDEX','DROP INDEX')) then
          select current_query() into ddl; 
          insert into public.dts_ddl_info(ddl, username, txid, tag, databaseschema, client_host, client_port, event_time)
          values (ddl, current_usercast(txid_current() as varchar(16)), tg_tag, current_database(), current_schema,  inet_client_addr(), inet_client_port(), current_timestamp);
          select count(id) into real_rows from public.dts_ddl_info;
          if real_rows > max_rows then
            delete from public.dts_ddl_info where id in (select min(id) from public.dts_ddl_info);
          end if;
      end if;
    end;
    $BODY$;
  4. Run the following statements to change the owner of the function to the account of the source database, such as postgres.

    content_copy
    CREATE EVENT TRIGGER dts_ddl_event ON ddl_command_end EXECUTE PROCEDURE public.dts_capture_ddl();
  5. Run the following statements to create an event trigger.

    content_copy
    CREATE EVENT TRIGGER dts_ddl_event ON ddl_command_end EXECUTE PROCEDURE public.dts_capture_ddl();
  6. Run the following statements to enable the created event trigger.

    content_copy
    ALTER EVENT TRIGGER dts_ddl_event ENABLE ALWAYS;
  7. Go to the DTS console to create an incremental data synchronization task to synchronize data from the PostgreSQL source database.

  8. After the synchronization task is complete, run the following statements to delete the created table, function, and trigger from the source database.

    content_copy
    drop EVENT trigger dts_ddl_event;
    drop function public.dts_capture_ddl();
    drop table public.dts_ddl_info;


JWMlXzOWooWY