Untitled

 avatar
unknown
golang
3 years ago
12 kB
3
Indexable
// Basic1 ok, Basic2 ok

// Contains the implementation of a LSP server.

package lsp

import (
	"container/list"
	"encoding/json" // for json Marshal
	"errors"
	"fmt" // need to delete this
	"github.com/cmu440/lspnet"
)

type pair struct {
	pair_id, pair_payload interface{}
}

type server struct {
	connection *lspnet.UDPConn // ?

	// [EpochLimit: 5, EpochMillis: 2000, WindowSize: 1, MaxBackOffInterval: 0,MaxUnackedMessages: 1]
	EpochLimit         int
	EpochMillis        int
	WindowSize         int
	MaxBackOffInterval int
	MaxUnackedMessages int

	read_request      chan string
	read_response_id  chan int
	read_response_msg chan string // need to turn back to [] byte in Read()

	read_routine_msg chan Message // ???

	address chan *lspnet.UDPAddr

	// server_seqnum_request chan int
	// server_seq_response chan int

	client_cur_index int              
	// current client_id number (e.g. current there are 5 clients already, then the number here can be 6, which means the next client will have an id value 6)
	// m map[int]client_2 // map conn_id to client struct!! // keep track of client's IP address+port with its client_id (so if resend connection message, we do not generate a new client_id??)
    m []client_2
	queue *list.List // use list so the size will not be fixed

	// writeCalled chan int
	// writeReady  chan bool
}

type client_2 struct {
	client_id              int
	server_sequence_number int // server's copy of sequence number for a specific client
	client_sequence_number int // server's expecting client's sequence number?
	ack_status             int // which message it has acked?

	// client's addr
	client_addr *lspnet.UDPAddr
	// map (unordered data)
	// unordered_queue map[int]string // type?

	unordered_queue *list.List

	// store un-ordered data message (priority queue, may use a heap) (for seq num that are too large, so we can not yet put into the queue)
	// priority_queue heap.Heap ??
	// probably also the sequence of message to send back to the client???
}

// NewServer creates, initiates, and returns a new server. This function should
// NOT block. Instead, it should spawn one or more goroutines (to handle things
// like accepting incoming client connections, triggering epoch events at
// fixed intervals, synchronizing events using a for-select loop like you saw in
// project 0, etc.) and immediately return. It should return a non-nil error if
// there was an error resolving or listening on the specified port number.
func NewServer(port int, params *Params) (Server, error) {
	// create a new server and initialize
	var LSPserver server

	port_string := fmt.Sprintf("%d", port)
	// fmt.Println("port_string = ", port_string)

	addr_string := "127.0.0.1: " + port_string

	a, error := lspnet.ResolveUDPAddr("udp", addr_string)
	if error != nil {
		fmt.Println(error)
		return &LSPserver, error
	}

	connection, error := lspnet.ListenUDP("udp", a)
	if error != nil {
		fmt.Println(error)
		return &LSPserver, error
	}

	LSPserver.connection = connection
	LSPserver.EpochLimit = params.EpochLimit
	LSPserver.EpochMillis = params.EpochMillis
	LSPserver.WindowSize = params.WindowSize
	LSPserver.MaxBackOffInterval = params.MaxBackOffInterval
	LSPserver.MaxUnackedMessages = params.MaxUnackedMessages

	//LSPserver.m = make(map[int]client_2) // !!!

	LSPserver.queue = list.New() // ??????

	LSPserver.read_request = make(chan string)
	LSPserver.read_response_id = make(chan int)
	LSPserver.read_response_msg = make(chan string)

	LSPserver.read_routine_msg = make(chan Message)
	LSPserver.address = make(chan *lspnet.UDPAddr)

	// LSPserver.writeCalled = make(chan int)
	// LSPserver.writeReady = make(chan bool)

	go LSPserver.main_routine() 

	// start read goroutine
	go LSPserver.read_routine() // ??????? always reading, race condition!!!

	return &LSPserver, nil
}


