INET Framework for OMNeT++/OMNEST
inet::PreemptableStreamer Class Reference

#include <PreemptableStreamer.h>

Inheritance diagram for inet::PreemptableStreamer:
inet::ClockUserModuleMixin< PacketProcessorBase > inet::queueing::IPacketFlow inet::queueing::PacketProcessorBase inet::queueing::IPacketSink inet::queueing::IPacketSource inet::queueing::IPacketProcessor inet::StringFormat::IDirectiveResolver inet::queueing::IPassivePacketSink inet::queueing::IActivePacketSink inet::queueing::IPassivePacketSource inet::queueing::IActivePacketSource

Public Member Functions

virtual ~PreemptableStreamer ()
 
virtual IPassivePacketSinkgetConsumer (cGate *gate) override
 Returns the passive packet sink where packets are pushed or nullptr if the connected module doesn't implement the interface. More...
 
virtual IPassivePacketSourcegetProvider (cGate *gate) override
 Returns the passive packet source from where packets are pulled or nullptr if the connected module doesn't implement the interface. More...
 
virtual bool supportsPacketPushing (cGate *gate) const override
 Returns true if the processor supports pushing packets at the given gate. More...
 
virtual bool supportsPacketPulling (cGate *gate) const override
 Returns true if the processor supports pulling packets at the given gate. More...
 
virtual bool supportsPacketPassing (cGate *gate) const override
 Returns true if the processor supports passing packets as a whole at the given gate. More...
 
virtual bool supportsPacketStreaming (cGate *gate) const override
 Returns true if the processor supports streaming packets at the given gate. More...
 
virtual bool canPushSomePacket (cGate *gate) const override
 Returns false if the packet sink is full at the given gate and no more packets can be pushed into it without raising an error. More...
 
virtual bool canPushPacket (Packet *packet, cGate *gate) const override
 Returns true if the given packet can be pushed at the given gate into the packet sink without raising an error. More...
 
virtual void pushPacket (Packet *packet, cGate *gate) override
 Pushes the packet into the packet sink at the given gate. More...
 
virtual void pushPacketStart (Packet *packet, cGate *gate, bps datarate) override
 Starts pushing the packet into the packet sink at the given gate. More...
 
virtual void pushPacketEnd (Packet *packet, cGate *gate) override
 Ends pushing the packet into the packet sink at the given gate. More...
 
virtual void pushPacketProgress (Packet *packet, cGate *gate, bps datarate, b position, b extraProcessableLength=b(0)) override
 Progresses pushing the packet into the packet sink at the given gate. More...
 
virtual void handleCanPushPacketChanged (cGate *gate) override
 Notifies about a change in the possibility of pushing some packet into the passive packet sink at the given gate. More...
 
virtual void handlePushPacketProcessed (Packet *packet, cGate *gate, bool successful) override
 Notifies about the completion of the packet processing for a packet that was pushed earlier independently whether the packet is passed or streamed. More...
 
virtual bool canPullSomePacket (cGate *gate) const override
 Returns false if the packet source is empty at the given gate and no more packets can be pulled from it without raising an error. More...
 
virtual PacketcanPullPacket (cGate *gate) const override
 Returns the packet that can be pulled at the given gate. More...
 
virtual PacketpullPacket (cGate *gate) override
 Pulls the packet from the packet source at the given gate. More...
 
virtual PacketpullPacketStart (cGate *gate, bps datarate) override
 Starts pulling the packet from the packet source at the given gate. More...
 
virtual PacketpullPacketEnd (cGate *gate) override
 Ends pulling the packet from the packet source at the given gate. More...
 
virtual PacketpullPacketProgress (cGate *gate, bps datarate, b position, b extraProcessableLength) override
 Progresses pulling the packet from the packet source at the given gate. More...
 
virtual void handleCanPullPacketChanged (cGate *gate) override
 Notifies about a change in the possibility of pulling some packet from the passive packet source at the given gate. More...
 
virtual void handlePullPacketProcessed (Packet *packet, cGate *gate, bool successful) override
 Notifies about the completion of the packet processing for a packet that was pulled earlier independently whether the packet is passed or streamed. More...
 
- Public Member Functions inherited from inet::ClockUserModuleMixin< PacketProcessorBase >
virtual void scheduleClockEventAt (clocktime_t time, ClockEvent *msg)
 
virtual void scheduleClockEventAfter (clocktime_t delay, ClockEvent *msg)
 
virtual cMessage * cancelClockEvent (ClockEvent *msg)
 
virtual void cancelAndDeleteClockEvent (ClockEvent *msg)
 
virtual void rescheduleClockEventAt (clocktime_t time, ClockEvent *msg)
 
virtual void rescheduleClockEventAfter (clocktime_t time, ClockEvent *msg)
 
virtual clocktime_t computeClockTimeFromSimTime (simtime_t time) const
 
virtual simtime_t computeSimTimeFromClockTime (clocktime_t time) const
 
virtual clocktime_t getClockTime () const
 
virtual clocktime_t getArrivalClockTime (ClockEvent *msg) const
 
