00001
00002
00003 #include "pch.h"
00004 #include "network.h"
00005 #include "wait.h"
00006
00007 #define CRYPTOPP_TRACE_NETWORK 0
00008
00009 NAMESPACE_BEGIN(CryptoPP)
00010
00011 #ifdef HIGHRES_TIMER_AVAILABLE
00012
00013 lword LimitedBandwidth::ComputeCurrentTransceiveLimit()
00014 {
00015 if (!m_maxBytesPerSecond)
00016 return ULONG_MAX;
00017
00018 double curTime = GetCurTimeAndCleanUp();
00019 lword total = 0;
00020 for (OpQueue::size_type i=0; i!=m_ops.size(); ++i)
00021 total += m_ops[i].second;
00022 return SaturatingSubtract(m_maxBytesPerSecond, total);
00023 }
00024
00025 double LimitedBandwidth::TimeToNextTransceive()
00026 {
00027 if (!m_maxBytesPerSecond)
00028 return 0;
00029
00030 if (!m_nextTransceiveTime)
00031 ComputeNextTransceiveTime();
00032
00033 return SaturatingSubtract(m_nextTransceiveTime, m_timer.ElapsedTimeAsDouble());
00034 }
00035
00036 void LimitedBandwidth::NoteTransceive(lword size)
00037 {
00038 if (m_maxBytesPerSecond)
00039 {
00040 double curTime = GetCurTimeAndCleanUp();
00041 m_ops.push_back(std::make_pair(curTime, size));
00042 m_nextTransceiveTime = 0;
00043 }
00044 }
00045
00046 void LimitedBandwidth::ComputeNextTransceiveTime()
00047 {
00048 double curTime = GetCurTimeAndCleanUp();
00049 lword total = 0;
00050 for (unsigned int i=0; i!=m_ops.size(); ++i)
00051 total += m_ops[i].second;
00052 m_nextTransceiveTime =
00053 (total < m_maxBytesPerSecond) ? curTime : m_ops.front().first + 1000;
00054 }
00055
00056 double LimitedBandwidth::GetCurTimeAndCleanUp()
00057 {
00058 if (!m_maxBytesPerSecond)
00059 return 0;
00060
00061 double curTime = m_timer.ElapsedTimeAsDouble();
00062 while (m_ops.size() && (m_ops.front().first + 1000 < curTime))
00063 m_ops.pop_front();
00064 return curTime;
00065 }
00066
00067 void LimitedBandwidth::GetWaitObjects(WaitObjectContainer &container, const CallStack &callStack)
00068 {
00069 double nextTransceiveTime = TimeToNextTransceive();
00070 if (nextTransceiveTime)
00071 container.ScheduleEvent(nextTransceiveTime, CallStack("LimitedBandwidth::GetWaitObjects()", &callStack));
00072 }
00073
00074
00075
00076 size_t NonblockingSource::GeneralPump2(
00077 lword& byteCount, bool blockingOutput,
00078 unsigned long maxTime, bool checkDelimiter, byte delimiter)
00079 {
00080 m_blockedBySpeedLimit = false;
00081
00082 if (!GetMaxBytesPerSecond())
00083 {
00084 size_t ret = DoPump(byteCount, blockingOutput, maxTime, checkDelimiter, delimiter);
00085 m_doPumpBlocked = (ret != 0);
00086 return ret;
00087 }
00088
00089 bool forever = (maxTime == INFINITE_TIME);
00090 unsigned long timeToGo = maxTime;
00091 Timer timer(Timer::MILLISECONDS, forever);
00092 lword maxSize = byteCount;
00093 byteCount = 0;
00094
00095 timer.StartTimer();
00096
00097 while (true)
00098 {
00099 lword curMaxSize = UnsignedMin(ComputeCurrentTransceiveLimit(), maxSize - byteCount);
00100
00101 if (curMaxSize || m_doPumpBlocked)
00102 {
00103 if (!forever) timeToGo = SaturatingSubtract(maxTime, timer.ElapsedTime());
00104 size_t ret = DoPump(curMaxSize, blockingOutput, timeToGo, checkDelimiter, delimiter);
00105 m_doPumpBlocked = (ret != 0);
00106 if (curMaxSize)
00107 {
00108 NoteTransceive(curMaxSize);
00109 byteCount += curMaxSize;
00110 }
00111 if (ret)
00112 return ret;
00113 }
00114
00115 if (maxSize != ULONG_MAX && byteCount >= maxSize)
00116 break;
00117
00118 if (!forever)
00119 {
00120 timeToGo = SaturatingSubtract(maxTime, timer.ElapsedTime());
00121 if (!timeToGo)
00122 break;
00123 }
00124
00125 double waitTime = TimeToNextTransceive();
00126 if (!forever && waitTime > timeToGo)
00127 {
00128 m_blockedBySpeedLimit = true;
00129 break;
00130 }
00131
00132 WaitObjectContainer container;
00133 LimitedBandwidth::GetWaitObjects(container, CallStack("NonblockingSource::GeneralPump2() - speed limit", 0));
00134 container.Wait((unsigned long)waitTime);
00135 }
00136
00137 return 0;
00138 }
00139
00140 size_t NonblockingSource::PumpMessages2(unsigned int &messageCount, bool blocking)
00141 {
00142 if (messageCount == 0)
00143 return 0;
00144
00145 messageCount = 0;
00146
00147 lword byteCount;
00148 do {
00149 byteCount = LWORD_MAX;
00150 RETURN_IF_NONZERO(Pump2(byteCount, blocking));
00151 } while(byteCount == LWORD_MAX);
00152
00153 if (!m_messageEndSent && SourceExhausted())
00154 {
00155 RETURN_IF_NONZERO(AttachedTransformation()->Put2(NULL, 0, GetAutoSignalPropagation(), true));
00156 m_messageEndSent = true;
00157 messageCount = 1;
00158 }
00159 return 0;
00160 }
00161
00162 lword NonblockingSink::TimedFlush(unsigned long maxTime, size_t targetSize)
00163 {
00164 m_blockedBySpeedLimit = false;
00165
00166 size_t curBufSize = GetCurrentBufferSize();
00167 if (curBufSize <= targetSize && (targetSize || !EofPending()))
00168 return 0;
00169
00170 if (!GetMaxBytesPerSecond())
00171 return DoFlush(maxTime, targetSize);
00172
00173 bool forever = (maxTime == INFINITE_TIME);
00174 unsigned long timeToGo = maxTime;
00175 Timer timer(Timer::MILLISECONDS, forever);
00176 lword totalFlushed = 0;
00177
00178 timer.StartTimer();
00179
00180 while (true)
00181 {
00182 size_t flushSize = UnsignedMin(curBufSize - targetSize, ComputeCurrentTransceiveLimit());
00183 if (flushSize || EofPending())
00184 {
00185 if (!forever) timeToGo = SaturatingSubtract(maxTime, timer.ElapsedTime());
00186 size_t ret = (size_t)DoFlush(timeToGo, curBufSize - flushSize);
00187 if (ret)
00188 {
00189 NoteTransceive(ret);
00190 curBufSize -= ret;
00191 totalFlushed += ret;
00192 }
00193 }
00194
00195 if (curBufSize <= targetSize && (targetSize || !EofPending()))
00196 break;
00197
00198 if (!forever)
00199 {
00200 timeToGo = SaturatingSubtract(maxTime, timer.ElapsedTime());
00201 if (!timeToGo)
00202 break;
00203 }
00204
00205 double waitTime = TimeToNextTransceive();
00206 if (!forever && waitTime > timeToGo)
00207 {
00208 m_blockedBySpeedLimit = true;
00209 break;
00210 }
00211
00212 WaitObjectContainer container;
00213 LimitedBandwidth::GetWaitObjects(container, CallStack("NonblockingSink::TimedFlush() - speed limit", 0));
00214 container.Wait((unsigned long)waitTime);
00215 }
00216
00217 return totalFlushed;
00218 }
00219
00220 bool NonblockingSink::IsolatedFlush(bool hardFlush, bool blocking)
00221 {
00222 TimedFlush(blocking ? INFINITE_TIME : 0);
00223 return hardFlush && (!!GetCurrentBufferSize() || EofPending());
00224 }
00225
00226
00227
00228 NetworkSource::NetworkSource(BufferedTransformation *attachment)
00229 : NonblockingSource(attachment), m_buf(1024*16)
00230 , m_waitingForResult(false), m_outputBlocked(false)
00231 , m_dataBegin(0), m_dataEnd(0)
00232 {
00233 }
00234
00235 unsigned int NetworkSource::GetMaxWaitObjectCount() const
00236 {
00237 return LimitedBandwidth::GetMaxWaitObjectCount()
00238 + GetReceiver().GetMaxWaitObjectCount()
00239 + AttachedTransformation()->GetMaxWaitObjectCount();
00240 }
00241
00242 void NetworkSource::GetWaitObjects(WaitObjectContainer &container, CallStack const& callStack)
00243 {
00244 if (BlockedBySpeedLimit())
00245 LimitedBandwidth::GetWaitObjects(container, CallStack("NetworkSource::GetWaitObjects() - speed limit", &callStack));
00246 else if (!m_outputBlocked)
00247 {
00248 if (m_dataBegin == m_dataEnd)
00249 AccessReceiver().GetWaitObjects(container, CallStack("NetworkSource::GetWaitObjects() - no data", &callStack));
00250 else
00251 container.SetNoWait(CallStack("NetworkSource::GetWaitObjects() - have data", &callStack));
00252 }
00253
00254 AttachedTransformation()->GetWaitObjects(container, CallStack("NetworkSource::GetWaitObjects() - attachment", &callStack));
00255 }
00256
00257 size_t NetworkSource::DoPump(lword &byteCount, bool blockingOutput, unsigned long maxTime, bool checkDelimiter, byte delimiter)
00258 {
00259 NetworkReceiver &receiver = AccessReceiver();
00260
00261 lword maxSize = byteCount;
00262 byteCount = 0;
00263 bool forever = maxTime == INFINITE_TIME;
00264 Timer timer(Timer::MILLISECONDS, forever);
00265 BufferedTransformation *t = AttachedTransformation();
00266
00267 if (m_outputBlocked)
00268 goto DoOutput;
00269
00270 while (true)
00271 {
00272 if (m_dataBegin == m_dataEnd)
00273 {
00274 if (receiver.EofReceived())
00275 break;
00276
00277 if (m_waitingForResult)
00278 {
00279 if (receiver.MustWaitForResult() &&
00280 !receiver.Wait(SaturatingSubtract(maxTime, timer.ElapsedTime()),
00281 CallStack("NetworkSource::DoPump() - wait receive result", 0)))
00282 break;
00283
00284 unsigned int recvResult = receiver.GetReceiveResult();
00285 #if CRYPTOPP_TRACE_NETWORK
00286 OutputDebugString((IntToString((unsigned int)this) + ": Received " + IntToString(recvResult) + " bytes\n").c_str());
00287 #endif
00288 m_dataEnd += recvResult;
00289 m_waitingForResult = false;
00290
00291 if (!receiver.MustWaitToReceive() && !receiver.EofReceived() && m_dataEnd != m_buf.size())
00292 goto ReceiveNoWait;
00293 }
00294 else
00295 {
00296 m_dataEnd = m_dataBegin = 0;
00297
00298 if (receiver.MustWaitToReceive())
00299 {
00300 if (!receiver.Wait(SaturatingSubtract(maxTime, timer.ElapsedTime()),
00301 CallStack("NetworkSource::DoPump() - wait receive", 0)))
00302 break;
00303
00304 receiver.Receive(m_buf+m_dataEnd, m_buf.size()-m_dataEnd);
00305 m_waitingForResult = true;
00306 }
00307 else
00308 {
00309 ReceiveNoWait:
00310 m_waitingForResult = true;
00311
00312
00313 #if CRYPTOPP_TRACE_NETWORK
00314 OutputDebugString((IntToString((unsigned int)this) + ": Receiving " + IntToString(m_buf.size()-m_dataEnd) + " bytes\n").c_str());
00315 #endif
00316 while (receiver.Receive(m_buf+m_dataEnd, m_buf.size()-m_dataEnd))
00317 {
00318 unsigned int recvResult = receiver.GetReceiveResult();
00319 #if CRYPTOPP_TRACE_NETWORK
00320 OutputDebugString((IntToString((unsigned int)this) + ": Received " + IntToString(recvResult) + " bytes\n").c_str());
00321 #endif
00322 m_dataEnd += recvResult;
00323 if (receiver.EofReceived() || m_dataEnd > m_buf.size() /2)
00324 {
00325 m_waitingForResult = false;
00326 break;
00327 }
00328 }
00329 }
00330 }
00331 }
00332 else
00333 {
00334 m_putSize = UnsignedMin(m_dataEnd - m_dataBegin, maxSize - byteCount);
00335
00336 if (checkDelimiter)
00337 m_putSize = std::find(m_buf+m_dataBegin, m_buf+m_dataBegin+m_putSize, delimiter) - (m_buf+m_dataBegin);
00338
00339 DoOutput:
00340 size_t result = t->PutModifiable2(m_buf+m_dataBegin, m_putSize, 0, forever || blockingOutput);
00341 if (result)
00342 {
00343 if (t->Wait(SaturatingSubtract(maxTime, timer.ElapsedTime()),
00344 CallStack("NetworkSource::DoPump() - wait attachment", 0)))
00345 goto DoOutput;
00346 else
00347 {
00348 m_outputBlocked = true;
00349 return result;
00350 }
00351 }
00352 m_outputBlocked = false;
00353
00354 byteCount += m_putSize;
00355 m_dataBegin += m_putSize;
00356 if (checkDelimiter && m_dataBegin < m_dataEnd && m_buf[m_dataBegin] == delimiter)
00357 break;
00358 if (maxSize != ULONG_MAX && byteCount == maxSize)
00359 break;
00360
00361
00362
00363 if (maxTime > 0 && timer.ElapsedTime() > maxTime)
00364 break;
00365 }
00366 }
00367
00368 return 0;
00369 }
00370
00371
00372
00373 NetworkSink::NetworkSink(unsigned int maxBufferSize, unsigned int autoFlushBound)
00374 : m_maxBufferSize(maxBufferSize), m_autoFlushBound(autoFlushBound)
00375 , m_needSendResult(false), m_wasBlocked(false), m_eofState(EOF_NONE)
00376 , m_buffer(STDMIN(16U*1024U+256, maxBufferSize)), m_skipBytes(0)
00377 , m_speedTimer(Timer::MILLISECONDS), m_byteCountSinceLastTimerReset(0)
00378 , m_currentSpeed(0), m_maxObservedSpeed(0)
00379 {
00380 }
00381
00382 float NetworkSink::ComputeCurrentSpeed()
00383 {
00384 if (m_speedTimer.ElapsedTime() > 1000)
00385 {
00386 m_currentSpeed = m_byteCountSinceLastTimerReset * 1000 / m_speedTimer.ElapsedTime();
00387 m_maxObservedSpeed = STDMAX(m_currentSpeed, m_maxObservedSpeed * 0.98f);
00388 m_byteCountSinceLastTimerReset = 0;
00389 m_speedTimer.StartTimer();
00390
00391 }
00392 return m_currentSpeed;
00393 }
00394
00395 float NetworkSink::GetMaxObservedSpeed() const
00396 {
00397 lword m = GetMaxBytesPerSecond();
00398 return m ? STDMIN(m_maxObservedSpeed, float(m)) : m_maxObservedSpeed;
00399 }
00400
00401 unsigned int NetworkSink::GetMaxWaitObjectCount() const
00402 {
00403 return LimitedBandwidth::GetMaxWaitObjectCount() + GetSender().GetMaxWaitObjectCount();
00404 }
00405
00406 void NetworkSink::GetWaitObjects(WaitObjectContainer &container, CallStack const& callStack)
00407 {
00408 if (BlockedBySpeedLimit())
00409 LimitedBandwidth::GetWaitObjects(container, CallStack("NetworkSink::GetWaitObjects() - speed limit", &callStack));
00410 else if (m_wasBlocked)
00411 AccessSender().GetWaitObjects(container, CallStack("NetworkSink::GetWaitObjects() - was blocked", &callStack));
00412 else if (!m_buffer.IsEmpty())
00413 AccessSender().GetWaitObjects(container, CallStack("NetworkSink::GetWaitObjects() - buffer not empty", &callStack));
00414 else if (EofPending())
00415 AccessSender().GetWaitObjects(container, CallStack("NetworkSink::GetWaitObjects() - EOF pending", &callStack));
00416 }
00417
00418 size_t NetworkSink::Put2(const byte *inString, size_t length, int messageEnd, bool blocking)
00419 {
00420 if (m_eofState == EOF_DONE)
00421 {
00422 if (length || messageEnd)
00423 throw Exception(Exception::OTHER_ERROR, "NetworkSink::Put2() being called after EOF had been sent");
00424
00425 return 0;
00426 }
00427
00428 if (m_eofState > EOF_NONE)
00429 goto EofSite;
00430
00431 {
00432 if (m_skipBytes)
00433 {
00434 assert(length >= m_skipBytes);
00435 inString += m_skipBytes;
00436 length -= m_skipBytes;
00437 }
00438
00439 m_buffer.Put(inString, length);
00440
00441 if (!blocking || m_buffer.CurrentSize() > m_autoFlushBound)
00442 TimedFlush(0, 0);
00443
00444 size_t targetSize = messageEnd ? 0 : m_maxBufferSize;
00445 if (blocking)
00446 TimedFlush(INFINITE_TIME, targetSize);
00447
00448 if (m_buffer.CurrentSize() > targetSize)
00449 {
00450 assert(!blocking);
00451 m_wasBlocked = true;
00452 m_skipBytes += length;
00453 size_t blockedBytes = UnsignedMin(length, m_buffer.CurrentSize() - targetSize);
00454 return STDMAX<size_t>(blockedBytes, 1);
00455 }
00456
00457 m_wasBlocked = false;
00458 m_skipBytes = 0;
00459 }
00460
00461 if (messageEnd)
00462 {
00463 m_eofState = EOF_PENDING_SEND;
00464
00465 EofSite:
00466 TimedFlush(blocking ? INFINITE_TIME : 0, 0);
00467 if (m_eofState != EOF_DONE)
00468 return 1;
00469 }
00470
00471 return 0;
00472 }
00473
00474 lword NetworkSink::DoFlush(unsigned long maxTime, size_t targetSize)
00475 {
00476 NetworkSender &sender = AccessSender();
00477
00478 bool forever = maxTime == INFINITE_TIME;
00479 Timer timer(Timer::MILLISECONDS, forever);
00480 unsigned int totalFlushSize = 0;
00481
00482 while (true)
00483 {
00484 if (m_buffer.CurrentSize() <= targetSize)
00485 break;
00486
00487 if (m_needSendResult)
00488 {
00489 if (sender.MustWaitForResult() &&
00490 !sender.Wait(SaturatingSubtract(maxTime, timer.ElapsedTime()),
00491 CallStack("NetworkSink::DoFlush() - wait send result", 0)))
00492 break;
00493
00494 unsigned int sendResult = sender.GetSendResult();
00495 #if CRYPTOPP_TRACE_NETWORK
00496 OutputDebugString((IntToString((unsigned int)this) + ": Sent " + IntToString(sendResult) + " bytes\n").c_str());
00497 #endif
00498 m_buffer.Skip(sendResult);
00499 totalFlushSize += sendResult;
00500 m_needSendResult = false;
00501
00502 if (!m_buffer.AnyRetrievable())
00503 break;
00504 }
00505
00506 unsigned long timeOut = maxTime ? SaturatingSubtract(maxTime, timer.ElapsedTime()) : 0;
00507 if (sender.MustWaitToSend() && !sender.Wait(timeOut, CallStack("NetworkSink::DoFlush() - wait send", 0)))
00508 break;
00509
00510 size_t contiguousSize = 0;
00511 const byte *block = m_buffer.Spy(contiguousSize);
00512
00513 #if CRYPTOPP_TRACE_NETWORK
00514 OutputDebugString((IntToString((unsigned int)this) + ": Sending " + IntToString(contiguousSize) + " bytes\n").c_str());
00515 #endif
00516 sender.Send(block, contiguousSize);
00517 m_needSendResult = true;
00518
00519 if (maxTime > 0 && timeOut == 0)
00520 break;
00521 }
00522
00523 m_byteCountSinceLastTimerReset += totalFlushSize;
00524 ComputeCurrentSpeed();
00525
00526 if (m_buffer.IsEmpty() && !m_needSendResult)
00527 {
00528 if (m_eofState == EOF_PENDING_SEND)
00529 {
00530 sender.SendEof();
00531 m_eofState = sender.MustWaitForEof() ? EOF_PENDING_DELIVERY : EOF_DONE;
00532 }
00533
00534 while (m_eofState == EOF_PENDING_DELIVERY)
00535 {
00536 unsigned long timeOut = maxTime ? SaturatingSubtract(maxTime, timer.ElapsedTime()) : 0;
00537 if (!sender.Wait(timeOut, CallStack("NetworkSink::DoFlush() - wait EOF", 0)))
00538 break;
00539
00540 if (sender.EofSent())
00541 m_eofState = EOF_DONE;
00542 }
00543 }
00544
00545 return totalFlushSize;
00546 }
00547
00548 #endif // #ifdef HIGHRES_TIMER_AVAILABLE
00549
00550 NAMESPACE_END