【Oracle】Oracle logminer功能介绍

简述

logminer它包括DBMS_LOGMNR和DBMS_LOGMNR_D两个package,后边的D是字典的意思。它既能分析redo log file,也能分析归档后的archive log file。

在分析日志的过程中需要使用数据字典,一般先生成数据字典文件后使用,10g版本还可以使用在线数据字典

用于日志分析是:
Logminer也可以分析其它数据库的重做日志文件,但是必须使用重做日志所在数据库的数据字典,否则会出现无法识别的乱码。另外被分析数据库的操作系统平台最好和当前Logminer所在数据库的运行平台一样,且block size相同。

LogMiner的功能

  • 确定数据库的逻辑损坏时间

通过LogMiner可以准确定位该误操作的执行时间和SCN值,然后通过基于时间恢复或者基于SCN恢复可以完全恢复该表数据。

select scn_to_timestamp(10135546249879) from dual;
SELECT timestamp_to_scn(to_timestamp('2020-4-15 16:06:06','yyyy-mm-dd hh24:mi:ss')) FROM dual;
  • 确定事务级要执行的精细逻辑恢复操作

通过LogMiner可以取得任何用户的DML操作及相应的UNDO操作,通过执行UNDO操作可以取消用户的错误操作。

  • 执行后续审计

通过LogMiner可以跟踪Oracle数据库的所有DML、DDL和DCL操作,从而取得执行这些操作的时间顺序、执行这些操作的用户等信息。

logminer 简单使用

初始环境

--安装logminer 
@?/rdbms/admin/dbmslm.sql 
 @?/rdbms/admin/dbmslmd.sql 
--设置两端数据库
alter database archive;
alter database force logging;
alter database add supplemental  log data;--不开启的话只能看到以sys用户登陆下的数据更新
alter database add supplemental log data (primary key,unique,foreign key) columns;

分析归档/在线日志

--在线日志可通过v$log,v$logfile 结合查出,通常检查当前使用的在线日志
--归档日志可通过v$archived_log查出
--添加归档或在线日志
begin
sys.dbms_logmnr.add_logfile(logfilename=>'+DATAJIXIE/sypm/archvelog/2_29980_1007747066.dbf', options=>sys.dbms_logmnr.new);
end;
--启动分析
begin
sys.dbms_logmnr.start_logmnr(options=>sys.dbms_logmnr.dict_from_online_catalog);
end;
--or 也可以将字典存为本地,需如下设置,如抽取日志同步到其他数据库,建议该方式
alter system set utl_file_dir='/backup/logminer' scope=spfile;
execute dbms_logmnr_d.build ('dict2020.ora','/backup/logminer',dbms_logmnr_d.store_in_flat_file);
execute dbms_logmnr.start_logmnr(dictfilename=>'/backup/logminer/dict2020.ora',options=>dbms_logmnr.ddl_dict_tracking);
--通过视图分析
select * from  v$logmnr_contents  where sql_redo like '%INSERT%';

来自网络参考

以下测试脚本,默认使用用户mytest,表t2020,dblink:logdblink,每两秒钟刷新一次。通过事务id、scan等方式,捕捉事务id、scn变化,增量同步(主要使用三个视图:v$log,v$archived_log,v$logmnr_contents)。

本次测试脚本分为两部分,初始化环境及相关job和主要使用存储过程。

初始环境及相关job如下

alter session set nls_date_format='yyyy-mm-dd hh24:mi:ss';
--1 日志目录
create directory logmnr as '/backup/logmnr';
create directory SYNCDATACONFIG as '/backup/logmnr/config';
--2 DML队列目录
create directory SYNCDATA as '/backup/logmnr/syncdata';
--3 SCN标志位初始化
-- 实例化启动进程时的SCN到/backup/logmnr/scn.txt, 标志为全局 g_scn
declare
 fhandle utl_file.file_type;
 v_scn number(30);
begin
  v_scn:=dbms_flashback.get_system_change_number();
  fhandle := utl_file.fopen('SYNCDATACONFIG', 'scn_ck.txt', 'w');
  utl_file.put_line(fhandle , v_scn);
  utl_file.fclose(fhandle);
end;
/
-- 4 archive log 标志位初始化
declare
 fhandle utl_file.file_type;
 v_recid v$archived_log.RECID%type;
begin
    select max(recid) into v_recid  from v$archived_log order by 1;
    fhandle := utl_file.fopen ('SYNCDATACONFIG','archivelog_ck.txt', 'W');
    utl_file.put_line(fhandle ,to_char(v_recid));--初始化捕获进程前存在的归档不传送 
    utl_file.fclose(fhandle);
    exception 
     when others then return;
end;
/
-- 5 DML队列初始化
declare
 fhandle utl_file.file_type;
begin
  fhandle := utl_file.fopen('SYNCDATACONFIG', 'queue_ck.txt', 'w');
  utl_file.put_line(fhandle , 'q0000000000'); -- 初始化队列序列标志
  utl_file.fclose(fhandle);
end;
/
-- 7 发送队列检查点标志初始化
declare
 fhandle utl_file.file_type;
begin
  fhandle := utl_file.fopen('SYNCDATACONFIG', 'send_ck.txt', 'w');
  utl_file.fclose(fhandle);
end;
/
-- 8 xid事务标识集合初始化 
declare
 fhandle utl_file.file_type;
begin
  fhandle := utl_file.fopen('SYNCDATACONFIG', 'xid_array.txt', 'w');
  utl_file.fclose(fhandle);
end;
/
-- 9 所有队列序号集合初始化 
declare
 fhandle utl_file.file_type;
begin
  fhandle := utl_file.fopen('SYNCDATACONFIG', 'queue_array.txt', 'w');
  utl_file.fclose(fhandle);
end;
/
-- 10 捕获日志初始化
declare
 fhandle utl_file.file_type;
