Disclaimer

Данный блог является моей личной точкой зрения и не обязательно отражает точку зрения Oracle.

The views expressed on this blog are my own and do not necessarily reflect the views of Oracle

Поиск

Подписка

Oracle Streams для репликации: расширенная настройка (часть 8)

14.04.2009 от aryndin99

UPDATE1: добавлен набор правил strmadmin.transform_apply_rules

UPDATE2: нельзя менять employee_id при трансформации, т.к. это первичный ключ. Заменил на department_id

Продолжение. Начало смотрите в предыдущих поста: часть 1, часть 2, часть 3, часть 4, часть 5, часть 6, часть 7.

В предыдущих частям мы рассматривали репликацию, что называется, один в один, т.е. есть табличка – мы ее без изменений переносим на удаленный сервер. Очевидно, что область применения репликации гораздо шире. Например, при репликации бывает необходимо передавать только часть изменений (для определенного региона), а бывает необходимо произвести трансформацию данных: изменить имя пользователя, кому принадлежит таблица, изменить название столбца и т.д.

Прежде чем переходить к последующему изучению я настоятельно рекомендую освежить в памяти концепции Oracle Streams.

Трансформация.

Для того, чтобы осуществлять трансформацию необходимо понимание того, как Oracle Streams передает изменений на удаленный сервер. Сбор изменений в Oracle Streams осуществляется с помощью LCR-сообщений. По сути LCR-сообщение – это обычно Advanced Queue(AQ) сообщение и обработка его при передаче с одной базы данных на другую осуществляется как с обычным сообщение AQ.

Отличие заключается в том, что сообщения LCR генерируются автоматически (процессами сбора изменений) и применяются на удаленной базе также автоматически (процессами применения). При этом процессы применения изменений ничего не "знают" о структуре таблиц к которым они применяют изменения. О структуре таблиц "знает" само LCR сообщение и это сообщение "умеет" применять себя само с помощью метода EXECUTE. У LCR сообщения также есть методы позволяющие трансформировать изменения в тот момент, когда эти изменения инкапсулированы в LCR объект. В Oracle® Database PL/SQL Packages and Types Reference можно найти детальное описание методов, позволяющих изменить состояние LCR записи.

Преобразования бывают двух типов и применяются в следующем порядке:

  1. Декларативные (ADD_COLUMN, DELETE_COLUMN, RENAME_COLUMN, RENAME_SCHEMA, RENAME_TABLE)
  2. Пользовательские (более гибкий, но более медленный способ – позволяет написать PL/SQL обработчик)
Декларативная трансформация

Для тестового примера нас будут интересовать следующие методы:

  • SET_BASE_TABLE_NAME
  • SET_BASE_TABLE_OWNER
  • SET_VALUE

Зададимся задачей создание следующей репликации:

  • реплицируем таблицу repl_srctrans_user.employees
  • целевой базой при репликации является та же самая база (т.е. база-источник и целевая база – src).
  • изменений необходимо применять к таблице repl_desttrans_user.dup_employees
  • к значениям в поле department_id должны прибавляться значения 1000
clip_image001[1]

0. Как обычно для настройки репликации создадим нового пользователя.

CREATE USER repl_srctrans_user IDENTIFIED BY oracle DEFAULT TABLESPACE users QUOTA UNLIMITED ON users;
CREATE TABLE repl_srctrans_user.employees TABLESPACE users
       AS SELECT * FROM hr.employees WHERE department_id in (10,20,30,40);
ALTER TABLE repl_srctrans_user.employees ADD CONSTRAINT employee_id_pk PRIMARY KEY (employee_id);
-- Создадим также целевого пользователя:
CREATE USER repl_desttrans_user IDENTIFIED BY oracle DEFAULT TABLESPACE users QUOTA UNLIMITED ON users;

1. Создаем очередь, в которую будем помещать сообщения LCR

BEGIN
DBMS_STREAMS_ADM.SET_UP_QUEUE( queue_table => 'c_trans_queue_t',
queue_name => 'c_trans_queue_q', queue_user => 'strmadmin');
END;
/

