Untitled
unknown
golang
2 years ago
3.9 kB
3
Indexable
package impl import ( "errors" "go.dedis.ch/cs438/peer" "go.dedis.ch/cs438/transport" "golang.org/x/xerrors" "sync" "time" ) // Package dela defines the logger. // // Dela stands for DEDIS Ledger Architecture. It defines the modules that will // be combined to deploy a distributed public ledger. // // Dela is using a global logger with some default parameters. It is disabled by // default and the level can be increased using a environment variable: // // LLVL=trace go test ./... // LLVL=info go test ./... // type ConcurRouteTbl struct { tbl peer.RoutingTable mutex sync.Mutex } func (concurTbl *ConcurRouteTbl) Insert(key, val string) { concurTbl.mutex.Lock() defer concurTbl.mutex.Unlock() concurTbl.tbl[key] = val } func (concurTbl *ConcurRouteTbl) Read(key string) string { concurTbl.mutex.Lock() defer concurTbl.mutex.Unlock() return concurTbl.tbl[key] } func (concurTbl *ConcurRouteTbl) Delete(key string) { concurTbl.mutex.Lock() defer concurTbl.mutex.Unlock() delete(concurTbl.tbl, key) } // NewPeer creates a new peer. You can change the content and location of this // function but you MUST NOT change its signature and package location. func NewPeer(conf peer.Configuration) peer.Peer { // here you must return a struct that implements the peer.Peer functions. // Therefore, you are free to rename and change it as you want. n := &node{concurTbl: ConcurRouteTbl{tbl: make(peer.RoutingTable)}, conf: conf} n.AddPeer(n.conf.Socket.GetAddress()) return n } // node implements a peer to build a Peerster system // // - implements peer.Peer type node struct { peer.Peer // You probably want to keep the peer.Configuration on this struct: conf peer.Configuration concurTbl ConcurRouteTbl active bool } // Start implements peer.Service func (n *node) Start() error { n.active = true go func() { for { if !n.active { return } pkt, err := n.conf.Socket.Recv(time.Second * 1) if err != nil { if errors.Is(err, transport.TimeoutError(0)) { continue } } if pkt.Header.Destination != n.conf.Socket.GetAddress() { nxtNode := n.concurTbl.Read(pkt.Header.Destination) if nxtNode != "" { pkt.Header.RelayedBy = n.conf.Socket.GetAddress() go func() { err := func() error { return n.conf.Socket.Send(nxtNode, pkt, time.Second*1) }() if err != nil { return } }() } } else { go func() { err := func() error { return n.conf.MessageRegistry.ProcessPacket(pkt) }() if err != nil { return } }() } } }() return nil } // Stop implements peer.Service func (n *node) Stop() error { n.active = false return nil } // Unicast implements peer.Messaging func (n *node) Unicast(dest string, msg transport.Message) error { if n.concurTbl.Read(dest) == "" { return xerrors.Errorf("failed uni-cast: destination not known\n") } const TTL, NoTimeout = uint(0), int(0) addr := n.conf.Socket.GetAddress() hdr := transport.NewHeader(addr, addr, dest, TTL) pkt := transport.Packet{ Header: &hdr, Msg: &msg, } err := n.conf.Socket.Send(n.concurTbl.Read(dest), pkt, time.Duration(NoTimeout)) if err != nil { return xerrors.Errorf("failed uni-cast: %w", err) } return nil } // AddPeer implements peer.Service func (n *node) AddPeer(addr ...string) { for _, addr := range addr { n.concurTbl.Insert(addr, addr) } } // GetRoutingTable implements peer.Service func (n *node) GetRoutingTable() peer.RoutingTable { t := &n.concurTbl t.mutex.Lock() defer t.mutex.Unlock() return t.tbl } // SetRoutingEntry implements peer.Service func (n *node) SetRoutingEntry(origin, relayAddr string) { if relayAddr == "" { n.concurTbl.Delete(origin) return } if n.concurTbl.Read(origin) != origin { n.concurTbl.Insert(origin, relayAddr) return } }
Editor is loading...