- Public Member Functions inherited from inet::queueing::PacketProcessorBase
virtual bool supportsPacketSending (cGate *gate) const override
 Returns true if the processor supports sending packets at the given gate. More...
 
virtual const char * resolveDirective (char directive) const override
 
- Public Member Functions inherited from inet::queueing::IPacketProcessor
virtual ~IPacketProcessor ()
 
- Public Member Functions inherited from inet::queueing::IPassivePacketSink
virtual ~IPassivePacketSink ()
 
- Public Member Functions inherited from inet::queueing::IActivePacketSink
virtual ~IActivePacketSink ()
 
- Public Member Functions inherited from inet::queueing::IPassivePacketSource
virtual ~IPassivePacketSource ()
 
- Public Member Functions inherited from inet::queueing::IActivePacketSource
virtual ~IActivePacketSource ()
 

Protected Member Functions

virtual void initialize (int stage) override
 
virtual void handleMessage (cMessage *message) override
 
virtual bool isStreaming () const
 
virtual void endStreaming ()
 
- Protected Member Functions inherited from inet::queueing::PacketProcessorBase
virtual int numInitStages () const override
 
virtual void refreshDisplay () const override
 
virtual void handlePacketProcessed (Packet *packet)
 
virtual void checkPacketOperationSupport (cGate *gate) const
 
virtual void checkPacketOperationSupport (cGate *startGate, cGate *endGate) const
 
virtual void animate (Packet *packet, cGate *gate, const SendOptions &sendOptions, Action action) const
 
virtual void animatePacket (Packet *packet, cGate *gate, Action action) const
 
virtual void animatePacketStart (Packet *packet, cGate *gate, bps datarate, long transmissionId, Action action) const
 
virtual void animatePacketStart (Packet *packet, cGate *gate, bps datarate, const SendOptions &sendOptions, Action action) const
 
virtual void animatePacketEnd (Packet *packet, cGate *gate, long transmissionId, Action action) const
 
virtual void animatePacketEnd (Packet *packet, cGate *gate, const SendOptions &sendOptions, Action action) const
 
virtual void animatePacketProgress (Packet *packet, cGate *gate, bps datarate, b position, b extraProcessableLength, long transmissionId, Action action) const
 
virtual void animatePacketProgress (Packet *packet, cGate *gate, bps datarate, b position, b extraProcessableLength, const SendOptions &sendOptions, Action action) const
 
virtual void pushOrSendPacket (Packet *packet, cGate *gate, IPassivePacketSink *consumer)
 
virtual void pushOrSendPacketStart (Packet *packet, cGate *gate, IPassivePacketSink *consumer, bps datarate, int transmissionId)
 
virtual void pushOrSendPacketEnd (Packet *packet, cGate *gate, IPassivePacketSink *consumer, int transmissionId)
 
virtual void pushOrSendPacketProgress (Packet *packet, cGate *gate, IPassivePacketSink *consumer, bps datarate, b position, b extraProcessableLength, int transmissionId)
 
virtual void animatePush (Packet *packet, cGate *gate, const SendOptions &sendOptions) const
 
virtual void animatePushPacket (Packet *packet, cGate *gate) const
 
virtual void animatePushPacketStart (Packet *packet, cGate *gate, bps datarate, long transmissionId) const
 
virtual void animatePushPacketStart (Packet *packet, cGate *gate, bps datarate, const SendOptions &sendOptions) const
 
virtual void animatePushPacketEnd (Packet *packet, cGate *gate, long transmissionId) const
 
virtual void animatePushPacketEnd (Packet *packet, cGate *gate, const SendOptions &sendOptions) const
 
virtual void animatePushPacketProgress (Packet *packet, cGate *gate, bps datarate, b position, b extraProcessableLength, long transmissionId) const
 
virtual void animatePushPacketProgress (Packet *packet, cGate *gate, bps datarate, b position, b extraProcessableLength, const SendOptions &sendOptions) const
 
virtual void animatePull (Packet *packet, cGate *gate, const SendOptions &sendOptions) const
 
virtual void animatePullPacket (Packet *packet, cGate *gate) const
 
virtual void animatePullPacketStart (Packet *packet, cGate *gate, bps datarate, long transmissionId) const
 
virtual void animatePullPacketStart (Packet *packet, cGate *gate, bps datarate, const SendOptions &sendOptions) const
 
virtual void animatePullPacketEnd (Packet *packet, cGate *gate, long transmissionId) const
 
virtual void animatePullPacketEnd (Packet *packet, cGate *gate, const SendOptions &sendOptions) const
 
virtual void animatePullPacketProgress (Packet *packet, cGate *gate, bps datarate, b position, b extraProcessableLength, long transmissionId) const
 
virtual void animatePullPacketProgress (Packet *packet, cGate *gate, bps datarate, b position, b extraProcessableLength, const SendOptions &sendOptions) const
 
virtual void dropPacket (Packet *packet, PacketDropReason reason, int limit=-1)
 
virtual void updateDisplayString () const
 

Protected Attributes

bps datarate = bps(NaN)
 
b minPacketLength = b(-1)
 
b roundingLength = b(-1)
 
cGate * inputGate = nullptr
 
IActivePacketSourceproducer = nullptr
 