begin
  fhandle := utl_file.fopen('LOGMNR', 'capture_log.txt', 'w');
  utl_file.fclose(fhandle);
end;
/
-- 11 发送日志初始化
declare
 fhandle utl_file.file_type;
begin
  fhandle := utl_file.fopen('LOGMNR', 'send_log.txt', 'w');
  utl_file.fclose(fhandle);
end;
/
/*-- 9 数据库对照字典初始化
BEGIN
 dbms_logmnr_d.build(dictionary_filename=>'rac1emz_dict.ora',
 dictionary_location=>'/backup/logmnr/config');
END;
/*/
--10 初始化自定义类型
create or replace type myTableType as table of varchar2 (255);
/
-- 二、进程
--1 源数据库捕获进程
--2 源数据库传播进程 --检查点记录,支持断点续传
--三、创建源数据库捕获进程任务调度
/* 创建可执行程序 */
begin
    DBMS_SCHEDULER.CREATE_PROGRAM(
        program_name        => 'sys.proc_DataCapture_Archivelog',
        program_action      => 'sys.DataCapture_Archivelog',
        program_type        => 'STORED_PROCEDURE',
        number_of_arguments => 3,
        comments            => '数据同步程序',
        enabled             => false
    );
end;
/
/* 设置可执行程序的输入参数 */
begin
    DBMS_SCHEDULER.define_program_argument(
        program_name      => 'sys.proc_DataCapture_Archivelog',
        argument_position => 1,
        argument_type     => 'VARCHAR2',
        default_value     => ''
    );
    DBMS_SCHEDULER.define_program_argument(
        program_name      => 'sys.proc_DataCapture_Archivelog',
        argument_position => 2,
        argument_type     => 'VARCHAR2',
        default_value     => ''
    );
    DBMS_SCHEDULER.define_program_argument(
        program_name      => 'sys.proc_DataCapture_Archivelog',
        argument_position => 3,
        argument_type     => 'VARCHAR2',
        default_value     => ''
    );
END;
/
/* 创建调度表 */
begin
    DBMS_SCHEDULER.create_schedule(
        schedule_name   => 'sys.sch_DataCapture_Archivelog',
        repeat_interval => 'FREQ=SECONDLY;INTERVAL=5',
        start_date      => sysdate,
        comments        => '数据同步调度'
    );
end;
/
/* 创建作业 */
begin
    DBMS_SCHEDULER.create_job( 
        job_name      => 'sys.JOB_DEMOSYNC',     
        program_name  => 'sys.proc_DataCapture_Archivelog',
        schedule_name => 'sys.sch_DataCapture_Archivelog',
        job_class     => 'DEFAULT_JOB_CLASS',            
        comments      => '数据同步作业',
        auto_drop     => false,
        enabled       => false
    );
end;
/
/* 启动可执行程序 */
exec DBMS_SCHEDULER.enable('proc_DataCapture_Archivelog');
/* 启动作业 */
exec DBMS_SCHEDULER.enable('JOB_DEMOSYNC');
/* 设置不同的作业参数 */
begin
    DBMS_SCHEDULER.SET_JOB_ARGUMENT_VALUE(
        job_name          => 'sys.JOB_DEMOSYNC',
        argument_position => 1,
        argument_value    => 'MYTEST'
    );
    DBMS_SCHEDULER.SET_JOB_ARGUMENT_VALUE(
        job_name          => 'sys.JOB_DEMOSYNC',
        argument_position => 2,
        argument_value    => 'T2020'
    );
    DBMS_SCHEDULER.SET_JOB_ARGUMENT_VALUE(
        job_name          => 'sys.JOB_DEMOSYNC',
        argument_position => 3,
        argument_value    => 'LOGLINK'
    );
end;
/
----Job管理-----------------------------------------------------------------------
/* 禁用Job */
exec dbms_scheduler.disable('JOB_DEMOSYNC');
/* 执行Job */
exec dbms_scheduler.run_job('JOB_DEMOSYNC'); 
/* 停止Job */
exec dbms_scheduler.stop_job('JOB_DEMOSYNC');
/* 删除Job */
exec dbms_scheduler.drop_job('JOB_DEMOSYNC');
exec DBMS_SCHEDULER.drop_schedule('sys.sch_DataCapture_Archivelog');
exec DBMS_SCHEDULER.drop_PROGRAM('sys.proc_DataCapture_Archivelog');
------------------------------------------------
--三、创建源数据库发送进程任务调度
/* 创建可执行程序 */
begin
    DBMS_SCHEDULER.CREATE_PROGRAM(
        program_name        => 'sys.proc_DataReplicat',
        program_action      => 'sys.DataReplicat',
        program_type        => 'STORED_PROCEDURE',
        number_of_arguments => 0,
        comments            => '数据同步程序-发送进程',
        enabled             => false
    );
end;
/
/* 创建调度表 */
begin
    DBMS_SCHEDULER.create_schedule(
        schedule_name   => 'sys.sch_DataReplicat',
        repeat_interval => 'FREQ=SECONDLY;INTERVAL=2',
        start_date      => sysdate,
        comments        => '数据同步调度-发送进程'
    );
end;
/
/* 创建作业 */
begin
    DBMS_SCHEDULER.create_job( 
        job_name      => 'sys.JOB_DEMOSYNC2',     
        program_name  => 'sys.proc_DataReplicat',
        schedule_name => 'sys.sch_DataReplicat',
        job_class     => 'DEFAULT_JOB_CLASS',            
        comments      => '数据同步作业-发送进程',
        auto_drop     => false,
        enabled       => false
    );