func InsertMsgToList_server(c *client_2, msg *Message) {
	e := c.unordered_queue.Back()
	for ; e != nil; e = e.Prev() {
		if e.Value.(*Message).SeqNum < msg.SeqNum {
			c.unordered_queue.InsertAfter(msg, e)
			return
		}
	}
	c.unordered_queue.PushBack(msg)
}

func PrintList_server(l *list.List) {
	i := 0
	fmt.Println("Client LIST:  START")
	for e := l.Front(); e != nil; e = e.Next() {
		fmt.Printf("Client:  %d: %s\n", i, e.Value.(*Message))
		i++
	}
	fmt.Println("Client LIST:  END")
}


func (s *server) main_routine(){
	s.client_cur_index += 1
	s.m = append(s.m, client_2{})

	for {
		select {
			case <- s.read_request:
				// get the front of the server's queue and pass it into channel
				// fmt.Println("in case read_request")
				if s.queue.Len() > 0{
					message_to_read := s.queue.Front().Value
					s.read_response_id <- message_to_read.(*Message).ConnID
					s.read_response_msg <- string(message_to_read.(*Message).Payload) // ???
					s.queue.Remove(s.queue.Front())
				} 

			case message := <- s.read_routine_msg:
				
				addr_ := <- s.address

				if message.Type == 0 { // or 0? connect case (do not need to forward to main routine?)  lspnet.MsgConnect?
					
					s.m = append(s.m, client_2{
						client_id:              s.client_cur_index,
						server_sequence_number: message.SeqNum,
						client_sequence_number: message.SeqNum+1, // ??????
						client_addr:            addr_,
						unordered_queue:        list.New(), // LSPserver.queue = list.New()
					})

					ack_message := NewAck(s.client_cur_index, message.SeqNum)
					message_bytes, error := json.Marshal(ack_message)
					if error != nil {
						fmt.Println(error.Error())
						return
					}

					s.client_cur_index += 1

					s.connection.WriteToUDP(message_bytes, addr_) // 需要看會不會block (如果會block就用write_routine)

					// fmt.Println("server new connection after write to udp")

				}

				if message.Type == 1 { // data (按照順序傳進list/queue)

					this_seq_num := message.SeqNum
					this_client_id := message.ConnID

					// check checksum and size first
					this_size := message.Size
					this_payload := message.Payload

					// fmt.Println("message.Size = ", message.Size)
					// fmt.Println("len of payload = ", len(message.Payload))

					if len(this_payload) != this_size {
						fmt.Println("the size is incorrect")
						return
					}

					// checksum
					this_checksum := CalculateChecksum(message.ConnID, message.SeqNum, message.Size, message.Payload)

					if this_checksum != message.Checksum {
						fmt.Println("the checksum is incorrect")
						return
					}
					

					// insert into list
					InsertMsgToList_server(&s.m[this_client_id], &message)
					// PrintList_server(s.m[this_client_id].unordered_queue)


					// if seq num == client_seq_num, then we add it to server's queue
						// fmt.Println("client_sequence_number = ", s.m[this_client_id].client_sequence_number)
						// fmt.Println("SeqNum = ", s.m[this_client_id].unordered_queue.Front().Value.(*Message).SeqNum)
					for s.m[this_client_id].unordered_queue.Len() > 0{
						if s.m[this_client_id].client_sequence_number == s.m[this_client_id].unordered_queue.Front().Value.(*Message).SeqNum{

							// add to queue!
							s.queue.PushBack(s.m[this_client_id].unordered_queue.Front().Value.(*Message))


							s.m[this_client_id].client_sequence_number += 1
							s.m[this_client_id].unordered_queue.Remove(s.m[this_client_id].unordered_queue.Front())
							
						} else {
							break
						}
					}

					// send ack message back to client
					ack_message := NewAck(this_client_id, this_seq_num)
					message_bytes, error := json.Marshal(ack_message)
					if error != nil {
						fmt.Println(error.Error())
						return
					}
					s.connection.WriteToUDP(message_bytes, s.m[this_client_id].client_addr) // modified

				}

				// if message.Type == 2 { // ack (do NOT need to do anything for checkpoint?)

				// 	this_client_id := message.ConnID

					

				// }

				// if message.Type == 3 { // cAck
				// 	this_client_id := message.ConnID

				// }

		}
	}

}



