Pumping Data

From Crypto++ Wiki
Jump to: navigation, search

Pumping Data at certain rates or in user supplied blocks is a subject that surfaces on occasion. The need often arises with custom protocols and resource constrained device like mobile phones and development boards. This wiki article will show you how to manually pump data.

When manually pumping data you normally select a block size, like 4K or 64K. You create the source, like StringSource or FileSource, and you set pumpAll to false to signal the program will control the flow. Then, you call Pump on the source to process data from the source. You can also call Flush on occasion to ensure processed data is cleared from the pipeline (but you should not have to if things are working as expected).

There are two types of flushes available - a soft flush and a hard flush. The soft flush is Flush(false), and a hard flush is Flush(true). You have to be careful about flushing data in the pipeline. According to the source code comments in filters.h:

Hard flushes must be used with care. It means try to process and output everything, even if there may not be enough data to complete the action. For example, hard flushing a HexDecoder would cause an error if you do it after inputing an odd number of hex encoded characters.
For some types of filters, like ZlibDecompressor, hard flushes can only be done at "synchronization points". These synchronization points are positions in the data stream that are created by hard flushes on the corresponding reverse filters, in this example ZlibCompressor. This is useful when zlib compressed data is moved across a network in packets and compression state is preserved across packets, as in the SSH2 protocol

The section on low memory below provides a quick test environment verify operations with approximately 16 MB of memory.

Low Memory

You can simulate low memory in Bash using ulimit in its own sub-shell so it does not affect new processes. The command below simulates 16 MB of memory for the process. 8 MB and 12 MB were too small to start the process.

$ (ulimit -v 8192; ./pump.exe)
./pump.exe: error while loading shared libraries: libm.so.6: failed to map segment from shared object

$ (ulimit -v 12288; ./pump.exe)
./pump.exe: error while loading shared libraries: libc.so.6: failed to map segment from shared object

$ (ulimit -v 16368; ./pump.exe)
Processed: 10485760
Processed: 20971520
Processed: 31457280
Processed: 41943040
...
Processed: 503316480
Processed: 513802240
Processed: 524288000
Processed: 534773760

Filter Framework

The code below is the basic skeleton or framework to use when manually pumping data. The sample program in this section does not do much at all. About all it is capable of is counting bytes with a MeterFilter. Note that the bytes are counted between the filter and the sink, so they are bytes being written to storage. Cache managers could affect writing of the data to storage, so be sure you understand what your OS or other libraries are doing.

SomeFilter is not a real filter, but it is often an encoder, decoder, encryptor, decryptor, compressor, decompressor, etc. The Redirector breaks the ownership chain in a pipeline. It avoids nested calls to new and ensures objects like MeterFilter survive to read the measurement.

The need for remaining is required because of the way SourceExhausted operates in a pipeline. Effectively SourceExhausted returns false until MessageEnd() arrives because the machinery is message oriented, and not byte oriented.

int main(int argc, char* argv[])
{
  try
  {
      MeterFilter meter;
      SomeFilter filter(...);
      SomeSource source(...);
      SomeSink sink(...);
 
      source.Attach(new Redirector(filter));
      filter.Attach(new Redirector(meter));
      meter.Attach(new Redirector(sink));

      const word64 BLOCK_SIZE = 4096;
      word64 remaining = ...;
      word64 processed = 0;
	  
      while(remaining && !source.SourceExhausted())
      {
        unsigned int req = STDMIN(remaining, BLOCK_SIZE);

        source.Pump(req);
        filter.Flush(false);

        processed += req;
        remaining -= req;

        if (processed % (1024*1024*10) == 0)
          cout << "Processed: " << meter.GetTotalBytes() << endl;
      }

      // Signal there is no more data to process.
      // The dtor's will do this automatically.
      filter.MessageEnd();
  }
  catch(const Exception& ex)
  {
    cerr << ex.what() << endl;
  }

  return 0;
}