IPassivePacketSourceprovider = nullptr
 
cGate * outputGate = nullptr
 
IPassivePacketSinkconsumer = nullptr
 
IActivePacketSinkcollector = nullptr
 
simtime_t streamStart
 
bps streamDatarate = bps(NaN)
 
PacketstreamedPacket = nullptr
 
PacketremainingPacket = nullptr
 
ClockEventendStreamingTimer = nullptr
 
- Protected Attributes inherited from inet::queueing::PacketProcessorBase
const char * displayStringTextFormat = nullptr
 
int numProcessedPackets = -1
 
b processedTotalLength = b(-1)
 

Additional Inherited Members

- Protected Types inherited from inet::queueing::PacketProcessorBase
enum  Action { PUSH, PULL }
 

Constructor & Destructor Documentation

◆ ~PreemptableStreamer()

inet::PreemptableStreamer::~PreemptableStreamer ( )
virtual
19 {
20  delete streamedPacket;
21  delete remainingPacket;
23 }

Member Function Documentation

◆ canPullPacket()

Packet * inet::PreemptableStreamer::canPullPacket ( cGate *  gate) const
overridevirtual

Returns the packet that can be pulled at the given gate.

The returned value is nullptr if there is no such packet.

The gate must be a valid gate of this module and it must support pushing packets.

Implements inet::queueing::IPassivePacketSource.

113 {
114  return isStreaming() ? nullptr : remainingPacket != nullptr ? remainingPacket : provider->canPullPacket(inputGate->getPathStartGate());
115 }

◆ canPullSomePacket()

bool inet::PreemptableStreamer::canPullSomePacket ( cGate *  gate) const
overridevirtual

Returns false if the packet source is empty at the given gate and no more packets can be pulled from it without raising an error.

The gate must be a valid gate of this module and it must support pulling packets.

Implements inet::queueing::IPassivePacketSource.

108 {
109  return !isStreaming() && (remainingPacket != nullptr || provider->canPullSomePacket(inputGate->getPathStartGate()));
110 }

◆ canPushPacket()

bool inet::PreemptableStreamer::canPushPacket ( Packet packet,
cGate *  gate 
) const
overridevirtual

Returns true if the given packet can be pushed at the given gate into the packet sink without raising an error.

The packet must not be nullptr. The gate must be a valid gate of this module and it must support pushing packets.

Implements inet::queueing::IPassivePacketSink.

74 {
75  return !isStreaming() && consumer->canPushPacket(packet, outputGate->getPathEndGate());
76 }

◆ canPushSomePacket()

bool inet::PreemptableStreamer::canPushSomePacket ( cGate *  gate) const
overridevirtual

Returns false if the packet sink is full at the given gate and no more packets can be pushed into it without raising an error.

The gate must be a valid gate of this module and it must support pushing packets.

Implements inet::queueing::IPassivePacketSink.

69 {
70  return !isStreaming() && consumer->canPushSomePacket(outputGate->getPathEndGate());
71 }

◆ endStreaming()

void inet::PreemptableStreamer::endStreaming ( )
protectedvirtual
57 {
58  auto packetLength = streamedPacket->getTotalLength();
59  EV_INFO << "Ending streaming packet" << EV_FIELD(packet, *streamedPacket) << EV_ENDL;
62  streamedPacket = nullptr;
64  processedTotalLength += packetLength;
66 }

Referenced by handleMessage(), and pushPacket().

◆ getConsumer()

virtual IPassivePacketSink* inet::PreemptableStreamer::getConsumer ( cGate *  gate)
inlineoverridevirtual

Returns the passive packet sink where packets are pushed or nullptr if the connected module doesn't implement the interface.

The gate parameter must be a valid gate of this module.

Implements inet::queueing::IActivePacketSource.

51 { return this; }

◆ getProvider()

virtual IPassivePacketSource* inet::PreemptableStreamer::getProvider ( cGate *  gate)
inlineoverridevirtual

Returns the passive packet source from where packets are pulled or nullptr if the connected module doesn't implement the interface.

The gate parameter must be a valid gate of this module.

Implements inet::queueing::IActivePacketSink.

52 { return this; }

◆ handleCanPullPacketChanged()

void inet::PreemptableStreamer::handleCanPullPacketChanged ( cGate *  gate)
overridevirtual

Notifies about a change in the possibility of pulling some packet from the passive packet source at the given gate.

This method is called, for example, when a new packet is inserted into a queue. It allows the sink to pull a new packet from the queue.

The gate parameter must be a valid gate of this module.

Implements inet::queueing::IActivePacketSink.

194 {
195  Enter_Method("handleCanPullPacketChanged");
196  if (collector != nullptr && !isStreaming())
197  collector->handleCanPullPacketChanged(outputGate->getPathEndGate());
198 }

◆ handleCanPushPacketChanged()

void inet::PreemptableStreamer::handleCanPushPacketChanged ( cGate *  gate)
overridevirtual

Notifies about a change in the possibility of pushing some packet into the passive packet sink at the given gate.

This method is called, for example, when a new packet can be inserted into a queue. It allows the source to push a new packet into the queue.

The gate parameter must be a valid gate of this module.