end;
/
/* 启动可执行程序 */
exec DBMS_SCHEDULER.enable('proc_DataReplicat');
/* 启动作业 */
exec DBMS_SCHEDULER.enable('JOB_DEMOSYNC2');
----Job管理-----------------------------------------------------------------------
/* 禁用Job */
exec dbms_scheduler.disable('JOB_DEMOSYNC2');
/* 执行Job */
exec dbms_scheduler.run_job('JOB_DEMOSYNC2'); 
/* 停止Job */
exec dbms_scheduler.stop_job('JOB_DEMOSYNC2');
/* 删除Job */
exec dbms_scheduler.drop_job('JOB_DEMOSYNC2');
exec DBMS_SCHEDULER.drop_schedule('sys.sch_DataReplicat');
exec DBMS_SCHEDULER.drop_PROGRAM('sys.proc_DataReplicat');
--查看状态
set lines 200
set pages 99
col job_name for a15
col repeat_interval for a20
col state for a10
col start_date for a15
col last_start_date for a15
col next_run_date for a15
col comments for a15
select t.job_name,t.repeat_interval,t.state,t.start_date,t.last_start_date,t.next_run_date,t.comments
from dba_scheduler_jobs t
where t.job_name in('JOB_DEMOSYNC','JOB_DEMOSYNC2');

主要使用存储过程如下

抓取归档日志程序

create or replace procedure DataCapture_Archivelog(tableowner         varchar2,
                                                   tablename          varchar2,
                                                   database_link_name varchar2) is
  v_scn         number(30);
  fhandle       utl_file.file_type; --oracle文件类型
  fp_buffer     varchar2(4000); --文件输出缓存
  logfile        utl_file.file_type; --oracle文件类型
  logfile_buffer varchar2(4000); --文件输出缓存
  v_recid       number(10); --表archive log 序号
  v_recidck     number(10); --archivelog_ck 序号
  sql_txt       varchar2(4000); --待执行的 sql_redo
  v_dblink_name varchar2(20) := '@' || database_link_name; --目的数据库联接
  sqlwhere      varchar2(1000); --待执行的 sql_redo 条件语句
  v_tableowner  varchar2(20); --待同步的用户
  v_tablename   varchar2(20); --待同步的表
  v_queueck     number(10); --队列序列SCN
  v_queuename   varchar(20); --dml队列名称
  isCheckRedo   number(1) := 0; --是已经否抽取Redo日志
  v_sqlcommitnum number(5) := 10000; --已经提交的事务dml 分批提交给目的数据库
  v_sqlnum    number := 0; --已经提交的事务dml当前个数
  l_string long;
  l_data   myTableType := myTableType();
  n        number;
  isDDL      number(1) := 0; --是否是dll操作 0不是 1是
  isCommited number(1) := 0; --是否已经commit操作 0不是 1是
  XID_str varchar2(8000); --事务序号数组
  j       number := 0; --tb_dml_XIDSQN序列下标
  type tb_XID_type is table of v$logmnr_contents.XID%type index by binary_integer; --定义待排除DLL
  tb_ddl_XID   tb_xid_type; --定义待排除DLL表
  tb_dml_XID   tb_xid_type; --定义待排除DLL表
  duplicat_xid number(1) := 0; -- 0  不重复 1 重复
  cursor c_ddl is
    select distinct XID
      from v$logmnr_contents
     where seg_name = v_tablename
       and OPERATION = 'DDL';
