INET Framework for OMNeT++/OMNEST
inet::SctpPeer Class Reference

Implements the SctpPeer simple module. More...

#include <SctpPeer.h>

Inheritance diagram for inet::SctpPeer:
inet::SctpSocket::ICallback inet::LifecycleUnsupported inet::ILifecycle

Classes

struct  PathStatus
 

Public Member Functions

 SctpPeer ()
 
 ~SctpPeer ()
 
- Public Member Functions inherited from inet::SctpSocket::ICallback
virtual ~ICallback ()
 
virtual void socketOptionsArrived (SctpSocket *socket, Indication *indication)
 
virtual void socketDeleted (SctpSocket *socket)
 
virtual void sendqueueAbatedArrived (SctpSocket *socket, unsigned long int buffer)
 
virtual void addressAddedArrived (SctpSocket *socket, L3Address localAddr, L3Address remoteAddr)
 
- Public Member Functions inherited from inet::LifecycleUnsupported
virtual bool handleOperationStage (LifecycleOperation *operation, IDoneCallback *doneCallback) override
 Perform one stage of a lifecycle operation. More...
 
- Public Member Functions inherited from inet::ILifecycle
virtual ~ILifecycle ()
 

Protected Types

typedef std::map< int, long > RcvdPacketsPerAssoc
 
typedef std::map< int, long > SentPacketsPerAssoc
 
typedef std::map< int, long > RcvdBytesPerAssoc
 
typedef std::map< int, cOutVector * > BytesPerAssoc
 
typedef std::map< int, cHistogram * > HistEndToEndDelay
 
typedef std::map< int, cOutVector * > EndToEndDelay
 
typedef std::map< L3Address, PathStatusSctpPathStatus
 

Protected Member Functions

virtual void initialize (int stage) override
 
virtual int numInitStages () const override
 
virtual void handleMessage (cMessage *msg) override
 
virtual void finish () override
 
void handleTimer (cMessage *msg)
 
void connect ()
 
virtual void socketAvailable (SctpSocket *socket, Indication *indication) override
 
void socketEstablished (SctpSocket *socket, unsigned long int buffer) override
 
void socketDataArrived (SctpSocket *socket, Packet *msg, bool urgent) override
 
void socketDataNotificationArrived (SctpSocket *socket, Message *msg) override
 
void socketPeerClosed (SctpSocket *socket) override
 
void socketClosed (SctpSocket *socket) override
 
void socketFailure (SctpSocket *socket, int code) override
 
void socketStatusArrived (SctpSocket *socket, SctpStatusReq *status) override
 
void sendRequest (bool last=true)
 
void sendOrSchedule (cMessage *msg)
 
void generateAndSend ()
 
void sendRequestArrived (SctpSocket *socket) override
 
void sendQueueRequest ()
 
void shutdownReceivedArrived (SctpSocket *socket) override
 
void sendqueueFullArrived (SctpSocket *socket) override
 
void msgAbandonedArrived (SctpSocket *socket) override
 
void setStatusString (const char *s)
 

Protected Attributes

double delay
 
bool echo
 
bool ordered
 
bool schedule
 
int queueSize
 
int outboundStreams
 
int inboundStreams
 
SctpPathStatus sctpPathStatus
 
SctpSocket clientSocket
 
SctpSocket listeningSocket
 
cMessage * timeoutMsg
 
cMessage * timeMsg
 
cMessage * connectTimer
 
bool shutdownReceived
 
bool sendAllowed
 
int serverAssocId
 
int numRequestsToSend
 
int lastStream
 
int numPacketsToReceive
 
RcvdPacketsPerAssoc rcvdPacketsPerAssoc
 
SentPacketsPerAssoc sentPacketsPerAssoc
 
RcvdBytesPerAssoc rcvdBytesPerAssoc
 
BytesPerAssoc bytesPerAssoc
 
HistEndToEndDelay histEndToEndDelay
 
EndToEndDelay endToEndDelay
 
long bytesSent
 
int echoedBytesSent
 
int packetsSent
 
int bytesRcvd
 
int packetsRcvd
 
int notificationsReceived
 
int numSessions
 
int chunksAbandoned
 

Static Protected Attributes

static simsignal_t echoedPkSignal = registerSignal("echoedPk")
 

Detailed Description

Implements the SctpPeer simple module.

See the NED file for more info.

Member Typedef Documentation

◆ BytesPerAssoc

typedef std::map<int, cOutVector *> inet::SctpPeer::BytesPerAssoc
protected

◆ EndToEndDelay

typedef std::map<int, cOutVector *> inet::SctpPeer::EndToEndDelay
protected

◆ HistEndToEndDelay

typedef std::map<int, cHistogram *> inet::SctpPeer::HistEndToEndDelay
protected

◆ RcvdBytesPerAssoc

typedef std::map<int, long> inet::SctpPeer::RcvdBytesPerAssoc
protected

◆ RcvdPacketsPerAssoc

typedef std::map<int, long> inet::SctpPeer::RcvdPacketsPerAssoc
protected

◆ SctpPathStatus

typedef std::map<L3Address, PathStatus> inet::SctpPeer::SctpPathStatus
protected

◆ SentPacketsPerAssoc

typedef std::map<int, long> inet::SctpPeer::SentPacketsPerAssoc
protected

Constructor & Destructor Documentation

◆ SctpPeer()

inet::SctpPeer::SctpPeer ( )
34 {
35  timeoutMsg = nullptr;
36  timeMsg = nullptr;
37  connectTimer = nullptr;
38  delay = 0;
39  echo = false;
40  ordered = true;
41  schedule = false;
42  queueSize = 0;
43  outboundStreams = 1;
44  inboundStreams = 17;
45  shutdownReceived = false;
46  sendAllowed = true;
47  serverAssocId = 0;
49  lastStream = 0;
51  bytesSent = 0;
52  echoedBytesSent = 0;
53  packetsSent = 0;
54  bytesRcvd = 0;
55  packetsRcvd = 0;
57  numSessions = 0;
58  chunksAbandoned = 0;
59 }

◆ ~SctpPeer()