The code above elides word64 remaining = .... Its easy enough to determine when using a StringSource or StringSource because the size is readily at hand. You can use a FileSource and file size with the following code:

inline word64 FileSize(const FileSource& file)
{
  std::istream* stream = const_cast<FileSource&>(file).GetStream();

  std::ifstream::pos_type old = stream->tellg();
  std::ifstream::pos_type end = stream->seekg(0, std::ios_base::end).tellg();
  stream->seekg(old);

  return static_cast<word64>(end);
}

Or, you could just use the end-of-file as a predicate (the const_cast is not needed after Commit ca9e788fbfada9c4).

inline bool EndOfFile(const FileSource& file)
{
  std::istream* stream = const_cast<FileSource&>(file).GetStream();
  return stream->eof();
}

Compression

The following sample compresses random data with Gzip. The data is read from a FileSource, but StringSource or other source could be used as well. Because the data is random, it should be uncompressible. You can read from /dev/zero if you need something to compress.

To create the data before running the program use dd:

$ dd if=/dev/urandom of=uncompress.bin bs=512 count=1048576
1048576+0 records in
1048576+0 records out
536870912 bytes (537 MB, 512 MiB) copied, 4.55431 s, 118 MB/s

The program is shown below, and its a slightly modified version of the one presented in Filter Framework.

inline word64 FileSize(const FileSource& file)
{
  std::istream* stream = const_cast<FileSource&>(file).GetStream();

  std::ifstream::pos_type old = stream->tellg();
  std::ifstream::pos_type end = stream->seekg(0, std::ios_base::end).tellg();
  stream->seekg(old);

  return static_cast<word64>(end);
}

int main(int argc, char* argv[])
{
  try
  {
      MeterFilter meter;
      Gzip filter;
     
      FileSource source("uncompress.bin", false);
      FileSink sink("compress.bin");
 
      source.Attach(new Redirector(filter));
      filter.Attach(new Redirector(meter));
      meter.Attach(new Redirector(sink));

      const word64 BLOCK_SIZE = 4096;
      word64 remaining = FileSize(source);
      word64 processed = 0;
	  
      while(remaining && !source.SourceExhausted())
      {
        unsigned int req = STDMIN(remaining, BLOCK_SIZE);

        source.Pump(req);
        filter.Flush(false);

        processed += req;
        remaining -= req;

        if (processed % (1024*1024*10) == 0)
          cout << "Processed: " << meter.GetTotalBytes() << endl;
      }

      // Signal there is no more data to process.
      // The dtor's will do this automatically.
      filter.MessageEnd();
  }
  catch(const Exception& ex)
  {
    cerr << ex.what() << endl;
  }

  return 0;
}

Running the program on a 6th generation iCore results in the following output:

$ time ./pump.exe
Processed: 10498570
Processed: 20997130
Processed: 31495690
Processed: 41994250
...
Processed: 503930890
Processed: 514429450
Processed: 524928010
Processed: 535426570

real    0m5.837s
user    0m2.549s
sys     0m0.628s

And checking the file sizes results in:

$ ls -Al *.bin
-rw-rw-r--. 1 user  user 537526282 May  2 17:10 compress.bin
-rw-rw-r--. 1 user  user 536870912 May  2 17:10 uncompress.bin

Encryption

The next example performs encryption on a file. Its the same basic example as in Filter Framework and Compression, but it uses EndOfFile rather than FileSize. The sample data is called plain.bin, and it can be created with:

$ dd if=/dev/zero of=./plain.bin bs=512 count=1048576
1048576+0 records in
1048576+0 records out
536870912 bytes (537 MB, 512 MiB) copied, 1.22499 s, 438 MB/s

The modified program is shown below. The const_cast in EndOfFile is not needed after Commit ca9e788fbfada9c4.

inline bool EndOfFile(const FileSource& file)
{
  std::istream* stream = const_cast<FileSource&>(file).GetStream();
  return stream->eof();
}

