您的位置:首页 > 其它

【原创】利用存储过程处理流复制产…

2014-03-10 10:08 274 查看
流复制不能保证本地和远端的数据库数据完全一致,数据不一致的可能是完全存在的。而且进行插入,更新等操作时,可能因为主键冲突而导致操作失败。以下语句可以查看错误信息:


select * from dba_apply_error;






根据具体事务查找ERROR_MESSAGE:

select * from
dba_apply_error_messages where local_transaction_id
='81.23.112827';









如果手工的去查找并排除错误将会非常的耗时,此时就可以利用存储过程来自动化处理这些错误。

这个例子是针对某个事务(p_local_transaction_id )的错误而写的:

核心算法:

IF 操作类型是 insert THEN


获取新行的主键列;

ELSE

获取旧行的主键列;

END IF;

IF 源数据库不存在该条记录 THEN

在远端数据库中删掉该记录;

ELSE

在远端数据库中删掉该记录;

在远端数据库中插入源端对应的记录;

END IF;

具体代码如下:

CREATE OR REPLACE PROCEDURE MANUAL_ERROR(p_local_transaction_id
IN VARCHAR2, --处理事务ID

p_output_sql
IN VARCHAR2, --是否输出检查和改错SQL

p_deal_error
IN VARCHAR2, --是否执行改错SQL

p_delete_errormsg
IN VARCHAR2) --是否删除错误信息

is

--edit by leiyao
v1.5
2013-12-5


--**********************说明********************************

--1.p_output_sql=1,则输出不同步记录的检查和改错SQL语句。

--2.p_deal_error=1,则执行改错SQL语句。


--3.p_delete_errormsg=1,则删除dba_apply_error里的错误信息。


--**********************************************************

v_count_out int;


ad
ANYDATA;


data
ANYDATA;


message
ANYDATA;


newlist
SYS.LCR$_ROW_LIST;


oldlist
SYS.LCR$_ROW_LIST;


ret
PLS_INTEGER;


lcr
SYS.LCR$_ROW_RECORD;

errlog_rec
errorlog%ROWTYPE;

--MIS中列最大长度20,表名最大长度24

v_colname VARCHAR2(32);

--err_messages1 VARCHAR2(400);

ob_owner VARCHAR2(32);

ob_name VARCHAR2(32);

cmd_type VARCHAR2(30);

--主键列名及类型

cursor PK_COLS is

Select
B.COLUMN_NAME


from SYS.dba_cons_columns a, SYS.Dba_Tab_Columns b


where a.table_name = B.TABLE_NAME


AND A.table_name = ob_name


and a.column_name = b.COLUMN_NAME


and a.owner = ob_owner


and b.OWNER = A.owner


AND a.constraint_name =


(select constraint_name


from SYS.DBA_CONSTRAINTS


where owner = ob_owner


and table_name = ob_name


and constraint_type = 'P');

--取数非主键列

--cursor FILTER_COLS is