inet::SctpPeer::~SctpPeer ( )
62 {
63  cancelAndDelete(timeMsg);
64  cancelAndDelete(timeoutMsg);
65  cancelAndDelete(connectTimer);
66  for (auto& elem : bytesPerAssoc)
67  delete elem.second;
68  bytesPerAssoc.clear();
69 
70  for (auto& elem : endToEndDelay)
71  delete elem.second;
72  endToEndDelay.clear();
73 
74  for (auto& elem : histEndToEndDelay)
75  delete elem.second;
76  histEndToEndDelay.clear();
77 
78  rcvdPacketsPerAssoc.clear();
79  sentPacketsPerAssoc.clear();
80  rcvdBytesPerAssoc.clear();
81 }

Member Function Documentation

◆ connect()

void inet::SctpPeer::connect ( )
protected
174 {
175  const char *connectAddress = par("connectAddress");
176  int connectPort = par("connectPort");
177  int outStreams = par("outboundStreams");
178  clientSocket.setOutboundStreams(outStreams);
179 
180  EV_INFO << "issuing OPEN command\n";
181  EV_INFO << "Assoc " << clientSocket.getSocketId() << "::connect to address " << connectAddress << ", port " << connectPort << "\n";
182  numSessions++;
183  bool streamReset = par("streamReset");
184  L3Address destination;
185  L3AddressResolver().tryResolve(connectAddress, destination);
186  if (destination.isUnspecified())
187  EV << "cannot resolve destination address: " << connectAddress << endl;
188  else {
189  clientSocket.connect(destination, connectPort, streamReset, par("prMethod"), par("numRequestsPerSession"));
190  }
191 
192  if (streamReset) {
193  cMessage *cmsg = new cMessage("StreamReset", MSGKIND_RESET);
194  EV_INFO << "StreamReset Timer scheduled at " << simTime() << "\n";
195  scheduleAfter(par("streamRequestTime"), cmsg);
196  }
197 
198  unsigned int streamNum = 0;
199  cStringTokenizer tokenizer(par("streamPriorities"));
200  while (tokenizer.hasMoreTokens()) {
201  const char *token = tokenizer.nextToken();
202  clientSocket.setStreamPriority(streamNum, (unsigned int)atoi(token));
203 
204  streamNum++;
205  }
206 }

Referenced by handleTimer().

◆ finish()

void inet::SctpPeer::finish ( )
overrideprotectedvirtual
703 {
704  EV_INFO << getFullPath() << ": opened " << numSessions << " sessions\n";
705  EV_INFO << getFullPath() << ": sent " << bytesSent << " bytes in " << packetsSent << " packets\n";
706 
707  for (auto& elem : rcvdBytesPerAssoc)
708  EV_DETAIL << getFullPath() << ": received " << elem.second << " bytes in assoc " << elem.first << "\n";
709 
710  EV_INFO << getFullPath() << "Over all " << packetsRcvd << " packets received\n ";
711  EV_INFO << getFullPath() << "Over all " << notificationsReceived << " notifications received\n ";
712 }

◆ generateAndSend()

void inet::SctpPeer::generateAndSend ( )
protected
147 {
148  auto applicationPacket = new Packet("ApplicationPacket");
149  auto applicationData = makeShared<BytesChunk>();
150  int numBytes = par("requestLength");
151  std::vector<uint8_t> vec;
152  vec.resize(numBytes);
153  for (int i = 0; i < numBytes; i++)
154  vec[i] = (bytesSent + i) & 0xFF;
155  applicationData->setBytes(vec);
156  applicationData->addTag<CreationTimeTag>()->setCreationTime(simTime());
157  applicationPacket->insertAtBack(applicationData);
158  auto sctpSendReq = applicationPacket->addTag<SctpSendReq>();
159  sctpSendReq->setLast(true);
160  sctpSendReq->setPrMethod(par("prMethod"));
161  sctpSendReq->setPrValue(par("prValue"));
163  sctpSendReq->setSid(lastStream);
164  sctpSendReq->setSocketId(serverAssocId);
165  applicationPacket->setKind(ordered ? SCTP_C_SEND_ORDERED : SCTP_C_SEND_UNORDERED);
166  applicationPacket->addTag<SocketReq>()->setSocketId(serverAssocId);
167  applicationPacket->addTag<DispatchProtocolReq>()->setProtocol(&Protocol::sctp);
168  bytesSent += numBytes;
169  packetsSent++;
170  sendOrSchedule(applicationPacket);
171 }

Referenced by handleMessage(), and handleTimer().

◆ handleMessage()

void inet::SctpPeer::handleMessage ( cMessage *  msg)
overrideprotectedvirtual
209 {
210  int id = -1;
211 
212  if (msg->isSelfMessage())
213  handleTimer(msg);
214 
215  switch (msg->getKind()) {
216  case SCTP_I_PEER_CLOSED:
217  case SCTP_I_ABORT: {
218  Message *message = check_and_cast<Message *>(msg);
219  auto& intags = message->getTags();
220  const auto& ind = intags.findTag<SctpCommandReq>();
221  Request *cmsg = new Request("SCTP_C_ABORT", SCTP_C_ABORT);
222  auto& cmd = cmsg->addTag<SctpSendReq>();
223  id = ind->getSocketId();
224  cmd->setSocketId(id);
225  cmd->setSid(ind->getSid());
226  cmd->setNumMsgs(ind->getNumMsgs());
227  delete msg;
228  sendOrSchedule(cmsg);
229  break;
230  }
231 
232  case SCTP_I_ESTABLISHED: {
235  else {
236  Message *message = check_and_cast<Message *>(msg);
237  auto& tags = message->getTags();
238  const auto& connectInfo = tags.findTag<SctpConnectReq>();
239  numSessions++;
240  serverAssocId = connectInfo->getSocketId();
241  id = serverAssocId;
242  outboundStreams = connectInfo->getOutboundStreams();
243  rcvdPacketsPerAssoc[serverAssocId] = par("numPacketsToReceivePerClient");
244  sentPacketsPerAssoc[serverAssocId] = par("numPacketsToSendPerClient");
245  char text[50];
246  sprintf(text, "App: Received Bytes of assoc %d", serverAssocId);
247  bytesPerAssoc[serverAssocId] = new cOutVector(text);
249  sprintf(text, "App: EndToEndDelay of assoc %d", serverAssocId);
250  endToEndDelay[serverAssocId] = new cOutVector(text);
251  sprintf(text, "Hist: EndToEndDelay of assoc %d", serverAssocId);
252  histEndToEndDelay[serverAssocId] = new cHistogram(text);
253 
254 // delete connectInfo;
255  delete msg;
256 
257  if (par("numPacketsToSendPerClient").intValue() > 0) {
258  auto i = sentPacketsPerAssoc.find(serverAssocId);
259  numRequestsToSend = i->second;
260  if (par("thinkTime").doubleValue() > 0) {
261  generateAndSend();
262  timeoutMsg->setKind(SCTP_C_SEND);
263  scheduleAfter(par("thinkTime"), timeoutMsg);
265  i->second = numRequestsToSend;
266  }
267  else {
268  if (queueSize == 0) {
269  while (numRequestsToSend > 0) {
270  generateAndSend();
272  i->second = numRequestsToSend;
273  }
274  }
275  else if (queueSize > 0) {
276  int count = 0;
277  while (numRequestsToSend > 0 && count++ < queueSize * 2) {
278  generateAndSend();
280  i->second = numRequestsToSend;
281  }
282 
283  Request *cmsg = new Request("SCTP_C_QUEUE_MSGS_LIMIT", SCTP_C_QUEUE_MSGS_LIMIT);
284  auto& qinfo = cmsg->addTag<SctpInfoReq>();
285  qinfo->setText(queueSize);
286  qinfo->setSocketId(id);
287  sendOrSchedule(cmsg);
288  }
289 
290  EV_INFO << "!!!!!!!!!!!!!!!All data sent from Server !!!!!!!!!!\n";
291 
292  auto j = rcvdPacketsPerAssoc.find(serverAssocId);
293  if (j->second == 0 && par("waitToClose").doubleValue() > 0) {
294  char as[5];
295  sprintf(as, "%d", serverAssocId);
296  cMessage *abortMsg = new cMessage(as, SCTP_I_ABORT);
297  scheduleAfter(par("waitToClose"), abortMsg);
298  }
299  else {
300  EV_INFO << "no more packets to send, call shutdown for assoc " << serverAssocId << "\n";
301  Request *cmsg = new Request("ShutdownRequest", SCTP_C_SHUTDOWN);
302  auto& cmd = cmsg->addTag<SctpCommandReq>();
303  cmd->setSocketId(serverAssocId);
304  sendOrSchedule(cmsg);
305  }
306  }
307  }
308  }
309  break;
310  }
311 
314  Message *message = check_and_cast<Message *>(msg);
315  auto& intags = message->getTags();
316  const auto& ind = intags.findTag<SctpCommandReq>();
317  id = ind->getSocketId();
318  Request *cmsg = new Request("ReceiveRequest", SCTP_C_RECEIVE);
319  auto cmd = cmsg->addTag<SctpSendReq>();
320  cmsg->addTag<SocketReq>()->setSocketId(id);
321  cmsg->addTag<DispatchProtocolReq>()->setProtocol(&Protocol::sctp);
322  cmd->setSocketId(id);
323  cmd->setSid(ind->getSid());
324  cmd->setNumMsgs(ind->getNumMsgs());
325  delete msg;
326  if (!cmsg->isScheduled() && schedule == false) {
327  scheduleAfter(par("delayFirstRead"), cmsg);
328  }
329  else if (schedule == true)
330  sendOrSchedule(cmsg);
331  break;
332  }
333 
334  case SCTP_I_DATA: {
335  Packet *message = check_and_cast<Packet *>(msg);
336  auto& tags = message->getTags();
337  const auto& ind = tags.findTag<SctpRcvReq>();
338  id = ind->getSocketId();
339  auto j = rcvdBytesPerAssoc.find(id);
342  else if (j != rcvdBytesPerAssoc.end()) {
343  j->second += PK(msg)->getByteLength();
344  auto k = bytesPerAssoc.find(id);
345  k->second->record(j->second);
346  packetsRcvd++;
347 
348  if (!echo) {
349  if (par("numPacketsToReceivePerClient").intValue() > 0) {
350  auto i = rcvdPacketsPerAssoc.find(id);
351  i->second--;
352  SctpSimpleMessage *smsg = check_and_cast<SctpSimpleMessage *>(msg);
353  auto j = endToEndDelay.find(id);
354  j->second->record(simTime() - smsg->getCreationTime());
355  auto k = histEndToEndDelay.find(id);
356  k->second->collect(simTime() - smsg->getCreationTime());
357 
358  if (i->second == 0) {
359  Request *cmsg = new Request("SCTP_C_NO_OUTSTANDING", SCTP_C_NO_OUTSTANDING);
360  auto& qinfo = cmsg->addTag<SctpCommandReq>();
361  qinfo->setSocketId(id);
362  sendOrSchedule(cmsg);
363  }
364  }
365  delete msg;
366  }
367  else {
368  auto m = endToEndDelay.find(id);
369  auto k = histEndToEndDelay.find(id);
370  const auto& smsg = message->peekData();
371 
372  for (auto& region : smsg->getAllTags<CreationTimeTag>()) {
373  m->second->record(simTime() - region.getTag()->getCreationTime());
374  k->second->collect(simTime() - region.getTag()->getCreationTime());
375  }
376 
377  auto cmsg = new Packet("ApplicationPacket");
378  cmsg->insertAtBack(smsg);
379  auto cmd = cmsg->addTag<SctpSendReq>();
381  cmd->setLast(true);
382  cmd->setSocketId(id);
383  cmd->setPrValue(0);
384  cmd->setSid(lastStream);
385  cmsg->setKind(cmd->getSendUnordered() ? SCTP_C_SEND_UNORDERED : SCTP_C_SEND_ORDERED);
386  bytesSent += B(smsg->getChunkLength()).get();
387  packetsSent++;
388  sendOrSchedule(cmsg);
389  }
390  }
391  else {
392  delete msg;
393  }
394  break;
395  }
396 
398  Message *message = check_and_cast<Message *>(msg);
399  id = message->getTag<SocketInd>()->getSocketId();
400  EV_INFO << "server: SCTP_I_SHUTDOWN_RECEIVED for assoc " << id << "\n";
401  auto i = rcvdPacketsPerAssoc.find(id);
402 
405  else if (i != rcvdPacketsPerAssoc.end()) {
406  if (i->second == 0) {
407  Request *cmsg = new Request("SCTP_C_NO_OUTSTANDING", SCTP_C_NO_OUTSTANDING);
408  auto& qinfo = cmsg->addTag<SctpCommandReq>();
409  qinfo->setSocketId(id);
410  sendOrSchedule(cmsg);
411  }
412 
413  shutdownReceived = true;
414  }
415  delete msg;
416  break;
417  }
418 
421  EV_INFO << "Streams have been resetted\n";
422  break;
423  }
424 
425  case SCTP_I_CLOSED:
426  delete msg;
427  break;
428  }
429 
430  if (hasGUI()) {
431  char buf[32];
432  auto l = rcvdBytesPerAssoc.find(id);
433  sprintf(buf, "rcvd: %ld bytes\nsent: %ld bytes", l->second, bytesSent);
434  getDisplayString().setTagArg("t", 0, buf);
435  }
436 }

