Oracle Cdc Change Data Capture

Oracle 10g CDC
http://www.blogjava.net/decode360/archive/2009/06/15/287733.html

从Oracle9i开始,Oracle引入了CDC技术来实现对变化数据的捕获。在Oracle9i中CDC只支持同步的数据捕获(synchronous change capture),源数据的变化被实时的捕获,捕获的过程和源数据是同一个事务。它的实现需要源数据支持trigger,所以这种同步的技术会给数据源带来性能的问题。这是CDC在Oracle9i的一个缺陷(在Oracle10g中已经改进)。

在CDC中变化的数据被保存在一个变化表(change table)中,使用者通过订阅的方式(subscribe)生成一个视图(subscribe view)并且通过这个视图来得到这些变化的数据,一个change table可以有多个订阅者,也就可以有多个subscribe view。每个subscribe view可以从change table提取自己所关心的不同的columns 和rows。

在Oracle10g中,CDC开始支持异步方式来捕获变化的数据。在Oracle10g中利用redo log来实现CDC。Redo log是一个记录所有数据库变化的独立的文件。CDC的异步方式是非入侵方式(noninvasive)的实现,对于变化数据的捕获就不需要数据库加入trigger了。

Oracle的redo log有两种类型:

● On-line redo log
● Archive redo log

这种从redo log中读取数据变化的方式叫做“日志挖掘”(mining the logs),这种方式使捕获数据的操作从源数据的事务中分离出来,减轻了trigger技术给数据源带来的性能问题(是减轻不是消除!)

在Oracle10g中,异步CDC有两种实现方式:

● HOTLOG

这种方式利用On-line redo log。变化的数据从on-line redo log中获取,然后通过(Oracle Streams技术)把获取的变化保存在本地的变化表中(变化表还是在数据源上)。然后还需要其他方式把这些数据移到数据仓库中。

● AUTOLOG

这种方式与HOTLOG不同的是数据获取的位置不同。AUTOLOG方式利用Log Transport Services技术在不同数据库之间转换数据时进行变化数据获取操作的。如果目标数据库正好是数据仓库的话,这种方式就比HOTLOG方式减少了一次数据迁移的环节。

Log Transport Services技术是标准的日志迁移技术,比如Oracle的Oracle Data Guard。

Latency of change:

从发生变化的数据源事务提交到利用CDC发现变化的数据,并且把变化的数据移植到变化表之间的时间间隔。HOTLOG方式的时间滞后要比AUTOLOG方式小。

Oracle Streams

Oracle Streams技术可以在多个数据库之间进行数据备份,它通过在数据队列里面记录变化的数据。Oracle Streams技术也是利用数据库的redo log来解析变化的数据的。Oracle Streams除了进行数据备份外,还可以用来作为数据仓库的增量抽取。

Oracle Streams与CDC技术比较,可以用一个形象的比喻来形容:

把Oracle Streams比作砖、瓦;
把CDC看作是用砖、瓦盖成的大厦;

Oracle Streams可以用来实现CDC,它们是辅助的关系,不是真正竞争的关系。

上面是CDC的简单介绍,下面转一篇应用(但是怎么看都不太防舒服)
*
Oracle 10g CDC Test

怎样使用oracle 10G CDC(Capturing Changed Data)
 
1)create the user on the source DB and target DB.
 
sqlplus "/  as sysdba"
connect system/manager
 
set serveroutput on size 100000 
set linesize 2000 
create or replace package etl_util AUTHID CURRENT_USER 
IS 
  FUNCTION get_col_names(p_schema IN VARCHAR2, p_table_name IN VARCHAR2, p_skip_columns IN VARCHAR2, p_pref IN VARCHAR2 DEFAULT NULL) RETURN VARCHAR2; 
  FUNCTION get_cols_definition(p_schema IN VARCHAR2, p_table_name IN VARCHAR2, p_skip_columns IN VARCHAR2, p_pref IN VARCHAR2 DEFAULT NULL) RETURN VARCHAR2; 
  FUNCTION get_modified_col_names(schema IN VARCHAR2, table_name IN VARCHAR2, source_colmap$ IN RAW)  RETURN VARCHAR2; 
