Oracle Streams для репликации: расширенная настройка (часть 8)
14.04.2009 от aryndin99UPDATE1: добавлен набор правил strmadmin.transform_apply_rules
UPDATE2: нельзя менять employee_id при трансформации, т.к. это первичный ключ. Заменил на department_id
Продолжение. Начало смотрите в предыдущих поста: часть 1, часть 2, часть 3, часть 4, часть 5, часть 6, часть 7.
В предыдущих частям мы рассматривали репликацию, что называется, один в один, т.е. есть табличка – мы ее без изменений переносим на удаленный сервер. Очевидно, что область применения репликации гораздо шире. Например, при репликации бывает необходимо передавать только часть изменений (для определенного региона), а бывает необходимо произвести трансформацию данных: изменить имя пользователя, кому принадлежит таблица, изменить название столбца и т.д.
Прежде чем переходить к последующему изучению я настоятельно рекомендую освежить в памяти концепции Oracle Streams.
Трансформация.
Для того, чтобы осуществлять трансформацию необходимо понимание того, как Oracle Streams передает изменений на удаленный сервер. Сбор изменений в Oracle Streams осуществляется с помощью LCR-сообщений. По сути LCR-сообщение – это обычно Advanced Queue(AQ) сообщение и обработка его при передаче с одной базы данных на другую осуществляется как с обычным сообщение AQ.
Отличие заключается в том, что сообщения LCR генерируются автоматически (процессами сбора изменений) и применяются на удаленной базе также автоматически (процессами применения). При этом процессы применения изменений ничего не "знают" о структуре таблиц к которым они применяют изменения. О структуре таблиц "знает" само LCR сообщение и это сообщение "умеет" применять себя само с помощью метода EXECUTE. У LCR сообщения также есть методы позволяющие трансформировать изменения в тот момент, когда эти изменения инкапсулированы в LCR объект. В Oracle® Database PL/SQL Packages and Types Reference можно найти детальное описание методов, позволяющих изменить состояние LCR записи.
Преобразования бывают двух типов и применяются в следующем порядке:
- Декларативные (ADD_COLUMN, DELETE_COLUMN, RENAME_COLUMN, RENAME_SCHEMA, RENAME_TABLE)
- Пользовательские (более гибкий, но более медленный способ – позволяет написать PL/SQL обработчик)
Декларативная трансформация
Для тестового примера нас будут интересовать следующие методы:
- SET_BASE_TABLE_NAME
- SET_BASE_TABLE_OWNER
- SET_VALUE
Зададимся задачей создание следующей репликации:
- реплицируем таблицу repl_srctrans_user.employees
- целевой базой при репликации является та же самая база (т.е. база-источник и целевая база – src).
- изменений необходимо применять к таблице repl_desttrans_user.dup_employees
- к значениям в поле department_id должны прибавляться значения 1000
0. Как обычно для настройки репликации создадим нового пользователя.
CREATE USER repl_srctrans_user IDENTIFIED BY oracle DEFAULT TABLESPACE users QUOTA UNLIMITED ON users;
CREATE TABLE repl_srctrans_user.employees TABLESPACE users
AS SELECT * FROM hr.employees WHERE department_id in (10,20,30,40);
ALTER TABLE repl_srctrans_user.employees ADD CONSTRAINT employee_id_pk PRIMARY KEY (employee_id);
-- Создадим также целевого пользователя:
CREATE USER repl_desttrans_user IDENTIFIED BY oracle DEFAULT TABLESPACE users QUOTA UNLIMITED ON users;
1. Создаем очередь, в которую будем помещать сообщения LCR
BEGIN DBMS_STREAMS_ADM.SET_UP_QUEUE( queue_table => 'c_trans_queue_t', queue_name => 'c_trans_queue_q', queue_user => 'strmadmin'); END; /
2. Настройка правил, выделяющих необходимые изменения из всего множества изменений в базе
Первое, что необходимо сделать – это определить список таблиц и характеров изменений (DML, DDL), который будем передавать на удаленный сервер. Для этого мы будем создавать набор правил (RULE SET). В этот набор правил мы добавим правила (RULES).
Создаем набор правил:
BEGIN DBMS_RULE_ADM.CREATE_RULE_SET( rule_set_name => 'strmadmin.transform_capture_rules', evaluation_context => 'SYS.STREAMS$_EVALUATION_CONTEXT'); END; /
Создаем правила:
BEGIN DBMS_RULE_ADM.CREATE_RULE( rule_name => 'strmadmin.tranform_employees_dml', condition => ':dml.get_object_owner() = ''REPL_SRCTRANS_USER'' AND :dml.get_object_name()=''EMPLOYEES'''); END; /
Основное в этих правилах – условия. Функции, которые мы вызываем для проверки условий (get_object_owner, get_object_name) по сути являются методами объектного типа данных LCR. Полный список методов можно посмотреть по ссылке.
Добавляем правила в набор правил:
BEGIN DBMS_RULE_ADM.ADD_RULE ( rule_name => 'strmadmin.tranform_employees_dml', rule_set_name => 'strmadmin.transform_capture_rules'); END; /
Полность аналогично создаем набор правил strmadmin.transform_apply_rules. Можно было один и тот же набор правил повесить и на операцию применения, и на операцию сбора изменений, но тогда бы у нас возникли проблемы при настройке трансформации: трансформация прикручивается к правилу, т.е. получилось бы, что трансформация применяется 2 раза.
BEGIN DBMS_RULE_ADM.CREATE_RULE_SET( rule_set_name => 'strmadmin.transform_apply_rules', evaluation_context => 'SYS.STREAMS$_EVALUATION_CONTEXT'); END; / BEGIN DBMS_RULE_ADM.CREATE_RULE( rule_name => 'strmadmin.tranform_apply_employees_dml', condition => ':dml.get_object_owner() = ''REPL_SRCTRANS_USER'' AND :dml.get_object_name()=''EMPLOYEES'''); END; BEGIN DBMS_RULE_ADM.ADD_RULE ( rule_name => 'strmadmin.tranform_apply_employees_dml', rule_set_name => 'strmadmin.transform_apply_rules'); END; /
3. Создаем процесс применения
begin DBMS_APPLY_ADM.CREATE_APPLY(QUEUE_NAME=>'c_trans_queue_q',APPLY_NAME=>'APPLY_TRANSFORM', RULE_SET_NAME=>'strmadmin.transform_apply_rules',APPLY_CAPTURED=>TRUE,SOURCE_DATABASE=>'src.local'); end;
4. Создаем процесс сбора изменений
begin DBMS_CAPTURE_ADM.CREATE_CAPTURE( QUEUE_NAME=>'c_trans_queue_q', CAPTURE_NAME=>'CAPTURE_TRANSFORM', RULE_SET_NAME=>'strmadmin.transform_capture_rules'); end; /
Если мы сейчас запустим процесс сбора и применения, то произойдет ошибка, т.к. Oracle Streams попытается изменения применить к той таблице, на которой они произошли. Поэтому нужно будет ввести правила трансформации.
5. Задаем трансформацию для соответствующего правила. Обратите внимание – у меня трансформация задается для правила tranform_apply_employees_dml, т.е. трансформация будет происходить в момент применения изменений. Если бы мы хотели добавить преобразование в момент сбора изменений или распространения, т.е. нужно было бы вешать трансформацию на соответствующее правило.
BEGIN DBMS_STREAMS_ADM.RENAME_TABLE( rule_name => 'strmadmin.tranform_apply_employees_dml', from_table_name => 'REPL_SRCTRANS_USER.EMPLOYEES', to_table_name => 'REPL_DESTTRANS_USER.DUP_EMPLOYEES', step_number => 0, operation => 'ADD'); END; /
6. Производим первоначальную синхронизация
6.1. Фиксируем SCN с которого мы будем применяться изменения
На исходной базе выполняем команду:
STRMADMIN@db11r2:1521/src.local SQL>SELECT DBMS_FLASHBACK.GET_SYSTEM_CHANGE_NUMBER() FROM DUAL; DBMS_FLASHBACK.GET_SYSTEM_CHANGE_NUMBER() —————————————– 1674178
6.2. Создаем таблицу с изменениями на зафиксированный момент
CREATE TABLE repl_desttrans_user.dup_employees TABLESPACE users AS SELECT * FROM repl_srctrans_user.employees AS OF SCN 1674178;
6.3. Задаем SCN для нашей таблицы с помощью процедур DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN
DECLARE iscn NUMBER; BEGIN iscn := 1674178; DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN( source_object_name => 'repl_srctrans_user.employees ', source_database_name => 'src.local', instantiation_scn => iscn); END; /
6.4. Запуск процедуры подготовки к INSTANTIATION
begin
DBMS_CAPTURE_ADM.PREPARE_TABLE_INSTANTIATION('repl_srctrans_user.employees');
end;
/
7. Запускаем процессы сбора и применения
BEGIN DBMS_APPLY_ADM.START_APPLY( apply_name => 'APPLY_TRANSFORM'); DBMS_CAPTURE_ADM.START_CAPTURE( capture_name => 'CAPTURE_TRANSFORM'); END; /
8. Протестируем добавление новых данных
INSERT INTO repl_srctrans_user.employees SELECT * FROM hr.employees WHERE department_id in (50); COMMIT;
Добавление должно отрабатывать.
Пользовательская трансформация
Перейдем теперь к немного более сложной ситуации: у нас есть преобразование, но это преобразование не вписывается в рамки декларативных. Например, как в нашем случае, нужно менять значения в полях LCR сообщений. Для создания пользовательской трансформации необходимо определить функцию с одной из следующих сигнатур (второй вариант подходит, если мы хотим преобразовать одно сообщение в несколько):
FUNCTION user_function ( parameter_name IN ANYDATA) RETURN ANYDATA; FUNCTION user_function ( parameter_name IN ANYDATA) RETURN STREAMS$_ANYDATA_ARRAY;
9. В нашем случае, когда необходимо преобразование department_id=department_id+1000 функция будет выглядеть вот так:
CREATE OR REPLACE FUNCTION strmadmin.add_100000(in_any IN ANYDATA)
RETURN ANYDATA
IS
lcr SYS.LCR$_ROW_RECORD;
rc NUMBER;
deptid_value_anydata ANYDATA;
deptid_value_number NUMBER;
BEGIN
-- Get the type of object
-- Check if the object type is SYS.LCR$_ROW_RECORD
IF in_any.GETTYPENAME='SYS.LCR$_ROW_RECORD' THEN
-- Put the row LCR into lcr
rc := in_any.GETOBJECT(lcr);
-- Get old DEPARTMENT_ID
deptid_value_anydata := lcr.GET_VALUE('new','DEPARTMENT_ID');
-- Check that DEPARTMENT_ID is not null
IF deptid_value_anydata IS NOT NULL THEN
-- Put the column value into deptid_value_number
rc := deptid_value_anydata.GETNUMBER(deptid_value_number);
-- Change a value of DEPARTMENT_ID=DEPARTMENT_ID+1000
RECORD_ERROR (to_char(deptid_value_number));
lcr.SET_VALUE('new','DEPARTMENT_ID',ANYDATA.CONVERTNUMBER(deptid_value_number+1000));
END IF;
RETURN ANYDATA.CONVERTOBJECT(lcr);
END IF;
RETURN in_any;
EXCEPTION
WHEN OTHERS THEN
RECORD_ERROR (SQLERRM);
END;
/
10. Функцию необходимо прикрутить к соответствующему правилу. В нашем случае без разницы на какое правило вешать (применять при сборе изменений или при их применении), но для простоты повесим применение на то же правило, которое мы использовали при декларативной трансформации.
BEGIN DBMS_STREAMS_ADM.SET_RULE_TRANSFORM_FUNCTION( rule_name => 'strmadmin.tranform_apply_employees_dml', transform_function => 'strmadmin.add_100000'); END;
Порядок применения, как я уже говорил, будет следующий:
- Декларативные (ADD_COLUMN, DELETE_COLUMN, RENAME_COLUMN, RENAME_SCHEMA, RENAME_TABLE)
- Пользовательские (более гибкий, но более медленный способ – позволяет написать PL/SQL обработчик)
11. Список трансформаторов можно посмотреть в представлении DBA_RULES
COLUMN ACTION_CONTEXT_NAME HEADING 'Action Context Name' FORMAT A30 COLUMN ACTION_CONTEXT_VALUE HEADING 'Action Context Value' FORMAT A30 SELECT RULE_NAME,AC.NVN_NAME ACTION_CONTEXT_NAME, AC.NVN_VALUE.ACCESSVARCHAR2() ACTION_CONTEXT_VALUE FROM DBA_RULES R, TABLE(R.RULE_ACTION_CONTEXT.ACTX_LIST) AC
11. Проверим работу. Как видно – при передаче в целевую для репликации таблицу значение увеличилось на 1000.
STRMADMIN@db11r2:1521/src.local SQL>update repl_srctrans_user.employees set department_id=999 where employee_id=100; 1 row updated. STRMADMIN@db11r2:1521/src.local SQL>commit; Commit complete. STRMADMIN@db11r2:1521/src.local SQL>select department_id from repl_desttrans_user.dup_employees where employee_id=100; DEPARTMENT_ID ------------- 1999
Продолжение следует…
- Вам также могут быть интересны следующие статьи:
- Oracle Streams для репликации: расширенная настройка (часть 7)
- Oracle Streams для репликации: расширенная настройка (часть 9)
- Использование Oracle Streams для репликации (часть 4)
- Использование Oracle Streams для репликации (часть 3)
- Использование Oracle Streams для репликации (часть 5)
Рубрики: Data Warehouse, Data Warehousing, Database, Streams | Комментарии (8) »