◆ handleTimer()

void inet::SctpPeer::handleTimer ( cMessage *  msg)
protected
439 {
440  EV_TRACE << "SctpPeer::handleTimer\n";
441 
442  switch (msg->getKind()) {
443  case MSGKIND_CONNECT:
444  EV_INFO << "starting session call connect\n";
445  connect();
446  break;
447 
448  case SCTP_C_SEND:
449  if (numRequestsToSend > 0) {
450  generateAndSend();
451  if (par("thinkTime").doubleValue() > 0)
452  scheduleAfter(par("thinkTime"), timeoutMsg);
454  }
455  break;
456 
457  case SCTP_I_ABORT: {
458  Request *cmsg = new Request("SCTP_C_CLOSE", SCTP_C_CLOSE);
459  auto& cmd = cmsg->addTag<SctpCommandReq>();
460  int id = atoi(msg->getName());
461  cmd->setSocketId(id);
462  sendOrSchedule(cmsg);
463  }
464  break;
465 
466  case SCTP_C_RECEIVE:
467  schedule = true;
468  sendOrSchedule(PK(msg));
469  break;
470 
471  default:
472  break;
473  }
474 }

Referenced by handleMessage().

◆ initialize()

void inet::SctpPeer::initialize ( int  stage)
overrideprotectedvirtual
84 {
85  cSimpleModule::initialize(stage);
86 
87  if (stage == INITSTAGE_LOCAL) {
88  WATCH(numSessions);
89  WATCH(packetsSent);
90  WATCH(packetsRcvd);
91  WATCH(bytesSent);
92  WATCH(numRequestsToSend);
93  }
94  else if (stage == INITSTAGE_APPLICATION_LAYER) {
95  // parameters
96  const char *addressesString = par("localAddress");
97  AddressVector addresses = L3AddressResolver().resolve(cStringTokenizer(addressesString).asVector());
98  int port = par("localPort");
99  echo = par("echo");
100  delay = par("echoDelay");
101  outboundStreams = par("outboundStreams");
102  inboundStreams = par("inboundStreams");
103  ordered = par("ordered");
104  queueSize = par("queueSize");
105  timeoutMsg = new cMessage("SrvAppTimer");
106  listeningSocket.setOutputGate(gate("socketOut"));
109 
110  if (addresses.size() == 0) {
111  listeningSocket.bind(port);
112  clientSocket.bind(port);
113  }
114  else {
115  listeningSocket.bindx(addresses, port);
116  clientSocket.bindx(addresses, port);
117  }
118  listeningSocket.listen(true, par("streamReset"), par("numPacketsToSendPerClient"));
119  EV_DEBUG << "SctpPeer::initialized listen port=" << port << "\n";
121  clientSocket.setOutputGate(gate("socketOut"));
122 
123  if (simtime_t(par("startTime")) > SIMTIME_ZERO) { // FIXME is invalid the startTime == 0 ????
124  connectTimer = new cMessage("ConnectTimer", MSGKIND_CONNECT);
125  scheduleAt(par("startTime"), connectTimer);
126  }
127 
128  cModule *node = findContainingNode(this);
129  NodeStatus *nodeStatus = node ? check_and_cast_nullable<NodeStatus *>(node->getSubmodule("status")) : nullptr;
130  bool isOperational = (!nodeStatus) || nodeStatus->getState() == NodeStatus::UP;
131  if (!isOperational)
132  throw cRuntimeError("This module doesn't support starting in node DOWN state");
133  }
134 }

◆ msgAbandonedArrived()

void inet::SctpPeer::msgAbandonedArrived ( SctpSocket socket)
overrideprotectedvirtual

Reimplemented from inet::SctpSocket::ICallback.

693 {
694  chunksAbandoned++;
695 }

◆ numInitStages()

virtual int inet::SctpPeer::numInitStages ( ) const
inlineoverrideprotectedvirtual
85 { return NUM_INIT_STAGES; }

◆ sendOrSchedule()

void inet::SctpPeer::sendOrSchedule ( cMessage *  msg)
protected
137 {
138  if (delay == 0) {
139  send(msg, "socketOut");
140  }
141  else {
142  scheduleAfter(delay, msg);
143  }
144 }

Referenced by generateAndSend(), handleMessage(), and handleTimer().

◆ sendqueueFullArrived()

void inet::SctpPeer::sendqueueFullArrived ( SctpSocket socket)
overrideprotectedvirtual

Reimplemented from inet::SctpSocket::ICallback.

698 {
699  sendAllowed = false;
700 }

◆ sendQueueRequest()

void inet::SctpPeer::sendQueueRequest ( )
protected
621 {
622  Request *cmsg = new Request("SCTP_C_QUEUE_MSGS_LIMIT", SCTP_C_QUEUE_MSGS_LIMIT);
623  auto& qinfo = cmsg->addTag<SctpInfoReq>();
624  qinfo->setText(queueSize);
625  qinfo->setSocketId(clientSocket.getSocketId());
627 }

Referenced by socketEstablished().

◆ sendRequest()

