Untitled

 avatar
unknown
golang
3 years ago
14 kB
4
Indexable
// Contains the implementation of a LSP server.

package lsp

import (
	"errors"
	"github.com/cmu440/lspnet"
	"fmt" // need to delete this
	"container/list"
	"encoding/json" // for json Marshal
)


type pair struct {
    pair_id, pair_payload interface{} 
}


type server struct {
	connection *lspnet.UDPConn // ?

	// [EpochLimit: 5, EpochMillis: 2000, WindowSize: 1, MaxBackOffInterval: 0,MaxUnackedMessages: 1]
	EpochLimit int
	EpochMillis int
	WindowSize int
	MaxBackOffInterval int
	MaxUnackedMessages int

    read_request chan string
    read_response_id chan int
    read_response_msg chan string // need to turn back to [] byte in Read()

	client_cur_index int // current client_id number (e.g. current there are 5 clients already, then the number here can be 6, which means the next client will have an id value 6)
	m map[int]client_2 // map conn_id to client struct!! // keep track of client's IP address+port with its client_id (so if resend connection message, we do not generate a new client_id??)
	
	queue *list.List // use list so the size will not be fixed
}


type client_2 struct {
	client_id int
	server_sequence_number int // server's copy of sequence number for a specific client
	client_sequence_number int // server's expecting client's sequence number?
	ack_status int // which message it has acked?

    // client's addr
    client_addr *lspnet.UDPAddr
    // map (unordered data)
    unordered_queue map[int]string // type?

	// store un-ordered data message (priority queue, may use a heap) (for seq num that are too large, so we can not yet put into the queue)
	// priority_queue heap.Heap ??
	// probably also the sequence of message to send back to the client???
}


// NewServer creates, initiates, and returns a new server. This function should
// NOT block. Instead, it should spawn one or more goroutines (to handle things
// like accepting incoming client connections, triggering epoch events at
// fixed intervals, synchronizing events using a for-select loop like you saw in
// project 0, etc.) and immediately return. It should return a non-nil error if
// there was an error resolving or listening on the specified port number.
func NewServer(port int, params *Params) (Server, error) {
	// create a new server and initialize
	var LSPserver server

	// fmt.Println("port = ", port)
	// fmt.Println("params = ", params)
	port_string := fmt.Sprintf("%d", port)
    fmt.Println("port_string = ", port_string)


	addr_string := "127.0.0.1: " + port_string

    // addr_string := JoinHostPort("127.0.0.1", port_string)

	a, error := lspnet.ResolveUDPAddr("udp", addr_string)
	if error != nil {
        fmt.Println(error)
        return &LSPserver, error
    }

	connection, error := lspnet.ListenUDP("udp", a)
	if error != nil {
        fmt.Println(error)
        return &LSPserver, error
    }

    LSPserver.connection = connection
    LSPserver.EpochLimit = params.EpochLimit
    LSPserver.EpochMillis = params.EpochMillis
    LSPserver.WindowSize = params.WindowSize
    LSPserver.MaxBackOffInterval = params.MaxBackOffInterval
    LSPserver.MaxUnackedMessages = params.MaxUnackedMessages

    LSPserver.m = make(map[int]client_2) // !!!

    LSPserver.queue = list.New() // ??????

    LSPserver.read_request = make(chan string)
    LSPserver.read_response_id = make(chan int)
    LSPserver.read_response_msg = make(chan string)
    

    fmt.Println("Connection sets")

	// start main goroutine
	// go LSPserver.main_routine()

	// start read goroutine
	go LSPserver.read_routine() // ??????? always reading

	return &LSPserver, nil
}




