#include <omnetpp.h>
using namespace omnetpp; //for Omnet++ ver. 5
class Server : public cSimpleModule
{
private:
cQueue queue; //the queue of jobs; it is assumed that the first job in the queue is the one being serviced
cMessage *departure; //special message; it reminds about the end of service and the need for job departure
simtime_t departure_time; //time of the next departure
simtime_t T;
simtime_t overflowStart;
simtime_t overflowEnd;
int maxQueueSize;
int lostMessages;
int allMessages;
int consecutiveLosses;
cOutVector process;
cDoubleHistogram *overflowHist;
cLongHistogram *consecutiveLossesHist;
bool isOverflowing;
protected:
virtual void initialize();
virtual void handleMessage(cMessage *msgin);
};
Define_Module(Server);
void Server::initialize()
{
departure = new cMessage("Departure");
overflowHist = new cDoubleHistogram("Buffer overflow period");
overflowHist->setRange(0, 4);
overflowHist->setNumCells(300);
consecutiveLossesHist = new cLongHistogram("Consecutive losses");
maxQueueSize = par("N");
consecutiveLosses = 0;
T = 1;
WATCH(lostMessages);
WATCH(allMessages);
overflowStart = simTime();
overflowEnd = simTime();
isOverflowing = false;
}
void Server::handleMessage(cMessage *msgin) //two types of messages may arrive: a job from the source, or the special message initiating job departure
{
if (msgin==departure) //job departure
{
cMessage *msg = (cMessage *)queue.pop(); //remove the finished job from the head of the queue
if(queue.getLength() + 1 == maxQueueSize && isOverflowing)
{
overflowHist->collect(simTime() - overflowStart);
isOverflowing = false;
consecutiveLossesHist->collect(consecutiveLosses);
consecutiveLosses = 0;
}
process.record(queue.getLength());
send(msg,"out"); //depart the finished job
if (!queue.isEmpty()) //if the queue is not empty, initiate the next service, i.e. schedule the next departure event in the future
{
departure_time=simTime()+par("service_time");
scheduleAt(departure_time,departure);
}
}
else //job arrival
{
if(queue.getLength() < maxQueueSize)
{
allMessages++;
if (queue.isEmpty()) //if the queue is empty, the job that has just arrived has to be served immediately, i.e. the departure event of this job has to be scheduled in the future
{
departure_time=simTime()+par("service_time");
scheduleAt(departure_time,departure);
}
queue.insert(msgin); //insert the job at the end of the queue
if(queue.getLength() >= maxQueueSize)
{
isOverflowing = true;
overflowStart = simTime();
consecutiveLosses++;
}
}
else
{
lostMessages++;
consecutiveLosses++;
}
}
EV << "L = " << (double)lostMessages / (double)allMessages << std::endl;
EV << "The average number of consecutive losses = " << consecutiveLossesHist->getMean() << std::endl;
}