int main(int argc, char* argv[])
{
  try
  {
      byte key[AES::DEFAULT_KEYLENGTH]={}, iv[AES::BLOCKSIZE]={};
      CTR_Mode<AES>::Encryption encryptor;
      encryptor.SetKeyWithIV(key, sizeof(key), iv);

      MeterFilter meter;
      StreamTransformationFilter filter(encryptor);
     
      FileSource source("plain.bin", false);
      FileSink sink("cipher.bin");
 
      source.Attach(new Redirector(filter));
      filter.Attach(new Redirector(meter));
      meter.Attach(new Redirector(sink));

      const word64 BLOCK_SIZE = 4096;
      word64 processed = 0;
	  
      while(!EndOfFile(source) && !source.SourceExhausted())
      {
        source.Pump(BLOCK_SIZE);
        filter.Flush(false);

        processed += BLOCK_SIZE;

        if (processed % (1024*1024*10) == 0)
          cout << "Processed: " << meter.GetTotalBytes() << endl;
      }
  
      // Signal there is no more data to process.
      // The dtor's will do this automatically.
      filter.MessageEnd();
  }
  catch(const Exception& ex)
  {
    cerr << ex.what() << endl;
  }

  return 0;
}

Running the program results in:

$ time ./pump.exe
Processed: 10485760
Processed: 20971520
Processed: 31457280
Processed: 41943040
...
Processed: 503316480
Processed: 513802240
Processed: 524288000
Processed: 534773760

real    0m0.951s
user    0m0.240s
sys     0m0.710s

Finally, the respective file sizes:

$ ls -Al *.bin
-rw-rw-r--. 1 user  user 536870912 May  2 18:13 cipher.bin
-rw-rw-r--. 1 user  user 536870912 May  2 18:12 plain.bin

Skipping Data

Sometimes you need to skip data in the stream. Skip is part of the Filter interface, and it works on the output buffer. More precisely, Skip applies to the AttachedTransformation, so you can't skip the input data without some extra work.

To Skip bytes on a Source, use a NULL AttachedTransformation. Also see Skip'ing on a Source does not work as expected on Stack Overflow and Issue 248: Skip'ing on a Source does not work.

int main(int argc, char* argv[])
{
  string str1, str2;
  HexEncoder enc(new StringSink(str1));
  for(unsigned int i=0; i < 32; i++)
    enc.Put((byte)i);
  enc.MessageEnd();

  cout << "str1: " << str1 <<endl;

  // 'ss' has a NULL AttachedTransformation()
  StringSource ss(str1, false);
  ss.Pump(10);

  // Attach the real filter chain to 'ss'
  ss.Attach(new StringSink(str2));
  ss.PumpAll();

  cout << "str2: " << str2 << endl;

  return 0;
}

If you have to skip in the middle of a stream you can swap-in TheBitBucket.

int main(int argc, char* argv[])
{
  string str1, str2;
  HexEncoder enc(new StringSink(str1));
  for(unsigned int i=0; i < 32; i++)
    enc.Put((byte)i);
  enc.MessageEnd();

  cout << "str1: " << str1 <<endl;

  // Pump 10 bytes into the sink
  StringSource ss(str1, false, new StringSink(str2));
  ss.Pump(10);

  cout << "str2: " << str2 <<endl;

  // Skip 4 bytes
  ss.Attach(NULLPTR); ss.Pump(4);

  // Pump remaining bytes to sink
  ss.Attach(new StringSink(str2));
  ss.PumpAll();

  cout << "str2: " << str2 << endl;

  return 0;
}

The program above produces the following output.

$ ./test.exe
str1: 000102030405060708090A0B0C0D0E0F101112131415161718191A1B1C1D1E1F
str2: 0001020304
str2: 00010203040708090A0B0C0D0E0F101112131415161718191A1B1C1D1E1F