--SELECT column_value from
table(split(REPLACE(SUBSTR(err_messages1,INSTR(err_messages1,'ORA-26787:
The row with key (') + LENGTH('ORA-26787: The row with key ('),
INSTR(err_messages1,') =') - INSTR(err_messages1,'ORA-26787: The
row with key (') - LENGTH('ORA-26787: The row with key
(')),'"',''),','))

--WHERE column_value NOT
IN(Select
B.COLUMN_NAME
from SYS.dba_cons_columns
a, SYS.Dba_Tab_Columns b where
a.table_name= B.TABLE_NAME AND A.table_name=ob_name and
a.column_name = b.COLUMN_NAME and a.owner = ob_owner and
b.OWNER=A.owner AND
a.constraint_name
= (
select
constraint_name
from SYS.DBA_CONSTRAINTS where
owner = ob_owner and table_name
= ob_name
and
constraint_type ='P'));

--cursor c_messages is

--select message_count from dba_apply_error
where local_transaction_id = p_local_transaction_id;

v_cnt integer;

--
v_count
integer; --标识源端数据数目,若>=1,则存在该条记录,否则不存在该条记录。


v_sql
varchar2(4000);

v_sql_temp varchar2(4000);

v_select
varchar2(4000);


--v_id
varchar2(20);


--v_NAME
varchar2(32);

v_content_any ANYDATA;


tn
VARCHAR2(61);


str
VARCHAR2(4000);


chrs
VARCHAR2(1000);


num
NUMBER;


v_pos
NUMBER;


dat
DATE;


floats
float;

--res NUMBER;

oldlist SYS.LCR$_ROW_LIST;

BEGIN


--该过程是处理冲突的核心过程,可以处理ORA-26786及ORA-26787两类错误(实质上都是ORA-01403错误,no
data found)


--ORA-26787错误是更新或删除时提示找不到数据,ORA-26786错误是更新或删除时有数据,但部分列与LCR记录中值冲突


--处理ORA-26787错误思路:无论是UPDATE还是DELETE动作,均是从LCR中取出每个字段的旧值,然后组织成插入语句,插入后再执行错误


--处理ORA-26786错误思路:获取冲突列,再用冲突列名获取LCR中冲突字段值,组织为UPDATE语句,将目标中的冲突字段值改为LCR中的值,更新后再执行错误

----下面GETOBJECT语句必须一开始就执行

DBMS_OUTPUT.ENABLE(400000);

--初始化v_count_out

v_count_out := 0;

--获取该事务产生的错误个数

select message_count

into
v_cnt

from
dba_apply_error

where local_transaction_id =
p_local_transaction_id;

FOR j in 1 .. v_cnt LOOP

message :=
DBMS_APPLY_ADM.GET_ERROR_MESSAGE(j, p_local_transaction_id);


ret
:= message.GETOBJECT(lcr);


ad
:= DBMS_STREAMS.GET_INFORMATION('CONSTRAINT_NAME');


ret
:= ad.GetVarchar2(errlog_rec.text);


ad
:= DBMS_STREAMS.GET_INFORMATION('SENDER');


ret
:= ad.GETVARCHAR2(errlog_rec.sender);

-- Access
the LCR

-- Determine
the owner of the database object for the LCR用户名

ob_owner :=
lcr.GET_OBJECT_OWNER;

-- Determine
the name of the database object for the LCR

--表名

ob_name :=
lcr.GET_OBJECT_NAME;

-- Determine
the type of DML change


--DML类型

cmd_type :=
lcr.GET_COMMAND_TYPE;

IF
message.getTypeName() = 'SYS.LCR$_ROW_RECORD' THEN


v_sql
:= '';


v_sql_temp := '';


OPEN PK_COLS;


LOOP


FETCH PK_COLS


INTO v_colname;


EXIT WHEN PK_COLS%NOTFOUND;


--循环处理主键列,GET_VALUE获取主键的新值


IF cmd_type IN ('INSERT') THEN


v_content_any := lcr.GET_VALUE('new', v_colname);


ELSE


v_content_any := lcr.GET_VALUE('old', v_colname);


END IF;




IF v_content_any IS NOT NULL THEN


tn := v_content_any.GETTYPENAME();


--tn为字段类型,根据字段类型,使用不同方法获取LCR记录中的新值数据,然后组织成删除语句的WHERE条件


IF tn = 'SYS.VARCHAR2' THEN


str := v_content_any.AccessVarchar2();


IF str IS NULL THEN


v_sql := v_sql || ' AND ' || v_colname || ' IS NULL ';


ELSE


v_sql := v_sql || ' AND ' || v_colname || '=' || CHR(39) ||


to_char(str) || CHR(39);


END IF;


ELSIF tn = 'SYS.CHAR' then


chrs := v_content_any.AccessChar();


IF chrs IS NULL THEN


v_sql := v_sql || ' AND ' || v_colname || ' IS NULL ';


ELSE


v_sql := v_sql || ' AND ' || v_colname || '=' || CHR(39) || chrs
||


CHR(39);


END IF;


ELSIF tn = 'SYS.VARCHAR' THEN


chrs := v_content_any.AccessVarchar();


IF chrs IS NULL THEN


v_sql := v_sql || ' AND ' || v_colname || ' IS NULL ';


ELSE


v_sql := v_sql || ' AND ' || v_colname || '=' || CHR(39) ||


to_char(chrs) || CHR(39);


END IF;


ELSIF tn = 'SYS.NUMBER' THEN


num := v_content_any.AccessNumber();


IF NUM IS NULL THEN


v_sql := v_sql || ' AND ' || v_colname || ' IS NULL ';


ELSE


v_sql := v_sql || ' AND ' || v_colname || '=' ||
to_char(num);


END IF;


ELSIF tn = 'SYS.FLOAT' THEN


floats := v_content_any.AccessBFloat();


IF floats IS NULL THEN


v_sql := v_sql || ' AND ' || v_colname || ' IS NULL ';


ELSE


v_sql := v_sql || ' AND ' || v_colname || '=' || CHR(39) ||


to_char(floats) || CHR(39);


END IF;


ELSIF tn = 'SYS.DATE' THEN


dat := v_content_any.AccessDate();


IF dat IS NULL THEN


v_sql := v_sql || ' AND ' || v_colname || ' IS NULL ';


ELSE


v_sql := v_sql || ' AND ' || v_colname || '=' || 'TO_DATE('
||


CHR(39) || to_char(dat, 'yyyymmdd hh24:mi:ss') ||


CHR(39) || ',''yyyymmdd hh24:mi:ss'')';


END IF;


END IF;


END IF;


END LOOP;


CLOSE PK_COLS;


-------------------


v_sql_temp := 'SELECT COUNT(*) FROM ' || ob_owner
|| '.' || ob_name ||


'@SCDB.GZDS.GOV.CN' || '
WHERE ' || SUBSTR(v_sql, 5);


--DBMS_OUTPUT.PUT_LINE(v_sql_temp);


execute immediate v_sql_temp


into v_cnt; --获取源端记录数


----判断源端目标端记录是否相同


v_select := 'select count(*) from ' || '(SELECT * FROM ' ||
ob_owner || '.' ||


ob_name || ' WHERE ' || SUBSTR(v_sql, 5) || ' minus ' ||


'SELECT * FROM ' || ob_owner || '.' || ob_name ||


'@SCDB.GZDS.GOV.CN WHERE '
|| SUBSTR(v_sql, 5) || ')';


-- DBMS_OUTPUT.PUT_LINE('--检查语句:');


-- DBMS_OUTPUT.PUT_LINE(v_select);


execute immediate v_select


into v_count_out;


---------------


IF p_output_sql IN ('1') THEN


if v_count_out <> 0 then


-- 输出检查


DBMS_OUTPUT.PUT_LINE('--检查语句:');


DBMS_OUTPUT.PUT_LINE(v_select||';');


IF v_cnt >= 1 THEN


--DBMS_OUTPUT.PUT_LINE('>=1');


--输出改错SQL。


DBMS_OUTPUT.PUT_LINE('--源端有该记录,目标端先执行DELETE再INSERT!');


v_sql_temp := 'DELETE FROM ' || ob_owner || '.'
|| ob_name ||


' WHERE ' || SUBSTR(v_sql, 5);


DBMS_OUTPUT.PUT_LINE(v_sql_temp);


v_sql_temp := 'INSERT INTO ' || ob_owner || '.'
|| ob_name ||


' SELECT * FROM ' || ob_owner || '.' || ob_name ||


'@SCDB.GZDS.GOV.CN WHERE '
|| SUBSTR(v_sql, 5);


DBMS_OUTPUT.PUT_LINE(v_sql_temp || chr(10));


ELSE


--DBMS_OUTPUT.PUT_LINE('<1');


--输出改错SQL。


DBMS_OUTPUT.PUT_LINE('--源端无记录,目标端执行DELETE!');


v_sql_temp := 'DELETE FROM ' || ob_owner || '.'
|| ob_name ||


' WHERE ' || SUBSTR(v_sql, 5);


DBMS_OUTPUT.PUT_LINE(v_sql_temp || chr(10));


END IF;


end if;


END IF;


IF p_deal_error IN ('1') THEN


--处理错误


if v_count_out <> 0 then


IF v_cnt >= 1 THEN


--DBMS_OUTPUT.PUT_LINE('>=1');


--DBMS_OUTPUT.PUT_LINE('--源端有该记录,目标端先执行DELETE再INSERT!'||chr(10));


v_sql_temp := 'DELETE FROM ' || ob_owner || '.'
|| ob_name ||


' WHERE ' || SUBSTR(v_sql, 5);


execute immediate v_sql_temp;


v_sql_temp := 'INSERT INTO ' || ob_owner || '.'
|| ob_name ||


' SELECT * FROM ' || ob_owner || '.' || ob_name ||


'@SCDB.GZDS.GOV.CN WHERE '
|| SUBSTR(v_sql, 5);


execute immediate v_sql_temp;


commit;


ELSE


--DBMS_OUTPUT.PUT_LINE('<1');


--DBMS_OUTPUT.PUT_LINE('--源端无记录,目标端执行DELETE!'||chr(10));


v_sql_temp := 'DELETE FROM ' || ob_owner || '.'
|| ob_name ||


' WHERE ' || SUBSTR(v_sql, 5);


execute immediate v_sql_temp;


commit;


END IF;


end if;


END IF;

END
IF;

END LOOP;

if p_delete_errormsg in ('1') then


--删除错误信息


DBMS_APPLY_ADM.delete_error(p_local_transaction_id);

end if;

END MANUAL_ERROR;
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: