Untitled

mail@pastecode.io avatar
unknown
golang
a month ago
2.6 kB
1
Indexable
Never
func (c MessageController) ProcessMessage(r *ghttp.Request) {
	conn := util.MustNewWebSocket(r)
	defer conn.Close()

	chatUserID, err := service.GetChatUserCtx(r.Context())
	if err != nil {
		return
	}
	ctx, cancel := context.WithCancel(r.GetCtx())

	client := client{
		conn:       conn,
		chatUserID: chatUserID,
		subChan:    c.subscriber.Channel(util.GetRedisTopic(chatUserID), ctx.Done()),
		cancel:     cancel,
		errChan:    make(chan error),
	}

	go func() {
		if err = c.read(ctx, client); err != nil && !errors.Is(err, websocket.ErrCloseSent) {
			logger.WithContext(ctx).Error("c.read", zap.Error(err))
		}
	}()
	go c.write(ctx, client)

	<-ctx.Done()
	logger.WithContext(ctx).Info("exit request")
}

func (c MessageController) read(ctx context.Context, client client) (err error) {
	defer func() {
		close(client.errChan)
		client.cancel()
	}()
	client.conn.SetReadDeadline(time.Now().Add(pongWait))
	client.conn.SetPongHandler(func(string) error { client.conn.SetReadDeadline(time.Now().Add(pongWait)); return nil })

	for {
		// Continuously receive event websocket
		var rawMessage message.RawMessage
		if err := client.conn.ReadJSON(&rawMessage); err != nil {
			if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) {
				return nil
			}
			return errors.Wrap(err, "conn.ReadJSON")
		}
		if err = c.handleRawMessage(ctx, rawMessage, client); err != nil {
			client.errChan <- err
		}
	}
}

func (c MessageController) handleRawMessage(ctx context.Context, rawMessage message.RawMessage, cli client) error {
	mapHandlers := map[string]func(context.Context, message.RawMessage, client) error{
		consts.MsgRedisSendMessage: c.registerMessage,
		consts.MsgRedisSeenMessage: c.seenMessage,
	}
	if handler, ok := mapHandlers[rawMessage.Type]; ok {
		return handler(ctx, rawMessage, cli)
	}
	return errors.New("unknown message type")
}

func (c MessageController) write(ctx context.Context, client client) (err error) {
	ticker := time.NewTicker(pingPeriod)
	defer ticker.Stop()

	for {
		select {
		case <-ctx.Done(): // Case disconnect
			return nil

		case <-c.ctx.Done(): // Case server shutdown
			return disconnect(client.conn) // Cleanly close the connection by sending a close message

		case <-ticker.C: // Case server check ping pong
			if err := client.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
				return err
			}
		case msg := <-client.subChan: // Case receive event
			c.handleWriteMessage(msg, client)

		case err := <-client.errChan: // Case receive error
			c.handleWriteError(err, client)
			return disconnect(client.conn) // Write error response then disconnect
		}
	}
}
Leave a Comment