void inet::SctpPeer::sendRequest ( bool  last = true)
protected
539 {
540  EV_INFO << "sending request, " << numRequestsToSend - 1 << " more to go\n";
541  long numBytes = par("requestLength");
542 
543  if (numBytes < 1)
544  numBytes = 1;
545 
546  EV_INFO << "SctpClient: sending " << numBytes << " data bytes\n";
547 
548  auto cmsg = new Packet("ApplicationPacket");
549  auto msg = makeShared<BytesChunk>();
550  std::vector<uint8_t> vec;
551  vec.resize(numBytes);
552  for (int i = 0; i < numBytes; i++)
553  vec[i] = (bytesSent + i) & 0xFF;
554  msg->setBytes(vec);
555  msg->addTag<CreationTimeTag>()->setCreationTime(simTime());
556  cmsg->insertAtBack(msg);
558  auto sendCommand = cmsg->addTag<SctpSendReq>();
559  sendCommand->setLast(true);
560  // send SctpMessage with SctpSimpleMessage enclosed
561  clientSocket.send(cmsg);
562  bytesSent += numBytes;
563 }

Referenced by sendRequestArrived(), and socketEstablished().

◆ sendRequestArrived()

void inet::SctpPeer::sendRequestArrived ( SctpSocket socket)
overrideprotectedvirtual

Reimplemented from inet::SctpSocket::ICallback.

630 {
631  ASSERT(socket == &clientSocket);
632  int count = 0;
633 
634  EV_INFO << "sendRequestArrived numRequestsToSend=" << numRequestsToSend << "\n";
635 
636  while (numRequestsToSend > 0 && count++ < queueSize && sendAllowed) {
639  if (numRequestsToSend == 0) {
640  EV_INFO << "no more packets to send, call shutdown\n";
642  }
643  }
644 }

◆ setStatusString()

void inet::SctpPeer::setStatusString ( const char *  s)
protected
533 {
534  if (hasGUI())
535  getDisplayString().setTagArg("t", 0, s);
536 }

Referenced by socketClosed(), socketDataArrived(), socketEstablished(), socketFailure(), and socketPeerClosed().

◆ shutdownReceivedArrived()

void inet::SctpPeer::shutdownReceivedArrived ( SctpSocket socket)
overrideprotectedvirtual

Reimplemented from inet::SctpSocket::ICallback.

683 {
684  if (numRequestsToSend == 0) {
685  Message *cmsg = new Message("SCTP_C_NO_OUTSTANDING", SCTP_C_NO_OUTSTANDING);
686  auto& qinfo = cmsg->addTag<SctpCommandReq>();
687  qinfo->setSocketId(socket->getSocketId());
689  }
690 }

◆ socketAvailable()

virtual void inet::SctpPeer::socketAvailable ( SctpSocket socket,
Indication indication 
)
inlineoverrideprotectedvirtual

Implements inet::SctpSocket::ICallback.

91 { throw cRuntimeError("Model error, this module doesn't use any listener SCTP sockets"); }

◆ socketClosed()

void inet::SctpPeer::socketClosed ( SctpSocket socket)
overrideprotectedvirtual

Reimplemented from inet::SctpSocket::ICallback.

500 {
501  // *redefine* to start another session etc.
502  EV_INFO << "connection closed\n";
503  setStatusString("closed");
504 }

◆ socketDataArrived()

void inet::SctpPeer::socketDataArrived ( SctpSocket socket,
Packet msg,
bool  urgent 
)
overrideprotectedvirtual

Implements inet::SctpSocket::ICallback.

647 {
648  // *redefine* to perform or schedule next sending
649  packetsRcvd++;
650 
651  EV_INFO << "Client received packet Nr " << packetsRcvd << " from SCTP\n";
652 
653  auto& tags = msg->getTags();
654  const auto& ind = tags.findTag<SctpRcvReq>();
655 
656  emit(packetReceivedSignal, msg);
657  bytesRcvd += msg->getByteLength();
658 
659  if (echo) {
660  const auto& smsg = msg->peekData();
661  auto cmsg = new Packet("ApplicationPacket");
662  cmsg->setKind(ind->getSendUnordered() ? SCTP_C_SEND_UNORDERED : SCTP_C_SEND_ORDERED);
663  cmsg->insertAtBack(smsg);
664  auto cmd = cmsg->addTag<SctpSendReq>();
665  cmd->setLast(true);
666  cmd->setSocketId(ind->getSocketId());
667  cmd->setPrValue(0);
668  cmd->setSid(ind->getSid());
669  packetsSent++;
670  clientSocket.send(cmsg);
671  }
672 
673  if (par("numPacketsToReceive").intValue() > 0) {
675  if (numPacketsToReceive == 0) {
676  setStatusString("closing");
678  }
679  }
680 }

◆ socketDataNotificationArrived()

void inet::SctpPeer::socketDataNotificationArrived ( SctpSocket socket,
Message msg 
)
overrideprotectedvirtual

Implements inet::SctpSocket::ICallback.

477 {
478  Message *message = check_and_cast<Message *>(msg);
479  auto& intags = message->getTags();
480  const auto& ind = intags.findTag<SctpCommandReq>();
481  Request *cmesg = new Request("SCTP_C_RECEIVE", SCTP_C_RECEIVE);
482  auto& cmd = cmesg->addTag<SctpSendReq>();
483  cmd->setSocketId(ind->getSocketId());
484  cmd->setSid(ind->getSid());
485  cmd->setNumMsgs(ind->getNumMsgs());
487 }

◆ socketEstablished()

void inet::SctpPeer::socketEstablished ( SctpSocket socket,
unsigned long int  buffer 
)
overrideprotectedvirtual

Reimplemented from inet::SctpSocket::ICallback.