/*
This is where we turn UDP into LSP
We continue to read message, do some processing, and store it in the queue.
For the 3 different cases, we need to do different processing and send it to the main routine
we need to know the latest sequence number that is put into the queue, and only the number that is continuously larger than the current sequence number can be added to the queue
we do not need to call read_routine again and again, because it runs when server starts and never ends
Just continue to send sth into the queue
We just need tone read_routine
*/
func (s *server) read_routine() {
	
	for {
			message_temp := make([]byte, 1000)

			// message_read, _, _ := s.connection.Read(message_temp[:]) // ???
			n, addr_, _ := s.connection.ReadFromUDP(message_temp) // ???


			var message Message // ??????

			error := json.Unmarshal(message_temp[:n], &message) // need to read limited length
			if error != nil {
				fmt.Println(error.Error())
				// fmt.Println("error in Unmarshal")
				return
			}

			// send to main_routine
			s.read_routine_msg <- message

			s.address <- addr_ // ???

			// fmt.Println("message sent from read_routine to main_routine")

			// // fmt.Println("message_read = ", message_read)
			// fmt.Printf("server: message after unmarshal = %s\n", &message) // ?
	}
}

// probably do not need write_routine, because we can just write it back in the main routine?

/*
LSP's read
will have a received message, but we need to know which client is from, so we need to return client_id
Just taking things out of server's queue
從queue拿出來,回傳給他
main_routine -> 傳到read_routine,用channel溝通,不要直接在這邊改動s.queue (避免race condition)
*/
func (s *server) Read() (int, []byte, error) {

	// fmt.Println("Read!!!")

	for {
		// fmt.Println("**********")
		select{
		// default:
		// 	fmt.Println("((((((((((")
		// 	s.read_request <- "request"
		// 	fmt.Println(")))))))))))")
		case s.read_request <- "request":
			// do nothing
			// fmt.Println("request sent")
		case conn_id := <-s.read_response_id:
			// fmt.Println("&&&&&&&&&&")
			msg_string := <-s.read_response_msg
			// fmt.Println("receive response, Read finishes")
			return conn_id, []byte(msg_string), nil
		}
	}

	// s.read_request <- "request"

	// fmt.Println("server request sent")

	// conn_id := <-s.read_response_id

	// msg_string := <-s.read_response_msg

	//fmt.Println("receive response, Read finishes")

	// fmt.Printf("server read: conn_id = %s", conn_id)
	// fmt.Printf("server read: msg sent = %s", msg_string)

	// return conn_id, []byte(msg_string), nil

}

func (s *server) Write(connId int, payload []byte) error {

	 s.m[connId].server_sequence_number += 1 // ?????????
	//s.writeCalled <- connId
	//<-s.writeReady
	/*if entry, ok := s.m[connId]; ok {
		        //fmt.Println("11HIHIHI: ", s.m[connId])
		        entry.server_sequence_number += 1
		        //fmt.Println("22HIHIHI: ", s.m[connId])
				s.m[connId] = entry
		        //fmt.Println("33HIHIHI: ", s.m[connId])
			}
	*/

	size := len(payload)

	// s.server_seqnum_request <- connId
	// seq_num := <- s.server_seq_response

	checksum := CalculateChecksum(connId, s.m[connId].server_sequence_number, size, payload) // race condition
	msgToSer := NewData(connId, s.m[connId].server_sequence_number, size, payload, checksum)
	msgJson, error := json.Marshal(msgToSer)

	if error != nil {
		fmt.Println(error.Error())
		return error
	}

	addr := s.m[connId].client_addr // added
	_, error = s.connection.WriteToUDP(msgJson, addr)
	if error != nil {
		fmt.Println(error.Error())
		return error
	}

	// fmt.Printf("server write: %s\n", msgToSer)

	return nil
}

func (s *server) CloseConn(connId int) error {
	fmt.Println("CloseConn!!!")

	// delete the client from server's map

	return errors.New("not yet implemented")
}

func (s *server) Close() error {
	fmt.Println("Close!!!")

	// stop the server and clean all resources

	return errors.New("not yet implemented")
}



Editor is loading...