创建用户、分配权限
select * from dba_users t ;
---create on db1 SYS
create user oe identified by oe;
grant dba to oe;
create tablespace aq
datafile 'd:\oracle\aq.dbf' size 500m
extent management local
segment space management auto;
grant execute on dbms_aq to oe;
create user strmadmin identified by strmadmin
default tablespace aq
temporary tablespace temp;
grant dba to strmadmin;
grant execute on dbms_streams_adm to strmadmin;
grant execute on dbms_transform. to strmadmin;
----session db1 strmadmin
设置安全队列,创建对列
begin
dbms_streams_adm.set_up_queue(queue_table => 'oe_qtab_any',queue_name => 'oe_queue_any',queue_user => 'oe');
end;
/
设置队列的代理,并且为代理设置安全命名
declare
subscriber sys.aq$_agent;
begin
subscriber := sys.aq$_agent('LOCAL_AGENT',NULL,NULL);
sys.dbms_aqadm.add_subscriber(queue_name => 'strmadmin.oe_queue_any',subscriber => subscriber);
end;
/
begin
dbms_aqadm.enable_db_access(agent_name => 'local_agent',db_username => 'oe');
end;
/
-----session DB1 oe
创建一个队列对象类型
create or replace type cust_address_type as object (
address varchar2(40),
nos varchar2(10),
firstname varchar2(10),
middname varchar2(10),
lastname varchar2(10)
);
/
--------------SESSION DB1 OE
创建入队存储过程
create or replace procedure oe.enq_proc(payload anydata)
is
enqopt dbms_aq.enqueue_options_t;
mprop dbms_aq.message_properties_t;
enq_msgid raw(16);
begin
mprop.sender_id := sys.aq$_agent('LOCAL_AGENT',NULL,NULL);
dbms_aq.enqueue(queue_name => 'strmadmin.oe_queue_any',enqueue_options => enqopt,message_properties => mprop,payload => payload,msgid => enq_msgid);
end;
/
入队一个字符创
begin
oe.enq_proc(anydata.ConvertVarchar2('Chemicals - SW'));
COMMIT;
end;
入队一个数字
begin
oe.enq_proc(anydata.ConvertNumber('16'));
commit;
end;
/
入队一个用户自定义对象
begin
oe.enq_proc(anydata.ConvertObject(oe.cust_address_type('1666 aaaaa','3333333','qin','qiang','ai')));
commit;
end;
/
查看队列表
-----session strmadmin
select * from strmadmin.aq$oe_qtab_any;
-----session oe
创建出列存储过程
create or replace procedure oe.get_cust_address(consumer in varchar2) as
address oe.cust_address_type;
deq_address anydata;
msgid raw(16);
deqopt dbms_aq.dequeue_options_t;
mprop dbms_aq.message_properties_t;
new_address boolean := true;
next_trans exception;
no_message exception;
pragma exception_init (next_trans,-25235);
pragma exception_init (no_message,-25228);
num_var pls_integer;
begin
deqopt.consumer_name := consumer;
deqopt.wait :=1;
while (new_address) loop
begin
dbms_aq.dequeue(queue_name => 'strmadmin.oe_queue_any',dequeue_options => deqopt,message_properties => mprop,payload => deq_address,msgid => msgid);
deqopt.navigation := dbms_aq.next;
dbms_output.put_line('********************');
if(deq_address.getTypeName() = 'OE.CUST_ADDRESS_TYPE') then
dbms_output.put_line('Message type is:'||deq_address.getTypeName());
num_var := deq_address.getObject(address);
dbms_output.put_line('*****customer address *****');
dbms_output.put_line(address.address);
dbms_output.put_line(address.nos);
dbms_output.put_line(address.firstname);
dbms_output.put_line(address.middname);
dbms_output.put_line(address.lastname);
else
dbms_output.put_line('Message type is:'||deq_address.getTypeName());
end if;
commit;
exception
when next_trans then
deqopt.navigation := dbms_aq.next_transaction;
when no_message then
new_address := false;
dbms_output.put_line('no more messages');
end;
end loop;
end;
/
出列
begin
oe.get_cust_address('LOCAL_AGENT');
end;
简单的一个AQ demo,后面还有remot、LCRS、JMS慢慢增加。。哈哈。。