/*

This is where we turn UDP into LSP

We continue to read message, do some processing, and store it in the queue.
For the 3 different cases, we need to do different processing and send it to the main routine

we need to know the latest sequence number that is put into the queue, and only the number that is continuously larger than the current sequence number can be added to the queue

we do not need to call read_routine again and again, because it runs when server starts and never ends
Just continue to send sth into the queue

We just need tone read_routine
*/
func (s *server) read_routine(){
	fmt.Println("in read_routine")
    s.client_cur_index += 1

	for {
        select {
        case <- s.read_request:
            fmt.Println("server: !!!!!!!!!!!!!!!!!!!")
            fmt.Println("server: size = ", s.queue.Len())
            if s.queue.Len() > 0 {
                fmt.Println("server: Len > 0!!!!!!")

                elem := s.queue.Front().Value.(*pair)
                temp := (elem.pair_payload).(string)

                s.read_response_id <- elem.pair_id.(int)
                s.read_response_msg <- temp // this is string

                fmt.Println("channel sent in case read_request")
            }

        default:
            // fmt.Println("in read_routine!!!!!!")
            message_temp := make([]byte, 1000)
            // message_read, _, _ := s.connection.Read(message_temp[:]) // ???
            n, addr_, _ := s.connection.ReadFromUDP(message_temp) // ???
            // fmt.Println("message_temp = ", string(message_temp))
            
            var message Message // ??????

            error := json.Unmarshal(message_temp[:n], &message) // need to read limited length
            if error != nil{
                fmt.Println("error in Unmarshal")
                return 
            }

            // fmt.Println("message_read = ", message_read)
            fmt.Println("server: message after unmarshal = ", message) // ?

            if message.Type == 0{ // or 0? connect case (do not need to forward to main routine?)  lspnet.MsgConnect?
                fmt.Println("server: Type Connect")

                var new_client client_2 
                new_client.client_id = s.client_cur_index
                new_client.server_sequence_number = message.SeqNum
                new_client.client_sequence_number = message.SeqNum

                new_client.client_addr = addr_ // added

                // also need to initialize a priority queue? (no for now, and we can just use a sliding window to achieve this)

                s.m[s.client_cur_index] = new_client
                s.client_cur_index += 1

                // m["uid"] = make(map[string]T)
                // s.m[s.client_cur_index].unordered_queue = make(map[int]string) // added, initialize

                if entry, ok := s.m[s.client_cur_index]; ok { // added
                   entry.unordered_queue = make(map[int]string)
                
                   s.m[s.client_cur_index] = entry
                }

                // send ack message to client (with client's connect_id)
                ack_message := NewAck(new_client.client_id, message.SeqNum)
                message_bytes, error := json.Marshal(ack_message)
                if error != nil{
                    return 
                }

                // fmt.Println("server new connection before write to udp")

                s.connection.WriteToUDP(message_bytes, addr_) // 需要看會不會block (如果會block就用write_routine)

                // fmt.Println("server new connection after write to udp")
                
            }

            if message.Type == 1{ // data (按照順序傳進list/queue)
                fmt.Println("server Type Data")

                this_seq_num := message.SeqNum
                this_client_id := message.ConnID

                // check checksum and size first
                this_size := message.Size
                this_payload := message.Payload
                // fmt.Println("message.Size = ", message.Size)
                // fmt.Println("len of payload = ", len(message.Payload))

                if len(this_payload) != this_size{
                    fmt.Println("the size is incorrect")
                    return 
                }

                // checksum
                this_checksum := CalculateChecksum(message.ConnID, message.SeqNum, message.Size, message.Payload)
                // fmt.Println("calculated checksum = ", this_checksum)
                // fmt.Println("message.Checksum = ", message.Checksum)
                if this_checksum != message.Checksum{
                    fmt.Println("the checksum is incorrect")
                    return 
                }

                if entry, ok := s.m[this_client_id]; ok {
                    entry.unordered_queue = make(map[int]string)
                    s.m[this_client_id] = entry // ???
                }

                // s.m[this_client_id].unordered_queue = make(map[int]string)

                if entry2, ok2 := s.m[this_client_id]; ok2 {
                    entry2.unordered_queue[this_seq_num] = string(this_payload)
                    s.m[this_client_id] = entry2 // ???
                }

                // s.m[this_client_id].unordered_queue[this_seq_num] = string(this_payload)


                // send ack message back to client
                ack_message := NewAck(this_client_id, this_seq_num)
                message_bytes, error := json.Marshal(ack_message)
                if error != nil{
                    fmt.Println("error")
                    return 
                }
                // s.Write(this_client_id, message_bytes)
                s.connection.WriteToUDP(message_bytes, s.m[this_client_id].client_addr) // modified

                for {
                    if val, ok := s.m[this_client_id].unordered_queue[this_seq_num]; ok {
                        out_sequence := this_client_id
                        out_data := val

                        // if time out -> delete map?

                        if entry, ok_2 := s.m[this_client_id]; ok_2 { // because map is not addressable
                           entry.client_sequence_number += 1
                           s.m[this_client_id] = entry
                        }

                        s.queue.PushBack(&pair{out_sequence, out_data})

                    } else { // means that the sequence number we have currently has not yet arrived (so nothing we can do here)
                        break
                    }
                }

            }

            if message.Type == 2{ // ack (do NOT need to do anything for checkpoint?)
                fmt.Println("server: Type ACK")

                this_client_id := message.ConnID
                // fmt.Println("Receive ACK message from client")
                // fmt.Println("this_client_id = ", this_client_id)
                // s.m[this_client_id].server_sequence_number += 1

                if entry, ok := s.m[this_client_id]; ok {
                    entry.server_sequence_number += 1
                    s.m[this_client_id] = entry
                }

                // server's sequence number += 1 ?
                // move sliding window?
            }

            if message.Type == 3{ // cAck
                // fmt.Println("Type cACK")

                this_client_id := message.ConnID
                // fmt.Println("this_client_id = ", this_client_id)
                // fmt.Println("Receive cACK message from client")
                // s.m[this_client_id].server_sequence_number += 1

                if entry, ok := s.m[this_client_id]; ok {
                    entry.server_sequence_number += 1
                    s.m[this_client_id] = entry
                }

                // s.m[this_client_id].client_sequence_number
                // server's sequence number += 1 ?
                // move sliding window?
            }

        

        }
        
        
    }

}

