Saturday, February 12, 2011

Oracle Advanced Queuing - Queue Propagation

EDIT: It seems to be something with having the two queues in the same schema.

I’m trying to experiment with queue propagation but I’m not seeing records in the destination queue. But that could easily be because I don’t have all the pieces in place.

Does anyone have a test case they could post? I’ll include what I tried below. I found the troubleshooting in the docs a little light and the propagation is such a black box, it’s hard to know why this isn’t moving.

Here’s what I have; no laughing.


CREATE OR REPLACE TYPE test_payload AS OBJECT(
   test_id   NUMBER,
   test_dt   DATE);


DECLARE
   subscriber   SYS.aq$_agent;
BEGIN
--- Create Originating Queue and start it

   DBMS_AQADM.create_queue_table( queue_table => 'Test_MQT', queue_payload_type => 'Test_Payload',
                                  multiple_consumers => TRUE ); --- multiple subscriber 

   DBMS_AQADM.create_queue( 'Test_Q', 'Test_MQT' );
   DBMS_AQADM.start_queue( queue_name => 'Test_Q' );

--- Create Destination Queue and start it

   DBMS_AQADM.create_queue_table( queue_table => 'Dest_MQT', queue_payload_type => 'Test_Payload',
                                  multiple_consumers => TRUE );
   DBMS_AQADM.create_queue( 'Dest_Q', 'Dest_MQT' );
   DBMS_AQADM.start_queue( queue_name => 'Dest_Q' );

--- Add Subscriber and schedule propagation

   subscriber := SYS.aq$_agent( 'test_local_sub', 'Dest_Q', NULL );
   DBMS_AQADM.add_subscriber( queue_name => 'Test_Q', subscriber => subscriber );
   DBMS_AQADM.schedule_propagation( queue_name => 'Test_Q', destination_queue => 'Dest_Q' );
END;

DECLARE
   enqueue_options      DBMS_AQ.enqueue_options_t;
   message_properties   DBMS_AQ.message_properties_t;
   message_handle       RAW( 16 );
   MESSAGE              test_payload;
BEGIN
   MESSAGE := test_payload( 2, SYSDATE );
   DBMS_AQ.enqueue( queue_name => 'Test_Q', enqueue_options => enqueue_options,
                    message_properties => message_properties, payload => MESSAGE, msgid => message_handle );
   COMMIT;
END;

DECLARE
   dequeue_options      DBMS_AQ.dequeue_options_t;
   message_properties   DBMS_AQ.message_properties_t;
   message_handle       RAW( 16 );
   MESSAGE              test_payload;
BEGIN
   dequeue_options.navigation := DBMS_AQ.first_message;
   DBMS_AQ.dequeue( queue_name => 'Dest_Q', dequeue_options => dequeue_options,
                    message_properties => message_properties, payload => MESSAGE, msgid => message_handle );
   DBMS_OUTPUT.put_line( 'Test_ID: ' || MESSAGE.test_id );
   DBMS_OUTPUT.put_line( 'Test_Date: ' || MESSAGE.test_dt );
   COMMIT;
END;

0 comments:

Post a Comment