begin
--打开捕获日志
  logfile := utl_file.fopen('LOGMNR', 'capture_log.txt', 'a');
  --初始化schema
  v_tableowner := tableowner;
  v_tablename  := tablename;
  --捕获归档日志
  --dbms_output.put_line();
  utl_file.put_line(logfile, '');
  utl_file.put_line(logfile, 'Analyze Archivelog...start:'||to_char(sysdate,'yyyy-MM-dd hh24:mi:ss'));
  fhandle := utl_file.fopen('SYNCDATACONFIG', 'scn_ck.txt', 'R');
  utl_file.get_line(fhandle, fp_buffer);
  v_scn := fp_buffer;
  utl_file.fclose(fhandle);
  --dbms_output.put_line();
  utl_file.put_line(logfile, '0====v_scn:' || v_scn);
  select max(recid) into v_recid from v$archived_log order by 1;
  /*exception
  when no_data_found then
    return;*/
  --dbms_output.put_line();
  utl_file.put_line(logfile, '1====1v_recid:' || v_recid);
  fhandle := utl_file.fopen('SYNCDATACONFIG', 'archivelog_ck.txt', 'R');
  utl_file.get_line(fhandle, fp_buffer);
  v_recidck := fp_buffer;
  utl_file.fclose(fhandle);
  --dbms_output.put_line();
  utl_file.put_line(logfile, '1====2v_recid_ck:' || v_recidck);
  if trim(v_recid) != trim(nvl(v_recidck, 0)) then
    --dbms_output.put_line();
    utl_file.put_line(logfile, '2====1存在未分析的Archive log');
  else
    if isCheckRedo = 0 then
      utl_file.put_line(logfile, '2====1 no New Archive log');
      utl_file.put_line(logfile, '2====Analyze Archivelog...End:'||to_char(sysdate,'yyyy-MM-dd hh24:mi:ss'));
      utl_file.fclose(logfile);
      sys.datacapture_redo(tableowner, tablename, database_link_name);
      isCheckRedo := 1;
      return;
    end if;
  end if;
  --一次加载从archive_ck归档检查点到最新归档,及 当前 Redo
  for rs in (select name, recid from v$archived_log where recid >= v_recidck) loop
    dbms_logmnr.add_logfile(options     => dbms_logmnr.addfile,
                            logfilename => rs.name);
    --dbms_output.put_line();
    utl_file.put_line(logfile, '2====2 Archive logfilename:' || rs.name);
  end loop;
  --dbms_output.put_line();
  utl_file.put_line(logfile, '2====3补充Redo log');
  for rs in (select f.MEMBER
               from v$logfile f, v$log l
              where f.group# = l.GROUP#
                and l.STATUS = 'CURRENT') loop
    dbms_logmnr.add_logfile(options     => dbms_logmnr.addfile,
                            logfilename => rs.MEMBER);
    --dbms_output.put_line();
    utl_file.put_line(logfile, '2====3 Redo logfilename:' || rs.MEMBER);
  end loop;
  dbms_logmnr.start_logmnr(options => dbms_logmnr.dict_from_online_catalog);
  --dbms_logmnr.start_logmnr(dictfilename => '/backup/logmnr/rac1emz_testdict.ora');
  --待排除DLL
  open c_ddl;
  fetch c_ddl bulk collect
    into tb_ddl_XID;
  close c_ddl;
  --待查询的DML
  for c_tranXID in (select distinct XID
                      from v$logmnr_contents
                     where seg_name = v_tablename) loop
    isDDL        := 0;
    duplicat_xid := 0;
    isCommited   := 0;
    --判断是否dll操作
    for i in 1 .. tb_ddl_XID.count loop
      if c_tranXID.XID = tb_ddl_XID(i) then
        isDDL := 1;
        --dbms_output.put_line();
        utl_file.put_line(logfile, '3==1 ddl xid:' || c_tranXID.XID);
      end if;
    end loop;
    --不是dll,添加dml xid
    if isDDl = 0 then
      --是否已经commit
      for c_commitXid in (select XID
                            from v$logmnr_contents
                           where xid = c_tranXID.XID
                             and operation = 'COMMIT') loop
        isCommited := 1;
        --dbms_output.put_line();
        utl_file.put_line(logfile, '3==1 COMMITED xid:' || c_commitXid.xid);
      end loop;
      if isCommited = 1 then
        --判断xid事务标识是否已经存在,如果存在不添加,防止重复分析xid
        fhandle := utl_file.fopen('SYNCDATACONFIG', 'xid_array.txt', 'R');
        LOOP
          BEGIN
            utl_file.get_line(fhandle, fp_buffer);
            if rtrim(ltrim(fp_buffer)) = c_tranXID.XID then
              duplicat_xid := 1;
              --dbms_output.put_line(); --重复xid
              utl_file.put_line(logfile, '3====2 已分析过xid :' || c_tranXID.XID);
            end if;
          exception
            when no_data_found then
              exit;
          END;
        END LOOP;
        --判断没有重复的插入表tb_dml_XID
        if duplicat_xid = 0 then
          tb_dml_XID(j) := c_tranXID.XID;
          j := j + 1;
        end if;
        utl_file.fclose(fhandle);
      end if;
    end if;
  end loop;
  --没有dml  tran 退出程序
  --dbms_output.put_line();
  utl_file.put_line(logfile, '3====3 dml  tran count :' || tb_dml_XID.count);
  if tb_dml_XID.count = 0 then
    dbms_logmnr.end_logmnr;
  else
    --打开xid事务标识集合,准备新增xid事务标识
    fhandle := utl_file.fopen('SYNCDATACONFIG', 'xid_array.txt', 'a');
    --dbms_output.put_line();
    utl_file.put_line(logfile, '5====打开xid事务标识集合');
    --创建XID_str字符串
    --dbms_output.put_line();
    utl_file.put_line(logfile, '5====创建XID_str字符串');
    for i in 1 .. tb_dml_XID.count loop
      if tb_dml_XID.count != 1 then
        if i = 1 then
          --新增入一行xid事务标识
          utl_file.put_line(fhandle, tb_dml_XID(i - 1));
          --dbms_output.put_line();
          utl_file.put_line(logfile, 'a1_' || tb_dml_XID(i - 1));
          XID_str := tb_dml_XID(i - 1);
        else
          --新增入一行xid事务标识
          utl_file.put_line(fhandle, tb_dml_XID(i - 1));
          utl_file.put_line(logfile, 'a2_' || tb_dml_XID(i - 1));
          XID_str := XID_str || ',' || tb_dml_XID(i - 1);
        end if;
      else
        --新增入一行xid事务标识
        utl_file.put_line(fhandle, tb_dml_XID(i - 1));
        --dbms_output.put_line('a3_' || tb_dml_XID(i - 1));
        utl_file.put_line(logfile, 'a3_' || tb_dml_XID(i - 1));
        XID_str := tb_dml_XID(i - 1);
      end if;
    end loop;
    utl_file.fclose(fhandle);
    --dbms_output.put_line();
    utl_file.put_line(logfile, '5====关闭xid事务标识集合');
    --dbms_output.put_line();
    utl_file.put_line(logfile, '5====XID_str字符串: ' || XID_str);
    l_string := XID_str || ',';
    loop
      exit when l_string is null;
      n := instr(l_string, ',');
      l_data.extend;
      l_data(l_data.count) := ltrim(rtrim(substr(l_string, 1, n - 1)));
      l_string := substr(l_string, n + 1);
    end loop;
    --队列
    --读取队列开始序列
    fhandle := utl_file.fopen('SYNCDATACONFIG', 'queue_ck.txt', 'R');
    utl_file.get_line(fhandle, fp_buffer);
    utl_file.fclose(fhandle);
    --dbms_output.put_line();
    utl_file.put_line(logfile, '6====读取队列开始序列');
    --更新队列
    v_queueck   := to_number(substr(fp_buffer, 2, 10));
    v_queuename := 'q' || lpad((v_queueck + 1), 10, '0');
    fhandle     := utl_file.fopen('SYNCDATACONFIG', 'queue_ck.txt', 'w');
    utl_file.put_line(fhandle, v_queuename);
    utl_file.fclose(fhandle);
    --dbms_output.put_line();
    utl_file.put_line(logfile, '7====更新队列队列序列');
    ----创建队列文件
    fhandle := utl_file.fopen('SYNCDATA', v_queuename || '.txt', 'w');
    utl_file.fclose(fhandle);
    utl_file.put_line(logfile, '8====创建队列v_queuename:' || v_queuename);
    --添加dml
    fhandle := utl_file.fopen('SYNCDATA', v_queuename || '.txt', 'a');
    for c_sql in (select sql_redo, OPERATION
                    from v$logmnr_contents
                   where XID in (select *
                                   from THE (select cast(l_data as mytableType)
                                               from dual))
                     and operation in
                         ('INSERT', 'DELETE', 'UPDATE', 'COMMIT')
                   order by scn) loop
      if c_sql.sql_redo is not null then
        --判断如果是insert,不截取ROWID
        if c_sql.OPERATION = 'INSERT' or c_sql.OPERATION = 'COMMIT' then
          if c_sql.OPERATION = 'INSERT' then
            sql_txt := replace(c_sql.sql_redo,
                               '"' || v_tablename || '"',
                               '"' || v_tablename || '"' || v_dblink_name);
            utl_file.put_line(fhandle, sql_txt);
            utl_file.put_line(logfile, '9====sql_redo1:' || sql_txt);
          else
            utl_file.put_line(fhandle, c_sql.sql_redo);
            utl_file.put_line(logfile, '9====sql_redo2:' || c_sql.sql_redo);
          end if;
        else
          sql_txt := substr(c_sql.sql_redo,
                            0,
                            instr(c_sql.sql_redo, 'ROWID') - 5);
          sql_txt := replace(sql_txt,
                             '"' || v_tablename || '"',
                             '"' || v_tablename || '"' || v_dblink_name);
          utl_file.put_line(fhandle, sql_txt);
          utl_file.put_line(logfile, '9====sql_redo3:' || sql_txt);
        end if;
      end if;
    end loop;
    utl_file.fclose(fhandle);
    utl_file.put_line(logfile, '10====');
    --新增一行到 queue_arrary集合
    fhandle := utl_file.fopen('SYNCDATACONFIG', 'queue_array.txt', 'a');
    utl_file.put_line(fhandle, v_queuename);
    utl_file.fclose(fhandle);
    --dbms_output.put_line('11====新增到queue_arrary集合的队列名称:' || v_queuename);
    utl_file.put_line(logfile, '11====新增到queue_arrary集合的队列名称:' || v_queuename);
    dbms_logmnr.end_logmnr;
    --dbms_output.put_line('9====');
    utl_file.put_line(logfile, '9====');
    v_scn   := dbms_flashback.get_system_change_number();
    fhandle := utl_file.fopen('SYNCDATACONFIG', 'scn_ck.txt', 'w');
    utl_file.put_line(fhandle, v_scn);
    utl_file.fclose(fhandle);
    --dbms_output.put_line('10====v_scn:' || v_scn);
    utl_file.put_line(logfile, '10====v_scn:' || v_scn);
  end if;
  --结束后变更检查点
  --归档检查点
  fhandle := utl_file.fopen('SYNCDATACONFIG', 'archivelog_ck.txt', 'W');
  utl_file.put_line(fhandle, v_recid);
  utl_file.fclose(fhandle);
  --dbms_output.put_line('11====v_recid_ck:' || v_recid);
  utl_file.put_line(logfile, '11====v_recid_ck:' || v_recid);
  v_scn   := dbms_flashback.get_system_change_number();
  fhandle := utl_file.fopen('SYNCDATACONFIG', 'scn_ck.txt', 'w');
  utl_file.put_line(fhandle, v_scn);
  utl_file.fclose(fhandle);
  --dbms_output.put_line('12====v_scn:' || v_scn);
  utl_file.put_line(logfile, '12====v_scn:' || v_scn);
  utl_file.put_line(logfile, '2====Analyze Archivelog...End:'||to_char(sysdate,'yyyy-MM-dd hh24:mi:ss'));
  utl_file.fclose(logfile);
  if isCheckRedo = 0 then
    sys.datacapture_redo(tableowner, tablename, database_link_name);
  end if;