Implements inet::queueing::IActivePacketSource.

94 {
95  Enter_Method("handleCanPushPacketChanged");
96  if (producer != nullptr)
97  producer->handleCanPushPacketChanged(inputGate->getPathStartGate());
98 }

◆ handleMessage()

void inet::PreemptableStreamer::handleMessage ( cMessage *  message)
overrideprotectedvirtual
47 {
48  if (message == endStreamingTimer)
49  endStreaming();
50  else {
51  auto packet = check_and_cast<Packet *>(message);
52  pushPacket(packet, packet->getArrivalGate());
53  }
54 }

◆ handlePullPacketProcessed()

void inet::PreemptableStreamer::handlePullPacketProcessed ( Packet packet,
cGate *  gate,
bool  successful 
)
overridevirtual

Notifies about the completion of the packet processing for a packet that was pulled earlier independently whether the packet is passed or streamed.

This method is called, for example, when a previously pulled packet is failed to be processed successfully. It allows the sink to retry the operation.

The gate parameter must be a valid gate of this module. The packet must not be nullptr.

Implements inet::queueing::IActivePacketSink.

201 {
202  Enter_Method("handlePullPacketConfirmation");
203  if (collector != nullptr)
204  collector->handlePullPacketProcessed(packet, gate, successful);
205 }

◆ handlePushPacketProcessed()

void inet::PreemptableStreamer::handlePushPacketProcessed ( Packet packet,
cGate *  gate,
bool  successful 
)
overridevirtual

Notifies about the completion of the packet processing for a packet that was pushed earlier independently whether the packet is passed or streamed.

This method is called, for example, when a previously pushed packet is failed to be processed successfully. It allows the source to retry the operation.

The gate parameter must be a valid gate of this module. The packet must not be nullptr.

Implements inet::queueing::IActivePacketSource.

101 {
102  Enter_Method("handlePushPacketProcessed");
103  if (producer != nullptr)
104  producer->handlePushPacketProcessed(packet, inputGate->getPathStartGate(), successful);
105 }

◆ initialize()

void inet::PreemptableStreamer::initialize ( int  stage)
overrideprotectedvirtual

Reimplemented from inet::queueing::PacketProcessorBase.

26 {
27  ClockUserModuleMixin::initialize(stage);
28  if (stage == INITSTAGE_LOCAL) {
29  datarate = bps(par("datarate"));
30  minPacketLength = b(par("minPacketLength"));
31  roundingLength = b(par("roundingLength"));
32  inputGate = gate("in");
33  outputGate = gate("out");
34  producer = findConnectedModule<IActivePacketSource>(inputGate);
35  provider = findConnectedModule<IPassivePacketSource>(inputGate);
36  consumer = findConnectedModule<IPassivePacketSink>(outputGate);
37  collector = findConnectedModule<IActivePacketSink>(outputGate);
38  endStreamingTimer = new ClockEvent("EndStreamingTimer");
39  }
40  else if (stage == INITSTAGE_QUEUEING) {
43  }
44 }

◆ isStreaming()

virtual bool inet::PreemptableStreamer::isStreaming ( ) const
inlineprotectedvirtual

◆ pullPacket()

virtual Packet* inet::PreemptableStreamer::pullPacket ( cGate *  gate)
inlineoverridevirtual

Pulls the packet from the packet source at the given gate.

This operation pulls the packet as a whole. The onwership of the packet is transferred to the sink.

The source must not be empty at the given gate. The returned packet must not be nullptr. The gate must be a valid gate of this module and it must support pulling and passing packets.

Implements inet::queueing::IPassivePacketSource.

72 { throw cRuntimeError("Invalid operation"); }

◆ pullPacketEnd()

Packet * inet::PreemptableStreamer::pullPacketEnd ( cGate *  gate)
overridevirtual

Ends pulling the packet from the packet source at the given gate.

This is a packet streaming operation. The onwership of the packet is transferred to the sink.

Packet streaming can be started with any of the streaming operations, and ends when the streaming position plus the extra processable packet length equals to the total packet length.

This method is called, for example, when a preemption supporting server module ends streaming a packet from the the source.

The source must not be empty at the gate and no other packet streaming can be in progress. The gate must be a valid gate of this module and it must support pulling and streaming packets. The returned packet must not be nullptr.

Implements inet::queueing::IPassivePacketSource.

