package impl
import (
"errors"
"go.dedis.ch/cs438/peer"
"go.dedis.ch/cs438/transport"
"golang.org/x/xerrors"
"sync"
"time"
)
// Package dela defines the logger.
//
// Dela stands for DEDIS Ledger Architecture. It defines the modules that will
// be combined to deploy a distributed public ledger.
//
// Dela is using a global logger with some default parameters. It is disabled by
// default and the level can be increased using a environment variable:
//
// LLVL=trace go test ./...
// LLVL=info go test ./...
//
type ConcurRouteTbl struct {
tbl peer.RoutingTable
mutex sync.Mutex
}
func (concurTbl *ConcurRouteTbl) Insert(key, val string) {
concurTbl.mutex.Lock()
defer concurTbl.mutex.Unlock()
concurTbl.tbl[key] = val
}
func (concurTbl *ConcurRouteTbl) Read(key string) string {
concurTbl.mutex.Lock()
defer concurTbl.mutex.Unlock()
return concurTbl.tbl[key]
}
func (concurTbl *ConcurRouteTbl) Delete(key string) {
concurTbl.mutex.Lock()
defer concurTbl.mutex.Unlock()
delete(concurTbl.tbl, key)
}
// NewPeer creates a new peer. You can change the content and location of this
// function but you MUST NOT change its signature and package location.
func NewPeer(conf peer.Configuration) peer.Peer {
// here you must return a struct that implements the peer.Peer functions.
// Therefore, you are free to rename and change it as you want.
n := &node{concurTbl: ConcurRouteTbl{tbl: make(peer.RoutingTable)}, conf: conf}
n.AddPeer(n.conf.Socket.GetAddress())
return n
}
// node implements a peer to build a Peerster system
//
// - implements peer.Peer
type node struct {
peer.Peer
// You probably want to keep the peer.Configuration on this struct:
conf peer.Configuration
concurTbl ConcurRouteTbl
active bool
}
// Start implements peer.Service
func (n *node) Start() error {
n.active = true
go func() {
for {
if !n.active {
return
}
pkt, err := n.conf.Socket.Recv(time.Second * 1)
if err != nil {
if errors.Is(err, transport.TimeoutError(0)) {
continue
}
}
if pkt.Header.Destination != n.conf.Socket.GetAddress() {
nxtNode := n.concurTbl.Read(pkt.Header.Destination)
if nxtNode != "" {
pkt.Header.RelayedBy = n.conf.Socket.GetAddress()
go func() {
err := func() error {
return n.conf.Socket.Send(nxtNode, pkt, time.Second*1)
}()
if err != nil {
return
}
}()
}
} else {
go func() {
err := func() error {
return n.conf.MessageRegistry.ProcessPacket(pkt)
}()
if err != nil {
return
}
}()
}
}
}()
return nil
}
// Stop implements peer.Service
func (n *node) Stop() error {
n.active = false
return nil
}
// Unicast implements peer.Messaging
func (n *node) Unicast(dest string, msg transport.Message) error {
if n.concurTbl.Read(dest) == "" {
return xerrors.Errorf("failed uni-cast: destination not known\n")
}
const TTL, NoTimeout = uint(0), int(0)
addr := n.conf.Socket.GetAddress()
hdr := transport.NewHeader(addr, addr, dest, TTL)
pkt := transport.Packet{
Header: &hdr,
Msg: &msg,
}
err := n.conf.Socket.Send(n.concurTbl.Read(dest), pkt, time.Duration(NoTimeout))
if err != nil {
return xerrors.Errorf("failed uni-cast: %w", err)
}
return nil
}
// AddPeer implements peer.Service
func (n *node) AddPeer(addr ...string) {
for _, addr := range addr {
n.concurTbl.Insert(addr, addr)
}
}
// GetRoutingTable implements peer.Service
func (n *node) GetRoutingTable() peer.RoutingTable {
t := &n.concurTbl
t.mutex.Lock()
defer t.mutex.Unlock()
return t.tbl
}
// SetRoutingEntry implements peer.Service
func (n *node) SetRoutingEntry(origin, relayAddr string) {
if relayAddr == "" {
n.concurTbl.Delete(origin)
return
}
if n.concurTbl.Read(origin) != origin {
n.concurTbl.Insert(origin, relayAddr)
return
}
}