566 {
567  ASSERT(socket == &clientSocket);
568  int count = 0;
569  // *redefine* to perform or schedule first sending
570  EV_INFO << "SctpClient: connected\n";
571  setStatusString("connected");
572  // determine number of requests in this session
573  numRequestsToSend = par("numRequestsPerSession");
574  numPacketsToReceive = par("numPacketsToReceive");
575 
576  if (numRequestsToSend < 1)
577  numRequestsToSend = 0;
578 
579  // perform first request (next one will be sent when reply arrives)
580  if (numRequestsToSend > 0) {
581  if (par("thinkTime").doubleValue() > 0) {
582  if (sendAllowed) {
583  sendRequest();
585  }
586 
587  timeMsg->setKind(MSGKIND_SEND);
588  scheduleAfter(par("thinkTime"), timeMsg);
589  }
590  else {
591  if (queueSize > 0) {
592  while (numRequestsToSend > 0 && count++ < queueSize * 2 && sendAllowed) {
593  sendRequest(count == queueSize * 2);
595  }
596 
597  if (numRequestsToSend > 0 && sendAllowed)
599  }
600  else {
601  while (numRequestsToSend > 0 && sendAllowed) {
602  sendRequest();
604  }
605  }
606 
607  if (numPacketsToReceive == 0 && par("waitToClose").doubleValue() > 0) {
608  timeMsg->setKind(MSGKIND_ABORT);
609  scheduleAfter(par("waitToClose"), timeMsg);
610  }
611 
612  if (numRequestsToSend == 0 && par("waitToClose").doubleValue() == 0) {
613  EV_INFO << "socketEstablished:no more packets to send, call shutdown\n";
615  }
616  }
617  }
618 }

◆ socketFailure()

void inet::SctpPeer::socketFailure ( SctpSocket socket,
int  code 
)
overrideprotectedvirtual

Reimplemented from inet::SctpSocket::ICallback.

507 {
508  // subclasses may override this function, and add code try to reconnect after a delay.
509  EV_WARN << "connection broken\n";
510  setStatusString("broken");
511  // reconnect after a delay
512  timeMsg->setKind(MSGKIND_CONNECT);
513  scheduleAfter(par("reconnectInterval"), timeMsg);
514 }

◆ socketPeerClosed()

void inet::SctpPeer::socketPeerClosed ( SctpSocket socket)
overrideprotectedvirtual

Reimplemented from inet::SctpSocket::ICallback.

490 {
491  // close the connection (if not already closed)
493  EV_INFO << "remote SCTP closed, closing here as well\n";
494  setStatusString("closing");
496  }
497 }

◆ socketStatusArrived()

void inet::SctpPeer::socketStatusArrived ( SctpSocket socket,
SctpStatusReq status 
)
overrideprotectedvirtual

Reimplemented from inet::SctpSocket::ICallback.

517 {
518  struct PathStatus ps;
519  auto i = sctpPathStatus.find(status->getPathId());
520 
521  if (i != sctpPathStatus.end()) {
522  ps = i->second;
523  ps.active = status->getActive();
524  }
525  else {
526  ps.active = status->getActive();
527  ps.primaryPath = false;
528  sctpPathStatus[ps.pid] = ps;
529  }
530 }

Member Data Documentation

◆ bytesPerAssoc

BytesPerAssoc inet::SctpPeer::bytesPerAssoc
protected

Referenced by handleMessage(), and ~SctpPeer().

◆ bytesRcvd

int inet::SctpPeer::bytesRcvd
protected

Referenced by SctpPeer(), and socketDataArrived().

◆ bytesSent

long inet::SctpPeer::bytesSent
protected

◆ chunksAbandoned

int inet::SctpPeer::chunksAbandoned
protected

Referenced by msgAbandonedArrived(), and SctpPeer().

◆ clientSocket

◆ connectTimer

cMessage* inet::SctpPeer::connectTimer
protected

Referenced by initialize(), SctpPeer(), and ~SctpPeer().

◆ delay

double inet::SctpPeer::delay
protected

◆ echo

bool inet::SctpPeer::echo
protected

◆ echoedBytesSent

int inet::SctpPeer::echoedBytesSent
protected

Referenced by SctpPeer().

◆ echoedPkSignal

simsignal_t inet::SctpPeer::echoedPkSignal = registerSignal("echoedPk")
staticprotected

◆ endToEndDelay

EndToEndDelay inet::SctpPeer::endToEndDelay
protected

Referenced by handleMessage(), and ~SctpPeer().

◆ histEndToEndDelay

HistEndToEndDelay inet::SctpPeer::histEndToEndDelay
protected

Referenced by handleMessage(), and ~SctpPeer().

◆ inboundStreams

int inet::SctpPeer::inboundStreams
protected

Referenced by initialize(), and SctpPeer().

◆ lastStream

int inet::SctpPeer::lastStream
protected

◆ listeningSocket

SctpSocket inet::SctpPeer::listeningSocket
protected

Referenced by initialize().

◆ notificationsReceived

int inet::SctpPeer::notificationsReceived
protected

Referenced by finish(), handleMessage(), and SctpPeer().

◆ numPacketsToReceive

int inet::SctpPeer::numPacketsToReceive
protected

◆ numRequestsToSend

◆ numSessions

int inet::SctpPeer::numSessions
protected

◆ ordered

bool inet::SctpPeer::ordered
protected

◆ outboundStreams

int inet::SctpPeer::outboundStreams
protected

◆ packetsRcvd

int inet::SctpPeer::packetsRcvd
protected

◆ packetsSent

int inet::SctpPeer::packetsSent
protected

◆ queueSize

int inet::SctpPeer::queueSize
protected

◆ rcvdBytesPerAssoc

RcvdBytesPerAssoc inet::SctpPeer::rcvdBytesPerAssoc
protected

Referenced by finish(), handleMessage(), and ~SctpPeer().

◆ rcvdPacketsPerAssoc

RcvdPacketsPerAssoc inet::SctpPeer::rcvdPacketsPerAssoc
protected

Referenced by handleMessage(), and ~SctpPeer().

◆ schedule

bool inet::SctpPeer::schedule
protected

◆ sctpPathStatus

SctpPathStatus inet::SctpPeer::sctpPathStatus
protected

Referenced by socketStatusArrived().

◆ sendAllowed

bool inet::SctpPeer::sendAllowed
protected

◆ sentPacketsPerAssoc

SentPacketsPerAssoc inet::SctpPeer::sentPacketsPerAssoc
protected

Referenced by handleMessage(), and ~SctpPeer().

◆ serverAssocId

int inet::SctpPeer::serverAssocId
protected

◆ shutdownReceived

bool inet::SctpPeer::shutdownReceived
protected