146 {
147  Enter_Method("pullPacketEnd");
148  EV_INFO << "Ending streaming packet" << EV_FIELD(packet, *streamedPacket) << EV_ENDL;
149  auto packet = streamedPacket;
150  b pulledLength = streamDatarate * s((simTime() - streamStart).dbl());
151  b preemptedLength = roundingLength * ((pulledLength + roundingLength - b(1)) / roundingLength);
152  if (preemptedLength < minPacketLength)
153  preemptedLength = minPacketLength;
154  if (preemptedLength + minPacketLength <= packet->getTotalLength()) {
155  // already pulled part
156  const auto& fragmentTag = packet->getTagForUpdate<FragmentTag>();
157  fragmentTag->setLastFragment(false);
158  auto fragmentNumber = fragmentTag->getFragmentNumber();
159  std::string basePacketName = packet->getName();
160  if (fragmentNumber != 0)
161  basePacketName = basePacketName.substr(0, basePacketName.find("-frag"));
162  std::string packetName = basePacketName + "-frag" + std::to_string(fragmentNumber);
163  packet->setName(packetName.c_str());
164  packet->removeTagIfPresent<PacketProtocolTag>();
165  // remaining part
166  std::string remainingPacketName = basePacketName + "-frag" + std::to_string(fragmentNumber + 1);
167  const auto& remainingData = packet->removeAtBack(packet->getTotalLength() - preemptedLength);
168  remainingPacket = new Packet(remainingPacketName.c_str(), remainingData);
169  remainingPacket->copyTags(*packet);
170  const auto& remainingPacketFragmentTag = remainingPacket->getTagForUpdate<FragmentTag>();
171  remainingPacketFragmentTag->setFirstFragment(false);
172  remainingPacketFragmentTag->setLastFragment(true);
173  remainingPacketFragmentTag->setFragmentNumber(fragmentNumber + 1);
174  }
175  handlePacketProcessed(packet);
177  streamedPacket = nullptr;
179  return packet;
180 }

◆ pullPacketProgress()

Packet * inet::PreemptableStreamer::pullPacketProgress ( cGate *  gate,
bps  datarate,
b  position,
b  extraProcessableLength 
)
overridevirtual

Progresses pulling the packet from the packet source at the given gate.

This is a packet streaming operation. The position specifies where the packet streaming is at the moment. The extra length parameter partially fixes the future of the packet streaming operation. The onwership of the packet is transferred to the sink.

Packet streaming can be started with any of the streaming operations, and ends when the streaming position plus the extra processable packet length equals to the total packet length.

This method is called, for example, to notify the source about a change in the packet data when a preemption occurs.

The source must not be empty at the gate and no other packet streaming can be in progress. The gate must be a valid gate of this module and it must support pulling and streaming packets. The returned packet must not be nullptr.

Implements inet::queueing::IPassivePacketSource.

183 {
184  Enter_Method("pullPacketProgress");
186  EV_INFO << "Progressing streaming" << EV_FIELD(packet, *streamedPacket) << EV_ENDL;
187  auto packet = streamedPacket->dup();
188  animatePullPacketProgress(packet, outputGate, streamDatarate, position, extraProcessableLength, streamedPacket->getId());
190  return packet;
191 }

◆ pullPacketStart()

Packet * inet::PreemptableStreamer::pullPacketStart ( cGate *  gate,
bps  datarate 
)
overridevirtual

Starts pulling the packet from the packet source at the given gate.

This is a packet streaming operation. The onwership of the packet is transferred to the sink.

Packet streaming can be started with any of the streaming operations, and ends when the streaming position plus the extra processable packet length equals to the total packet length.

This method is called, for example, when a preemption supporting server module starts streaming a packet from the source.

The source must not be empty at the gate and no other packet streaming can be in progress. The gate must be a valid gate of this module and it must support pulling and streaming packets. The returned packet must not be nullptr.

Implements inet::queueing::IPassivePacketSource.

118 {
119  Enter_Method("pullPacketStart");
121  if (remainingPacket == nullptr) {
122  streamedPacket = provider->pullPacket(inputGate->getPathStartGate());
123  take(streamedPacket);
124  }
125  else {
127  remainingPacket = nullptr;
128  }
129  auto fragmentTag = streamedPacket->findTagForUpdate<FragmentTag>();
130  if (fragmentTag == nullptr) {
131  fragmentTag = streamedPacket->addTag<FragmentTag>();
132  fragmentTag->setFirstFragment(true);
133  fragmentTag->setLastFragment(true);
134  fragmentTag->setFragmentNumber(0);
135  fragmentTag->setNumFragments(-1);
136  }
137  streamStart = simTime();
138  auto packet = streamedPacket->dup();
139  EV_INFO << "Starting streaming packet" << EV_FIELD(packet) << EV_ENDL;
142  return packet;
143 }

◆ pushPacket()

void inet::PreemptableStreamer::pushPacket ( Packet packet,
cGate *  gate 
)
overridevirtual

Pushes the packet into the packet sink at the given gate.

This operation pushes the packet as a whole. The onwership of the packet is transferred to the sink.

This method is called, for example, when a packet source module pushes a packet into a queue module.

The sink must not be full at the gate. The packet must not be nullptr. The gate must be a valid gate of this module and it must support pushing and passing packets.

Implements inet::queueing::IPassivePacketSink.

79 {
80  Enter_Method("pushPacket");
81  ASSERT(!isStreaming());
82  take(packet);
84  streamedPacket = packet;
85  EV_INFO << "Starting streaming packet" << EV_FIELD(packet) << EV_ENDL;
87  if (std::isnan(streamDatarate.get()))
88  endStreaming();
89  else
91 }

Referenced by handleMessage().

◆ pushPacketEnd()

virtual void inet::PreemptableStreamer::pushPacketEnd ( Packet packet,
cGate *  gate 
)
inlineoverridevirtual

Ends pushing the packet into the packet sink at the given gate.

This is a packet streaming operation. The onwership of the packet is transferred to the sink.