END etl_util; 
/ 
show errors
 
create or replace package body etl_util 
IS 
-- ****************************************************************************** 
-- PUBLIC FUNCTION get_col_names 
-- ****************************************************************************** 
  FUNCTION get_col_names(p_schema IN VARCHAR2, p_table_name IN VARCHAR2, p_skip_columns IN VARCHAR2, p_pref IN VARCHAR2 DEFAULT NULL) RETURN VARCHAR2    
    IS 
  BEGIN 
    DECLARE 
      v_col_nm VARCHAR2(100); 
      v_ret VARCHAR2(32767); 
      v_work_str VARCHAR2(32767);
 
      CURSOR c_tab_col(p_tbl_nm VARCHAR2, p_schm VARCHAR2) IS        
            SELECT column_name FROM all_tab_columns 
            WHERE owner = UPPER(TRIM(p_schm)) 
            AND table_name = UPPER(TRIM(p_tbl_nm)) 
            ORDER BY column_name;
 
  BEGIN 
  OPEN c_tab_col(p_table_name, p_schema); 
  v_ret := ''; 
  v_work_str := ','||LOWER(REPLACE(p_skip_columns,' ','')) || ',';
 
  LOOP 
      FETCH c_tab_col INTO v_col_nm; 
      EXIT WHEN c_tab_col%NOTFOUND; 
    IF INSTR(v_work_str,','||LOWER(v_col_nm)) = 0 THEN 
        v_ret := v_ret || ',' || p_pref ||LOWER(v_col_nm); 
    END IF; 
  END LOOP; 
  CLOSE c_tab_col; 
  IF (v_ret IS NOT NULL) THEN v_ret := SUBSTR(v_ret, 2); 
  END IF; 
  IF (v_ret IS NULL) THEN 
      raise_application_error(-20201,'Table ' || p_schema||'.'||p_table_name 
        || ' Does not exist or has no columns'); 
  END IF;
 
  RETURN v_ret; 
  END; 
  END; 
-- ****************************************************************************** 
-- END OF PUBLIC FUNCTION get_col_names 
-- ******************************************************************************
 
-- ****************************************************************************** 
-- PUBLIC FUNCTION get_cols_definition 
-- ****************************************************************************** 
  FUNCTION get_cols_definition(p_schema IN VARCHAR2, p_table_name IN VARCHAR2, p_skip_columns IN VARCHAR2, p_pref IN VARCHAR2 DEFAULT NULL) RETURN VARCHAR2    
    IS 
  BEGIN 
    DECLARE 
      v_col_nm VARCHAR2(100); 
      v_col_def VARCHAR2(300); 
      v_ret VARCHAR2(32767); 
      v_work_str VARCHAR2(32767); 
      CURSOR c_tab_col(p_tbl_nm VARCHAR2, p_schm VARCHAR2) IS        
        SELECT column_name, 
            RPAD(column_name,53) || 
        decode(data_type 
            ,'NUMBER','NUMBER('||TO_CHAR(data_precision)|| 
                decode(data_scale,0,'',','||data_scale)||')' 
            ,'CHAR','CHAR('||TO_CHAR(DATA_LENGTH)||')' 
            ,'VARCHAR2','VARCHAR2('||TO_CHAR(DATA_LENGTH)||')' 
            ,'DATE','DATE' 
            ,data_type) def 
        from all_tab_columns 
        where table_name = upper(trim(p_tbl_nm)) 
            AND owner = upper(trim(p_schm)) 
        order by owner, table_name, column_name 
        ; 
  BEGIN 
  OPEN c_tab_col(p_table_name, p_schema); 
  v_ret := ''; 
  v_work_str := ','||LOWER(REPLACE(p_skip_columns,' ','')) || ',';
 
  LOOP 
      FETCH c_tab_col INTO v_col_nm, v_col_def; 
      EXIT WHEN c_tab_col%NOTFOUND; 
    IF INSTR(v_work_str,','||LOWER(v_col_nm)) = 0 THEN 
        v_ret := v_ret || ', ' || p_pref ||LOWER(v_col_def); 
    END IF; 
  END LOOP; 
  CLOSE c_tab_col; 
  IF (v_ret IS NOT NULL) THEN v_ret := SUBSTR(v_ret, 2); END IF; 
  IF (v_ret IS NULL) THEN 
      raise_application_error(-20201,'Table ' || p_schema||'.'||p_table_name 
        || ' Does not exist or has no columns'); 
  END IF;
 
  RETURN v_ret; 
  exception 
          when others 
            then dbms_output.put_line(SUBSTR('Error in get_cols_definition ' || sqlerrm,1,220));
  return 'error ' || sqlerrm ; 
  END; 
  END; 
