Untitled
unknown
golang
2 years ago
2.7 kB
16
Indexable
func (c MessageController) ProcessMessage(r *ghttp.Request) {
conn := util.MustNewWebSocket(r)
defer conn.Close()
chatUserID, err := service.GetChatUserCtx(r.Context())
if err != nil {
return
}
ctx, cancel := context.WithCancel(r.GetCtx())
client := client{
conn: conn,
chatUserID: chatUserID,
subChan: c.subscriber.Channel(util.GetRedisTopic(chatUserID), ctx.Done()),
cancel: cancel,
errChan: make(chan error),
}
go func() {
if err = c.read(ctx, client); err != nil && !errors.Is(err, websocket.ErrCloseSent) {
logger.WithContext(ctx).Error("c.read", zap.Error(err))
}
}()
go c.write(ctx, client)
<-ctx.Done()
logger.WithContext(ctx).Info("exit request")
}
func (c MessageController) read(ctx context.Context, client client) (err error) {
defer func() {
close(client.errChan)
client.cancel()
}()
client.conn.SetReadDeadline(time.Now().Add(pongWait))
client.conn.SetPongHandler(func(string) error { client.conn.SetReadDeadline(time.Now().Add(pongWait)); return nil })
for {
// Continuously receive event websocket
var rawMessage message.RawMessage
if err := client.conn.ReadJSON(&rawMessage); err != nil {
if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) {
return nil
}
return errors.Wrap(err, "conn.ReadJSON")
}
if err = c.handleRawMessage(ctx, rawMessage, client); err != nil {
client.errChan <- err
}
}
}
func (c MessageController) handleRawMessage(ctx context.Context, rawMessage message.RawMessage, cli client) error {
mapHandlers := map[string]func(context.Context, message.RawMessage, client) error{
consts.MsgRedisSendMessage: c.registerMessage,
consts.MsgRedisSeenMessage: c.seenMessage,
}
if handler, ok := mapHandlers[rawMessage.Type]; ok {
return handler(ctx, rawMessage, cli)
}
return errors.New("unknown message type")
}
func (c MessageController) write(ctx context.Context, client client) (err error) {
ticker := time.NewTicker(pingPeriod)
defer ticker.Stop()
for {
select {
case <-ctx.Done(): // Case disconnect
return nil
case <-c.ctx.Done(): // Case server shutdown
return disconnect(client.conn) // Cleanly close the connection by sending a close message
case <-ticker.C: // Case server check ping pong
if err := client.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
return err
}
case msg := <-client.subChan: // Case receive event
c.handleWriteMessage(msg, client)
case err := <-client.errChan: // Case receive error
c.handleWriteError(err, client)
return disconnect(client.conn) // Write error response then disconnect
}
}
}
Editor is loading...
Leave a Comment