Oracle Streams для репликации: расширенная настройка (часть 7)
12.04.2009 от aryndin99Продолжение. Начало смотрите в предыдущих поста: часть 1, часть 2, часть 3, часть 4, часть 5, часть 6.
В предыдущих частям мы рассмотрели настройку репликации с использованием макропроцедур MAINTAIN_*. Использование их значительно упростило жищнь при настройке репликации. Более того большинству не нужно лазить в низкоуровневые настройки очередей и процессов распространения – все можно настроить высокоуровневыми командами.
Однако, более тонкая настройка репликации требуется понимания того, что лежит в основе этих макропроцедур. Более того, при решении проблем в репликации приходится зачастую разбираться в том, что сгенерировали команды MAINTAIN_*.
Прежде чем переходить к последующему изучению я настоятельно рекомендую освежить в памяти концепции Oracle Streams.
Итак, после повторения концепций можно переходить к настройке простейшего потока репликации.
Будем осуществлять репликацию таблицы. При этом сбор изменений будем проводить на стороне источника данных, собранные изменения будем складывать на стороне источника. Их будем передавать процессом распространения на целевую базу. На целевой базе данных будем применяться изменения.
Все до безобразия просто. Но в жизни все оказывается не так просто. Итак приступим.
0. Как обычно для настройки репликации создадим нового пользователя.
CREATE USER repl_new_user IDENTIFIED BY oracle DEFAULT TABLESPACE users QUOTA UNLIMITED ON users;
CREATE TABLE repl_new_user.employees TABLESPACE users AS SELECT * FROM hr.employees WHERE department_id in (10,20,30,40);
CREATE TABLE repl_new_user.departments TABLESPACE users AS SELECT * FROM hr.departments WHERE department_id in (10,20,30,40);
ALTER TABLE repl_new_user.employees ADD CONSTRAINT employee_id_pk PRIMARY KEY (employee_id);
ALTER TABLE repl_new_user.departments ADD CONSTRAINT department_id_pk PRIMARY KEY (department_id);
ALTER TABLE repl_new_user.employees ADD CONSTRAINT department_fk FOREIGN KEY (department_id) REFERENCES repl_new_user.departments(department_id);
1. Настройка очередей сообщений strmadmin.streams_queue на базах src и dest. Для этого подключимся к исходной базе данных под пользователем strmadmin и выполним следующие команды:
begin
DBMS_STREAMS_ADM.SET_UP_QUEUE();
DBMS_STREAMS_ADM.SET_UP_QUEUE@DEST();
end;
Эти команды на двух серверах выполняют следующие действия:
- Создают таблицу для очереди streams_queue_table
- Создают очередь streams_queue, которая хранит сообщения типа ANYDATA в таблице streams_queue_table
- Стартует очередь.
Результаты работы можно увидеть, выполнив запросы
select owner,name,QUEUE_TABLE from dba_queues where name='STREAMS_QUEUE';
select owner,name,QUEUE_TABLE from dba_queues@DEST where name='STREAMS_QUEUE';
2. Настройка правил, выделяющих необходимые изменения из всего множества изменений в базе
Первое, что необходимо сделать – это определить список таблиц и характеров изменений (DML, DDL), который будем передавать на удаленный сервер. Для этого мы будем создавать набор правил (RULE SET). В этот набор правил мы добавим правила (RULES).
Создаем набор правил:
BEGIN
DBMS_RULE_ADM.CREATE_RULE_SET(
rule_set_name => 'strmadmin.hr_capture_rules',
evaluation_context => 'SYS.STREAMS$_EVALUATION_CONTEXT');
END;
/
Создаем правила:
BEGIN
DBMS_RULE_ADM.CREATE_RULE( rule_name => 'strmadmin.repl_new_user_employees_dml',
condition => ' :dml.get_object_owner() = ''REPL_NEW_USER'' AND :dml.get_object_name()=''EMPLOYEES'' ');
END;
/
BEGIN
DBMS_RULE_ADM.CREATE_RULE( rule_name => 'strmadmin.repl_new_user_departments_dml',
condition => ' :dml.get_object_owner() = ''REPL_NEW_USER'' AND :dml.get_object_name()=''DEPARTMENTS'' ');
END;
/
Основное в этих правилах – условия. Функции, которые мы вызываем для проверки условий (get_object_owner, get_object_name) по сути являются методами объектного типа данных LCR. Полный список методов можно посмотреть по ссылке.
Добавляем правила в набор правил:
BEGIN
DBMS_RULE_ADM.ADD_RULE
( rule_name => 'strmadmin.repl_new_user_employees_dml',
rule_set_name => 'strmadmin.hr_capture_rules');
DBMS_RULE_ADM.ADD_RULE
( rule_name => 'strmadmin.repl_new_user_departments_dml',
rule_set_name => 'strmadmin.hr_capture_rules');
END;
/
3. Настройка процесса распространения.
Для настройки передачи изменений можно использовать процедуры
- DBMS_STREAMS_ADM.ADD_TABLE_PROPAGATION_RULES
- DBMS_PROPAGATION_ADM.CREATE_PROPAGATION
DBMS_PROPAGATION_ADM.START_PROPAGATION
Мы будем использовать второй вариант – самый сложный, но наиболее гибкий. Следующие команды настроят передачу:
begin
dbms_propagation_adm.CREATE_PROPAGATION('SRC_TO_DEST',
'STREAMS_QUEUE','STREAMS_QUEUE','DEST','strmadmin.hr_capture_rules');
dbms_propagation_adm.START_PROPAGATION('SRC_TO_DEST');
end;
/
Это процесс будет передавать только те изменения, которые мы задали в правилах, т.е. DML изменения в таблицах HR.EMPLOYEES и HR.DEPARTMENTS
4. Настройка процесса сбора изменений
Для сбора изменений будет использоваться процесс сбора и этот процесс также должен знать о списке изменений, которые он должен собираться в очередь сообщений.
Опять таки можно использовать 2 способа:
- DBMS_STREAMS_ADM.ADD_TABLE_RULES
- DBMS_CAPTURE_ADM.CREATE_CAPTURE
Будем использовать второй вариант. При этом очевидно, что для сбора можно использовать тот же самый набор правил, что мы использовали и для распространения изменений. Это не всегда так (если на один сервер мы хотели бы передавать изменения только по первому региона, а на другой сервер изменения только по второму региону), но в нашем случае собираемые и передаваемые изменения одинаковы.
begin
DBMS_CAPTURE_ADM.CREATE_CAPTURE(
QUEUE_NAME=>'STREAMS_QUEUE',
CAPTURE_NAME=>'CAPTURE_REPL_NEW_USER',
RULE_SET_NAME=>'strmadmin.hr_capture_rules');
end;
/
5. Необходимо произвести первоначальную синхронизацию таблиц.
Самый простой способ использовать database link для первоначальной передачи таблицы. Можно также использовать DATAPUMP. Давайте воспользуемся вариантом с DATAPUMP. Нам необходимо гарантировать, что ни одно изменение не будет потеряно, но также необходимо гарантировать, что какие-то изменения не будут проделаны два раза.
Делается это следующим образом:
- фиксируем SCN с которого мы будем применяться изменения
- экспортируем таблицу на момент SCN, который мы зафиксировали
- импортируем таблицу в целевую базу
- задаем SCN для нашей таблицы с помощью процедур DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN
В это и есть смысл процедуры INSTANTIATION. В случае применения процедур MAINTAIN_* от нас это процесс скрыт. Здесь же нам приходится выполнять это самим.
По шагам:
5.1. Фиксируем SCN с которого мы будем применяться изменения
На исходной базе выполняем команду:
STRMADMIN@db11r2:1521/src.local SQL>SELECT DBMS_FLASHBACK.GET_SYSTEM_CHANGE_NUMBER() FROM DUAL;
DBMS_FLASHBACK.GET_SYSTEM_CHANGE_NUMBER()
-----------------------------------------
1631172
5.2. Экспортируем таблицу на момент SCN, который мы зафиксировали
expdp system/oracle@src tables=repl_new_user.EMPLOYEES,repl_new_user.DEPARTMENTS FLASHBACK_SCN=1631172 DUMPFILE=repl_new_user.dmp DIRECTORY=src_dir
Обратите внимание на фразу FLASHBACK_SCN=1631172 – она здесь ключевая.
5.3. Импортируем данные в целевую базу данных.
Создаем пользователя на целевой базе из sqlplus
CREATE USER repl_new_user IDENTIFIED BY oracle DEFAULT TABLESPACE users QUOTA UNLIMITED ON users;
Копируем дамп в каталог, доступный целевому серверу (для этого каталога должен быть создан объект DIRECTORY – в моем случае dest_dir):
cp /home/oracle/src/repl_new_user.dmp /home/oracle/dest/
Запускаем из командной строки импорт данных в целевую таблицу:
impdp system/oracle@dest.local full=y DUMPFILE=repl_new_user.dmp DIRECTORY=dest_dir
5.4. Задаем SCN для нашей таблицы с помощью процедур DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN
С исходной базы данных (src.local) запускаем команду задания номера изменения с которого необходимо начинаться применять изменения:
DECLARE
iscn NUMBER;
BEGIN
iscn := 1631172;
DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN@DEST.LOCAL(
source_object_name => 'repl_new_user.EMPLOYEES',
source_database_name => 'src.local',
instantiation_scn => iscn);
DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN@DEST.LOCAL(
source_object_name => 'repl_new_user.DEPARTMENTS',
source_database_name => 'src.local',
instantiation_scn => iscn);
END;
/
5.5. Запуск процедуры подготовки к INSTANTIATION
Запускаем на исходной базе данных
begin
DBMS_CAPTURE_ADM.PREPARE_TABLE_INSTANTIATION('REPL_NEW_USER.employees');
DBMS_CAPTURE_ADM.PREPARE_TABLE_INSTANTIATION('REPL_NEW_USER.departments');
end;
/
Эта процедура сбрасывает в журналы информацию необходимую для старта репликации.
6. Конфигурация применения изменений на целевой базе.
Эти операции мы делаем на целевой базе.
Давайте создадим набор правил для применения, аналогичный тому что мы создали на исходной базе (в нашем случае это можно было бы и не делать, но в общем случае это может понадобиться):
BEGIN
DBMS_RULE_ADM.CREATE_RULE_SET(
rule_set_name => 'strmadmin.hr_apply_rules',
evaluation_context => 'SYS.STREAMS$_EVALUATION_CONTEXT');
DBMS_RULE_ADM.CREATE_RULE( rule_name => 'strmadmin.repl_new_user_employees_dml',
condition => ' :dml.get_object_owner() = ''REPL_NEW_USER'' AND :dml.get_object_name()=''EMPLOYEES'' ');
DBMS_RULE_ADM.CREATE_RULE( rule_name => 'strmadmin.repl_new_user_departments_dml',
condition => ' :dml.get_object_owner() = ''REPL_NEW_USER'' AND :dml.get_object_name()=''DEPARTMENTS'' ');
DBMS_RULE_ADM.ADD_RULE
( rule_name => 'strmadmin.repl_new_user_employees_dml',
rule_set_name => 'strmadmin.hr_apply_rules');
DBMS_RULE_ADM.ADD_RULE
( rule_name => 'strmadmin.repl_new_user_departments_dml',
rule_set_name => 'strmadmin.hr_apply_rules');
END;
/
Теперь создадим процесс применения, который будет применять изменения, указанный в правилах:
begin
DBMS_APPLY_ADM.CREATE_APPLY(QUEUE_NAME=>'streams_queue',APPLY_NAME=>'APPLY_REPL_NEW_USER',
RULE_SET_NAME=>'strmadmin.hr_apply_rules',APPLY_CAPTURED=>TRUE,SOURCE_DATABASE=>'src.local');
end;
7. Запускаем процессы по очереди:
На целевой базе запускаем процесс применения:
BEGIN
DBMS_APPLY_ADM.START_APPLY( apply_name => 'APPLY_REPL_NEW_USER');
END;
/
На исходной базе запускаем процесс сбора изменений:
BEGIN
DBMS_CAPTURE_ADM.START_CAPTURE(
capture_name => 'CAPTURE_REPL_NEW_USER');
END;
Можно еще слегка подкрутить параметры применения изменений. Например, вот так (не дает процессу применения останавливать при возникновении ошибки):
BEGIN
DBMS_APPLY_ADM.SET_PARAMETER(
apply_name => 'APPLY_REPL_NEW_USER',
parameter => 'disable_on_error',
value => 'N');
END;
/
8. Все. Мы сделали для репликации все, что могли. Давайте проверим работу вот так:
Запускаем добавление новых данных на исходной базе (src.local):
INSERT INTO repl_new_user.departments SELECT * FROM hr.departments WHERE department_id in (50);
COMMIT;
Проверим появление изменений на целевой базе:
select count(*) from REPL_NEW_USER.departments;
Все должно работать. Оставайтесь на связи. Дальше пойдем еще глубже и попробуем применять правила трансформации, синхронный сбор изменений и другие интересные вещи. Возникают вопросы – пишите – буду стараться ответить.
- Вам также могут быть интересны следующие статьи:
- Oracle Streams для репликации: расширенная настройка (часть 9)
- Oracle Streams для репликации: расширенная настройка (часть 8)
- Использование Oracle Streams для репликации (часть 4)
- Использование Oracle Streams для репликации (часть 5)
- Использование Oracle Streams для репликации (часть 3)
Рубрики: Data Warehouse, Data Warehousing, Database, Streams | Комментарии (2) »

14.04.2009 в 23:34
[...] настройка (часть 8)Лицензирование Spatial, Locator, MapviewerOracle Streams для репликации: расширенная настройка (часть 7)Веб-семинар “Обработка пространственных данных [...]
16.04.2009 в 16:57
[...] настройка (часть 8)Лицензирование Spatial, Locator, MapviewerOracle Streams для репликации: расширенная настройка (часть 7)Веб-семинар “Обработка пространственных данных [...]