// probably do not need write_routine, because we can just write it back in the main routine?


/*
LSP's read
will have a received message, but we need to know which client is from, so we need to return client_id

Just taking things out of server's queue 

從queue拿出來,回傳給他

main_routine -> 傳到read_routine,用channel溝通,不要直接在這邊改動s.queue (避免race condition)
*/
func (s *server) Read() (int, []byte, error) {

	fmt.Println("server: Read!!!")

	// take out from the server's queue

    s.read_request <- "read_request"
    fmt.Println("server: request sent to read_routine")

    conn_id := <- s.read_response_id
    fmt.Println("server: receive connd_id")

    msg_string := <- s.read_response_msg
    fmt.Println("server: receive msg")

    fmt.Println("server: conn_id = ", conn_id)
    fmt.Println("server: msg sent = ", msg_string)

    return conn_id, []byte(msg_string), nil


    // for {
    //     if s.queue.Len() > 0 {
    //         message := s.queue.Front() // queue or processed_queue??? return *element

    //         elem := s.queue.Front().Value

    //         if foo, ok := elem.(*pair); ok {
    //                cur_id, _ := foo.pair_id.(int)
    //                cur_payload, _ := foo.pair_payload.(string)

    //                s.queue.Remove(message)
    //                fmt.Println("queue finish")

    //                return cur_id, []byte(cur_payload), nil
    //         }
    //     }
    // }
	
   
}

func (s *server) Write(connId int, payload []byte) error {
	fmt.Println("Write starts!!!")

	// WriteToUDP ??

    // 用connID找到這個connection的address,然後再傳出去 (建立連線時,要用map記錄對方的addr)
    // seqNum + 1

    addr := s.m[connId].client_addr // added
    // s.connection.WriteTo(payload, addr) // added

    _, error := s.connection.WriteToUDP(payload, addr)
    if error != nil {
        return error
    }

    fmt.Println("Write finishes!!!")

	return nil
}

func (s *server) CloseConn(connId int) error {
	fmt.Println("CloseConn!!!")

	// delete the client from server's map

	return errors.New("not yet implemented")
}

func (s *server) Close() error {
	fmt.Println("Close!!!")

	// stop the server and clean all resources

	return errors.New("not yet implemented")
}





Editor is loading...