-- ****************************************************************************** 
-- END OF PUBLIC FUNCTION get_cols_definition 
-- ******************************************************************************
 
-- ****************************************************************************** 
-- PUBLIC FUNCTION get_modified_col_names 
-- ****************************************************************************** 
  FUNCTION get_modified_col_names(schema IN VARCHAR2, table_name IN VARCHAR2, source_colmap$ IN RAW) RETURN VARCHAR2 
    IS 
  BEGIN 
    DECLARE 
      v_col_nm VARCHAR2(100); 
      v_col_pos number; 
      v_ret VARCHAR2(32767); 
      v_work_str VARCHAR2(32767); 
    v_byte_nr INTEGER; 
    v_col_map VARCHAR2(32767); 
    i INTEGER; j INTEGER;k INTEGER; 
    v_val VARCHAR2(5); 
      CURSOR c_tab_col(p_tbl_nm VARCHAR2, p_schm VARCHAR2) IS        
        SELECT column_name, POWER(2,MOD(column_id,8)) col_pos, 1 + floor((column_id)/8) byte_nr 
            FROM all_tab_cols 
            WHERE table_name=UPPER(p_tbl_nm) 
                AND OWNER=UPPER(p_schm) 
            ORDER BY column_id;        
  BEGIN 
      SELECT dump(source_colmap$,10) INTO v_col_map FROM DUAL; 
      i:=INSTR(v_col_map,':'); 
      v_col_map:=','||SUBSTR(v_col_map,i + 2)||','; 
--    dbms_output.put_line('v_col_map='||v_col_map); 
      OPEN c_tab_col(table_name, schema); 
      v_ret := ''; 
      LOOP 
--    dbms_output.put_line('--1'); 
          FETCH c_tab_col INTO v_col_nm, v_col_pos, v_byte_nr; 
          EXIT WHEN c_tab_col%NOTFOUND; 
--    dbms_output.put_line('byte_nr='||to_char(v_byte_nr)); 
          i:=INSTR(v_col_map,',',1,v_byte_nr); 
          j:=INSTR(v_col_map,',',1,v_byte_nr + 1); 
          IF ((i=0) OR (j = 0)) THEN 
              EXIT; 
          END IF; 
          v_val := SUBSTR(v_col_map, i + 1, j - i - 1 ); 
          k := BITAND(TO_NUMBER(v_val),v_col_pos); 
          IF (k>0) THEN 
              v_ret := v_ret || ',' || v_col_nm; 
          END IF; 
--    DBMS_OUTPUT.put_line('col='||v_col_nm ||' byte='||TO_CHAR(v_byte_nr) 
--          || ' i=' || TO_CHAR(i) || ' j='||TO_CHAR(j) 
--          ||' val='||v_val||' col_pos='||TO_CHAR(v_col_pos)||'res='||TO_CHAR(k)); 
      END LOOP; 
      CLOSE c_tab_col; 
      IF (v_ret IS NOT NULL) THEN 
          v_ret := SUBSTR(v_ret,2); 
      END IF; 
      RETURN v_ret; 
  EXCEPTION 
        WHEN OTHERS 
            THEN dbms_output.put_line(SUBSTR('Error in get_modifed_col_names: ',1,220)); 
                dbms_output.put_line(SUBSTR(sqlerrm,1,240)); 
      return ''; 
  END; 
  END; 
-- ****************************************************************************** 
-- END OF PUBLIC FUNCTION get_modified_col_names 
-- ******************************************************************************
 
END etl_util; 
/ 
show errors
 
