ibmmq-process
unknown
golang
a year ago
2.8 kB
21
Indexable
package main
import (
"fmt"
"github.com/ibm-messaging/mq-golang/v5/ibmmq"
"log"
)
// Función para procesar los mensajes
func processMessage(msgData []byte, id int) {
fmt.Printf("Procesando mensaje %d: %s\n", id, string(msgData))
// Aquí puedes añadir tu lógica de procesamiento
}
func main() {
// Configuración de conexión
queueManager := "QM1"
queueName := "QUEUE1"
channel := "CHANNEL1"
connName := "localhost(1414)" // Hostname y puerto
user := "username" // Cambia esto según sea necesario
password := "password" // Cambia esto según sea necesario
// Conexión al gestor de colas
mqConn := ibmmq.NewMQCNO()
mqCD := ibmmq.NewMQCD()
mqCD.ChannelName = channel
mqCD.ConnectionName = connName
mqConn.ClientConn = mqCD
// Autenticación (opcional)
mqCSP := ibmmq.NewMQCSP()
mqCSP.UserId = user
mqCSP.Password = password
mqConn.SecurityParms = mqCSP
qMgr, err := ibmmq.Connx(queueManager, mqConn)
if err != nil {
log.Fatalf("Error conectando al gestor de colas: %v", err)
}
defer qMgr.Disc()
// Abrir la cola
mqOD := ibmmq.NewMQOD()
mqOD.ObjectName = queueName
mqOD.ObjectType = ibmmq.MQOT_Q
queue, err := qMgr.Open(mqOD, ibmmq.MQOO_INQUIRE|ibmmq.MQOO_INPUT_AS_Q_DEF)
if err != nil {
log.Fatalf("Error abriendo la cola: %v", err)
}
defer queue.Close(0)
// Obtener atributos de la cola
status, err := queue.Inq([]int32{ibmmq.MQIA_CURRENT_Q_DEPTH})
if err != nil {
log.Fatalf("Error obteniendo atributos de la cola: %v", err)
}
// Mostrar la cantidad de mensajes en la cola
qDepth := status[ibmmq.MQIA_CURRENT_Q_DEPTH]
fmt.Printf("La cantidad de mensajes en la cola %s es: %d\n", queueName, qDepth)
// Canal para pasar los mensajes leídos a las goroutines
msgChan := make(chan []byte, qDepth)
doneChan := make(chan bool, qDepth)
// Goroutine para procesar los mensajes
go func() {
for i := 0; i < qDepth; i++ {
msgData := <-msgChan
go func(id int, data []byte) {
processMessage(data, id)
doneChan <- true
}(i+1, msgData)
}
}()
// Leer y enviar los mensajes al canal
for i := 0; i < qDepth; i++ {
message := ibmmq.NewMQMessage()
gmo := ibmmq.NewMQGMO()
gmo.Options = ibmmq.MQGMO_WAIT
gmo.WaitInterval = 3000 // Esperar 3 segundos por mensaje
err := queue.Get(message, gmo)
if err != nil {
if err.MQRC == ibmmq.MQRC_NO_MSG_AVAILABLE {
fmt.Println("No hay más mensajes en la cola")
break
}
log.Fatalf("Error leyendo mensaje de la cola: %v", err)
}
msgData := make([]byte, message.DataLength)
_, err = message.Read(msgData)
if err != nil {
log.Fatalf("Error leyendo datos del mensaje: %v", err)
}
msgChan <- msgData
}
// Esperar a que todas las goroutines terminen
for i := 0; i < qDepth; i++ {
<-doneChan
}
close(msgChan)
close(doneChan)
}
Editor is loading...
Leave a Comment