ibmmq-process
unknown
golang
a year ago
2.8 kB
10
Indexable
package main import ( "fmt" "github.com/ibm-messaging/mq-golang/v5/ibmmq" "log" ) // Función para procesar los mensajes func processMessage(msgData []byte, id int) { fmt.Printf("Procesando mensaje %d: %s\n", id, string(msgData)) // Aquí puedes añadir tu lógica de procesamiento } func main() { // Configuración de conexión queueManager := "QM1" queueName := "QUEUE1" channel := "CHANNEL1" connName := "localhost(1414)" // Hostname y puerto user := "username" // Cambia esto según sea necesario password := "password" // Cambia esto según sea necesario // Conexión al gestor de colas mqConn := ibmmq.NewMQCNO() mqCD := ibmmq.NewMQCD() mqCD.ChannelName = channel mqCD.ConnectionName = connName mqConn.ClientConn = mqCD // Autenticación (opcional) mqCSP := ibmmq.NewMQCSP() mqCSP.UserId = user mqCSP.Password = password mqConn.SecurityParms = mqCSP qMgr, err := ibmmq.Connx(queueManager, mqConn) if err != nil { log.Fatalf("Error conectando al gestor de colas: %v", err) } defer qMgr.Disc() // Abrir la cola mqOD := ibmmq.NewMQOD() mqOD.ObjectName = queueName mqOD.ObjectType = ibmmq.MQOT_Q queue, err := qMgr.Open(mqOD, ibmmq.MQOO_INQUIRE|ibmmq.MQOO_INPUT_AS_Q_DEF) if err != nil { log.Fatalf("Error abriendo la cola: %v", err) } defer queue.Close(0) // Obtener atributos de la cola status, err := queue.Inq([]int32{ibmmq.MQIA_CURRENT_Q_DEPTH}) if err != nil { log.Fatalf("Error obteniendo atributos de la cola: %v", err) } // Mostrar la cantidad de mensajes en la cola qDepth := status[ibmmq.MQIA_CURRENT_Q_DEPTH] fmt.Printf("La cantidad de mensajes en la cola %s es: %d\n", queueName, qDepth) // Canal para pasar los mensajes leídos a las goroutines msgChan := make(chan []byte, qDepth) doneChan := make(chan bool, qDepth) // Goroutine para procesar los mensajes go func() { for i := 0; i < qDepth; i++ { msgData := <-msgChan go func(id int, data []byte) { processMessage(data, id) doneChan <- true }(i+1, msgData) } }() // Leer y enviar los mensajes al canal for i := 0; i < qDepth; i++ { message := ibmmq.NewMQMessage() gmo := ibmmq.NewMQGMO() gmo.Options = ibmmq.MQGMO_WAIT gmo.WaitInterval = 3000 // Esperar 3 segundos por mensaje err := queue.Get(message, gmo) if err != nil { if err.MQRC == ibmmq.MQRC_NO_MSG_AVAILABLE { fmt.Println("No hay más mensajes en la cola") break } log.Fatalf("Error leyendo mensaje de la cola: %v", err) } msgData := make([]byte, message.DataLength) _, err = message.Read(msgData) if err != nil { log.Fatalf("Error leyendo datos del mensaje: %v", err) } msgChan <- msgData } // Esperar a que todas las goroutines terminen for i := 0; i < qDepth; i++ { <-doneChan } close(msgChan) close(doneChan) }
Editor is loading...
Leave a Comment