grant execute on etl_util to public; 
DROP PUBLIC SYNONYM etl_util; 
CREATE PUBLIC SYNONYM etl_util FOR etl_util; 
--select etl_util.get_modified_col_names(schema1 => 'SCOTT' 
--,table_name => 'EMP' 
--,source_colmap$ => HEXTORAW('C000')) FROM dual; 
-- 
BEGIN 
declare v_res VARCHAR2(1000); 
BEGIN 
    v_res := etl_util.get_modified_col_names('test', 'philip_test', HEXTORAW('0601')); 
    dbms_output.put_line('res='||v_res); 
END; 
END; 
/
 
2)create the user
 
connect system/manager 
DROP USER cdc_publisher CASCADE; 
CREATE USER cdc_publisher IDENTIFIED BY pass; 
GRANT EXECUTE_CATALOG_ROLE to cdc_publisher; 
GRANT SELECT_CATALOG_ROLE to cdc_publisher;
 
3)grant the priviledge
 
connect test/pass 
GRANT SELECT on PHILIP_TEST to cdc_publisher; 
GRANT SELECT on PHILIP_TEST2 to cdc_publisher; 
--the PHILIP_TEST2  is under  user   "test"
 
4)Create Change Tables 
--connect cdc_publisher/pass@sde223_dvweb 
set serveroutput on size 1000000 
set linesize 120
 
/* Login as a publisher to run this */ 
BEGIN 
    DECLARE work_sql VARCHAR2(32767); 
        v_publisher_id VARCHAR2(20)  := 'cdc_publisher'; 
        v_source_schema VARCHAR2(20) := 'test'; 
        v_source_table VARCHAR2(100); 
        v_cdc_table VARCHAR2(100); 
        CURSOR c_tables IS 
            SELECT 'PHILIP_TEST' table_name FROM dual 
                UNION 
            SELECT 'PHILIP_TEST2' table_name FROM dual 
        ; 
BEGIN 
FOR tables IN c_tables LOOP 
  v_source_table := tables.table_name; 
  v_cdc_table := 'rep_'||v_source_table; 
  work_sql := etl_util.get_cols_definition( 
        p_schema=>v_source_schema, 
        p_table_name =>v_source_table, 
        p_skip_columns => NULL, 
        p_pref => NULL); 
    DBMS_output.put_line('work_sql'||work_sql); 
  DBMS_LOGMNR_CDC_PUBLISH.CREATE_CHANGE_TABLE ( 
    OWNER => v_publisher_id, SOURCE_SCHEMA => v_source_schema 
    ,CHANGE_SET_NAME => 'SYNC_SET', CAPTURE_VALUES => 'new'--only Captures the changed values from the source table 
    ,RS_ID => 'n', ROW_ID => 'n', USER_ID => 'y', TIMESTAMP => 'y' 
    ,OBJECT_ID => 'n' -- leave it as 'N' or you will have "table has no columns" error 
    ,SOURCE_COLMAP => 'n', TARGET_COLMAP => 'n', OPTIONS_STRING => null 
    ,SOURCE_TABLE => v_source_table, CHANGE_TABLE_NAME => v_cdc_table 
    ,COLUMN_TYPE_LIST => work_sql); 
  DBMS_output.put_line('Change table '||v_cdc_table ||' was created successfully'); 
END LOOP;
 
EXCEPTION 
    WHEN OTHERS THEN 
    DBMS_output.put_line('Error start ********************************************'); 
    DBMS_output.put_line('Error during change table ' || v_cdc_table || ' creation:'); 
    DBMS_output.put_line(to_char(sqlcode)|| ' ' || sqlerrm); 
    DBMS_output.put_line('Error end  ********************************************'); 
END; 
END; 
/
 
5)when we change the "test.PHILIP_TEST" and "test.PHILIP_TEST2" ,under cdc_publisher , the table "cdc_philip_test will recorde this operation and the value at once.

*

另附一篇比较详细的CDC原理研究:
*
oracle学习—CDC 研究(1)
http://hi.baidu.com/ybgba/blog/item/898d93366f79c8d5a2cc2b01.html

oracle学习—CDC 研究(2)
http://hi.baidu.com/ybgba/blog/item/988d113d7c18ddcf9e3d6201.html