Crypto++  5.6.3
Free C++ class library of cryptographic schemes
network.cpp
1 // network.cpp - written and placed in the public domain by Wei Dai
2 
3 #include "pch.h"
4 
5 #include "network.h"
6 #include "wait.h"
7 
8 #define CRYPTOPP_TRACE_NETWORK 0
9 
10 NAMESPACE_BEGIN(CryptoPP)
11 
12 #ifdef HIGHRES_TIMER_AVAILABLE
13 
14 lword LimitedBandwidth::ComputeCurrentTransceiveLimit()
15 {
16  if (!m_maxBytesPerSecond)
17  return ULONG_MAX;
18 
19  const double curTime = GetCurTimeAndCleanUp();
20  CRYPTOPP_UNUSED(curTime);
21 
22  lword total = 0;
23  for (OpQueue::size_type i=0; i!=m_ops.size(); ++i)
24  total += m_ops[i].second;
25  return SaturatingSubtract(m_maxBytesPerSecond, total);
26 }
27 
28 double LimitedBandwidth::TimeToNextTransceive()
29 {
30  if (!m_maxBytesPerSecond)
31  return 0;
32 
33  if (!m_nextTransceiveTime)
34  ComputeNextTransceiveTime();
35 
36  return SaturatingSubtract(m_nextTransceiveTime, m_timer.ElapsedTimeAsDouble());
37 }
38 
39 void LimitedBandwidth::NoteTransceive(lword size)
40 {
41  if (m_maxBytesPerSecond)
42  {
43  double curTime = GetCurTimeAndCleanUp();
44  m_ops.push_back(std::make_pair(curTime, size));
45  m_nextTransceiveTime = 0;
46  }
47 }
48 
49 void LimitedBandwidth::ComputeNextTransceiveTime()
50 {
51  double curTime = GetCurTimeAndCleanUp();
52  lword total = 0;
53  for (unsigned int i=0; i!=m_ops.size(); ++i)
54  total += m_ops[i].second;
55  m_nextTransceiveTime =
56  (total < m_maxBytesPerSecond) ? curTime : m_ops.front().first + 1000;
57 }
58 
59 double LimitedBandwidth::GetCurTimeAndCleanUp()
60 {
61  if (!m_maxBytesPerSecond)
62  return 0;
63 
64  double curTime = m_timer.ElapsedTimeAsDouble();
65  while (m_ops.size() && (m_ops.front().first + 1000 < curTime))
66  m_ops.pop_front();
67  return curTime;
68 }
69 
70 void LimitedBandwidth::GetWaitObjects(WaitObjectContainer &container, const CallStack &callStack)
71 {
72  double nextTransceiveTime = TimeToNextTransceive();
73  if (nextTransceiveTime)
74  container.ScheduleEvent(nextTransceiveTime, CallStack("LimitedBandwidth::GetWaitObjects()", &callStack));
75 }
76 
77 // *************************************************************
78 
80  lword& byteCount, bool blockingOutput,
81  unsigned long maxTime, bool checkDelimiter, byte delimiter)
82 {
83  m_blockedBySpeedLimit = false;
84 
85  if (!GetMaxBytesPerSecond())
86  {
87  size_t ret = DoPump(byteCount, blockingOutput, maxTime, checkDelimiter, delimiter);
88  m_doPumpBlocked = (ret != 0);
89  return ret;
90  }
91 
92  bool forever = (maxTime == INFINITE_TIME);
93  unsigned long timeToGo = maxTime;
94  Timer timer(Timer::MILLISECONDS, forever);
95  lword maxSize = byteCount;
96  byteCount = 0;
97 
98  timer.StartTimer();
99 
100  while (true)
101  {
102  lword curMaxSize = UnsignedMin(ComputeCurrentTransceiveLimit(), maxSize - byteCount);
103 
104  if (curMaxSize || m_doPumpBlocked)
105  {
106  if (!forever) timeToGo = SaturatingSubtract(maxTime, timer.ElapsedTime());
107  size_t ret = DoPump(curMaxSize, blockingOutput, timeToGo, checkDelimiter, delimiter);
108  m_doPumpBlocked = (ret != 0);
109  if (curMaxSize)
110  {
111  NoteTransceive(curMaxSize);
112  byteCount += curMaxSize;
113  }
114  if (ret)
115  return ret;
116  }
117 
118  if (maxSize != ULONG_MAX && byteCount >= maxSize)
119  break;
120 
121  if (!forever)
122  {
123  timeToGo = SaturatingSubtract(maxTime, timer.ElapsedTime());
124  if (!timeToGo)
125  break;
126  }
127 
128  double waitTime = TimeToNextTransceive();
129  if (!forever && waitTime > timeToGo)
130  {
131  m_blockedBySpeedLimit = true;
132  break;
133  }
134 
135  WaitObjectContainer container;
136  LimitedBandwidth::GetWaitObjects(container, CallStack("NonblockingSource::GeneralPump2() - speed limit", 0));
137  container.Wait((unsigned long)waitTime);
138  }
139 
140  return 0;
141 }
142 
143 size_t NonblockingSource::PumpMessages2(unsigned int &messageCount, bool blocking)
144 {
145  if (messageCount == 0)
146  return 0;
147 
148  messageCount = 0;
149 
150  lword byteCount;
151  do {
152  byteCount = LWORD_MAX;
153  RETURN_IF_NONZERO(Pump2(byteCount, blocking));
154  } while(byteCount == LWORD_MAX);
155 
156  if (!m_messageEndSent && SourceExhausted())
157  {
158  RETURN_IF_NONZERO(AttachedTransformation()->Put2(NULL, 0, GetAutoSignalPropagation(), true));
159  m_messageEndSent = true;
160  messageCount = 1;
161  }
162  return 0;
163 }
164 
165 lword NonblockingSink::TimedFlush(unsigned long maxTime, size_t targetSize)
166 {
167  m_blockedBySpeedLimit = false;
168 
169  size_t curBufSize = GetCurrentBufferSize();
170  if (curBufSize <= targetSize && (targetSize || !EofPending()))
171  return 0;
172 
173  if (!GetMaxBytesPerSecond())
174  return DoFlush(maxTime, targetSize);
175 
176  bool forever = (maxTime == INFINITE_TIME);
177  unsigned long timeToGo = maxTime;
178  Timer timer(Timer::MILLISECONDS, forever);
179  lword totalFlushed = 0;
180 
181  timer.StartTimer();
182 
183  while (true)
184  {
185  size_t flushSize = UnsignedMin(curBufSize - targetSize, ComputeCurrentTransceiveLimit());
186  if (flushSize || EofPending())
187  {
188  if (!forever) timeToGo = SaturatingSubtract(maxTime, timer.ElapsedTime());
189  size_t ret = (size_t)DoFlush(timeToGo, curBufSize - flushSize);
190  if (ret)
191  {
192  NoteTransceive(ret);
193  curBufSize -= ret;
194  totalFlushed += ret;
195  }
196  }
197 
198  if (curBufSize <= targetSize && (targetSize || !EofPending()))
199  break;
200 
201  if (!forever)
202  {
203  timeToGo = SaturatingSubtract(maxTime, timer.ElapsedTime());
204  if (!timeToGo)
205  break;
206  }
207 
208  double waitTime = TimeToNextTransceive();
209  if (!forever && waitTime > timeToGo)
210  {
211  m_blockedBySpeedLimit = true;
212  break;
213  }
214 
215  WaitObjectContainer container;
216  LimitedBandwidth::GetWaitObjects(container, CallStack("NonblockingSink::TimedFlush() - speed limit", 0));
217  container.Wait((unsigned long)waitTime);
218  }
219 
220  return totalFlushed;
221 }
222 
223 bool NonblockingSink::IsolatedFlush(bool hardFlush, bool blocking)
224 {
225  TimedFlush(blocking ? INFINITE_TIME : 0);
226  return hardFlush && (!!GetCurrentBufferSize() || EofPending());
227 }
228 
229 // *************************************************************
230 
231 NetworkSource::NetworkSource(BufferedTransformation *attachment)
232  : NonblockingSource(attachment), m_buf(1024*16)
233  , m_putSize(0), m_dataBegin(0), m_dataEnd(0)
234  , m_waitingForResult(false), m_outputBlocked(false)
235 {
236 }
237 
239 {
241  + GetReceiver().GetMaxWaitObjectCount()
243 }
244 
246 {
247  if (BlockedBySpeedLimit())
248  LimitedBandwidth::GetWaitObjects(container, CallStack("NetworkSource::GetWaitObjects() - speed limit", &callStack));
249  else if (!m_outputBlocked)
250  {
251  if (m_dataBegin == m_dataEnd)
252  AccessReceiver().GetWaitObjects(container, CallStack("NetworkSource::GetWaitObjects() - no data", &callStack));
253  else
254  container.SetNoWait(CallStack("NetworkSource::GetWaitObjects() - have data", &callStack));
255  }
256 
257  AttachedTransformation()->GetWaitObjects(container, CallStack("NetworkSource::GetWaitObjects() - attachment", &callStack));
258 }
259 
260 size_t NetworkSource::DoPump(lword &byteCount, bool blockingOutput, unsigned long maxTime, bool checkDelimiter, byte delimiter)
261 {
262  NetworkReceiver &receiver = AccessReceiver();
263 
264  lword maxSize = byteCount;
265  byteCount = 0;
266  bool forever = maxTime == INFINITE_TIME;
267  Timer timer(Timer::MILLISECONDS, forever);
269 
270  if (m_outputBlocked)
271  goto DoOutput;
272 
273  while (true)
274  {
275  if (m_dataBegin == m_dataEnd)
276  {
277  if (receiver.EofReceived())
278  break;
279 
280  if (m_waitingForResult)
281  {
282  if (receiver.MustWaitForResult() &&
283  !receiver.Wait(SaturatingSubtract(maxTime, timer.ElapsedTime()),
284  CallStack("NetworkSource::DoPump() - wait receive result", 0)))
285  break;
286 
287  unsigned int recvResult = receiver.GetReceiveResult();
288 #if CRYPTOPP_TRACE_NETWORK
289  OutputDebugString((IntToString((unsigned int)this) + ": Received " + IntToString(recvResult) + " bytes\n").c_str());
290 #endif
291  m_dataEnd += recvResult;
292  m_waitingForResult = false;
293 
294  if (!receiver.MustWaitToReceive() && !receiver.EofReceived() && m_dataEnd != m_buf.size())
295  goto ReceiveNoWait;
296  }
297  else
298  {
299  m_dataEnd = m_dataBegin = 0;
300 
301  if (receiver.MustWaitToReceive())
302  {
303  if (!receiver.Wait(SaturatingSubtract(maxTime, timer.ElapsedTime()),
304  CallStack("NetworkSource::DoPump() - wait receive", 0)))
305  break;
306 
307  receiver.Receive(m_buf+m_dataEnd, m_buf.size()-m_dataEnd);
308  m_waitingForResult = true;
309  }
310  else
311  {
312 ReceiveNoWait:
313  m_waitingForResult = true;
314  // call Receive repeatedly as long as data is immediately available,
315  // because some receivers tend to return data in small pieces
316 #if CRYPTOPP_TRACE_NETWORK
317  OutputDebugString((IntToString((unsigned int)this) + ": Receiving " + IntToString(m_buf.size()-m_dataEnd) + " bytes\n").c_str());
318 #endif
319  while (receiver.Receive(m_buf+m_dataEnd, m_buf.size()-m_dataEnd))
320  {
321  unsigned int recvResult = receiver.GetReceiveResult();
322 #if CRYPTOPP_TRACE_NETWORK
323  OutputDebugString((IntToString((unsigned int)this) + ": Received " + IntToString(recvResult) + " bytes\n").c_str());
324 #endif
325  m_dataEnd += recvResult;
326  if (receiver.EofReceived() || m_dataEnd > m_buf.size() /2)
327  {
328  m_waitingForResult = false;
329  break;
330  }
331  }
332  }
333  }
334  }
335  else
336  {
337  m_putSize = UnsignedMin(m_dataEnd - m_dataBegin, maxSize - byteCount);
338 
339  if (checkDelimiter)
340  m_putSize = std::find(m_buf+m_dataBegin, m_buf+m_dataBegin+m_putSize, delimiter) - (m_buf+m_dataBegin);
341 
342 DoOutput:
343  size_t result = t->PutModifiable2(m_buf+m_dataBegin, m_putSize, 0, forever || blockingOutput);
344  if (result)
345  {
346  if (t->Wait(SaturatingSubtract(maxTime, timer.ElapsedTime()),
347  CallStack("NetworkSource::DoPump() - wait attachment", 0)))
348  goto DoOutput;
349  else
350  {
351  m_outputBlocked = true;
352  return result;
353  }
354  }
355  m_outputBlocked = false;
356 
357  byteCount += m_putSize;
358  m_dataBegin += m_putSize;
359  if (checkDelimiter && m_dataBegin < m_dataEnd && m_buf[m_dataBegin] == delimiter)
360  break;
361  if (maxSize != ULONG_MAX && byteCount == maxSize)
362  break;
363  // once time limit is reached, return even if there is more data waiting
364  // but make 0 a special case so caller can request a large amount of data to be
365  // pumped as long as it is immediately available
366  if (maxTime > 0 && timer.ElapsedTime() > maxTime)
367  break;
368  }
369  }
370 
371  return 0;
372 }
373 
374 // *************************************************************
375 
376 NetworkSink::NetworkSink(unsigned int maxBufferSize, unsigned int autoFlushBound)
377  : m_maxBufferSize(maxBufferSize), m_autoFlushBound(autoFlushBound)
378  , m_needSendResult(false), m_wasBlocked(false), m_eofState(EOF_NONE)
379  , m_buffer(STDMIN(16U*1024U+256, maxBufferSize)), m_skipBytes(0)
380  , m_speedTimer(Timer::MILLISECONDS), m_byteCountSinceLastTimerReset(0)
381  , m_currentSpeed(0), m_maxObservedSpeed(0)
382 {
383 }
384 
386 {
387  if (m_speedTimer.ElapsedTime() > 1000)
388  {
389  m_currentSpeed = m_byteCountSinceLastTimerReset * 1000 / m_speedTimer.ElapsedTime();
390  m_maxObservedSpeed = STDMAX(m_currentSpeed, m_maxObservedSpeed * 0.98f);
391  m_byteCountSinceLastTimerReset = 0;
392  m_speedTimer.StartTimer();
393 // OutputDebugString(("max speed: " + IntToString((int)m_maxObservedSpeed) + " current speed: " + IntToString((int)m_currentSpeed) + "\n").c_str());
394  }
395  return m_currentSpeed;
396 }
397 
399 {
400  lword m = GetMaxBytesPerSecond();
401  return m ? STDMIN(m_maxObservedSpeed, float(CRYPTOPP_VC6_INT64 m)) : m_maxObservedSpeed;
402 }
403 
405 {
407 }
408 
410 {
411  if (BlockedBySpeedLimit())
412  LimitedBandwidth::GetWaitObjects(container, CallStack("NetworkSink::GetWaitObjects() - speed limit", &callStack));
413  else if (m_wasBlocked)
414  AccessSender().GetWaitObjects(container, CallStack("NetworkSink::GetWaitObjects() - was blocked", &callStack));
415  else if (!m_buffer.IsEmpty())
416  AccessSender().GetWaitObjects(container, CallStack("NetworkSink::GetWaitObjects() - buffer not empty", &callStack));
417  else if (EofPending())
418  AccessSender().GetWaitObjects(container, CallStack("NetworkSink::GetWaitObjects() - EOF pending", &callStack));
419 }
420 
421 size_t NetworkSink::Put2(const byte *inString, size_t length, int messageEnd, bool blocking)
422 {
423  if (m_eofState == EOF_DONE)
424  {
425  if (length || messageEnd)
426  throw Exception(Exception::OTHER_ERROR, "NetworkSink::Put2() being called after EOF had been sent");
427 
428  return 0;
429  }
430 
431  if (m_eofState > EOF_NONE)
432  goto EofSite;
433 
434  {
435  if (m_skipBytes)
436  {
437  assert(length >= m_skipBytes);
438  inString += m_skipBytes;
439  length -= m_skipBytes;
440  }
441 
442  m_buffer.Put(inString, length);
443 
444  if (!blocking || m_buffer.CurrentSize() > m_autoFlushBound)
445  TimedFlush(0, 0);
446 
447  size_t targetSize = messageEnd ? 0 : m_maxBufferSize;
448  if (blocking)
449  TimedFlush(INFINITE_TIME, targetSize);
450 
451  if (m_buffer.CurrentSize() > targetSize)
452  {
453  assert(!blocking);
454  m_wasBlocked = true;
455  m_skipBytes += length;
456  size_t blockedBytes = UnsignedMin(length, m_buffer.CurrentSize() - targetSize);
457  return STDMAX<size_t>(blockedBytes, 1);
458  }
459 
460  m_wasBlocked = false;
461  m_skipBytes = 0;
462  }
463 
464  if (messageEnd)
465  {
466  m_eofState = EOF_PENDING_SEND;
467 
468  EofSite:
469  TimedFlush(blocking ? INFINITE_TIME : 0, 0);
470  if (m_eofState != EOF_DONE)
471  return 1;
472  }
473 
474  return 0;
475 }
476 
477 lword NetworkSink::DoFlush(unsigned long maxTime, size_t targetSize)
478 {
479  NetworkSender &sender = AccessSender();
480 
481  bool forever = maxTime == INFINITE_TIME;
482  Timer timer(Timer::MILLISECONDS, forever);
483  unsigned int totalFlushSize = 0;
484 
485  while (true)
486  {
487  if (m_buffer.CurrentSize() <= targetSize)
488  break;
489 
490  if (m_needSendResult)
491  {
492  if (sender.MustWaitForResult() &&
493  !sender.Wait(SaturatingSubtract(maxTime, timer.ElapsedTime()),
494  CallStack("NetworkSink::DoFlush() - wait send result", 0)))
495  break;
496 
497  unsigned int sendResult = sender.GetSendResult();
498 #if CRYPTOPP_TRACE_NETWORK
499  OutputDebugString((IntToString((unsigned int)this) + ": Sent " + IntToString(sendResult) + " bytes\n").c_str());
500 #endif
501  m_buffer.Skip(sendResult);
502  totalFlushSize += sendResult;
503  m_needSendResult = false;
504 
505  if (!m_buffer.AnyRetrievable())
506  break;
507  }
508 
509  unsigned long timeOut = maxTime ? SaturatingSubtract(maxTime, timer.ElapsedTime()) : 0;
510  if (sender.MustWaitToSend() && !sender.Wait(timeOut, CallStack("NetworkSink::DoFlush() - wait send", 0)))
511  break;
512 
513  size_t contiguousSize = 0;
514  const byte *block = m_buffer.Spy(contiguousSize);
515 
516 #if CRYPTOPP_TRACE_NETWORK
517  OutputDebugString((IntToString((unsigned int)this) + ": Sending " + IntToString(contiguousSize) + " bytes\n").c_str());
518 #endif
519  sender.Send(block, contiguousSize);
520  m_needSendResult = true;
521 
522  if (maxTime > 0 && timeOut == 0)
523  break; // once time limit is reached, return even if there is more data waiting
524  }
525 
526  m_byteCountSinceLastTimerReset += totalFlushSize;
528 
529  if (m_buffer.IsEmpty() && !m_needSendResult)
530  {
531  if (m_eofState == EOF_PENDING_SEND)
532  {
533  sender.SendEof();
534  m_eofState = sender.MustWaitForEof() ? EOF_PENDING_DELIVERY : EOF_DONE;
535  }
536 
537  while (m_eofState == EOF_PENDING_DELIVERY)
538  {
539  unsigned long timeOut = maxTime ? SaturatingSubtract(maxTime, timer.ElapsedTime()) : 0;
540  if (!sender.Wait(timeOut, CallStack("NetworkSink::DoFlush() - wait EOF", 0)))
541  break;
542 
543  if (sender.EofSent())
544  m_eofState = EOF_DONE;
545  }
546  }
547 
548  return totalFlushSize;
549 }
550 
551 #endif // #ifdef HIGHRES_TIMER_AVAILABLE
552 
553 NAMESPACE_END
Base class for all exceptions thrown by the library.
Definition: cryptlib.h:139
container of wait objects
Definition: wait.h:151
float GetMaxObservedSpeed() const
get the maximum observed speed of this sink in bytes per second
Definition: network.cpp:398
high resolution timer
Definition: hrtimer.h:53
virtual bool SourceExhausted() const =0
Determines if the Source is exhausted.
lword TimedFlush(unsigned long maxTime, size_t targetSize=0)
flush to device for no more than maxTime milliseconds
Definition: network.cpp:165
size_type size() const
Provides the count of elements in the SecBlock.
Definition: secblock.h:516
Some other error occurred not belonging to other categories.
Definition: cryptlib.h:158
void GetWaitObjects(WaitObjectContainer &container, CallStack const &callStack)
Retrieves waitable objects.
Definition: network.cpp:245
Interface for buffered transformations.
Definition: cryptlib.h:1342
size_t Put2(const byte *inString, size_t length, int messageEnd, bool blocking)
Input multiple bytes for processing.
Definition: network.cpp:421
size_t PumpMessages2(unsigned int &messageCount, bool blocking=true)
Pump messages to attached transformation.
Definition: network.cpp:143
unsigned int GetMaxWaitObjectCount() const
Retrieves the maximum number of waitable objects.
Definition: network.cpp:404
virtual void GetWaitObjects(WaitObjectContainer &container, CallStack const &callStack)=0
Retrieves waitable objects.
size_t Put(byte inByte, bool blocking=true)
Input a byte for processing.
Definition: cryptlib.h:1363
size_t GeneralPump2(lword &byteCount, bool blockingOutput=true, unsigned long maxTime=INFINITE_TIME, bool checkDelimiter=false, byte delimiter='\n')
pump up to maxSize bytes using at most maxTime milliseconds
Definition: network.cpp:79
BufferedTransformation * AttachedTransformation()
Retrieve attached transformation.
Definition: filters.cpp:36
float ComputeCurrentSpeed()
compute the current speed of this sink in bytes per second
Definition: network.cpp:385
T1 SaturatingSubtract(const T1 &a, const T2 &b)
Performs a saturating subtract clamped at 0.
Definition: misc.h:865
const T1 UnsignedMin(const T1 &a, const T2 &b)
Safe comparison of values that could be neagtive and incorrectly promoted.
Definition: misc.h:433
virtual lword Skip(lword skipMax=LWORD_MAX)
Discard skipMax bytes from the output buffer.
Definition: cryptlib.cpp:556
const T & STDMIN(const T &a, const T &b)
Replacement function for std::min.
Definition: misc.h:397
size_t Put2(const byte *inString, size_t length, int messageEnd, bool blocking)
Input a byte array for processing.
Definition: simple.h:122
const unsigned long INFINITE_TIME
Represents infinite time.
Definition: cryptlib.h:110
void GetWaitObjects(WaitObjectContainer &container, CallStack const &callStack)
Retrieves waitable objects.
Definition: cryptlib.cpp:431
unsigned int GetMaxWaitObjectCount() const
Definition: network.h:38
bool Wait(unsigned long milliseconds, CallStack const &callStack)
Wait on this object.
Definition: wait.cpp:426
size_t Pump2(lword &byteCount, bool blocking=true)
Pump data to attached transformation.
Definition: network.h:78
bool AnyRetrievable() const
Determines whether bytes are ready for retrieval.
Definition: queue.h:37
std::string IntToString(T value, unsigned int base=10)
Converts a value to a string.
Definition: misc.h:460
void GetWaitObjects(WaitObjectContainer &container, CallStack const &callStack)
Retrieves waitable objects.
Definition: network.cpp:409
unsigned int GetMaxWaitObjectCount() const
Retrieves the maximum number of waitable objects.
Definition: cryptlib.cpp:425
const T & STDMAX(const T &a, const T &b)
Replacement function for std::max.
Definition: misc.h:407
unsigned int GetMaxWaitObjectCount() const
Retrieves the maximum number of waitable objects.
Definition: network.cpp:238
virtual bool Receive(byte *buf, size_t bufLen)=0
receive data from network source, returns whether result is immediately available ...
Crypto++ library namespace.
Network Receiver.
Definition: network.h:94
bool IsolatedFlush(bool hardFlush, bool blocking)
Flushes data buffered by this object, without signal propagation.
Definition: network.cpp:223
virtual unsigned int GetMaxWaitObjectCount() const =0
Maximum number of wait objects that this object can return.
virtual size_t PutModifiable2(byte *inString, size_t length, int messageEnd, bool blocking)
Input multiple bytes that may be modified by callee.
Definition: cryptlib.h:1451
a Source class that can pump from a device for a specified amount of time.
Definition: network.h:55
Network Sender.
Definition: network.h:152