exception
  when others then
    dbms_logmnr.end_logmnr;
    utl_file.put_line(logfile, '2====Analyze Archivelog...End:'||to_char(sysdate,'yyyy-MM-dd hh24:mi:ss'));
    --dbms_output.put_line(sqlerrm);
    utl_file.put_line(logfile, '13====2sqlerrm: ' || sqlerrm);
    utl_file.fclose(logfile);
    return;
end DataCapture_Archivelog;
/

抓取在线日志程序

create or replace procedure DATACAPTURE_REDO(tableowner varchar2,tablename varchar2,database_link_name varchar2) is
  v_scn         number(30);
  fhandle       utl_file.file_type; --oracle文件类型
  fp_buffer     varchar2(4000); --文件输出缓存
  logfile        utl_file.file_type; --oracle文件类型
  logfile_buffer varchar2(4000); --文件输出缓存
  v_recid       number(10); --表archive log 序号
  v_recidck     number(10); --archivelog_ck 序号
  sql_txt       varchar2(4000); --待执行的 sql_redo
  v_dblink_name varchar2(20) := '@'||database_link_name; --目的数据库联接
  sqlwhere      varchar2(1000); --待执行的 sql_redo 条件语句
  v_tableowner  varchar2(20); --待同步的用户
  v_tablename   varchar2(20); --待同步的表
  v_queueck     number(10); --队列序列SCN
  v_queuename   varchar(20); --dml队列名称
  v_sqlcommitnum number(5) := 10000; --已经提交的事务dml 分批提交给目的数据库
  v_sqlnum    number:= 0; --已经提交的事务dml当前个数
  l_string long;
  l_data   myTableType := myTableType();
  n        number;
  isDDL      number(1) := 0; --是否是ddl操作 0不是 1是
  isCommited number(1) := 0; --是否已经commit操作 0不是 1是
  XID_str varchar2(8000); --事务序号数组
  j       number := 0; --tb_dml_XIDSQN序列下标
  type tb_XID_type is table of v$logmnr_contents.XID%type index by binary_integer; --定义待排除DLL
  tb_ddl_XID   tb_xid_type; --定义待排除DLL表
  tb_dml_XID   tb_xid_type; --定义待排除DLL表
  duplicat_xid number(1) := 0; -- 0  不重复 1 重复
  cursor c_ddl is
    select distinct XID
      from v$logmnr_contents
     where seg_name = v_tablename
       and OPERATION = 'DDL';
