package main
import (
"container/list"
"encoding/json"
"fmt"
"log"
"os"
"strconv"
"github.com/cmu440/bitcoin"
"github.com/cmu440/lsp"
)
const (
chunksize = 10000
max_uint64 = ^uint64(0)
)
type NonceStatus int
const (
Unstarted NonceStatus = iota
Distributed
Finished
)
type server struct {
lspServer lsp.Server
readRoutineMsgChan chan bitcoin.Message
readRoutineIDChan chan int
readRoutineClosedIDChan chan int
clientList *list.List // sorted by remaining_nonce
idleMinerList *list.List
busyMinerList *list.List
}
type client struct {
conn_id int
max_nonce uint64
nonce_chunk_status []NonceStatus
remaining_nonce uint64
min_hash uint64
nonce uint64
}
type miner struct {
conn_id int
client_id int
lower, upper uint64
}
func startServer(port int) (*server, error) {
srv, err := lsp.NewServer(port, lsp.NewParams())
new_srv := server{
lspServer: srv,
}
return &new_srv, err
}
var LOGF *log.Logger
func main() {
// You may need a logger for debug purpose
const (
name = "serverLog.txt"
flag = os.O_RDWR | os.O_CREATE
perm = os.FileMode(0666)
)
file, err := os.OpenFile(name, flag, perm)
if err != nil {
return
}
defer file.Close()
LOGF = log.New(file, "", log.Lshortfile|log.Lmicroseconds)
// Usage: LOGF.Println() or LOGF.Printf()
const numArgs = 2
if len(os.Args) != numArgs {
fmt.Printf("Usage: ./%s <port>", os.Args[0])
return
}
port, err := strconv.Atoi(os.Args[1])
if err != nil {
fmt.Println("Port must be a number:", err)
return
}
srv, err := startServer(port)
if err != nil {
fmt.Println(err.Error())
return
}
fmt.Println("Server listening on port", port)
defer srv.lspServer.Close()
// TODO: implement this!
go mainRoutine(srv)
go readRoutine(srv)
for {
}
// <-
}
func mainRoutine(s *server) {
for {
if s.idleMinerList.Len() > 0 {
distributeTask_SRTF(s)
}
select {
case msg := <-s.readRoutineMsgChan:
id := <-s.readRoutineIDChan
switch msg.Type {
case bitcoin.Join: // from miner
addNewMiner(s, id)
case bitcoin.Request: // from client (data, lower, upper)
addNewClient(s, id, msg.Data, msg.Upper)
case bitcoin.Result: // from miner (hash, nonce)
combineResult(s, id, msg.Hash, msg.Nonce)
}
case id := <-s.readRoutineClosedIDChan:
closeMinerOrClient(s, id)
}
}
}
func readRoutine(s *server) {
for {
//_, message, _ := s.lspServer.Read()
conn_id, message, err := s.lspServer.Read()
if err != nil { // a miner or a client closed
s.readRoutineClosedIDChan <- conn_id
continue
}
var msg bitcoin.Message
err = json.Unmarshal(message, &msg)
if err != nil {
return
}
s.readRoutineMsgChan <- msg
s.readRoutineIDChan <- conn_id
}
}
func addNewMiner(s *server, conn_id int) {
s.idleMinerList.PushBack(miner{
conn_id: conn_id,
})
return
}
func addNewClient(s *server, conn_id int, data string, max_nonce uint64) {
cli := client{
conn_id: conn_id,
max_nonce: max_nonce,
nonce_chunk_status: make([]NonceStatus, max_nonce/chunksize+1),
remaining_nonce: max_nonce,
min_hash: max_uint64,
}
// insert client so that clientList is sorted in the ascending order of remaining_nonce
for e := s.clientList.Front(); e != nil; e = e.Next() {
if max_nonce < e.Value.(client).remaining_nonce {
s.clientList.InsertBefore(cli, e)
return
}
}
s.clientList.PushBack(cli)
return
}
func combineResult(s *server, conn_id int, hash uint64, nonce uint64) {
// find and remove the miner in busyMinerList
e := s.busyMinerList.Front()
for ; e != nil; e = e.Next() {
if e.Value.(miner).conn_id == conn_id {
s.busyMinerList.Remove(e)
break
}
}
miner := e.Value.(miner)
// find the client in clientList and combine the result from the miner
for e := s.clientList.Front(); e != nil; e = e.Next() {
if e.Value.(client).conn_id == miner.client_id {
cli := e.Value.(client)
// update the min_hash and nonce
if hash < cli.min_hash {
cli.min_hash = hash
cli.nonce = nonce
}
// check if the result for the client is finished
if cli.remaining_nonce <= chunksize { // finished
s.clientList.Remove(e)
// TODO: write result to client
} else {
cli.nonce_chunk_status[miner.lower/chunksize] = Finished
cli.remaining_nonce -= chunksize
}
break
}
}
s.idleMinerList.PushBack(miner)
return
}
func distributeTask_SRTF(s *server) {
for e := s.clientList.Front(); e != nil; e = e.Next() {
cli := e.Value.(client)
for i, v := range cli.nonce_chunk_status {
if v == Unstarted {
el := s.idleMinerList.Front()
miner := el.Value.(miner)
s.idleMinerList.Remove(el)
miner.client_id = cli.conn_id
miner.lower = uint64(i * chunksize)
miner.upper = min(uint64((i+1) * chunksize), cli.max_nonce)
// TODO: write request to miner
cli.nonce_chunk_status[i] = Distributed
s.busyMinerList.PushBack(miner)
if s.idleMinerList.Len() == 0 {
return
}
}
}
}
}
func min(a, b uint64) uint64 {
if a < b {
return a
} else {
return b
}
}
func closeMinerOrClient(s *server, close_id int) {
for e := s.idleMinerList.Front(); e != nil; e = e.Next() {
if e.Value.(miner).conn_id == close_id {
s.idleMinerList.Remove(e)
return
}
}
for e := s.busyMinerList.Front(); e != nil; e = e.Next() {
if e.Value.(miner).conn_id == close_id {
s.busyMinerList.Remove(e)
miner := e.Value.(miner)
client_id := miner.client_id
for el := s.clientList.Front(); el != nil; el = el.Next() {
if el.Value.(client).conn_id == client_id {
el.Value.(client).nonce_chunk_status[miner.lower/chunksize] = Unstarted
return
}
}
}
}
for e := s.clientList.Front(); e != nil; e = e.Next() {
if e.Value.(client).conn_id == close_id {
s.clientList.Remove(e)
return
}
}
}