Untitled

 avatar
unknown
golang
2 years ago
3.9 kB
2
Indexable
package impl

import (
	"errors"
	"go.dedis.ch/cs438/peer"
	"go.dedis.ch/cs438/transport"
	"golang.org/x/xerrors"
	"sync"
	"time"
)

// Package dela defines the logger.
//
// Dela stands for DEDIS Ledger Architecture. It defines the modules that will
// be combined to deploy a distributed public ledger.
//
// Dela is using a global logger with some default parameters. It is disabled by
// default and the level can be increased using a environment variable:
//
//   LLVL=trace go test ./...
//   LLVL=info go test ./...
//

type ConcurRouteTbl struct {
	tbl   peer.RoutingTable
	mutex sync.Mutex
}

func (concurTbl *ConcurRouteTbl) Insert(key, val string) {
	concurTbl.mutex.Lock()
	defer concurTbl.mutex.Unlock()
	concurTbl.tbl[key] = val
}

func (concurTbl *ConcurRouteTbl) Read(key string) string {
	concurTbl.mutex.Lock()
	defer concurTbl.mutex.Unlock()
	return concurTbl.tbl[key]
}

func (concurTbl *ConcurRouteTbl) Delete(key string) {
	concurTbl.mutex.Lock()
	defer concurTbl.mutex.Unlock()
	delete(concurTbl.tbl, key)
}

// NewPeer creates a new peer. You can change the content and location of this
// function but you MUST NOT change its signature and package location.
func NewPeer(conf peer.Configuration) peer.Peer {
	// here you must return a struct that implements the peer.Peer functions.
	// Therefore, you are free to rename and change it as you want.

	n := &node{concurTbl: ConcurRouteTbl{tbl: make(peer.RoutingTable)}, conf: conf}
	n.AddPeer(n.conf.Socket.GetAddress())
	return n
}

// node implements a peer to build a Peerster system
//
// - implements peer.Peer
type node struct {
	peer.Peer
	// You probably want to keep the peer.Configuration on this struct:
	conf      peer.Configuration
	concurTbl ConcurRouteTbl
	active    bool
}

// Start implements peer.Service
func (n *node) Start() error {
	n.active = true
	go func() {
		for {
			if !n.active {
				return
			}
			pkt, err := n.conf.Socket.Recv(time.Second * 1)
			if err != nil {
				if errors.Is(err, transport.TimeoutError(0)) {
					continue
				}
			}

			if pkt.Header.Destination != n.conf.Socket.GetAddress() {
				nxtNode := n.concurTbl.Read(pkt.Header.Destination)
				if nxtNode != "" {
					pkt.Header.RelayedBy = n.conf.Socket.GetAddress()
					go func() {
						err := func() error {
							return n.conf.Socket.Send(nxtNode, pkt, time.Second*1)
						}()
						if err != nil {
							return
						}
					}()
				}
			} else {
				go func() {
					err := func() error {
						return n.conf.MessageRegistry.ProcessPacket(pkt)
					}()
					if err != nil {
						return
					}
				}()
			}
		}
	}()
	return nil
}

// Stop implements peer.Service
func (n *node) Stop() error {
	n.active = false
	return nil
}

// Unicast implements peer.Messaging
func (n *node) Unicast(dest string, msg transport.Message) error {
	if n.concurTbl.Read(dest) == "" {
		return xerrors.Errorf("failed uni-cast: destination not known\n")
	}

	const TTL, NoTimeout = uint(0), int(0)
	addr := n.conf.Socket.GetAddress()
	hdr := transport.NewHeader(addr, addr, dest, TTL)

	pkt := transport.Packet{
		Header: &hdr,
		Msg:    &msg,
	}

	err := n.conf.Socket.Send(n.concurTbl.Read(dest), pkt, time.Duration(NoTimeout))
	if err != nil {
		return xerrors.Errorf("failed uni-cast: %w", err)
	}

	return nil
}

// AddPeer implements peer.Service
func (n *node) AddPeer(addr ...string) {
	for _, addr := range addr {
		n.concurTbl.Insert(addr, addr)
	}
}

// GetRoutingTable implements peer.Service
func (n *node) GetRoutingTable() peer.RoutingTable {
	t := &n.concurTbl
	t.mutex.Lock()
	defer t.mutex.Unlock()

	return t.tbl
}

// SetRoutingEntry implements peer.Service
func (n *node) SetRoutingEntry(origin, relayAddr string) {
	if relayAddr == "" {
		n.concurTbl.Delete(origin)
		return
	}

	if n.concurTbl.Read(origin) != origin {
		n.concurTbl.Insert(origin, relayAddr)
		return
	}
}