Untitled

mail@pastecode.io avatar
unknown
plain_text
3 years ago
3.2 kB
1
Indexable
Never
#include <omnetpp.h>
using namespace omnetpp; //for Omnet++ ver. 5

class Server : public cSimpleModule
{
  private:
    cQueue queue;	            //the queue of jobs; it is assumed that the first job in the queue is the one being serviced
	cMessage *departure;        //special message; it reminds about the end of service and the need for job departure
	simtime_t departure_time;   //time of the next departure
	simtime_t T;
	simtime_t overflowStart;
	simtime_t overflowEnd;
	int maxQueueSize;
	int lostMessages;
	int allMessages;
	int consecutiveLosses;

	cOutVector process;
	cDoubleHistogram *overflowHist;
	cLongHistogram *consecutiveLossesHist;
	bool isOverflowing;

  protected:
    virtual void initialize();
    virtual void handleMessage(cMessage *msgin);

};

Define_Module(Server);


void Server::initialize()
{
	departure = new cMessage("Departure");

	overflowHist = new cDoubleHistogram("Buffer overflow period");
	overflowHist->setRange(0, 4);
	overflowHist->setNumCells(300);

	consecutiveLossesHist = new cLongHistogram("Consecutive losses");

	maxQueueSize = par("N");
	consecutiveLosses = 0;
	T = 1;

	WATCH(lostMessages);
	WATCH(allMessages);

	overflowStart = simTime();
	overflowEnd = simTime();

	isOverflowing = false;
}


void Server::handleMessage(cMessage *msgin)  //two types of messages may arrive: a job from the source, or the special message initiating job departure
{
    if (msgin==departure)   //job departure
	{
        cMessage *msg = (cMessage *)queue.pop();    //remove the finished job from the head of the queue
        if(queue.getLength() + 1 == maxQueueSize && isOverflowing)
        {
            overflowHist->collect(simTime() - overflowStart);
            isOverflowing = false;
            consecutiveLossesHist->collect(consecutiveLosses);
            consecutiveLosses = 0;
        }
		process.record(queue.getLength());
		send(msg,"out");                            //depart the finished job
		if (!queue.isEmpty())                         //if the queue is not empty, initiate the next service, i.e. schedule the next departure event in the future
		{
			departure_time=simTime()+par("service_time");
	        scheduleAt(departure_time,departure);
		}
	}
	else                    //job arrival
	{
	    if(queue.getLength() < maxQueueSize)
	    {
	        allMessages++;
	        if (queue.isEmpty())  //if the queue is empty, the job that has just arrived has to be served immediately, i.e. the departure event of this job has to be scheduled in the future
	        {
	            departure_time=simTime()+par("service_time");
	            scheduleAt(departure_time,departure);
	        }
	        queue.insert(msgin); //insert the job at the end of the queue
	        if(queue.getLength() >= maxQueueSize)
	        {
	            isOverflowing = true;
	            overflowStart = simTime();
	            consecutiveLosses++;
	        }
	    }
	    else
	    {
	        lostMessages++;
	        consecutiveLosses++;
	    }

	}
    EV << "L = " << (double)lostMessages / (double)allMessages << std::endl;
    EV << "The average number of consecutive losses = " << consecutiveLossesHist->getMean() << std::endl;
}