Untitled
unknown
scala
3 years ago
2.7 kB
10
Indexable
//Server import zhttp.http._ import zhttp.service.Server import zhttp.socket._ import zio._ import zio.stream.ZStream import zio.Duration._ object main extends ZIOAppDefault { // Message Handlers private val open = Socket.succeed(WebSocketFrame.text("Greetings!")) private val echo = Socket.collect[WebSocketFrame] { case WebSocketFrame.Text(text) => ZStream .repeat(WebSocketFrame.text(s"Received: $text")) .schedule(Schedule.spaced(Duration.fromMillis(1000L))) .take(3) } val heartBeat = Socket.fromStream( ZStream .succeed(WebSocketFrame.text("HEARTBEAT")) .repeat(Schedule.forever && Schedule.spaced(1.seconds)) ) // Setup protocol settings private val protocol = SocketProtocol.subProtocol("json") // Setup decoder settings private val decoder = SocketDecoder.allowExtensions // Combine all channel handlers together private val socketApp = { SocketApp( echo merge heartBeat ) // Called after each message being received on the channel // Called after the request is successfully upgraded to websocket .onOpen(open) // Called after the connection is closed .onClose(_ => Console.printLine("Closed!").ignore) // Called whenever there is an error on the socket channel .onError(_ => Console.printLine("Error!").ignore) // Setup websocket decoder config .withDecoder(decoder) // Setup websocket protocol config .withProtocol(protocol) } private val app = Http.collectZIO[Request] { case Method.GET -> !! / "greet" / name => UIO(Response.text(s"Greetings ${name}!")) case Method.GET -> !! / "subscriptions" => socketApp.toResponse } override def run: ZIO[Any with ZIOAppArgs with Scope, Any, Any] = Server.start(8090, app).exitCode } //Client import zhttp.service.{ChannelFactory, EventLoopGroup} import zhttp.socket.{ Socket, SocketApp, SocketDecoder, SocketProtocol, WebSocketFrame } import zio._ import zio.stream.ZStream object main extends ZIOAppDefault { private val env = EventLoopGroup.auto(8) ++ ChannelFactory.auto private val url = "ws://localhost:8090/subscriptions" private val app = Socket .collect[WebSocketFrame] { case fr @ WebSocketFrame.Text(_) => ZStream.succeed(fr) } .toSocketApp .onOpen(Socket.succeed(WebSocketFrame.text("Hellow"))) .onClose(cn => Console.printLine(s"Connection closed: ${cn}").!) .onError(thr => ZIO.die(thr)) .connect(url) def run = app .provide(env) .exitCode } /*Client reponse when running Connection closed: localhost/127.0.0.1:8090 Process finished with exit code 0 */
Editor is loading...