Untitled

mail@pastecode.io avatar
unknown
golang
2 years ago
6.0 kB
5
Indexable
package main

import (
	"container/list"
	"encoding/json"
	"fmt"
	"log"
	"os"
	"strconv"

	"github.com/cmu440/bitcoin"
	"github.com/cmu440/lsp"
)

const (
	chunksize  = 10000
	max_uint64 = ^uint64(0)
)

type NonceStatus int

const (
	Unstarted NonceStatus = iota
	Distributed
	Finished
)

type server struct {
	lspServer               lsp.Server
	readRoutineMsgChan      chan bitcoin.Message
	readRoutineIDChan       chan int
	readRoutineClosedIDChan chan int
	clientList              *list.List // sorted by remaining_nonce
	idleMinerList           *list.List
	busyMinerList           *list.List
}

type client struct {
	conn_id            int
	max_nonce          uint64
	nonce_chunk_status []NonceStatus
	remaining_nonce    uint64
	min_hash           uint64
	nonce              uint64
}
type miner struct {
	conn_id      int
	client_id    int
	lower, upper uint64
}

func startServer(port int) (*server, error) {
	srv, err := lsp.NewServer(port, lsp.NewParams())

	new_srv := server{
		lspServer: srv,
	}
	return &new_srv, err
}

var LOGF *log.Logger

func main() {
	// You may need a logger for debug purpose
	const (
		name = "serverLog.txt"
		flag = os.O_RDWR | os.O_CREATE
		perm = os.FileMode(0666)
	)

	file, err := os.OpenFile(name, flag, perm)
	if err != nil {
		return
	}
	defer file.Close()

	LOGF = log.New(file, "", log.Lshortfile|log.Lmicroseconds)
	// Usage: LOGF.Println() or LOGF.Printf()

	const numArgs = 2
	if len(os.Args) != numArgs {
		fmt.Printf("Usage: ./%s <port>", os.Args[0])
		return
	}

	port, err := strconv.Atoi(os.Args[1])
	if err != nil {
		fmt.Println("Port must be a number:", err)
		return
	}

	srv, err := startServer(port)
	if err != nil {
		fmt.Println(err.Error())
		return
	}
	fmt.Println("Server listening on port", port)

	defer srv.lspServer.Close()

	// TODO: implement this!

	go mainRoutine(srv)
	go readRoutine(srv)
	for {
	}
	// <-
}

func mainRoutine(s *server) {
	for {
		if s.idleMinerList.Len() > 0 {
			distributeTask_SRTF(s)
		}
		select {
		case msg := <-s.readRoutineMsgChan:
			id := <-s.readRoutineIDChan
			switch msg.Type {
			case bitcoin.Join: // from miner
				addNewMiner(s, id)

			case bitcoin.Request: // from client (data, lower, upper)
				addNewClient(s, id, msg.Data, msg.Upper)

			case bitcoin.Result: // from miner (hash, nonce)
				combineResult(s, id, msg.Hash, msg.Nonce)
			}
		case id := <-s.readRoutineClosedIDChan:
			closeMinerOrClient(s, id)
		}
	}
}
func readRoutine(s *server) {
	for {
		//_, message, _ := s.lspServer.Read()
		conn_id, message, err := s.lspServer.Read()
		if err != nil { // a miner or a client closed
			s.readRoutineClosedIDChan <- conn_id
			continue
		}

		var msg bitcoin.Message
		err = json.Unmarshal(message, &msg)
		if err != nil {
			return
		}
		s.readRoutineMsgChan <- msg
		s.readRoutineIDChan <- conn_id
	}
}

func addNewMiner(s *server, conn_id int) {

	s.idleMinerList.PushBack(miner{
		conn_id: conn_id,
	})
	return
}

func addNewClient(s *server, conn_id int, data string, max_nonce uint64) {
	cli := client{
		conn_id:            conn_id,
		max_nonce:          max_nonce,
		nonce_chunk_status: make([]NonceStatus, max_nonce/chunksize+1),
		remaining_nonce:    max_nonce,
		min_hash:           max_uint64,
	}
	// insert client so that clientList is sorted in the ascending order of remaining_nonce
	for e := s.clientList.Front(); e != nil; e = e.Next() {
		if max_nonce < e.Value.(client).remaining_nonce {
			s.clientList.InsertBefore(cli, e)
			return
		}
	}
	s.clientList.PushBack(cli)
	return
}

func combineResult(s *server, conn_id int, hash uint64, nonce uint64) {
	// find and remove the miner in busyMinerList
    e := s.busyMinerList.Front()
	for ; e != nil; e = e.Next() {
		if e.Value.(miner).conn_id == conn_id {
			s.busyMinerList.Remove(e)
			break
		}
	}
	miner := e.Value.(miner)

	// find the client in clientList and combine the result from the miner
	for e := s.clientList.Front(); e != nil; e = e.Next() {
		if e.Value.(client).conn_id == miner.client_id {
			cli := e.Value.(client)
			// update the min_hash and nonce
			if hash < cli.min_hash {
				cli.min_hash = hash
				cli.nonce = nonce
			}
			// check if the result for the client is finished
			if cli.remaining_nonce <= chunksize { // finished
				s.clientList.Remove(e)
				// TODO: write result to client

			} else {
				cli.nonce_chunk_status[miner.lower/chunksize] = Finished
				cli.remaining_nonce -= chunksize
			}
			break
		}
	}

	s.idleMinerList.PushBack(miner)
	return
}

func distributeTask_SRTF(s *server) {
	for e := s.clientList.Front(); e != nil; e = e.Next() {
		cli := e.Value.(client)
		for i, v := range cli.nonce_chunk_status {
			if v == Unstarted {
				el := s.idleMinerList.Front()
				miner := el.Value.(miner)
				s.idleMinerList.Remove(el)
				miner.client_id = cli.conn_id
				miner.lower = uint64(i * chunksize)
				miner.upper = min(uint64((i+1) * chunksize), cli.max_nonce)
				// TODO: write request to miner
				cli.nonce_chunk_status[i] = Distributed
				s.busyMinerList.PushBack(miner)
				if s.idleMinerList.Len() == 0 {
					return
				}
			}
		}
	}
}

func min(a, b uint64) uint64 {
    if a < b {
        return a
    } else {
        return b
    }
}
func closeMinerOrClient(s *server, close_id int) {
	for e := s.idleMinerList.Front(); e != nil; e = e.Next() {
		if e.Value.(miner).conn_id == close_id {
			s.idleMinerList.Remove(e)
			return
		}
	}
	for e := s.busyMinerList.Front(); e != nil; e = e.Next() {
		if e.Value.(miner).conn_id == close_id {
			s.busyMinerList.Remove(e)
			miner := e.Value.(miner)
			client_id := miner.client_id
			for el := s.clientList.Front(); el != nil; el = el.Next() {
				if el.Value.(client).conn_id == client_id {
					el.Value.(client).nonce_chunk_status[miner.lower/chunksize] = Unstarted
					return
				}
			}
		}
	}
	for e := s.clientList.Front(); e != nil; e = e.Next() {
		if e.Value.(client).conn_id == close_id {
			s.clientList.Remove(e)
			return
		}
	}
}