15.04.2009 в 11:20
Очень познавательно… Спасибо…
15.04.2009 в 11:38
15.04.2009 в 12:08
А в пункте 3 не опечатка?
’strmadmin.transform_apply_rules’ -> ’strmadmin.transform_capture_rules’
15.04.2009 в 12:25
Спасибо. Там была ошибка – я исправил. Правило должно остаться strmadmin.transform_apply_rules, но его нужно создать.
15.04.2009 в 13:47
Кстати, а я правильно понял, что для нормальной репликации необходимы первичные ключи?
15.04.2009 в 15:10
Необязательно, но крайне желательно. Oracle Streams нужно знать к какой строке относятся изменения.
Если первичного ключа нет, то можно использовать механизм Substitute Key Columns. С помощью него можно задать набор столбцов с уникальными значениями. Например вот так:
BEGIN
DBMS_APPLY_ADM.SET_KEY_COLUMNS(
object_name => ‘hr.employees’,
column_list => ‘first_name,last_name,hire_date’);
END;
/
23.04.2009 в 09:18
Допустим есть две схемы у которых одинаковая структура (набор таблиц и т. д. и т. п.), но разные данные в этих таблицах. Можно ли настроить репликацию так, чтобы устранить различия в данных?
23.04.2009 в 09:30
С помощью Streams этого не сделать, Streams не обращается к исходной таблице, он не видит данных из нее, а только поток изменений в Redo Log (в асинхронном режиме).
А для Ваших целей вполне подойдет команда MERGE (http://download.oracle.com/docs/cd/B28359_01/server.111/b28286/statements_9016.htm).