begin
--打开捕获日志
  logfile := utl_file.fopen('LOGMNR', 'capture_log.txt', 'a');
  --初始化schema
  v_tableowner := tableowner;
  v_tablename  := tablename;
  --捕获在线日志
  --dbms_output.put_line('Analyze Redo...Start');
  utl_file.put_line(logfile, '');
  utl_file.put_line(logfile, 'Analyze Redo...Start :'||to_char(sysdate,'yyyy-MM-dd hh24:mi:ss'));
  fhandle := utl_file.fopen('SYNCDATACONFIG', 'scn_ck.txt', 'R');
  utl_file.get_line(fhandle, fp_buffer);
  v_scn := fp_buffer;
  utl_file.fclose(fhandle);
  --dbms_output.put_line('1====v_scn:' || v_scn);
  utl_file.put_line(logfile, '1====v_scn:' || v_scn);
  for rs in (select f.MEMBER
               from v$logfile f, v$log l
              where f.group# = l.GROUP#
                and l.STATUS = 'CURRENT') loop
    dbms_logmnr.add_logfile(options     => dbms_logmnr.addfile,
                            logfilename => rs.member);
    dbms_logmnr.start_logmnr(options => dbms_logmnr.dict_from_online_catalog);
    --dbms_logmnr.start_logmnr(dictfilename => '/backup/logmnr/rac1emz_testdict.ora');
    --dbms_output.put_line('2====logfilename:' || rs.member);
    utl_file.put_line(logfile, '2====logfilename:' || rs.member);
    --待排除DLL
    open c_ddl;
    fetch c_ddl bulk collect
      into tb_ddl_XID;
    close c_ddl;
    --待查询的DML
    for c_tranXID in (select distinct XID
                        from v$logmnr_contents
                       where seg_name = v_tablename) loop
      isDDL:= 0;
      duplicat_xid := 0;
      isCommited   := 0;
      --判断是否ddl操作
      for i in 1 .. tb_ddl_XID.count loop
        if c_tranXID.XID = tb_ddl_XID(i) then
          isDDL := 1;
          --dbms_output.put_line('3==1 ddl xid:'||c_tranXID.XID);
          utl_file.put_line(logfile, '3==1 ddl xid:'||c_tranXID.XID);
        end if;
      end loop;
      --不是dll,添加dml xid
      if isDDL = 0 then
      --dbms_output.put_line('3==1 dml xid:'||c_tranXID.XID);
      utl_file.put_line(logfile, '3==1 dml xid:'||c_tranXID.XID);
        --是否已经commit
        for  c_commit in (select XID from v$logmnr_contents
                             where xid = c_tranXID.XID
                               and operation = 'COMMIT') loop
            isCommited := 1;
            --dbms_output.put_line('3==1 COMMITED xid:'||c_commit.xid);
            utl_file.put_line(logfile, '3==1 COMMITED xid:'||c_commit.xid);
        end loop;
        if isCommited = 1 then
          --判断xid事务标识是否已经存在,如果存在不添加,防止重复分析xid
          fhandle := utl_file.fopen('SYNCDATACONFIG', 'xid_array.txt', 'R');
          LOOP
            BEGIN
              utl_file.get_line(fhandle, fp_buffer);
              if rtrim(ltrim(fp_buffer)) = c_tranXID.XID then
                duplicat_xid := 1;
                --dbms_output.put_line('3====2 已分析过xid :' || c_tranXID.XID); --重复xid
                utl_file.put_line(logfile, '3====2 已分析过xid :' || c_tranXID.XID);
              end if;
            exception
              when no_data_found then
                exit;
            END;
          END LOOP;
          --判断没有重复的插入表tb_dml_XID
          if duplicat_xid = 0 then
            tb_dml_XID(j) := c_tranXID.XID;
            j := j + 1;
          end if;
          utl_file.fclose(fhandle);
        end if;
      end if;
    end loop;
    --没有dml tran 退出程序
    --dbms_output.put_line('3====3 dml tran count :' || tb_dml_XID.count);
    utl_file.put_line(logfile, '3====3 dml tran count :' || tb_dml_XID.count);
    if tb_dml_XID.count = 0 then
      dbms_logmnr.end_logmnr;
      utl_file.put_line(logfile, 'Analyze Redo...End:'||to_char(sysdate,'yyyy-MM-dd hh24:mi:ss'));
      utl_file.fclose(logfile);
      return;
    end if;
    --打开xid事务标识集合,准备新增xid事务标识
    fhandle := utl_file.fopen('SYNCDATACONFIG', 'xid_array.txt', 'a');
    --dbms_output.put_line('4====打开xid事务标识集合');
    utl_file.put_line(logfile, '4====打开xid事务标识集合');
    --创建XID_str字符串
    --dbms_output.put_line('4====创建XID_str字符串');
    utl_file.put_line(logfile, '4====创建XID_str字符串');
    for i in 1 .. tb_dml_XID.count loop
      if tb_dml_XID.count != 1 then
        if i = 1 then
          --新增入一行xid事务标识
          utl_file.put_line(fhandle, tb_dml_XID(i - 1));
          --dbms_output.put_line('a1_' || tb_dml_XID(i - 1));
          utl_file.put_line(logfile, 'a1_' || tb_dml_XID(i - 1));
          XID_str := tb_dml_XID(i - 1);
        else
          --新增入一行xid事务标识
          utl_file.put_line(fhandle, tb_dml_XID(i - 1));
          --dbms_output.put_line('a2_' || tb_dml_XID(i - 1));
          utl_file.put_line(logfile, 'a2_' || tb_dml_XID(i - 1));
          XID_str := XID_str || ',' || tb_dml_XID(i - 1);
        end if;
      else
        --新增入一行xid事务标识
        utl_file.put_line(fhandle, tb_dml_XID(i - 1));
        --dbms_output.put_line('a3_' || tb_dml_XID(i - 1));
        utl_file.put_line(logfile, 'a3_' || tb_dml_XID(i - 1));
        XID_str := tb_dml_XID(i - 1);
      end if;
    end loop;
    utl_file.fclose(fhandle);
    --dbms_output.put_line('4====关闭xid事务标识集合');
    utl_file.put_line(logfile, '4====关闭xid事务标识集合');
    --dbms_output.put_line('4====XID_str字符串: ' || XID_str);
    utl_file.put_line(logfile, '4====XID_str字符串: ' || XID_str);
    l_string := XID_str || ',';
    loop
      exit when l_string is null;
      n := instr(l_string, ',');
      l_data.extend;
      l_data(l_data.count) := ltrim(rtrim(substr(l_string, 1, n - 1)));
      l_string := substr(l_string, n + 1);
    end loop;
    --队列
    --读取队列开始序列
    fhandle := utl_file.fopen('SYNCDATACONFIG', 'queue_ck.txt', 'R');
    utl_file.get_line(fhandle, fp_buffer);
    utl_file.fclose(fhandle);
    --dbms_output.put_line('5====读取队列开始序列');
    utl_file.put_line(logfile, '5====读取队列开始序列');
    --更新队列
    v_queueck   := to_number(substr(fp_buffer, 2, 10));
    v_queuename := 'q' || lpad((v_queueck + 1), 10, '0');
    fhandle     := utl_file.fopen('SYNCDATACONFIG', 'queue_ck.txt', 'w');
    utl_file.put_line(fhandle, v_queuename);
    utl_file.fclose(fhandle);
    --dbms_output.put_line('6====更新队列队列序列');
    utl_file.put_line(logfile, '6====更新队列队列序列');
    ----创建队列
    fhandle := utl_file.fopen('SYNCDATA', v_queuename || '.txt', 'w');
    utl_file.fclose(fhandle);
    utl_file.put_line(logfile, '7====创建队列v_queuename:' || v_queuename);
    --添加dml
    fhandle := utl_file.fopen('SYNCDATA', v_queuename || '.txt', 'a');
    for c_sql in (select sql_redo, OPERATION
                    from v$logmnr_contents
                   where XID in (select *
                                   from THE (select cast(l_data as mytableType)
                                               from dual))
                     and operation in
                         ('INSERT', 'DELETE', 'UPDATE', 'COMMIT')
                   order by scn) loop
      if c_sql.sql_redo is not null then
        --判断如果是insert,不截取ROWID
        if c_sql.OPERATION = 'INSERT' or c_sql.OPERATION = 'COMMIT' then
          if c_sql.OPERATION = 'INSERT' then
            sql_txt := replace(c_sql.sql_redo,
                               '"' || v_tablename || '"',
                               '"' || v_tablename || '"' || v_dblink_name);
            utl_file.put_line(fhandle, sql_txt);
            --dbms_output.put_line('8====sql_redo1:' || sql_txt);
            utl_file.put_line(logfile, '8====sql_redo1:' || sql_txt);
          else
            utl_file.put_line(fhandle, c_sql.sql_redo);
            --dbms_output.put_line('8====sql_redo2:' || c_sql.sql_redo);
            utl_file.put_line(logfile, '8====sql_redo2:' || c_sql.sql_redo);
          end if;
        else
          sql_txt := substr(c_sql.sql_redo,
                            0,
                            instr(c_sql.sql_redo, 'ROWID') - 5);
          sql_txt := replace(sql_txt,
                             '"' || v_tablename || '"',
                             '"' || v_tablename || '"' || v_dblink_name);
          utl_file.put_line(fhandle, sql_txt);
          --dbms_output.put_line('8====sql_redo3:' || sql_txt);
          utl_file.put_line(logfile, '8====sql_redo3:' || sql_txt);
        end if;
      end if;
    end loop;
    utl_file.fclose(fhandle);
    --新增一行到 queue_arrary集合
    fhandle := utl_file.fopen('SYNCDATACONFIG', 'queue_array.txt', 'a');
    utl_file.put_line(fhandle, v_queuename);
    utl_file.fclose(fhandle);
    --dbms_output.put_line('11====新增到queue_arrary集合的队列名称:' || v_queuename);
    utl_file.put_line(logfile, '10====新增到queue_arrary集合的队列名称:' || v_queuename);
    --dbms_output.put_line('9====fclose');
    utl_file.put_line(logfile, '11====fclose');
    dbms_logmnr.end_logmnr;
    --dbms_output.put_line('10====end_logmnr');
    utl_file.put_line(logfile, '10====end_logmnr');
  end loop;
  v_scn   := dbms_flashback.get_system_change_number();
  fhandle := utl_file.fopen('SYNCDATACONFIG', 'scn_ck.txt', 'w');
  utl_file.put_line(fhandle, v_scn);
  utl_file.fclose(fhandle);
  --dbms_output.put_line('11====v_scn:' || v_scn);
  utl_file.put_line(logfile, '12====v_scn:' || v_scn);
  utl_file.put_line(logfile, 'Analyze Redo...End:'||to_char(sysdate,'yyyy-MM-dd hh24:mi:ss'));
  utl_file.fclose(logfile);