2. Настройка правил, выделяющих необходимые изменения из всего множества изменений в базе

Первое, что необходимо сделать – это определить список таблиц и характеров изменений (DML, DDL), который будем передавать на удаленный сервер. Для этого мы будем создавать набор правил (RULE SET). В этот набор правил мы добавим правила (RULES).

Создаем набор правил:

BEGIN
DBMS_RULE_ADM.CREATE_RULE_SET(
rule_set_name => 'strmadmin.transform_capture_rules',
evaluation_context => 'SYS.STREAMS$_EVALUATION_CONTEXT');
END;
/

Создаем правила:

BEGIN
DBMS_RULE_ADM.CREATE_RULE( rule_name => 'strmadmin.tranform_employees_dml',
condition => ':dml.get_object_owner() = ''REPL_SRCTRANS_USER'' AND :dml.get_object_name()=''EMPLOYEES''');
END;
/

Основное в этих правилах – условия. Функции, которые мы вызываем для проверки условий (get_object_owner, get_object_name) по сути являются методами объектного типа данных LCR. Полный список методов можно посмотреть по ссылке.

Добавляем правила в набор правил:

BEGIN
DBMS_RULE_ADM.ADD_RULE
( rule_name => 'strmadmin.tranform_employees_dml',
rule_set_name => 'strmadmin.transform_capture_rules');
END;
/

Полность аналогично создаем набор правил strmadmin.transform_apply_rules. Можно было один и тот же набор правил повесить и на операцию применения, и на операцию сбора изменений, но тогда бы у нас возникли проблемы при настройке трансформации: трансформация прикручивается к правилу, т.е. получилось бы, что трансформация применяется 2 раза.

BEGIN
DBMS_RULE_ADM.CREATE_RULE_SET(
rule_set_name => 'strmadmin.transform_apply_rules',
evaluation_context => 'SYS.STREAMS$_EVALUATION_CONTEXT');
END;
/

BEGIN
DBMS_RULE_ADM.CREATE_RULE( rule_name => 'strmadmin.tranform_apply_employees_dml',
condition => ':dml.get_object_owner() = ''REPL_SRCTRANS_USER'' AND :dml.get_object_name()=''EMPLOYEES''');
END;

BEGIN
DBMS_RULE_ADM.ADD_RULE
( rule_name => 'strmadmin.tranform_apply_employees_dml',
rule_set_name => 'strmadmin.transform_apply_rules');
END;
/

3. Создаем процесс применения

begin
DBMS_APPLY_ADM.CREATE_APPLY(QUEUE_NAME=>'c_trans_queue_q',APPLY_NAME=>'APPLY_TRANSFORM',
RULE_SET_NAME=>'strmadmin.transform_apply_rules',APPLY_CAPTURED=>TRUE,SOURCE_DATABASE=>'src.local');
end;

4. Создаем процесс сбора изменений

begin
DBMS_CAPTURE_ADM.CREATE_CAPTURE(
QUEUE_NAME=>'c_trans_queue_q',
CAPTURE_NAME=>'CAPTURE_TRANSFORM',
RULE_SET_NAME=>'strmadmin.transform_capture_rules');
end;
/

Если мы сейчас запустим процесс сбора и применения, то произойдет ошибка, т.к. Oracle Streams попытается изменения применить к той таблице, на которой они произошли. Поэтому нужно будет ввести правила трансформации.

5. Задаем трансформацию для соответствующего правила. Обратите внимание – у меня трансформация задается для правила tranform_apply_employees_dml, т.е. трансформация будет происходить в момент применения изменений. Если бы мы хотели добавить преобразование в момент сбора изменений или распространения, т.е. нужно было бы вешать трансформацию на соответствующее правило.

BEGIN
DBMS_STREAMS_ADM.RENAME_TABLE(
rule_name       => 'strmadmin.tranform_apply_employees_dml',
from_table_name => 'REPL_SRCTRANS_USER.EMPLOYEES',
to_table_name   => 'REPL_DESTTRANS_USER.DUP_EMPLOYEES',
step_number     => 0,
operation       => 'ADD');
END;
/

