//Server
import zhttp.http._
import zhttp.service.Server
import zhttp.socket._
import zio._
import zio.stream.ZStream
import zio.Duration._
object main extends ZIOAppDefault {
// Message Handlers
private val open = Socket.succeed(WebSocketFrame.text("Greetings!"))
private val echo = Socket.collect[WebSocketFrame] {
case WebSocketFrame.Text(text) =>
ZStream
.repeat(WebSocketFrame.text(s"Received: $text"))
.schedule(Schedule.spaced(Duration.fromMillis(1000L)))
.take(3)
}
val heartBeat = Socket.fromStream(
ZStream
.succeed(WebSocketFrame.text("HEARTBEAT"))
.repeat(Schedule.forever && Schedule.spaced(1.seconds))
)
// Setup protocol settings
private val protocol = SocketProtocol.subProtocol("json")
// Setup decoder settings
private val decoder = SocketDecoder.allowExtensions
// Combine all channel handlers together
private val socketApp = {
SocketApp(
echo merge heartBeat
) // Called after each message being received on the channel
// Called after the request is successfully upgraded to websocket
.onOpen(open)
// Called after the connection is closed
.onClose(_ => Console.printLine("Closed!").ignore)
// Called whenever there is an error on the socket channel
.onError(_ => Console.printLine("Error!").ignore)
// Setup websocket decoder config
.withDecoder(decoder)
// Setup websocket protocol config
.withProtocol(protocol)
}
private val app =
Http.collectZIO[Request] {
case Method.GET -> !! / "greet" / name =>
UIO(Response.text(s"Greetings ${name}!"))
case Method.GET -> !! / "subscriptions" => socketApp.toResponse
}
override def run: ZIO[Any with ZIOAppArgs with Scope, Any, Any] =
Server.start(8090, app).exitCode
}
//Client
import zhttp.service.{ChannelFactory, EventLoopGroup}
import zhttp.socket.{
Socket,
SocketApp,
SocketDecoder,
SocketProtocol,
WebSocketFrame
}
import zio._
import zio.stream.ZStream
object main extends ZIOAppDefault {
private val env = EventLoopGroup.auto(8) ++ ChannelFactory.auto
private val url = "ws://localhost:8090/subscriptions"
private val app =
Socket
.collect[WebSocketFrame] { case fr @ WebSocketFrame.Text(_) =>
ZStream.succeed(fr)
}
.toSocketApp
.onOpen(Socket.succeed(WebSocketFrame.text("Hellow")))
.onClose(cn => Console.printLine(s"Connection closed: ${cn}").!)
.onError(thr => ZIO.die(thr))
.connect(url)
def run =
app
.provide(env)
.exitCode
}
/*Client reponse when running
Connection closed: localhost/127.0.0.1:8090
Process finished with exit code 0
*/