Packet streaming can be started with any of the streaming operations, and ends when the streaming position plus the extra processable packet length equals to the total packet length.

This method is called, for example, when a preemption supporting server module ends streaming a packet to the sink.

The sink must not be full at the gate and no other packet streaming can be in progress. The packet must not be nullptr. The gate must be a valid gate of this module and it must support pushing and streaming packets.

Implements inet::queueing::IPassivePacketSink.

64 { throw cRuntimeError("Invalid operation"); }

◆ pushPacketProgress()

virtual void inet::PreemptableStreamer::pushPacketProgress ( Packet packet,
cGate *  gate,
bps  datarate,
b  position,
b  extraProcessableLength = b(0) 
)
inlineoverridevirtual

Progresses pushing the packet into the packet sink at the given gate.

This is a packet streaming operation. The position specifies where the packet streaming is at the moment. The extra length parameter partially fixes the future of the packet streaming operation. The onwership of the packet is transferred to the sink.

Packet streaming can be started with any of the streaming operations, and ends when the streaming position plus the extra processable packet length equals to the total packet length.

This method is called, for example, to notify the sink about a change in the packet data when a preemption occurs.

The sink must not be full at the gate and no other packet streaming can be in progress. The packet must not be nullptr. The gate must be a valid gate of this module and it must support pushing and streaming packets.

Implements inet::queueing::IPassivePacketSink.

65 { throw cRuntimeError("Invalid operation"); }

◆ pushPacketStart()

virtual void inet::PreemptableStreamer::pushPacketStart ( Packet packet,
cGate *  gate,
bps  datarate 
)
inlineoverridevirtual

Starts pushing the packet into the packet sink at the given gate.

This is a packet streaming operation. The onwership of the packet is transferred to the sink.

Packet streaming can be started with any of the streaming operations, and ends when the streaming position plus the extra processable packet length equals to the total packet length.

This method is called, for example, when a preemption supporting server module starts streaming a packet to the sink.

The sink must not be full at the gate and no other packet streaming can be in progress. The packet must not be nullptr. The gate must be a valid gate of this module and it must support pushing and streaming packets.

Implements inet::queueing::IPassivePacketSink.

63 { throw cRuntimeError("Invalid operation"); }

◆ supportsPacketPassing()

virtual bool inet::PreemptableStreamer::supportsPacketPassing ( cGate *  gate) const
inlineoverridevirtual

Returns true if the processor supports passing packets as a whole at the given gate.

A passed packet is handed over from one module to another by either the standard OMNeT++ handleMessage() mechanism, or by directly calling pushPacket() or pullPacket().

For example, packets are passed as a whole to and from a queue module.

Connecting incompatible gates raises an error during initialize. The gate parameter must be a valid gate of this module. The gate should be marked with @labels(pass) in the NED file.

Reimplemented from inet::queueing::PacketProcessorBase.

56 { return gate == inputGate; }

◆ supportsPacketPulling()

virtual bool inet::PreemptableStreamer::supportsPacketPulling ( cGate *  gate) const
inlineoverridevirtual

Returns true if the processor supports pulling packets at the given gate.

Pulling a packet is a synchronous operation that is initiated by the sink module. A pulled packet can be passed as a whole using pullPacket(), or it can be streamed from the source to the sink using pullPacketStart(), pullPacketEnd(), and pullPacketProgress().

For output gates, true means that the connected module can pull packets from this module. For input gates, true means that this module can pull packets from the connected module. For example, a packet server module can pull packets from a queue module.

Connecting incompatible gates raises an error during initialize. The gate parameter must be a valid gate of this module. The gate should be marked with @labels(pull) in the NED file.

Implements inet::queueing::IPacketProcessor.

55 { return true; }

◆ supportsPacketPushing()

virtual bool inet::PreemptableStreamer::supportsPacketPushing ( cGate *  gate) const
inlineoverridevirtual

Returns true if the processor supports pushing packets at the given gate.

Pushing a packet is a synchronous operation that is initiated by the source module. A pushed packet can be passed as a whole using pushPacket(), or it can be streamed from the source to the sink using pushPacketStart(), pushPacketEnd(), and pushPacketProgress().

For output gates, true means that this module can push packets into the connected module. For input gates, true means that the connected module can push packets into this module. For example, a packet generator module can push packets into a queue module.

Connecting incompatible gates raises an error during initialize. The gate parameter must be a valid gate of this module. The gate should be marked with @labels(push) in the NED file.

Implements inet::queueing::IPacketProcessor.

54 { return true; }

◆ supportsPacketStreaming()

virtual bool inet::PreemptableStreamer::supportsPacketStreaming ( cGate *  gate) const
inlineoverridevirtual

Returns true if the processor supports streaming packets at the given gate.

A streamed packet is handed over from one module to another using several method calls and potentially exending to a non-zero simulation duration.

For example, packets are streamed to a preemptable signal transmitter module.

Connecting incompatible gates raises an error during initialize. The gate parameter must be a valid gate of this module. The gate should be marked with @labels(stream) in the NED file.

Reimplemented from inet::queueing::PacketProcessorBase.

57 { return gate == outputGate; }

