Untitled

mail@pastecode.io avatar
unknown
plain_text
a year ago
3.4 kB
3
Indexable
Never
CREATE OR REPLACE TYPE EVENTO_TYPE2 AS OBJECT (
    EVENTO   VARCHAR2(100),
    ORIGEN  VARCHAR2(100),
    PAYLOAD   CLOB
);


begin
    dbms_aqadm.create_queue_table('BOOK_QUEUE_TAB2', 'EVENTO_TYPE2');
    dbms_aqadm.create_queue('DEMO_BOOK_QUEUE2', 'BOOK_QUEUE_TAB2');
    dbms_aqadm.start_queue('DEMO_BOOK_QUEUE2');
end;




DECLARE 
   enqueue_options     dbms_aq.enqueue_options_t; 
   message_properties  dbms_aq.message_properties_t; 
   message_handle      RAW(16); 
   message   EVENTO_TYPE2;
  
BEGIN 

FOR i in (Select XMLELEMENT(
    NAME "acre",
    XMLFOREST(
     act.idefact as idefact,
     act.numacre as numacre,
     act.stsacre as stsacre,
     act.fecsts as fecsts,
     (SELECT
                  XMLAGG(
                    XMLELEMENT(
                      NAME "det_acre",
                      XMLFOREST( da.NUMACRE as "NUMACRE",      
                                 da.NUMDETACRE   as "NUMDETACRE",
                                 da.CODGRUPOACRE as "CODGRUPOACRE",
                                 da.CODCPTOACRE as "CODCPTOACRE",
                                 da.CODMONEDA as "CODMONEDA"
                      )
                    )
                  )
                FROM det_acre da
                WHERE da.numacre = act.numacre) as detacres
    )
    )as XMLDOC
from acreencia_table act
where rownum < 100) Loop
   message := EVENTO_TYPE2('EVENTO_EMISION1', 'ACSELX', i.XMLDOC.getCLOBval() );
   DBMS_AQ.ENQUEUE(queue_name => 'DEMO_BOOK_QUEUE2',            
           enqueue_options    => enqueue_options,        
           message_properties => message_properties,      
                     payload  => message,                
                      msgid   => message_handle); 

   COMMIT; 
END LOOP;

END; 



----- Opcional consumo interno
create or replace PROCEDURE POC_CONSUMIR_COLA AS
dequeue_options DBMS_AQ.dequeue_options_t;
 msg_prop_array DBMS_AQ.message_properties_array_t :=
 DBMS_AQ.message_properties_array_t();
 payload_array aq_message_array;
 msgid_array DBMS_AQ.msgid_array_t;
 retval PLS_INTEGER;
 payload_tmp clob;
BEGIN
DBMS_OUTPUT.PUT_LINE('Antes del bulce ' );
--dequeue_options.wait := 10;
  LOOP
    DBMS_OUTPUT.PUT_LINE('comienzo de iteracion');
    retval := DBMS_AQ.DEQUEUE_ARRAY(
    queue_name => 'DEMO_BOOK_QUEUE2',
    dequeue_options => dequeue_options,
    array_size => 1000,
    message_properties_array => msg_prop_array,
    payload_array => payload_array,
    msgid_array => msgid_array);
    DBMS_OUTPUT.PUT_LINE('Number of messages dequeued: ' || retval);
    DBMS_OUTPUT.PUT_LINE('Number of messages payload: ' || payload_array.count);
    FOR i IN 1 .. payload_array.count LOOP
        payload_array(i).get_text(payload_tmp);
        DBMS_OUTPUT.PUT_LINE(payload_tmp  );
    END LOOP;
    --; --eliminar
    -- Dormir el proceso por un segundo para no consumir todos los recursos del sistema
    DBMS_LOCK.SLEEP(1);
  END LOOP;
END;


BEGIN
  DBMS_SCHEDULER.CREATE_JOB(
    job_name        => 'job_consumir_cola',
    job_type        => 'PLSQL_BLOCK',
    job_action      => 'BEGIN poc_consumir_cola(); END;',
    start_date      => SYSTIMESTAMP,
    repeat_interval => 'FREQ=SECONDLY;INTERVAL=5',
    end_date        => NULL,
    enabled         => TRUE );
End;