Untitled
unknown
golang
10 months ago
2.6 kB
3
Indexable
func (c MessageController) ProcessMessage(r *ghttp.Request) { conn := util.MustNewWebSocket(r) defer conn.Close() chatUserID, err := service.GetChatUserCtx(r.Context()) if err != nil { return } ctx, cancel := context.WithCancel(r.GetCtx()) client := client{ conn: conn, chatUserID: chatUserID, subChan: c.subscriber.Channel(util.GetRedisTopic(chatUserID), ctx.Done()), cancel: cancel, errChan: make(chan error), } go func() { if err = c.read(ctx, client); err != nil && !errors.Is(err, websocket.ErrCloseSent) { logger.WithContext(ctx).Error("c.read", zap.Error(err)) } }() go c.write(ctx, client) <-ctx.Done() logger.WithContext(ctx).Info("exit request") } func (c MessageController) read(ctx context.Context, client client) (err error) { defer func() { close(client.errChan) client.cancel() }() client.conn.SetReadDeadline(time.Now().Add(pongWait)) client.conn.SetPongHandler(func(string) error { client.conn.SetReadDeadline(time.Now().Add(pongWait)); return nil }) for { // Continuously receive event websocket var rawMessage message.RawMessage if err := client.conn.ReadJSON(&rawMessage); err != nil { if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) { return nil } return errors.Wrap(err, "conn.ReadJSON") } if err = c.handleRawMessage(ctx, rawMessage, client); err != nil { client.errChan <- err } } } func (c MessageController) handleRawMessage(ctx context.Context, rawMessage message.RawMessage, cli client) error { mapHandlers := map[string]func(context.Context, message.RawMessage, client) error{ consts.MsgRedisSendMessage: c.registerMessage, consts.MsgRedisSeenMessage: c.seenMessage, } if handler, ok := mapHandlers[rawMessage.Type]; ok { return handler(ctx, rawMessage, cli) } return errors.New("unknown message type") } func (c MessageController) write(ctx context.Context, client client) (err error) { ticker := time.NewTicker(pingPeriod) defer ticker.Stop() for { select { case <-ctx.Done(): // Case disconnect return nil case <-c.ctx.Done(): // Case server shutdown return disconnect(client.conn) // Cleanly close the connection by sending a close message case <-ticker.C: // Case server check ping pong if err := client.conn.WriteMessage(websocket.PingMessage, nil); err != nil { return err } case msg := <-client.subChan: // Case receive event c.handleWriteMessage(msg, client) case err := <-client.errChan: // Case receive error c.handleWriteError(err, client) return disconnect(client.conn) // Write error response then disconnect } } }
Editor is loading...
Leave a Comment