Untitled

 avatar
unknown
scala
3 years ago
2.7 kB
9
Indexable
//Server
import zhttp.http._
import zhttp.service.Server
import zhttp.socket._
import zio._
import zio.stream.ZStream
import zio.Duration._

object main extends ZIOAppDefault {

  // Message Handlers
  private val open = Socket.succeed(WebSocketFrame.text("Greetings!"))

  private val echo = Socket.collect[WebSocketFrame] {
    case WebSocketFrame.Text(text) =>
      ZStream
        .repeat(WebSocketFrame.text(s"Received: $text"))
        .schedule(Schedule.spaced(Duration.fromMillis(1000L)))
        .take(3)
  }

  val heartBeat = Socket.fromStream(
    ZStream
      .succeed(WebSocketFrame.text("HEARTBEAT"))
      .repeat(Schedule.forever && Schedule.spaced(1.seconds))
  )

  // Setup protocol settings
  private val protocol = SocketProtocol.subProtocol("json")

  // Setup decoder settings
  private val decoder = SocketDecoder.allowExtensions

  // Combine all channel handlers together
  private val socketApp = {

    SocketApp(
      echo merge heartBeat
    ) // Called after each message being received on the channel

      // Called after the request is successfully upgraded to websocket
      .onOpen(open)

      // Called after the connection is closed
      .onClose(_ => Console.printLine("Closed!").ignore)

      // Called whenever there is an error on the socket channel
      .onError(_ => Console.printLine("Error!").ignore)

      // Setup websocket decoder config
      .withDecoder(decoder)

      // Setup websocket protocol config
      .withProtocol(protocol)
  }

  private val app =
    Http.collectZIO[Request] {
      case Method.GET -> !! / "greet" / name =>
        UIO(Response.text(s"Greetings ${name}!"))
      case Method.GET -> !! / "subscriptions" => socketApp.toResponse
    }

  override def run: ZIO[Any with ZIOAppArgs with Scope, Any, Any] =
    Server.start(8090, app).exitCode
}

//Client
import zhttp.service.{ChannelFactory, EventLoopGroup}
import zhttp.socket.{
  Socket,
  SocketApp,
  SocketDecoder,
  SocketProtocol,
  WebSocketFrame
}
import zio._
import zio.stream.ZStream

object main extends ZIOAppDefault {
  private val env = EventLoopGroup.auto(8) ++ ChannelFactory.auto
  private val url = "ws://localhost:8090/subscriptions"

  private val app =
    Socket
      .collect[WebSocketFrame] { case fr @ WebSocketFrame.Text(_) =>
        ZStream.succeed(fr)
      }
      .toSocketApp
      .onOpen(Socket.succeed(WebSocketFrame.text("Hellow")))
      .onClose(cn => Console.printLine(s"Connection closed: ${cn}").!)
      .onError(thr => ZIO.die(thr))
      .connect(url)

  def run =
    app
      .provide(env)
      .exitCode
}

/*Client reponse when running 

Connection closed: localhost/127.0.0.1:8090
Process finished with exit code 0
*/