6. Производим первоначальную синхронизация

6.1. Фиксируем SCN с которого мы будем применяться изменения

На исходной базе выполняем команду:

STRMADMIN@db11r2:1521/src.local SQL>SELECT DBMS_FLASHBACK.GET_SYSTEM_CHANGE_NUMBER() FROM DUAL;

DBMS_FLASHBACK.GET_SYSTEM_CHANGE_NUMBER()
—————————————–
1674178

6.2. Создаем таблицу с изменениями на зафиксированный момент

CREATE TABLE repl_desttrans_user.dup_employees
TABLESPACE users AS SELECT * FROM repl_srctrans_user.employees AS OF SCN 1674178;

6.3. Задаем SCN для нашей таблицы с помощью процедур DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN

DECLARE
iscn  NUMBER;
BEGIN
iscn := 1674178;
DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN(
source_object_name    => 'repl_srctrans_user.employees ',
source_database_name  => 'src.local',
instantiation_scn     => iscn);
END;
/

6.4. Запуск процедуры подготовки к INSTANTIATION

begin
DBMS_CAPTURE_ADM.PREPARE_TABLE_INSTANTIATION('repl_srctrans_user.employees');
end;
/

7. Запускаем процессы сбора и применения

BEGIN
DBMS_APPLY_ADM.START_APPLY( apply_name => 'APPLY_TRANSFORM');
DBMS_CAPTURE_ADM.START_CAPTURE( capture_name  => 'CAPTURE_TRANSFORM');
END;
/

8. Протестируем добавление новых данных

INSERT INTO repl_srctrans_user.employees SELECT * FROM hr.employees WHERE department_id in (50);
COMMIT;

Добавление должно отрабатывать.

Пользовательская трансформация

Перейдем теперь к немного более сложной ситуации: у нас есть преобразование, но это преобразование не вписывается в рамки декларативных. Например, как в нашем случае, нужно менять значения в полях LCR сообщений. Для создания пользовательской трансформации необходимо определить функцию с одной из следующих сигнатур (второй вариант подходит, если мы хотим преобразовать одно сообщение в несколько):

FUNCTION user_function (
parameter_name   IN  ANYDATA)
RETURN ANYDATA;

FUNCTION user_function (
parameter_name   IN  ANYDATA)
RETURN STREAMS$_ANYDATA_ARRAY;

9. В нашем случае, когда необходимо преобразование department_id=department_id+1000 функция будет выглядеть вот так:

CREATE OR REPLACE FUNCTION strmadmin.add_100000(in_any IN ANYDATA)
RETURN ANYDATA
IS
lcr SYS.LCR$_ROW_RECORD;
rc  NUMBER;
deptid_value_anydata ANYDATA;
deptid_value_number NUMBER;
BEGIN
-- Get the type of object
-- Check if the object type is SYS.LCR$_ROW_RECORD
IF in_any.GETTYPENAME='SYS.LCR$_ROW_RECORD' THEN
-- Put the row LCR into lcr
rc := in_any.GETOBJECT(lcr);
-- Get old DEPARTMENT_ID
deptid_value_anydata := lcr.GET_VALUE('new','DEPARTMENT_ID');
-- Check that DEPARTMENT_ID is not null
IF deptid_value_anydata IS NOT NULL THEN
-- Put the column value into deptid_value_number
rc := deptid_value_anydata.GETNUMBER(deptid_value_number);
-- Change a value of DEPARTMENT_ID=DEPARTMENT_ID+1000
RECORD_ERROR (to_char(deptid_value_number));
lcr.SET_VALUE('new','DEPARTMENT_ID',ANYDATA.CONVERTNUMBER(deptid_value_number+1000));
END IF;
RETURN ANYDATA.CONVERTOBJECT(lcr);
END IF;
RETURN in_any;
EXCEPTION
WHEN OTHERS THEN
RECORD_ERROR (SQLERRM);
END;
/

