network.cpp

00001 // network.cpp - written and placed in the public domain by Wei Dai
00002 
00003 #include "pch.h"
00004 #include "network.h"
00005 #include "wait.h"
00006 
00007 #define CRYPTOPP_TRACE_NETWORK 0
00008 
00009 NAMESPACE_BEGIN(CryptoPP)
00010 
00011 #ifdef HIGHRES_TIMER_AVAILABLE
00012 
00013 lword LimitedBandwidth::ComputeCurrentTransceiveLimit()
00014 {
00015         if (!m_maxBytesPerSecond)
00016                 return ULONG_MAX;
00017 
00018         double curTime = GetCurTimeAndCleanUp();
00019         lword total = 0;
00020         for (OpQueue::size_type i=0; i!=m_ops.size(); ++i)
00021                 total += m_ops[i].second;
00022         return SaturatingSubtract(m_maxBytesPerSecond, total);
00023 }
00024 
00025 double LimitedBandwidth::TimeToNextTransceive()
00026 {
00027         if (!m_maxBytesPerSecond)
00028                 return 0;
00029 
00030         if (!m_nextTransceiveTime)
00031                 ComputeNextTransceiveTime();
00032 
00033         return SaturatingSubtract(m_nextTransceiveTime, m_timer.ElapsedTimeAsDouble());
00034 }
00035 
00036 void LimitedBandwidth::NoteTransceive(lword size)
00037 {
00038         if (m_maxBytesPerSecond)
00039         {
00040                 double curTime = GetCurTimeAndCleanUp();
00041                 m_ops.push_back(std::make_pair(curTime, size));
00042                 m_nextTransceiveTime = 0;
00043         }
00044 }
00045 
00046 void LimitedBandwidth::ComputeNextTransceiveTime()
00047 {
00048         double curTime = GetCurTimeAndCleanUp();
00049         lword total = 0;
00050         for (unsigned int i=0; i!=m_ops.size(); ++i)
00051                 total += m_ops[i].second;
00052         m_nextTransceiveTime =
00053                 (total < m_maxBytesPerSecond) ? curTime : m_ops.front().first + 1000;
00054 }
00055 
00056 double LimitedBandwidth::GetCurTimeAndCleanUp()
00057 {
00058         if (!m_maxBytesPerSecond)
00059                 return 0;
00060 
00061         double curTime = m_timer.ElapsedTimeAsDouble();
00062         while (m_ops.size() && (m_ops.front().first + 1000 < curTime))
00063                 m_ops.pop_front();
00064         return curTime;
00065 }
00066 
00067 void LimitedBandwidth::GetWaitObjects(WaitObjectContainer &container, const CallStack &callStack)
00068 {
00069         double nextTransceiveTime = TimeToNextTransceive();
00070         if (nextTransceiveTime)
00071                 container.ScheduleEvent(nextTransceiveTime, CallStack("LimitedBandwidth::GetWaitObjects()", &callStack));
00072 }
00073 
00074 // *************************************************************
00075 
00076 size_t NonblockingSource::GeneralPump2(
00077         lword& byteCount, bool blockingOutput,
00078         unsigned long maxTime, bool checkDelimiter, byte delimiter)
00079 {
00080         m_blockedBySpeedLimit = false;
00081 
00082         if (!GetMaxBytesPerSecond())
00083         {
00084                 size_t ret = DoPump(byteCount, blockingOutput, maxTime, checkDelimiter, delimiter);
00085                 m_doPumpBlocked = (ret != 0);
00086                 return ret;
00087         }
00088 
00089         bool forever = (maxTime == INFINITE_TIME);
00090         unsigned long timeToGo = maxTime;
00091         Timer timer(Timer::MILLISECONDS, forever);
00092         lword maxSize = byteCount;
00093         byteCount = 0;
00094 
00095         timer.StartTimer();
00096 
00097         while (true)
00098         {
00099                 lword curMaxSize = UnsignedMin(ComputeCurrentTransceiveLimit(), maxSize - byteCount);
00100 
00101                 if (curMaxSize || m_doPumpBlocked)
00102                 {
00103                         if (!forever) timeToGo = SaturatingSubtract(maxTime, timer.ElapsedTime());
00104                         size_t ret = DoPump(curMaxSize, blockingOutput, timeToGo, checkDelimiter, delimiter);
00105                         m_doPumpBlocked = (ret != 0);
00106                         if (curMaxSize)
00107                         {
00108                                 NoteTransceive(curMaxSize);
00109                                 byteCount += curMaxSize;
00110                         }
00111                         if (ret)
00112                                 return ret;
00113                 }
00114 
00115                 if (maxSize != ULONG_MAX && byteCount >= maxSize)
00116                         break;
00117 
00118                 if (!forever)
00119                 {
00120                         timeToGo = SaturatingSubtract(maxTime, timer.ElapsedTime());
00121                         if (!timeToGo)
00122                                 break;
00123                 }
00124 
00125                 double waitTime = TimeToNextTransceive();
00126                 if (!forever && waitTime > timeToGo)
00127                 {
00128                         m_blockedBySpeedLimit = true;
00129                         break;
00130                 }
00131 
00132                 WaitObjectContainer container;
00133                 LimitedBandwidth::GetWaitObjects(container, CallStack("NonblockingSource::GeneralPump2() - speed limit", 0));
00134                 container.Wait((unsigned long)waitTime);
00135         }
00136 
00137         return 0;
00138 }
00139 
00140 size_t NonblockingSource::PumpMessages2(unsigned int &messageCount, bool blocking)
00141 {
00142         if (messageCount == 0)
00143                 return 0;
00144 
00145         messageCount = 0;
00146 
00147         lword byteCount;
00148         do {
00149                 byteCount = LWORD_MAX;
00150                 RETURN_IF_NONZERO(Pump2(byteCount, blocking));
00151         } while(byteCount == LWORD_MAX);
00152 
00153         if (!m_messageEndSent && SourceExhausted())
00154         {
00155                 RETURN_IF_NONZERO(AttachedTransformation()->Put2(NULL, 0, GetAutoSignalPropagation(), true));
00156                 m_messageEndSent = true;
00157                 messageCount = 1;
00158         }
00159         return 0;
00160 }
00161 
00162 lword NonblockingSink::TimedFlush(unsigned long maxTime, size_t targetSize)
00163 {
00164         m_blockedBySpeedLimit = false;
00165 
00166         size_t curBufSize = GetCurrentBufferSize();
00167         if (curBufSize <= targetSize && (targetSize || !EofPending()))
00168                 return 0;
00169 
00170         if (!GetMaxBytesPerSecond())
00171                 return DoFlush(maxTime, targetSize);
00172 
00173         bool forever = (maxTime == INFINITE_TIME);
00174         unsigned long timeToGo = maxTime;
00175         Timer timer(Timer::MILLISECONDS, forever);
00176         lword totalFlushed = 0;
00177 
00178         timer.StartTimer();
00179 
00180         while (true)
00181         {       
00182                 size_t flushSize = UnsignedMin(curBufSize - targetSize, ComputeCurrentTransceiveLimit());
00183                 if (flushSize || EofPending())
00184                 {
00185                         if (!forever) timeToGo = SaturatingSubtract(maxTime, timer.ElapsedTime());
00186                         size_t ret = (size_t)DoFlush(timeToGo, curBufSize - flushSize);
00187                         if (ret)
00188                         {
00189                                 NoteTransceive(ret);
00190                                 curBufSize -= ret;
00191                                 totalFlushed += ret;
00192                         }
00193                 }
00194 
00195                 if (curBufSize <= targetSize && (targetSize || !EofPending()))
00196                         break;
00197 
00198                 if (!forever)
00199                 {
00200                         timeToGo = SaturatingSubtract(maxTime, timer.ElapsedTime());
00201                         if (!timeToGo)
00202                                 break;
00203                 }
00204 
00205                 double waitTime = TimeToNextTransceive();
00206                 if (!forever && waitTime > timeToGo)
00207                 {
00208                         m_blockedBySpeedLimit = true;
00209                         break;
00210                 }
00211 
00212                 WaitObjectContainer container;
00213                 LimitedBandwidth::GetWaitObjects(container, CallStack("NonblockingSink::TimedFlush() - speed limit", 0));
00214                 container.Wait((unsigned long)waitTime);
00215         }
00216 
00217         return totalFlushed;
00218 }
00219 
00220 bool NonblockingSink::IsolatedFlush(bool hardFlush, bool blocking)
00221 {
00222         TimedFlush(blocking ? INFINITE_TIME : 0);
00223         return hardFlush && (!!GetCurrentBufferSize() || EofPending());
00224 }
00225 
00226 // *************************************************************
00227 
00228 NetworkSource::NetworkSource(BufferedTransformation *attachment)
00229         : NonblockingSource(attachment), m_buf(1024*16)
00230         , m_waitingForResult(false), m_outputBlocked(false)
00231         , m_dataBegin(0), m_dataEnd(0)
00232 {
00233 }
00234 
00235 unsigned int NetworkSource::GetMaxWaitObjectCount() const
00236 {
00237         return LimitedBandwidth::GetMaxWaitObjectCount()
00238                 + GetReceiver().GetMaxWaitObjectCount()
00239                 + AttachedTransformation()->GetMaxWaitObjectCount();
00240 }
00241 
00242 void NetworkSource::GetWaitObjects(WaitObjectContainer &container, CallStack const& callStack)
00243 {
00244         if (BlockedBySpeedLimit())
00245                 LimitedBandwidth::GetWaitObjects(container, CallStack("NetworkSource::GetWaitObjects() - speed limit", &callStack));
00246         else if (!m_outputBlocked)
00247         {
00248                 if (m_dataBegin == m_dataEnd)
00249                         AccessReceiver().GetWaitObjects(container, CallStack("NetworkSource::GetWaitObjects() - no data", &callStack)); 
00250                 else
00251                         container.SetNoWait(CallStack("NetworkSource::GetWaitObjects() - have data", &callStack));
00252         }
00253 
00254         AttachedTransformation()->GetWaitObjects(container, CallStack("NetworkSource::GetWaitObjects() - attachment", &callStack));
00255 }
00256 
00257 size_t NetworkSource::DoPump(lword &byteCount, bool blockingOutput, unsigned long maxTime, bool checkDelimiter, byte delimiter)
00258 {
00259         NetworkReceiver &receiver = AccessReceiver();
00260 
00261         lword maxSize = byteCount;
00262         byteCount = 0;
00263         bool forever = maxTime == INFINITE_TIME;
00264         Timer timer(Timer::MILLISECONDS, forever);
00265         BufferedTransformation *t = AttachedTransformation();
00266 
00267         if (m_outputBlocked)
00268                 goto DoOutput;
00269 
00270         while (true)
00271         {
00272                 if (m_dataBegin == m_dataEnd)
00273                 {
00274                         if (receiver.EofReceived())
00275                                 break;
00276 
00277                         if (m_waitingForResult)
00278                         {
00279                                 if (receiver.MustWaitForResult() &&
00280                                         !receiver.Wait(SaturatingSubtract(maxTime, timer.ElapsedTime()),
00281                                                 CallStack("NetworkSource::DoPump() - wait receive result", 0)))
00282                                         break;
00283 
00284                                 unsigned int recvResult = receiver.GetReceiveResult();
00285 #if CRYPTOPP_TRACE_NETWORK
00286                                 OutputDebugString((IntToString((unsigned int)this) + ": Received " + IntToString(recvResult) + " bytes\n").c_str());
00287 #endif
00288                                 m_dataEnd += recvResult;
00289                                 m_waitingForResult = false;
00290 
00291                                 if (!receiver.MustWaitToReceive() && !receiver.EofReceived() && m_dataEnd != m_buf.size())
00292                                         goto ReceiveNoWait;
00293                         }
00294                         else
00295                         {
00296                                 m_dataEnd = m_dataBegin = 0;
00297 
00298                                 if (receiver.MustWaitToReceive())
00299                                 {
00300                                         if (!receiver.Wait(SaturatingSubtract(maxTime, timer.ElapsedTime()),
00301                                                         CallStack("NetworkSource::DoPump() - wait receive", 0)))
00302                                                 break;
00303 
00304                                         receiver.Receive(m_buf+m_dataEnd, m_buf.size()-m_dataEnd);
00305                                         m_waitingForResult = true;
00306                                 }
00307                                 else
00308                                 {
00309 ReceiveNoWait:
00310                                         m_waitingForResult = true;
00311                                         // call Receive repeatedly as long as data is immediately available,
00312                                         // because some receivers tend to return data in small pieces
00313 #if CRYPTOPP_TRACE_NETWORK
00314                                         OutputDebugString((IntToString((unsigned int)this) + ": Receiving " + IntToString(m_buf.size()-m_dataEnd) + " bytes\n").c_str());
00315 #endif
00316                                         while (receiver.Receive(m_buf+m_dataEnd, m_buf.size()-m_dataEnd))
00317                                         {
00318                                                 unsigned int recvResult = receiver.GetReceiveResult();
00319 #if CRYPTOPP_TRACE_NETWORK
00320                                                 OutputDebugString((IntToString((unsigned int)this) + ": Received " + IntToString(recvResult) + " bytes\n").c_str());
00321 #endif
00322                                                 m_dataEnd += recvResult;
00323                                                 if (receiver.EofReceived() || m_dataEnd > m_buf.size() /2)
00324                                                 {
00325                                                         m_waitingForResult = false;
00326                                                         break;
00327                                                 }
00328                                         }
00329                                 }
00330                         }
00331                 }
00332                 else
00333                 {
00334                         m_putSize = UnsignedMin(m_dataEnd - m_dataBegin, maxSize - byteCount);
00335 
00336                         if (checkDelimiter)
00337                                 m_putSize = std::find(m_buf+m_dataBegin, m_buf+m_dataBegin+m_putSize, delimiter) - (m_buf+m_dataBegin);
00338 
00339 DoOutput:
00340                         size_t result = t->PutModifiable2(m_buf+m_dataBegin, m_putSize, 0, forever || blockingOutput);
00341                         if (result)
00342                         {
00343                                 if (t->Wait(SaturatingSubtract(maxTime, timer.ElapsedTime()),
00344                                                 CallStack("NetworkSource::DoPump() - wait attachment", 0)))
00345                                         goto DoOutput;
00346                                 else
00347                                 {
00348                                         m_outputBlocked = true;
00349                                         return result;
00350                                 }
00351                         }
00352                         m_outputBlocked = false;
00353 
00354                         byteCount += m_putSize;
00355                         m_dataBegin += m_putSize;
00356                         if (checkDelimiter && m_dataBegin < m_dataEnd && m_buf[m_dataBegin] == delimiter)
00357                                 break;
00358                         if (maxSize != ULONG_MAX && byteCount == maxSize)
00359                                 break;
00360                         // once time limit is reached, return even if there is more data waiting
00361                         // but make 0 a special case so caller can request a large amount of data to be
00362                         // pumped as long as it is immediately available
00363                         if (maxTime > 0 && timer.ElapsedTime() > maxTime)
00364                                 break;
00365                 }
00366         }
00367 
00368         return 0;
00369 }
00370 
00371 // *************************************************************
00372 
00373 NetworkSink::NetworkSink(unsigned int maxBufferSize, unsigned int autoFlushBound)
00374         : m_maxBufferSize(maxBufferSize), m_autoFlushBound(autoFlushBound)
00375         , m_needSendResult(false), m_wasBlocked(false), m_eofState(EOF_NONE)
00376         , m_buffer(STDMIN(16U*1024U+256, maxBufferSize)), m_skipBytes(0) 
00377         , m_speedTimer(Timer::MILLISECONDS), m_byteCountSinceLastTimerReset(0)
00378         , m_currentSpeed(0), m_maxObservedSpeed(0)
00379 {
00380 }
00381 
00382 float NetworkSink::ComputeCurrentSpeed()
00383 {
00384         if (m_speedTimer.ElapsedTime() > 1000)
00385         {
00386                 m_currentSpeed = m_byteCountSinceLastTimerReset * 1000 / m_speedTimer.ElapsedTime();
00387                 m_maxObservedSpeed = STDMAX(m_currentSpeed, m_maxObservedSpeed * 0.98f);
00388                 m_byteCountSinceLastTimerReset = 0;
00389                 m_speedTimer.StartTimer();
00390 //              OutputDebugString(("max speed: " + IntToString((int)m_maxObservedSpeed) + " current speed: " + IntToString((int)m_currentSpeed) + "\n").c_str());
00391         }
00392         return m_currentSpeed;
00393 }
00394 
00395 float NetworkSink::GetMaxObservedSpeed() const
00396 {
00397         lword m = GetMaxBytesPerSecond();
00398         return m ? STDMIN(m_maxObservedSpeed, float(m)) : m_maxObservedSpeed;
00399 }
00400 
00401 unsigned int NetworkSink::GetMaxWaitObjectCount() const
00402 {
00403         return LimitedBandwidth::GetMaxWaitObjectCount() + GetSender().GetMaxWaitObjectCount();
00404 }
00405 
00406 void NetworkSink::GetWaitObjects(WaitObjectContainer &container, CallStack const& callStack)
00407 {
00408         if (BlockedBySpeedLimit())
00409                 LimitedBandwidth::GetWaitObjects(container, CallStack("NetworkSink::GetWaitObjects() - speed limit", &callStack));
00410         else if (m_wasBlocked)
00411                 AccessSender().GetWaitObjects(container, CallStack("NetworkSink::GetWaitObjects() - was blocked", &callStack));
00412         else if (!m_buffer.IsEmpty())
00413                 AccessSender().GetWaitObjects(container, CallStack("NetworkSink::GetWaitObjects() - buffer not empty", &callStack));
00414         else if (EofPending())
00415                 AccessSender().GetWaitObjects(container, CallStack("NetworkSink::GetWaitObjects() - EOF pending", &callStack));
00416 }
00417 
00418 size_t NetworkSink::Put2(const byte *inString, size_t length, int messageEnd, bool blocking)
00419 {
00420         if (m_eofState == EOF_DONE)
00421         {
00422                 if (length || messageEnd)
00423                         throw Exception(Exception::OTHER_ERROR, "NetworkSink::Put2() being called after EOF had been sent");
00424 
00425                 return 0;
00426         }
00427 
00428         if (m_eofState > EOF_NONE)
00429                 goto EofSite;
00430 
00431         {
00432                 if (m_skipBytes)
00433                 {
00434                         assert(length >= m_skipBytes);
00435                         inString += m_skipBytes;
00436                         length -= m_skipBytes;
00437                 }
00438 
00439                 m_buffer.Put(inString, length);
00440 
00441                 if (!blocking || m_buffer.CurrentSize() > m_autoFlushBound)
00442                         TimedFlush(0, 0);
00443 
00444                 size_t targetSize = messageEnd ? 0 : m_maxBufferSize;
00445                 if (blocking)
00446                         TimedFlush(INFINITE_TIME, targetSize);
00447 
00448                 if (m_buffer.CurrentSize() > targetSize)
00449                 {
00450                         assert(!blocking);
00451                         m_wasBlocked = true;
00452                         m_skipBytes += length;
00453                         size_t blockedBytes = UnsignedMin(length, m_buffer.CurrentSize() - targetSize);
00454                         return STDMAX<size_t>(blockedBytes, 1);
00455                 }
00456 
00457                 m_wasBlocked = false;
00458                 m_skipBytes = 0;
00459         }
00460 
00461         if (messageEnd)
00462         {
00463                 m_eofState = EOF_PENDING_SEND;
00464 
00465         EofSite:
00466                 TimedFlush(blocking ? INFINITE_TIME : 0, 0);
00467                 if (m_eofState != EOF_DONE)
00468                         return 1;
00469         }
00470 
00471         return 0;
00472 }
00473 
00474 lword NetworkSink::DoFlush(unsigned long maxTime, size_t targetSize)
00475 {
00476         NetworkSender &sender = AccessSender();
00477 
00478         bool forever = maxTime == INFINITE_TIME;
00479         Timer timer(Timer::MILLISECONDS, forever);
00480         unsigned int totalFlushSize = 0;
00481 
00482         while (true)
00483         {
00484                 if (m_buffer.CurrentSize() <= targetSize)
00485                         break;
00486                 
00487                 if (m_needSendResult)
00488                 {
00489                         if (sender.MustWaitForResult() &&
00490                                 !sender.Wait(SaturatingSubtract(maxTime, timer.ElapsedTime()),
00491                                         CallStack("NetworkSink::DoFlush() - wait send result", 0)))
00492                                 break;
00493 
00494                         unsigned int sendResult = sender.GetSendResult();
00495 #if CRYPTOPP_TRACE_NETWORK
00496                         OutputDebugString((IntToString((unsigned int)this) + ": Sent " + IntToString(sendResult) + " bytes\n").c_str());
00497 #endif
00498                         m_buffer.Skip(sendResult);
00499                         totalFlushSize += sendResult;
00500                         m_needSendResult = false;
00501 
00502                         if (!m_buffer.AnyRetrievable())
00503                                 break;
00504                 }
00505 
00506                 unsigned long timeOut = maxTime ? SaturatingSubtract(maxTime, timer.ElapsedTime()) : 0;
00507                 if (sender.MustWaitToSend() && !sender.Wait(timeOut, CallStack("NetworkSink::DoFlush() - wait send", 0)))
00508                         break;
00509 
00510                 size_t contiguousSize = 0;
00511                 const byte *block = m_buffer.Spy(contiguousSize);
00512 
00513 #if CRYPTOPP_TRACE_NETWORK
00514                 OutputDebugString((IntToString((unsigned int)this) + ": Sending " + IntToString(contiguousSize) + " bytes\n").c_str());
00515 #endif
00516                 sender.Send(block, contiguousSize);
00517                 m_needSendResult = true;
00518 
00519                 if (maxTime > 0 && timeOut == 0)
00520                         break;  // once time limit is reached, return even if there is more data waiting
00521         }
00522 
00523         m_byteCountSinceLastTimerReset += totalFlushSize;
00524         ComputeCurrentSpeed();
00525         
00526         if (m_buffer.IsEmpty() && !m_needSendResult)
00527         {
00528                 if (m_eofState == EOF_PENDING_SEND)
00529                 {
00530                         sender.SendEof();
00531                         m_eofState = sender.MustWaitForEof() ? EOF_PENDING_DELIVERY : EOF_DONE;
00532                 }
00533 
00534                 while (m_eofState == EOF_PENDING_DELIVERY)
00535                 {
00536                         unsigned long timeOut = maxTime ? SaturatingSubtract(maxTime, timer.ElapsedTime()) : 0;
00537                         if (!sender.Wait(timeOut, CallStack("NetworkSink::DoFlush() - wait EOF", 0)))
00538                                 break;
00539 
00540                         if (sender.EofSent())
00541                                 m_eofState = EOF_DONE;
00542                 }
00543         }
00544 
00545         return totalFlushSize;
00546 }
00547 
00548 #endif  // #ifdef HIGHRES_TIMER_AVAILABLE
00549 
00550 NAMESPACE_END

Generated on Sat Dec 23 02:07:08 2006 for Crypto++ by  doxygen 1.5.1-p1