Member Data Documentation

◆ collector

IActivePacketSink* inet::PreemptableStreamer::collector = nullptr
protected

◆ consumer

IPassivePacketSink* inet::PreemptableStreamer::consumer = nullptr
protected

◆ datarate

bps inet::PreemptableStreamer::datarate = bps(NaN)
protected

◆ endStreamingTimer

ClockEvent* inet::PreemptableStreamer::endStreamingTimer = nullptr
protected

◆ inputGate

cGate* inet::PreemptableStreamer::inputGate = nullptr
protected

◆ minPacketLength

b inet::PreemptableStreamer::minPacketLength = b(-1)
protected

Referenced by initialize(), and pullPacketEnd().

◆ outputGate

cGate* inet::PreemptableStreamer::outputGate = nullptr
protected

◆ producer

IActivePacketSource* inet::PreemptableStreamer::producer = nullptr
protected

◆ provider

IPassivePacketSource* inet::PreemptableStreamer::provider = nullptr
protected

◆ remainingPacket

Packet* inet::PreemptableStreamer::remainingPacket = nullptr
protected

◆ roundingLength

b inet::PreemptableStreamer::roundingLength = b(-1)
protected

Referenced by initialize(), and pullPacketEnd().

◆ streamDatarate

bps inet::PreemptableStreamer::streamDatarate = bps(NaN)
protected

◆ streamedPacket

Packet* inet::PreemptableStreamer::streamedPacket = nullptr
protected

◆ streamStart

simtime_t inet::PreemptableStreamer::streamStart
protected

Referenced by pullPacketEnd(), and pullPacketStart().