10. Функцию необходимо прикрутить к соответствующему правилу. В нашем случае без разницы на какое правило вешать (применять при сборе изменений или при их применении), но для простоты повесим применение на то же правило, которое мы использовали при декларативной трансформации.

BEGIN
DBMS_STREAMS_ADM.SET_RULE_TRANSFORM_FUNCTION(
rule_name           => 'strmadmin.tranform_apply_employees_dml',
transform_function  => 'strmadmin.add_100000');
END;

Порядок применения, как я уже говорил, будет следующий:

  1. Декларативные (ADD_COLUMN, DELETE_COLUMN, RENAME_COLUMN, RENAME_SCHEMA, RENAME_TABLE)
  2. Пользовательские (более гибкий, но более медленный способ – позволяет написать PL/SQL обработчик)

11. Список трансформаторов можно посмотреть в представлении DBA_RULES

COLUMN ACTION_CONTEXT_NAME HEADING 'Action Context Name' FORMAT A30
COLUMN ACTION_CONTEXT_VALUE HEADING 'Action Context Value' FORMAT A30

SELECT
RULE_NAME,AC.NVN_NAME ACTION_CONTEXT_NAME,
AC.NVN_VALUE.ACCESSVARCHAR2() ACTION_CONTEXT_VALUE
FROM DBA_RULES R, TABLE(R.RULE_ACTION_CONTEXT.ACTX_LIST) AC

11. Проверим работу. Как видно – при передаче в целевую для репликации таблицу значение увеличилось на 1000.

STRMADMIN@db11r2:1521/src.local SQL>update repl_srctrans_user.employees
set department_id=999 where
employee_id=100;

1 row updated.

STRMADMIN@db11r2:1521/src.local SQL>commit;
Commit complete.

STRMADMIN@db11r2:1521/src.local SQL>select department_id from repl_desttrans_user.dup_employees where employee_id=100;

DEPARTMENT_ID
-------------
1999

Продолжение следует…

Рубрики: Data Warehouse, Data Warehousing, Database, Streams | Комментарии (8) »

Комментарии (8)

  1. Alexey Pirogov пишет:

    Очень познавательно… Спасибо…

  2. aryndin99 пишет:

    :) не за что

  3. Alexey Pirogov пишет:

    А в пункте 3 не опечатка?
    ’strmadmin.transform_apply_rules’ -> ’strmadmin.transform_capture_rules’

  4. aryndin99 пишет:

    Спасибо. Там была ошибка – я исправил. Правило должно остаться strmadmin.transform_apply_rules, но его нужно создать.

  5. Alexey Pirogov пишет:

    Кстати, а я правильно понял, что для нормальной репликации необходимы первичные ключи?

  6. aryndin99 пишет:

    Необязательно, но крайне желательно. Oracle Streams нужно знать к какой строке относятся изменения.
    Если первичного ключа нет, то можно использовать механизм Substitute Key Columns. С помощью него можно задать набор столбцов с уникальными значениями. Например вот так:
    BEGIN
    DBMS_APPLY_ADM.SET_KEY_COLUMNS(
    object_name => ‘hr.employees’,
    column_list => ‘first_name,last_name,hire_date’);
    END;
    /

  7. Alexey Pirogov пишет:

    Допустим есть две схемы у которых одинаковая структура (набор таблиц и т. д. и т. п.), но разные данные в этих таблицах. Можно ли настроить репликацию так, чтобы устранить различия в данных?

  8. aryndin99 пишет:

    С помощью Streams этого не сделать, Streams не обращается к исходной таблице, он не видит данных из нее, а только поток изменений в Redo Log (в асинхронном режиме).
    А для Ваших целей вполне подойдет команда MERGE (http://download.oracle.com/docs/cd/B28359_01/server.111/b28286/statements_9016.htm).

Оставить комментарий

Заметьте: Включена проверка комментариев. Нет смысла повторно отправлять комментарий.