Referenced by handleMessage(), and SctpPeer().

◆ timeMsg

cMessage* inet::SctpPeer::timeMsg
protected

◆ timeoutMsg

cMessage* inet::SctpPeer::timeoutMsg
protected

The documentation for this class was generated from the following files:
inet::SCTP_I_RCV_STREAMS_RESETTED
@ SCTP_I_RCV_STREAMS_RESETTED
Definition: SctpCommand_m.h:214
inet::findContainingNode
cModule * findContainingNode(const cModule *from)
Find the node containing the given module.
Definition: ModuleAccess.cc:31
inet::SctpSocket::processMessage
void processMessage(cMessage *msg) override
Examines the message (which should have arrived from SctpMain), updates socket state,...
Definition: SctpSocket.cc:461
inet::SctpPeer::delay
double delay
Definition: SctpPeer.h:43
inet::SCTP_C_SEND_UNORDERED
@ SCTP_C_SEND_UNORDERED
Definition: SctpCommand_m.h:138
inet::SctpPeer::shutdownReceived
bool shutdownReceived
Definition: SctpPeer.h:58
inet::SctpPeer::schedule
bool schedule
Definition: SctpPeer.h:46
inet::SctpSocket::setStreamPriority
void setStreamPriority(uint32_t stream, uint32_t priority)
Definition: SctpSocket.cc:620
inet::SctpPeer::timeMsg
cMessage * timeMsg
Definition: SctpPeer.h:56
inet::AddressVector
std::vector< L3Address > AddressVector
Definition: SctpCommand_m.h:70
inet::SctpSocket::setOutputGate
void setOutputGate(cGate *toSctp)
Sets the gate on which to send to SCTP.
Definition: SctpSocket.h:174
inet::SctpPeer::sendRequest
void sendRequest(bool last=true)
Definition: SctpPeer.cc:538
MSGKIND_RESET
#define MSGKIND_RESET
Definition: SctpPeer.cc:26
inet::SCTP_I_CLOSED
@ SCTP_I_CLOSED
Definition: SctpCommand_m.h:201
inet::SctpSocket::getSocketId
int getSocketId() const override
Returns the internal connection Id.
Definition: SctpSocket.h:138
inet::SctpSocket::CONNECTED
@ CONNECTED
Definition: SctpSocket.h:80
inet::count
int count(const std::vector< T > &v, const Tk &a)
Definition: stlutils.h:54
inet::SctpSocket::send
virtual void send(Packet *packet) override
Send data message.
Definition: SctpSocket.cc:355
inet::SctpSocket::shutdown
void shutdown(int id=-1)
Definition: SctpSocket.cc:408
inet::SctpSocket::sendRequest
void sendRequest(cMessage *msg)
Send request.
Definition: SctpSocket.cc:389
inet::SctpSocket::getState
int getState() const
Returns the socket state, one of NOT_BOUND, CLOSED, LISTENING, CONNECTING, CONNECTED,...
Definition: SctpSocket.h:151
inet::SctpPeer::packetsSent
int packetsSent
Definition: SctpPeer.h:74
inet::SctpPeer::echoedBytesSent
int echoedBytesSent
Definition: SctpPeer.h:73
inet::SctpSocket::setCallback
void setCallback(ICallback *cb)
Sets a callback object, to be used with processMessage().
Definition: SctpSocket.cc:456
DispatchProtocolReq
removed DscpReq Ipv4ControlInfo Ipv6ControlInfo up L3AddressInd DispatchProtocolReq L4PortInd Ipv4ControlInfo Ipv6ControlInfo down DispatchProtocolReq
Definition: IUdp-gates.txt:25
inet::SCTP_C_SEND
@ SCTP_C_SEND
Definition: SctpCommand_m.h:132
inet::SctpPeer::bytesRcvd
int bytesRcvd
Definition: SctpPeer.h:75
inet::SCTP_C_SHUTDOWN
@ SCTP_C_SHUTDOWN
Definition: SctpCommand_m.h:142
inet::SCTP_I_SEND_STREAMS_RESETTED
@ SCTP_I_SEND_STREAMS_RESETTED
Definition: SctpCommand_m.h:213
inet::SctpSocket::sendNotification
void sendNotification(cMessage *msg)
Send notification.
Definition: SctpSocket.cc:377
inet::SctpPeer::inboundStreams
int inboundStreams
Definition: SctpPeer.h:49
MSGKIND_SEND
#define MSGKIND_SEND
Definition: SctpPeer.cc:23
inet::SctpPeer::queueSize
int queueSize
Definition: SctpPeer.h:47
inet::SctpPeer::bytesSent
long bytesSent
Definition: SctpPeer.h:72
inet::SctpPeer::outboundStreams
int outboundStreams
Definition: SctpPeer.h:48
inet::SCTP_C_RECEIVE
@ SCTP_C_RECEIVE
Definition: SctpCommand_m.h:136
inet::SCTP_I_DATA
@ SCTP_I_DATA
Definition: SctpCommand_m.h:197
inet::SctpPeer::notificationsReceived
int notificationsReceived
Definition: SctpPeer.h:77
inet::SctpPeer::echo
bool echo
Definition: SctpPeer.h:44
inet::SctpPeer::endToEndDelay
EndToEndDelay endToEndDelay
Definition: SctpPeer.h:71
inet::SCTP_C_ABORT
@ SCTP_C_ABORT
Definition: SctpCommand_m.h:134
inet::SctpPeer::numPacketsToReceive
int numPacketsToReceive
Definition: SctpPeer.h:63
inet::SctpSocket::close
void close(int id)
Closes the local end of the connection.
Definition: SctpSocket.cc:394
MSGKIND_CONNECT
#define MSGKIND_CONNECT
Definition: SctpPeer.cc:22
inet::SctpPeer::histEndToEndDelay
HistEndToEndDelay histEndToEndDelay
Definition: SctpPeer.h:70
inet::SctpPeer::sctpPathStatus
SctpPathStatus sctpPathStatus
Definition: SctpPeer.h:52
inet::SCTP_I_ESTABLISHED
@ SCTP_I_ESTABLISHED
Definition: SctpCommand_m.h:199
inet::units::values::s
value< double, units::s > s
Definition: Units.h:1235
inet::units::units::B
intscale< b, 1, 8 > B
Definition: Units.h:1168
inet::SctpSocket::listen
void listen(bool fork=true, bool streamReset=false, uint32_t requests=0, uint32_t messagesToPush=0)
Initiates passive OPEN.
Definition: SctpSocket.cc:183
inet::SCTP_C_SEND_ORDERED
@ SCTP_C_SEND_ORDERED
Definition: SctpCommand_m.h:137
inet::SctpSocket::setOutboundStreams
void setOutboundStreams(int streams)
Setter and getter methods for socket and API Parameters.
Definition: SctpSocket.h:179
inet::packetReceivedSignal
simsignal_t packetReceivedSignal
Definition: Simsignals.cc:97
inet::SCTP_I_SHUTDOWN_RECEIVED
@ SCTP_I_SHUTDOWN_RECEIVED
Definition: SctpCommand_m.h:209
inet::SCTP_C_NO_OUTSTANDING
@ SCTP_C_NO_OUTSTANDING
Definition: SctpCommand_m.h:143
inet::SCTP_I_PEER_CLOSED
@ SCTP_I_PEER_CLOSED
Definition: SctpCommand_m.h:200
inet::SctpPeer::timeoutMsg
cMessage * timeoutMsg
Definition: SctpPeer.h:55
inet::SctpPeer::clientSocket
SctpSocket clientSocket
Definition: SctpPeer.h:53
inet::SctpPeer::setStatusString
void setStatusString(const char *s)
Definition: SctpPeer.cc:532
inet::SCTP_C_QUEUE_MSGS_LIMIT
@ SCTP_C_QUEUE_MSGS_LIMIT
Definition: SctpCommand_m.h:141
inet::SctpSocket::bindx
void bindx(AddressVector localAddr, int localPort)
Definition: SctpSocket.cc:171
inet::INITSTAGE_LOCAL
INET_API InitStage INITSTAGE_LOCAL
Initialization of local state that don't use or affect other modules includes:
NUM_INIT_STAGES
#define NUM_INIT_STAGES
Definition: InitStageRegistry.h:73
inet::SCTP_I_DATA_NOTIFICATION
@ SCTP_I_DATA_NOTIFICATION
Definition: SctpCommand_m.h:198
inet::SctpPeer::numRequestsToSend
int numRequestsToSend
Definition: SctpPeer.h:61
inet::physicallayer::k
const double k
Definition: Qam1024Modulation.cc:14
PK
#define PK(msg)
Definition: INETDefs.h:89
inet::SctpPeer::lastStream
int lastStream
Definition: SctpPeer.h:62
inet::SctpSocket::connect
void connect(L3Address remoteAddress, int32_t remotePort, bool streamReset=false, int32_t prMethod=0, uint32_t numRequests=0)
Active OPEN to the given remote socket.
Definition: SctpSocket.cc:241
inet::SctpPeer::sentPacketsPerAssoc
SentPacketsPerAssoc sentPacketsPerAssoc
Definition: SctpPeer.h:67
inet::INITSTAGE_APPLICATION_LAYER
INET_API InitStage INITSTAGE_APPLICATION_LAYER
Initialization of applications.
inet::SctpSocket::CONNECTING
@ CONNECTING
Definition: SctpSocket.h:80
inet::SctpPeer::rcvdBytesPerAssoc
RcvdBytesPerAssoc rcvdBytesPerAssoc
Definition: SctpPeer.h:68
inet::SCTP_I_ABORT
@ SCTP_I_ABORT
Definition: SctpCommand_m.h:206
inet::NodeStatus::UP
@ UP
Definition: NodeStatus.h:28
inet::Protocol::sctp
static const Protocol sctp
Definition: Protocol.h:108
inet::SctpPeer::sendOrSchedule
void sendOrSchedule(cMessage *msg)
Definition: SctpPeer.cc:136
inet::units::units::ps
pico< s >::type ps
Definition: Units.h:1073
inet::SctpPeer::connectTimer
cMessage * connectTimer
Definition: SctpPeer.h:57
inet::SctpPeer::chunksAbandoned
int chunksAbandoned
Definition: SctpPeer.h:79
tags
* tags
Definition: IUdp-gates.txt:3
inet::SctpPeer::handleTimer
void handleTimer(cMessage *msg)
Definition: SctpPeer.cc:438
inet::SctpPeer::rcvdPacketsPerAssoc
RcvdPacketsPerAssoc rcvdPacketsPerAssoc
Definition: SctpPeer.h:66
inet::SctpPeer::bytesPerAssoc
BytesPerAssoc bytesPerAssoc
Definition: SctpPeer.h:69
inet::SctpPeer::numSessions
int numSessions
Definition: SctpPeer.h:78
inet::SctpPeer::generateAndSend
void generateAndSend()
Definition: SctpPeer.cc:146
inet::units::values::m
value< double, units::m > m
Definition: Units.h:1233
inet::SctpPeer::sendAllowed
bool sendAllowed
Definition: SctpPeer.h:59
inet::SctpPeer::listeningSocket
SctpSocket listeningSocket
Definition: SctpPeer.h:54
MSGKIND_ABORT
#define MSGKIND_ABORT
Definition: SctpPeer.cc:24
inet::SctpPeer::connect
void connect()
Definition: SctpPeer.cc:173
inet::SctpSocket::bind
void bind(int localPort)
Bind the socket to a local port number.
Definition: SctpSocket.cc:142
inet::SCTP_C_CLOSE
@ SCTP_C_CLOSE
Definition: SctpCommand_m.h:133
inet::SctpSocket::PEER_CLOSED
@ PEER_CLOSED
Definition: SctpSocket.h:80
inet::SctpPeer::packetsRcvd
int packetsRcvd
Definition: SctpPeer.h:76
inet::SctpPeer::sendQueueRequest
void sendQueueRequest()
Definition: SctpPeer.cc:620
inet::SctpSocket::setInboundStreams
void setInboundStreams(int streams)
Definition: SctpSocket.h:180
inet::SctpPeer::serverAssocId
int serverAssocId
Definition: SctpPeer.h:60
inet::SctpPeer::ordered
bool ordered
Definition: SctpPeer.h:45