The documentation for this class was generated from the following files:
inet::PreemptableStreamer::streamedPacket
Packet * streamedPacket
Definition: PreemptableStreamer.h:36
inet::Packet::addTag
const Ptr< T > addTag()
Returns a newly added packet tag for the provided type, or throws an exception if such a packet tag i...
Definition: Packet.h:1310
inet::queueing::IPassivePacketSink::canPushPacket
virtual bool canPushPacket(Packet *packet, cGate *gate) const =0
Returns true if the given packet can be pushed at the given gate into the packet sink without raising...
inet::PreemptableStreamer::inputGate
cGate * inputGate
Definition: PreemptableStreamer.h:26
inet::PreemptableStreamer::remainingPacket
Packet * remainingPacket
Definition: PreemptableStreamer.h:37
inet::Packet::getTagForUpdate
const Ptr< T > getTagForUpdate()
Returns the packet tag for the provided type or throws an exception if no such packet tag is found.
Definition: Packet.h:1303
inet::units::units::bps
compose< b, pow< s, -1 > > bps
Definition: Units.h:1169
inet::INITSTAGE_QUEUEING
INET_API InitStage INITSTAGE_QUEUEING
Initialization of queueing modules.
inet::ClockUserModuleMixin< PacketProcessorBase >::scheduleClockEventAfter
virtual void scheduleClockEventAfter(clocktime_t delay, ClockEvent *msg)
Definition: ClockUserModuleMixin.h:65
inet::PreemptableStreamer::pushPacket
virtual void pushPacket(Packet *packet, cGate *gate) override
Pushes the packet into the packet sink at the given gate.
Definition: PreemptableStreamer.cc:78
inet::queueing::IPassivePacketSource::canPullPacket
virtual Packet * canPullPacket(cGate *gate) const =0
Returns the packet that can be pulled at the given gate.
inet::PreemptableStreamer::endStreamingTimer
ClockEvent * endStreamingTimer
Definition: PreemptableStreamer.h:39
inet::queueing::PacketProcessorBase::animatePullPacketStart
virtual void animatePullPacketStart(Packet *packet, cGate *gate, bps datarate, long transmissionId) const
Definition: PacketProcessorBase.cc:360
inet::PreemptableStreamer::streamStart
simtime_t streamStart
Definition: PreemptableStreamer.h:34
inet::queueing::IPassivePacketSink::canPushSomePacket
virtual bool canPushSomePacket(cGate *gate) const =0
Returns false if the packet sink is full at the given gate and no more packets can be pushed into it ...
PacketProtocolTag
removed DscpReq Ipv4ControlInfo Ipv6ControlInfo up L3AddressInd DispatchProtocolReq L4PortInd Ipv4ControlInfo Ipv6ControlInfo down PacketProtocolTag
Definition: IUdp-gates.txt:25
inet::queueing::PacketProcessorBase::handlePacketProcessed
virtual void handlePacketProcessed(Packet *packet)
Definition: PacketProcessorBase.cc:34
inet::queueing::PacketProcessorBase::processedTotalLength
b processedTotalLength
Definition: PacketProcessorBase.h:31
inet::ClockEvent
cMessage ClockEvent
Definition: contract/ClockEvent.h:18
inet::Packet::findTagForUpdate
const Ptr< T > findTagForUpdate()
Returns the packet tag for the provided type or returns nullptr if no such packet tag is found.
Definition: Packet.h:1289
inet::units::values::s
value< double, units::s > s
Definition: Units.h:1235
inet::Packet::dup
virtual Packet * dup() const override
Definition: Packet.h:171
inet::PreemptableStreamer::datarate
bps datarate
Definition: PreemptableStreamer.h:22
EV_FIELD
#define EV_FIELD(...)
Definition: INETDefs.h:112
inet::PreemptableStreamer::streamDatarate
bps streamDatarate
Definition: PreemptableStreamer.h:35
inet::PreemptableStreamer::outputGate
cGate * outputGate
Definition: PreemptableStreamer.h:30
NaN
#define NaN
Definition: INETMath.h:91
inet::INITSTAGE_LOCAL
INET_API InitStage INITSTAGE_LOCAL
Initialization of local state that don't use or affect other modules includes:
inet::queueing::PacketProcessorBase::numProcessedPackets
int numProcessedPackets
Definition: PacketProcessorBase.h:30
inet::queueing::IActivePacketSource::handleCanPushPacketChanged
virtual void handleCanPushPacketChanged(cGate *gate)=0
Notifies about a change in the possibility of pushing some packet into the passive packet sink at the...
inet::Packet::getTotalLength
b getTotalLength() const
Returns the total packet length ignoring front and back offsets.
Definition: Packet.h:193
inet::queueing::PacketProcessorBase::pushOrSendPacketEnd
virtual void pushOrSendPacketEnd(Packet *packet, cGate *gate, IPassivePacketSink *consumer, int transmissionId)
Definition: PacketProcessorBase.cc:154
inet::ClockUserModuleMixin< PacketProcessorBase >::cancelAndDeleteClockEvent
virtual void cancelAndDeleteClockEvent(ClockEvent *msg)
Definition: ClockUserModuleMixin.h:67
inet::units::values::b
value< int64_t, units::b > b
Definition: Units.h:1241
inet::PreemptableStreamer::collector
IActivePacketSink * collector
Definition: PreemptableStreamer.h:32
inet::queueing::IActivePacketSource::handlePushPacketProcessed
virtual void handlePushPacketProcessed(Packet *packet, cGate *gate, bool successful)=0
Notifies about the completion of the packet processing for a packet that was pushed earlier independe...
inet::queueing::PacketProcessorBase::animatePullPacketProgress
virtual void animatePullPacketProgress(Packet *packet, cGate *gate, bps datarate, b position, b extraProcessableLength, long transmissionId) const
Definition: PacketProcessorBase.cc:380
inet::queueing::PacketProcessorBase::updateDisplayString
virtual void updateDisplayString() const
Definition: PacketProcessorBase.cc:399
inet::queueing::PacketProcessorBase::checkPacketOperationSupport
virtual void checkPacketOperationSupport(cGate *gate) const
Definition: PacketProcessorBase.cc:40
inet::PreemptableStreamer::producer
IActivePacketSource * producer
Definition: PreemptableStreamer.h:27
Enter_Method
#define Enter_Method(...)
Definition: SelfDoc.h:71
inet::units::value::get
const value_type & get() const
Definition: Units.h:108
inet::queueing::PacketProcessorBase::pushOrSendPacketStart
virtual void pushOrSendPacketStart(Packet *packet, cGate *gate, IPassivePacketSink *consumer, bps datarate, int transmissionId)
Definition: PacketProcessorBase.cc:136
inet::PreemptableStreamer::roundingLength
b roundingLength
Definition: PreemptableStreamer.h:24
inet::queueing::PacketProcessorBase::animatePullPacketEnd
virtual void animatePullPacketEnd(Packet *packet, cGate *gate, long transmissionId) const
Definition: PacketProcessorBase.cc:370
inet::PreemptableStreamer::consumer
IPassivePacketSink * consumer
Definition: PreemptableStreamer.h:31
inet::queueing::IActivePacketSink::handlePullPacketProcessed
virtual void handlePullPacketProcessed(Packet *packet, cGate *gate, bool successful)=0
Notifies about the completion of the packet processing for a packet that was pulled earlier independe...
inet::queueing::IPassivePacketSource::canPullSomePacket
virtual bool canPullSomePacket(cGate *gate) const =0
Returns false if the packet source is empty at the given gate and no more packets can be pulled from ...
inet::PreemptableStreamer::provider
IPassivePacketSource * provider
Definition: PreemptableStreamer.h:28
EV_ENDL
#define EV_ENDL
Definition: INETDefs.h:114
inet::PreemptableStreamer::minPacketLength
b minPacketLength
Definition: PreemptableStreamer.h:23
inet::PreemptableStreamer::isStreaming
virtual bool isStreaming() const
Definition: PreemptableStreamer.h:45
inet::queueing::IPassivePacketSource::pullPacket
virtual Packet * pullPacket(cGate *gate)=0
Pulls the packet from the packet source at the given gate.
inet::PreemptableStreamer::endStreaming
virtual void endStreaming()
Definition: PreemptableStreamer.cc:56
inet::Packet::copyTags
void copyTags(const Packet &source)
Copies the set of packet tags from the other packet.
Definition: Packet.h:1275
inet::queueing::IActivePacketSink::handleCanPullPacketChanged
virtual void handleCanPullPacketChanged(cGate *gate)=0
Notifies about a change in the possibility of pulling some packet from the passive packet source at t...