exception
  when others then
    dbms_logmnr.end_logmnr;
    --dbms_output.put_line(sqlerrm||': '||sqlcode);
    utl_file.put_line(logfile, 'Analyze Redo...End:'||to_char(sysdate,'yyyy-MM-dd hh24:mi:ss'));
    utl_file.put_line(logfile, sqlerrm||': '||sqlcode);
    utl_file.fclose(logfile);
    return;
end DataCapture_Redo;
/

数据应用,以dblink方式插入到目标端

create or replace procedure DataReplicat is
  fhandle     utl_file.file_type; --oracle文件类型
  fp_buffer   varchar2(4000); --文件输出缓存
  v_queuename varchar(20); --queue_arrary.txt中的dml队列名称
  v_send_ck   varchar(20); --send_ck.txt的dml队列名称
  v_run_sql   varchar(4000); --dml
  logfile        utl_file.file_type; --oracle文件类型
  logfile_buffer varchar2(4000); --文件输出缓存
begin
  --打开发送日志
  logfile := utl_file.fopen('LOGMNR', 'send_log.txt', 'a');
  utl_file.put_line(logfile, '');
  utl_file.put_line(logfile, 'DataReplicat......Start:'||to_char(sysdate,'yyyy-MM-dd hh24:mi:ss'));
  --读取send_ck.txt,获取检查点
  fhandle := utl_file.fopen('SYNCDATACONFIG', 'send_ck.txt', 'R');
  BEGIN
    utl_file.get_line(fhandle, fp_buffer);
    v_send_ck := fp_buffer;
    --dbms_output.put_line();
    utl_file.put_line(logfile, '1====1v_send_ck:' || v_send_ck);
  exception
    when no_data_found then
      v_send_ck := null;
      --dbms_output.put_line();
      utl_file.put_line(logfile, '1====1v_send_ck is null');
  END;
  utl_file.fclose(fhandle);
  --如果send_ck.txt为空,从queue_array.txt读取第一个队列名
  if v_send_ck is null then
    fhandle := utl_file.fopen('SYNCDATACONFIG', 'queue_array.txt', 'R');
    BEGIN
      utl_file.get_line(fhandle, fp_buffer); --读取第一个队列名
      v_queuename := fp_buffer;
      --dbms_output.put_line();
      utl_file.put_line(logfile, '2====1queue_array.txt head->v_queuename:' ||v_queuename);
      v_send_ck := v_queuename;
    exception
      when no_data_found then
        --queue_arrary.txt没有队列直接退出
        --dbms_output.put_line();
        utl_file.put_line(logfile,'2====2queue_array.txt->no queuename!');
        --dbms_output.put_line();
        utl_file.put_line(logfile,'2====2DataReplicat......End:'||to_char(sysdate,'yyyy-MM-dd hh24:mi:ss'));
        return;
    END;
    utl_file.fclose(fhandle);
  else
    --如果send_ck.txt不为空,读取下一个队列
    v_queuename := 'q' ||
                   lpad((to_number(substr(v_send_ck, 2, 10)) + 1), 10, '0');
    --dbms_output.put_line();
    utl_file.put_line(logfile,'2====3next queuename:' || v_queuename);
    begin
      fhandle := utl_file.fopen('SYNCDATA', v_queuename || '.txt', 'R');
      utl_file.get_line(fhandle, fp_buffer);
      utl_file.fclose(fhandle);
      --dbms_output.put_line();
      utl_file.put_line(logfile,'2====4queue_array.txt->' || v_queuename);
      v_send_ck := v_queuename;
    exception
      when others then
        --没有队列直接退出
        --dbms_output.put_line();
        utl_file.put_line(logfile,'2====5queue_array.txt->no new queuename or dml in queue file!');
        --dbms_output.put_line();
        utl_file.put_line(logfile,'2====5DataReplicat......End:'||to_char(sysdate,'yyyy-MM-dd hh24:mi:ss'));
        utl_file.fclose(logfile);
        return;
    end;
  end if;
  --读取syncdata队列 v_queuename,执行队列q***********.txt命令
  fhandle := utl_file.fopen('SYNCDATA', v_queuename || '.txt', 'R');
  LOOP
    BEGIN
      utl_file.get_line(fhandle, fp_buffer);
      v_run_sql := replace(fp_buffer, ';', null);
      if v_run_sql != 'commit' then
        execute immediate v_run_sql;
        --dbms_output.put_line();
        utl_file.put_line(logfile,'3====1run_sql:' || v_run_sql);
      else
        commit;
        --dbms_output.put_line();
        utl_file.put_line(logfile,'3====2commit');
      end if;
    exception
      when no_data_found then
        exit;
    END;
  END LOOP;
  utl_file.fclose(fhandle);
  --执行成功后 send_ck.txt记录该检查点
  fhandle := utl_file.fopen('SYNCDATACONFIG', 'send_ck.txt', 'w');
  utl_file.put_line(fhandle, v_send_ck);
  utl_file.fclose(fhandle);
  --dbms_output.put_line();
  utl_file.put_line(logfile,'4====v_send_ck changed:' || v_send_ck);
  --dbms_output.put_line();
  utl_file.put_line(logfile,'4====DataReplicat......End:'||to_char(sysdate,'yyyy-MM-dd hh24:mi:ss'));
exception
  when others then
    --dbms_output.put_line();
    utl_file.put_line(logfile,'DataReplicat......End:'||to_char(sysdate,'yyyy-MM-dd hh24:mi:ss'));
    --dbms_output.put_line();
    utl_file.put_line(logfile,sqlerrm);
    rollback;
    utl_file.put_line(logfile,'rollback');
    utl_file.fclose(logfile);
    return;
end DataReplicat;
/

部分内容摘自网络,如有侵权,请联系

请使用浏览器的分享功能分享到微信等