续oracle10g simpe AQ step by step(一)
-------------------session db1 sys
赋予权限
GRANT DBA, SELECT_CATALOG_ROLE TO strmadmin;
GRANT EXECUTE ON DBMS_APPLY_ADM TO strmadmin;
GRANT EXECUTE ON DBMS_AQ TO strmadmin;
GRANT EXECUTE ON DBMS_AQADM TO strmadmin;
GRANT EXECUTE ON DBMS_STREAMS_ADM TO strmadmin;
BEGIN
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
privilege => DBMS_RULE_ADM.CREATE_RULE_SET_OBJ,
grantee => 'strmadmin',
grant_option => FALSE);
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
privilege => DBMS_RULE_ADM.CREATE_RULE_OBJ,
grantee => 'strmadmin',
grant_option => FALSE);
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
privilege => DBMS_RULE_ADM.CREATE_EVALUATION_CONTEXT_OBJ,
grantee => 'strmadmin',
grant_option => FALSE);
END;
/
-----------------------session db1 strmadmin
--创建队列表
BEGIN
DBMS_STREAMS_ADM.SET_UP_QUEUE(
queue_table => 'oe_queue_table',
queue_name => 'oe_queue');
END;
/
--授予OE用户入列、出列权限
BEGIN
SYS.DBMS_AQADM.GRANT_QUEUE_PRIVILEGE(
privilege => 'ALL',
queue_name => 'strmadmin.oe_queue',
grantee => 'oe');
SYS.DBMS_AQADM.CREATE_AQ_AGENT(
agent_name => 'explicit_enq');
DBMS_AQADM.ENABLE_DB_ACCESS(
agent_name => 'explicit_enq',
db_username => 'oe');
END;
/
--------------------------session db1 oe
---OE用户创建后期测试表及类型
create table orders (
order_id NUMBER(12),
order_date TIMESTAMP(6) WITH LOCAL TIME ZONE,
order_mode VARCHAR2(8),
customer_id NUMBER(6),
order_status NUMBER(2),
order_total NUMBER(8,2),
sales_rep_id NUMBER(6),
promotion_id NUMBER(6));
create table customer (
customer_id NUMBER(6),
cust_first_name VARCHAR2(20),
cust_last_name VARCHAR2(20),
cust_address CUST_ADDRESS_TYPE,
phone_numbers number,
nls_language VARCHAR2(3),
nls_territory VARCHAR2(30),
credit_limit NUMBER(9,2),
cust_email VARCHAR2(30),
account_mgr_id NUMBER(6),
date_of_birth DATE,
marital_status VARCHAR2(20),
gender VARCHAR2(1),
income_level VARCHAR2(20));
CREATE TYPE order_event_typ AS OBJECT(
order_id NUMBER(12),
order_date TIMESTAMP(6) WITH LOCAL TIME ZONE,
order_mode VARCHAR2(8),
customer_id NUMBER(6),
order_status NUMBER(2),
order_total NUMBER(8,2),
sales_rep_id NUMBER(6),
promotion_id NUMBER(6),
action VARCHAR(7));
/
CREATE TYPE customer_event_typ AS OBJECT(
customer_id NUMBER(6),
cust_first_name VARCHAR2(20),
cust_last_name VARCHAR2(20),
cust_address CUST_ADDRESS_TYPE,
phone_numbers number,
nls_language VARCHAR2(3),
nls_territory VARCHAR2(30),
credit_limit NUMBER(9,2),
cust_email VARCHAR2(30),
account_mgr_id NUMBER(6),
date_of_birth DATE,
marital_status VARCHAR2(20),
gender VARCHAR2(1),
income_level VARCHAR2(20),
action VARCHAR(7));
/
----创建一个过程,作用是入列任何类型的数据
CREATE PROCEDURE oe.enq_proc_3 (event IN ANYDATA) IS
enqopt DBMS_AQ.ENQUEUE_OPTIONS_T;
mprop DBMS_AQ.MESSAGE_PROPERTIES_T;
enq_eventid RAW(16);
BEGIN
mprop.SENDER_ID := SYS.AQ$_AGENT('explicit_enq', NULL, NULL);
DBMS_AQ.ENQUEUE(
queue_name => 'strmadmin.oe_queue',
enqueue_options => enqopt,
message_properties => mprop,
payload => event,
msgid => enq_eventid);
END;
/
----创建一个过程,作用是入列一些ROW_LCR的数据
CREATE PROCEDURE oe.enq_row_lcr(
source_dbname VARCHAR2,
cmd_type VARCHAR2,
obj_owner VARCHAR2,
obj_name VARCHAR2,
old_vals SYS.LCR$_ROW_LIST,
new_vals SYS.LCR$_ROW_LIST)
AS
eopt DBMS_AQ.ENQUEUE_OPTIONS_T;
mprop DBMS_AQ.MESSAGE_PROPERTIES_T;
enq_msgid RAW(16);
row_lcr SYS.LCR$_ROW_RECORD;
BEGIN
mprop.SENDER_ID := SYS.AQ$_AGENT('explicit_enq', NULL, NULL);
row_lcr := SYS.LCR$_ROW_RECORD.CONSTRUCT(
source_database_name => source_dbname,
command_type => cmd_type,
object_owner => obj_owner,
object_name => obj_name,
old_values => old_vals,
new_values => new_vals);
DBMS_AQ.ENQUEUE(
queue_name => 'strmadmin.oe_queue',
enqueue_options => eopt,
message_properties => mprop,
payload => ANYDATA.ConvertObject(row_lcr),
msgid => enq_msgid);
END enq_row_lcr;
/
---创建函数,作用返回队列里数据的action值
CREATE FUNCTION oe.get_oe_action (event IN ANYDATA)
RETURN VARCHAR2
IS
ord oe.order_event_typ;
cust oe.customer_event_typ;
num NUMBER;
type_name VARCHAR2(61);
BEGIN
type_name := event.GETTYPENAME;
IF type_name = 'OE.ORDER_EVENT_TYP' THEN
num := event.GETOBJECT(ord);
RETURN ord.action;
ELSIF type_name = 'OE.CUSTOMER_EVENT_TYP' THEN
num := event.GETOBJECT(cust);
RETURN cust.action;
ELSE
RETURN NULL;
END IF;
END;
/
----授予strmadmin用户执行get_oe_action函数的权限
GRANT EXECUTE ON get_oe_action TO strmadmin;
---创建一个消息管理的过程,并且把队列里的数据插入orders 、customers 表
CREATE PROCEDURE oe.mes_handler (event IN ANYDATA) IS
ord oe.order_event_typ;
cust oe.customer_event_typ;
num NUMBER;
type_name VARCHAR2(61);
BEGIN
type_name := event.GETTYPENAME;
IF type_name = 'OE.ORDER_EVENT_TYP' THEN
num := event.GETOBJECT(ord);
INSERT INTO oe.orders VALUES (ord.order_id, ord.order_date,
ord.order_mode, ord.customer_id, ord.order_status, ord.order_total,
ord.sales_rep_id, ord.promotion_id);
ELSIF type_name = 'OE.CUSTOMER_EVENT_TYP' THEN
num := event.GETOBJECT(cust);
INSERT INTO oe.customers VALUES (cust.customer_id, cust.cust_first_name,
cust.cust_last_name, cust.cust_address, cust.phone_numbers,
cust.nls_language, cust.nls_territory, cust.credit_limit, cust.cust_email,
cust.account_mgr_id, cust.date_of_birth, cust.marital_status,
cust.gender, cust.income_level);
END IF;
END;
/
GRANT EXECUTE ON mes_handler TO strmadmin;
---------------------session db1 strmadmin
-----创建一个EVALUATION_CONTEXT
DECLARE
table_alias SYS.RE$TABLE_ALIAS_LIST;
BEGIN
table_alias := SYS.RE$TABLE_ALIAS_LIST(
SYS.RE$TABLE_ALIAS('tab', 'strmadmin.oe_queue_table'));
DBMS_RULE_ADM.CREATE_EVALUATION_CONTEXT(
evaluation_context_name => 'oe_eval_context',
table_aliases => table_alias);
END;
/
---创建一个规则池
BEGIN
DBMS_RULE_ADM.CREATE_RULE_SET(
rule_set_name => 'apply_oe_rs',
evaluation_context => 'strmadmin.oe_eval_context');
END;
/
--创建一个队列action位为‘APPLY’的规则,
BEGIN
DBMS_RULE_ADM.CREATE_RULE(
rule_name => 'strmadmin.apply_action',
condition => 'oe.get_oe_action(tab.user_data) = ''APPLY'' ');
END;
/
--创建一个规则,当对orders和customer表进行DML操作时,应用进程自动对队列进行操作
BEGIN
DBMS_RULE_ADM.CREATE_RULE(
rule_name => 'apply_lcrs',
condition => ':dml.GET_OBJECT_OWNER() = ''OE'' AND ' ||
' (:dml.GET_OBJECT_NAME() = ''ORDERS'' OR ' ||
':dml.GET_OBJECT_NAME() = ''CUSTOMERS'') ',
evaluation_context => 'SYS.STREAMS$_EVALUATION_CONTEXT');
END;
/
---将以上两个规则加入规则池
BEGIN
DBMS_RULE_ADM.ADD_RULE(
rule_name => 'apply_action',
rule_set_name => 'apply_oe_rs');
DBMS_RULE_ADM.ADD_RULE(
rule_name => 'apply_lcrs',
rule_set_name => 'apply_oe_rs');
END;
/
--创建一个应用程序
BEGIN
DBMS_APPLY_ADM.CREATE_APPLY(
queue_name => 'strmadmin.oe_queue',
apply_name => 'apply_oe',
rule_set_name => 'strmadmin.apply_oe_rs',
message_handler => 'oe.mes_handler',
apply_user => 'oe',
apply_captured => false);
END;
/
---赋予OE用户可以执行apply_oe_rs规则池的权限
BEGIN
DBMS_RULE_ADM.GRANT_OBJECT_PRIVILEGE(
privilege => DBMS_RULE_ADM.EXECUTE_ON_RULE_SET,
object_name => 'strmadmin.apply_oe_rs',
grantee => 'oe',
grant_option => FALSE);
END;
/
--设置应用程序在有错误的情况下继续可用参数,并且启动应用程序
BEGIN
DBMS_APPLY_ADM.SET_PARAMETER(
apply_name => 'apply_oe',
parameter => 'disable_on_error',
value => 'n');
DBMS_APPLY_ADM.START_APPLY(
apply_name => 'apply_oe');
END;
/
----dequeue
-----出列
---创建一个队列的代理,作用是完成OE_QUEUE队列的出列操作
BEGIN
SYS.DBMS_AQADM.CREATE_AQ_AGENT(
agent_name => 'explicit_dq');
END;
/
--使OE用户可以通过explicti_dq进行出列操作
BEGIN
DBMS_AQADM.ENABLE_DB_ACCESS(
agent_name => 'explicit_dq',
db_username => 'oe');
END;
/
--创建一个队列消息程序,并且出列action不是'APPLY'的信息
DECLARE
subscriber SYS.AQ$_AGENT;
BEGIN
subscriber := SYS.AQ$_AGENT('explicit_dq', NULL, NULL);
SYS.DBMS_AQADM.ADD_SUBSCRIBER(
queue_name => 'strmadmin.oe_queue',
subscriber => subscriber,
rule => 'oe.get_oe_action(tab.user_data) != ''APPLY''');
END;
/
--创建出列程序
CREATE PROCEDURE oe.explicit_dq (consumer IN VARCHAR2) AS
deqopt DBMS_AQ.DEQUEUE_OPTIONS_T;
mprop DBMS_AQ.MESSAGE_PROPERTIES_T;
msgid RAW(16);
payload ANYDATA;
new_messages BOOLEAN := TRUE;
ord oe.order_event_typ;
cust oe.customer_event_typ;
tc pls_integer;
next_trans EXCEPTION;
no_messages EXCEPTION;
pragma exception_init (next_trans, -25235);
pragma exception_init (no_messages, -25228);
BEGIN
deqopt.consumer_name := consumer;
deqopt.wait := 1;
WHILE (new_messages) LOOP
BEGIN
DBMS_AQ.DEQUEUE(
queue_name => 'strmadmin.oe_queue',
dequeue_options => deqopt,
message_properties => mprop,
payload => payload,
msgid => msgid);
COMMIT;
deqopt.navigation := DBMS_AQ.NEXT;
DBMS_OUTPUT.PUT_LINE('Message Dequeued');
DBMS_OUTPUT.PUT_LINE('Type Name := ' || payload.GetTypeName);
IF (payload.GetTypeName = 'OE.ORDER_EVENT_TYP') THEN
tc := payload.GetObject(ord);
DBMS_OUTPUT.PUT_LINE('order_id - ' || ord.order_id);
DBMS_OUTPUT.PUT_LINE('order_date - ' || ord.order_date);
DBMS_OUTPUT.PUT_LINE('order_mode - ' || ord.order_mode);
DBMS_OUTPUT.PUT_LINE('customer_id - ' || ord.customer_id);
DBMS_OUTPUT.PUT_LINE('order_status - ' || ord.order_status);
DBMS_OUTPUT.PUT_LINE('order_total - ' || ord.order_total);
DBMS_OUTPUT.PUT_LINE('sales_rep_id - ' || ord.sales_rep_id);
DBMS_OUTPUT.PUT_LINE('promotion_id - ' || ord.promotion_id);
END IF;
IF (payload.GetTypeName = 'OE.CUSTOMER_EVENT_TYP') THEN
tc := payload.GetObject(cust);
DBMS_OUTPUT.PUT_LINE('customer_id - ' || cust.customer_id);
DBMS_OUTPUT.PUT_LINE('cust_first_name - ' || cust.cust_first_name);
DBMS_OUTPUT.PUT_LINE('cust_last_name - ' || cust.cust_last_name);
DBMS_OUTPUT.PUT_LINE('street_address - ' ||
cust.cust_address.address);
DBMS_OUTPUT.PUT_LINE('postal_code - ' ||
cust.cust_address.nos);
DBMS_OUTPUT.PUT_LINE('city - ' || cust.cust_address.firstname);
DBMS_OUTPUT.PUT_LINE('state_province - ' ||
cust.cust_address.middname);
DBMS_OUTPUT.PUT_LINE('country_id - ' ||
cust.cust_address.lastname);
DBMS_OUTPUT.PUT_LINE('phone_number1 - ' || cust.phone_numbers);
DBMS_OUTPUT.PUT_LINE('nls_language - ' || cust.nls_language);
DBMS_OUTPUT.PUT_LINE('nls_territory - ' || cust.nls_territory);
DBMS_OUTPUT.PUT_LINE('credit_limit - ' || cust.credit_limit);
DBMS_OUTPUT.PUT_LINE('cust_email - ' || cust.cust_email);
DBMS_OUTPUT.PUT_LINE('account_mgr_id - ' || cust.account_mgr_id);
DBMS_OUTPUT.PUT_LINE('date_of_birth - ' || cust.date_of_birth);
DBMS_OUTPUT.PUT_LINE('marital_status - ' || cust.marital_status);
DBMS_OUTPUT.PUT_LINE('gender - ' || cust.gender);
DBMS_OUTPUT.PUT_LINE('income_level - ' || cust.income_level);
END IF;
EXCEPTION
WHEN next_trans THEN
deqopt.navigation := DBMS_AQ.NEXT_TRANSACTION;
WHEN no_messages THEN
new_messages := FALSE;
DBMS_OUTPUT.PUT_LINE('No more messagess');
END;
END LOOP;
END;
/
------Enqueuing Messages
----入列一个order类型的数据,并且action为'apply'
BEGIN
oe.enq_proc_3(ANYDATA.convertobject(oe.order_event_typ(
2500,sysdate,'online',117,3,44699,161,NULL,'APPLY')));
END;
/
---入列一个customer类型的数据,并且action为'apply'
BEGIN
oe.enq_proc_3(ANYDATA.convertobject(oe.customer_event_typ(
990,'Hester','Prynne',oe.cust_address_type('555 Beacon Street',
'02109','Boston','MA','US'),1234567,'i','AMERICA',5000,
'a@scarlet_letter.com',145,NULL,'SINGLE','F','UNDER 50,000','APPLY')));
END;
/
---查询队列表
select * from strmadmin.aq$oe_queue_table;
select sysdate from dual;
--入列一个order类型队列,,并且action为'dequeue'
BEGIN
oe.enq_proc_3(ANYDATA.convertobject(oe.order_event_typ(
2501,sysdate,'direct',117,3,22788,161,NULL,'DEQUEUE')));
END;
/
/*
BEGIN
oe.enq_proc_3(ANYDATA.convertobject(oe.customer_event_typ(
991,'Nick','Carraway',oe.cust_address_type('10th Street',
'11101','Long Island','NY','US'),12345,'i','AMERICA',3000,
'nick@great_gatsby.com',149,NULL,'MARRIED','M','OVER 150,000','DEQUEUE')));
END;
*/
--入列一个customer类型队列,,并且action为'dequeue'
BEGIN
oe.enq_proc_3(ANYDATA.convertobject(oe.customer_event_typ(
990,'Hester','Prynne',oe.cust_address_type('555 Beacon Street',
'02109','Boston','MA','US'),1234567,'i','AMERICA',5000,
'a@scarlet_letter.com',145,NULL,'SINGLE','F','UNDER 50,000','DEQUEUE')));
END;
/
select * from ORDERS;
--入列user_lcr类型数据
DECLARE
newunit1 SYS.LCR$_ROW_UNIT;
newunit2 SYS.LCR$_ROW_UNIT;
newunit3 SYS.LCR$_ROW_UNIT;
newunit4 SYS.LCR$_ROW_UNIT;
newunit5 SYS.LCR$_ROW_UNIT;
newunit6 SYS.LCR$_ROW_UNIT;
newunit7 SYS.LCR$_ROW_UNIT;
newunit8 SYS.LCR$_ROW_UNIT;
newvals SYS.LCR$_ROW_LIST;
BEGIN
newunit1 := SYS.LCR$_ROW_UNIT(
'ORDER_ID',ANYDATA.ConvertNumber(2502),DBMS_LCR.NOT_A_LOB,NULL,NULL);
newunit2 := SYS.LCR$_ROW_UNIT(
'ORDER_DATE',ANYDATA.ConvertTimestampLTZ(sysdate),DBMS_LCR.NOT_A_LOB,
NULL,NULL);
newunit3 := SYS.LCR$_ROW_UNIT(
'ORDER_MODE',ANYDATA.ConvertVarchar2('online'),DBMS_LCR.NOT_A_LOB,NULL,NULL);
newunit4 := SYS.LCR$_ROW_UNIT(
'CUSTOMER_ID',ANYDATA.ConvertNumber(145),DBMS_LCR.NOT_A_LOB,NULL,NULL);
newunit5 := SYS.LCR$_ROW_UNIT(
'ORDER_STATUS',ANYDATA.ConvertNumber(3),DBMS_LCR.NOT_A_LOB,NULL,NULL);
newunit6 := SYS.LCR$_ROW_UNIT(
'ORDER_TOTAL',ANYDATA.ConvertNumber(35199),DBMS_LCR.NOT_A_LOB,NULL,NULL);
newunit7 := SYS.LCR$_ROW_UNIT(
'SALES_REP_ID',ANYDATA.ConvertNumber(160),DBMS_LCR.NOT_A_LOB,NULL,NULL);
newunit8 := SYS.LCR$_ROW_UNIT(
'PROMOTION_ID',ANYDATA.ConvertNumber(1),DBMS_LCR.NOT_A_LOB,NULL,NULL);
newvals := SYS.LCR$_ROW_LIST(
newunit1,newunit2,newunit3,newunit4,newunit5,newunit6,newunit7,newunit8);
oe.enq_row_lcr('DB01','INSERT','OE','ORDERS',NULL,newvals);
END;
/
-----更改入列的数据
DECLARE
oldunit1 SYS.LCR$_ROW_UNIT;
oldunit2 SYS.LCR$_ROW_UNIT;
oldvals SYS.LCR$_ROW_LIST;
newunit1 SYS.LCR$_ROW_UNIT;
newvals SYS.LCR$_ROW_LIST;
BEGIN
oldunit1 := SYS.LCR$_ROW_UNIT(
'ORDER_ID',ANYDATA.ConvertNumber(2502),DBMS_LCR.NOT_A_LOB,NULL,NULL);
oldunit2 := SYS.LCR$_ROW_UNIT(
'ORDER_TOTAL',ANYDATA.ConvertNumber(35199),DBMS_LCR.NOT_A_LOB,NULL,NULL);
oldvals := SYS.LCR$_ROW_LIST(oldunit1,oldunit2);
newunit1 := SYS.LCR$_ROW_UNIT(
'ORDER_TOTAL',ANYDATA.ConvertNumber(5235),DBMS_LCR.NOT_A_LOB,NULL,NULL);
newvals := SYS.LCR$_ROW_LIST(newunit1);
oe.enq_row_lcr('DB01','UPDATE','OE','ORDERS',oldvals,newvals);
END;
/
commit;
---------------------Dequeuing Messages Explicitly and Querying for Applied Messages
select * from strmadmin.aq$oe_queue_table;
select * from orders;
select * from customer;
----befor 3 record
---调用出列过程,进行出列
begin
oe.explicit_dq('explicit_dq');
end;
----end 1 record
---查询结果
SELECT order_id, order_date, customer_id, order_total
FROM oe.orders WHERE order_id = 2500;
SELECT cust_first_name, cust_last_name, cust_email
FROM oe.customer WHERE customer_id = 990;
SELECT order_id, order_date, customer_id, order_total
FROM oe.orders WHERE order_id = 2502;
================================
此文原创,转载请注明出处!