ibmmq-process

 avatar
unknown
golang
a year ago
2.8 kB
10
Indexable
package main

import (
	"fmt"
	"github.com/ibm-messaging/mq-golang/v5/ibmmq"
	"log"
)

// Función para procesar los mensajes
func processMessage(msgData []byte, id int) {
	fmt.Printf("Procesando mensaje %d: %s\n", id, string(msgData))
	// Aquí puedes añadir tu lógica de procesamiento
}

func main() {
	// Configuración de conexión
	queueManager := "QM1"
	queueName := "QUEUE1"
	channel := "CHANNEL1"
	connName := "localhost(1414)" // Hostname y puerto
	user := "username"            // Cambia esto según sea necesario
	password := "password"        // Cambia esto según sea necesario

	// Conexión al gestor de colas
	mqConn := ibmmq.NewMQCNO()
	mqCD := ibmmq.NewMQCD()
	mqCD.ChannelName = channel
	mqCD.ConnectionName = connName
	mqConn.ClientConn = mqCD

	// Autenticación (opcional)
	mqCSP := ibmmq.NewMQCSP()
	mqCSP.UserId = user
	mqCSP.Password = password
	mqConn.SecurityParms = mqCSP

	qMgr, err := ibmmq.Connx(queueManager, mqConn)
	if err != nil {
		log.Fatalf("Error conectando al gestor de colas: %v", err)
	}
	defer qMgr.Disc()

	// Abrir la cola
	mqOD := ibmmq.NewMQOD()
	mqOD.ObjectName = queueName
	mqOD.ObjectType = ibmmq.MQOT_Q
	queue, err := qMgr.Open(mqOD, ibmmq.MQOO_INQUIRE|ibmmq.MQOO_INPUT_AS_Q_DEF)
	if err != nil {
		log.Fatalf("Error abriendo la cola: %v", err)
	}
	defer queue.Close(0)

	// Obtener atributos de la cola
	status, err := queue.Inq([]int32{ibmmq.MQIA_CURRENT_Q_DEPTH})
	if err != nil {
		log.Fatalf("Error obteniendo atributos de la cola: %v", err)
	}

	// Mostrar la cantidad de mensajes en la cola
	qDepth := status[ibmmq.MQIA_CURRENT_Q_DEPTH]
	fmt.Printf("La cantidad de mensajes en la cola %s es: %d\n", queueName, qDepth)

	// Canal para pasar los mensajes leídos a las goroutines
	msgChan := make(chan []byte, qDepth)
	doneChan := make(chan bool, qDepth)

	// Goroutine para procesar los mensajes
	go func() {
		for i := 0; i < qDepth; i++ {
			msgData := <-msgChan
			go func(id int, data []byte) {
				processMessage(data, id)
				doneChan <- true
			}(i+1, msgData)
		}
	}()

	// Leer y enviar los mensajes al canal
	for i := 0; i < qDepth; i++ {
		message := ibmmq.NewMQMessage()
		gmo := ibmmq.NewMQGMO()
		gmo.Options = ibmmq.MQGMO_WAIT
		gmo.WaitInterval = 3000 // Esperar 3 segundos por mensaje

		err := queue.Get(message, gmo)
		if err != nil {
			if err.MQRC == ibmmq.MQRC_NO_MSG_AVAILABLE {
				fmt.Println("No hay más mensajes en la cola")
				break
			}
			log.Fatalf("Error leyendo mensaje de la cola: %v", err)
		}

		msgData := make([]byte, message.DataLength)
		_, err = message.Read(msgData)
		if err != nil {
			log.Fatalf("Error leyendo datos del mensaje: %v", err)
		}

		msgChan <- msgData
	}

	// Esperar a que todas las goroutines terminen
	for i := 0; i < qDepth; i++ {
		<-doneChan
	}

	close(msgChan)
	close(doneChan)
}
Editor is loading...
Leave a Comment