Pumping Data

From Crypto++ Wiki
Jump to navigation Jump to search

Pumping Data at certain rates or in user supplied blocks is a subject that surfaces on occasion. The need often arises with custom protocols and resource constrained device like mobile phones and development boards. This wiki article will show you how to manually pump data.

When manually pumping data you normally select a block size, like 4K or 64K. You create the source, like StringSource or FileSource, and you set pumpAll to false to signal the program will control the flow. Then, you call Pump on the source to process data from the source. You can also call Flush on occasion to ensure processed data is cleared from the pipeline (but you should not have to if things are working as expected).

There are two types of flushes available - a soft flush and a hard flush. The soft flush is Flush(false), and a hard flush is Flush(true). You have to be careful about flushing data in the pipeline. According to the source code comments in filters.h:

Hard flushes must be used with care. It means try to process and output everything, even if there may not be enough data to complete the action. For example, hard flushing a HexDecoder would cause an error if you do it after inputing an odd number of hex encoded characters.

For some types of filters, like ZlibDecompressor, hard flushes can only be done at "synchronization points". These synchronization points are positions in the data stream that are created by hard flushes on the corresponding reverse filters, in this example ZlibCompressor. This is useful when zlib compressed data is moved across a network in packets and compression state is preserved across packets, as in the SSH2 protocol

The section on low memory below provides a quick test environment verify operations with approximately 16 MB of memory.

Low Memory

You can simulate low memory in Bash using ulimit in its own sub-shell so it does not affect new processes. The command below simulates 16 MB of memory for the process. 8 MB and 12 MB were too small to start the process.

$ (ulimit -v 8192; ./pump.exe)
./pump.exe: error while loading shared libraries: libm.so.6: failed to map segment from shared object

$ (ulimit -v 12288; ./pump.exe)
./pump.exe: error while loading shared libraries: libc.so.6: failed to map segment from shared object

$ (ulimit -v 16368; ./pump.exe)
Processed: 10485760
Processed: 20971520
Processed: 31457280
Processed: 41943040
...
Processed: 503316480
Processed: 513802240
Processed: 524288000
Processed: 534773760

Keep It Simple

Folks who pump their own data usually wish to do it because of low memory conditions. The library does fine on its own and usually does not need help. The library chunks data in 4K blocks and can easily handle gigabytes of data. If you are trying to pump your own data you are probably over thinking the solution.

For example, the program below calculates multiple hashes over a large, 1 GB file. It does so using less than 4 MB of RAM for the program and data.

#include "cryptlib.h"
#include "channels.h"
#include "filters.h"
#include "files.h"
#include "sha.h"
#include "crc.h"
#include "hex.h"

#include <string>
#include <iostream>

int main(int argc, char* argv[])
{
    using namespace CryptoPP;

    try
    {
        std::string s1, s2;
        CRC32 crc;
        SHA1 sha1;

        HashFilter f1(crc, new HexEncoder(new StringSink(s1)));
        HashFilter f2(sha1, new HexEncoder(new StringSink(s2)));

        ChannelSwitch cs;
        cs.AddDefaultRoute(f1);
        cs.AddDefaultRoute(f2);

        FileSource("data.bin", true /*pumpAll*/, new Redirector(cs));

        std::cout << "Filename: " << "data.bin" << std::endl;
        std::cout << "   CRC32: " << s1 << std::endl;
        std::cout << "    SHA1: " << s2 << std::endl;
    }
    catch(const Exception& ex)
    {
        std::cerr << ex.what() << std::endl;
    }

  return 0;
}

And the file:

$ dd if=/dev/urandom of=data.bin bs=1M count=1024
1024+0 records in
1024+0 records out
1073741824 bytes (1.1 GB, 1.0 GiB) copied, 5.48884 s, 196 MB/s

And the result with 16 MB of RAM is shown below. 12 MB is used to map the process; and less than 4 MB is used by the program:

$ (ulimit -v 16368; ./test.exe)
Filename: data.bin
   CRC32: 20F45F3E
    SHA1: F0857F8E46112BB08A04D5CE51BDBEA0C4539032

Filter Framework

The code below is the basic skeleton or framework to use when manually pumping data. The sample program in this section does not do much at all. About all it is capable of is counting bytes with a MeterFilter. Note that the bytes are counted between the filter and the sink, so they are bytes being written to storage. Cache managers could affect writing of the data to storage, so be sure you understand what your OS or other libraries are doing.

SomeFilter is not a real filter, but it is often an encoder, decoder, encryptor, decryptor, compressor, decompressor, etc. The Redirector breaks the ownership chain in a pipeline. It avoids nested calls to new and ensures objects like MeterFilter survive to read the measurement.

The need for remaining is required because of the way SourceExhausted operates in a pipeline. Effectively SourceExhausted returns false until MessageEnd() arrives because the machinery is message oriented, and not byte oriented.

int main(int argc, char* argv[])
{
  try
  {
      MeterFilter meter;
      SomeFilter filter(...);
      SomeSource source(...);
      SomeSink sink(...);
 
      source.Attach(new Redirector(filter));
      filter.Attach(new Redirector(meter));
      meter.Attach(new Redirector(sink));

      const size_t BLOCK_SIZE = 4096;
      lword remaining = ...;
      lword processed = 0;
      
      while(remaining && !source.SourceExhausted())
      {
        unsigned int req = STDMIN(remaining, BLOCK_SIZE);

        source.Pump(req);
        filter.Flush(false);

        processed += req;
        remaining -= req;

        if (processed % (1024*1024*10) == 0)
          cout << "Processed: " << meter.GetTotalBytes() << endl;
      }

      // Signal there is no more data to process.
      // The dtor's will do this automatically.
      filter.MessageEnd();
  }
  catch(const Exception& ex)
  {
    cerr << ex.what() << endl;
  }

  return 0;
}

The code above elides lword remaining = .... Its easy enough to determine when using a StringSource or StringSource because the size is readily at hand. You can use a FileSource and file size with the following code:

inline lword FileSize(const FileSource& file)
{
  std::istream* stream = const_cast<FileSource&>(file).GetStream();

  std::ifstream::pos_type old = stream->tellg();
  std::ifstream::pos_type end = stream->seekg(0, std::ios_base::end).tellg();
  stream->seekg(old);

  return static_cast<lword>(end);
}

Or, you could just use the end-of-file as a predicate (the const_cast is not needed after Commit ca9e788fbfada9c4).

inline bool EndOfFile(const FileSource& file)
{
  std::istream* stream = const_cast<FileSource&>(file).GetStream();
  return stream->eof();
}

Compression

The following sample compresses random data with Gzip. The data is read from a FileSource, but StringSource or other source could be used as well. Because the data is random, it should be uncompressible. You can read from /dev/zero if you need something to compress.

To create the data before running the program use dd:

$ dd if=/dev/urandom of=uncompress.bin bs=512 count=1048576
1048576+0 records in
1048576+0 records out
536870912 bytes (537 MB, 512 MiB) copied, 4.55431 s, 118 MB/s

The program is shown below, and its a slightly modified version of the one presented in Filter Framework.

inline lword FileSize(const FileSource& file)
{
  std::istream* stream = const_cast<FileSource&>(file).GetStream();

  std::ifstream::pos_type old = stream->tellg();
  std::ifstream::pos_type end = stream->seekg(0, std::ios_base::end).tellg();
  stream->seekg(old);

  return static_cast<lword>(end);
}

int main(int argc, char* argv[])
{
  try
  {
      MeterFilter meter;
      Gzip filter;
     
      FileSource source("uncompress.bin", false);
      FileSink sink("compress.bin");
 
      source.Attach(new Redirector(filter));
      filter.Attach(new Redirector(meter));
      meter.Attach(new Redirector(sink));

      const size_t BLOCK_SIZE = 4096;
      lword remaining = FileSize(source);
      lword processed = 0;
      
      while(remaining && !source.SourceExhausted())
      {
        unsigned int req = STDMIN(remaining, BLOCK_SIZE);

        source.Pump(req);
        filter.Flush(false);

        processed += req;
        remaining -= req;

        if (processed % (1024*1024*10) == 0)
          cout << "Processed: " << meter.GetTotalBytes() << endl;
      }

      // Signal there is no more data to process.
      // The dtor's will do this automatically.
      filter.MessageEnd();
  }
  catch(const Exception& ex)
  {
    cerr << ex.what() << endl;
  }

  return 0;
}

Running the program on a 6th generation iCore results in the following output:

$ time ./pump.exe
Processed: 10498570
Processed: 20997130
Processed: 31495690
Processed: 41994250
...
Processed: 503930890
Processed: 514429450
Processed: 524928010
Processed: 535426570

real    0m5.837s
user    0m2.549s
sys     0m0.628s

And checking the file sizes results in:

$ ls -Al *.bin
-rw-rw-r--. 1 user  user 537526282 May  2 17:10 compress.bin
-rw-rw-r--. 1 user  user 536870912 May  2 17:10 uncompress.bin

Encryption

The next example performs encryption on a file. Its the same basic example as in Filter Framework and Compression, but it uses EndOfFile rather than FileSize. The sample data is called plain.bin, and it can be created with:

$ dd if=/dev/zero of=./plain.bin bs=512 count=1048576
1048576+0 records in
1048576+0 records out
536870912 bytes (537 MB, 512 MiB) copied, 1.22499 s, 438 MB/s

The modified program is shown below. The const_cast in EndOfFile is not needed after Commit ca9e788fbfada9c4.

inline bool EndOfFile(const FileSource& file)
{
  std::istream* stream = const_cast<FileSource&>(file).GetStream();
  return stream->eof();
}

int main(int argc, char* argv[])
{
  try
  {
      byte key[AES::DEFAULT_KEYLENGTH]={}, iv[AES::BLOCKSIZE]={};
      CTR_Mode<AES>::Encryption encryptor;
      encryptor.SetKeyWithIV(key, sizeof(key), iv);

      MeterFilter meter;
      StreamTransformationFilter filter(encryptor);
     
      FileSource source("plain.bin", false);
      FileSink sink("cipher.bin");
 
      source.Attach(new Redirector(filter));
      filter.Attach(new Redirector(meter));
      meter.Attach(new Redirector(sink));

      const size_t BLOCK_SIZE = 4096;
      lword processed = 0;
      
      while(!EndOfFile(source) && !source.SourceExhausted())
      {
        source.Pump(BLOCK_SIZE);
        filter.Flush(false);

        processed += BLOCK_SIZE;

        if (processed % (1024*1024*10) == 0)
          cout << "Processed: " << meter.GetTotalBytes() << endl;
      }
  
      // Signal there is no more data to process.
      // The dtor's will do this automatically.
      filter.MessageEnd();
  }
  catch(const Exception& ex)
  {
    cerr << ex.what() << endl;
  }

  return 0;
}

Running the program results in:

$ time ./pump.exe
Processed: 10485760
Processed: 20971520
Processed: 31457280
Processed: 41943040
...
Processed: 503316480
Processed: 513802240
Processed: 524288000
Processed: 534773760

real    0m0.951s
user    0m0.240s
sys     0m0.710s

Finally, the respective file sizes:

$ ls -Al *.bin
-rw-rw-r--. 1 user  user 536870912 May  2 18:13 cipher.bin
-rw-rw-r--. 1 user  user 536870912 May  2 18:12 plain.bin

Skipping Data

Sometimes you need to skip data in the stream. Skip is part of the Filter interface, and it works on the output buffer and not the source buffer. More precisely, Skip applies to the AttachedTransformation, so you can't skip the input data without a little extra work.

To Skip bytes on a Source, use a NULL AttachedTransformation. Also see Skip'ing on a Source does not work as expected on Stack Overflow and Issue 248: Skip'ing on a Source does not work.

int main(int argc, char* argv[])
{
  string str1, str2;
  HexEncoder enc(new StringSink(str1));
  for(unsigned int i=0; i < 32; i++)
    enc.Put((byte)i);
  enc.MessageEnd();

  cout << "str1: " << str1 <<endl;

  // 'ss' has a NULL AttachedTransformation()
  StringSource ss(str1, false);
  ss.Pump(10);

  // Attach the real filter chain to 'ss'
  ss.Attach(new StringSink(str2));
  ss.PumpAll();

  cout << "str2: " << str2 << endl;

  return 0;
}

If you have to skip in the middle of a stream you can swap-in TheBitBucket.

int main(int argc, char* argv[])
{
  string str1, str2;
  HexEncoder enc(new StringSink(str1));
  for(unsigned int i=0; i < 32; i++)
    enc.Put((byte)i);
  enc.MessageEnd();

  cout << "str1: " << str1 <<endl;

  // Pump 10 bytes into the sink
  StringSource ss(str1, false, new StringSink(str2));
  ss.Pump(10);

  cout << "str2: " << str2 <<endl;

  // Skip 4 bytes
  ss.Attach(NULLPTR); ss.Pump(4);

  // Pump remaining bytes to sink
  ss.Attach(new StringSink(str2));
  ss.PumpAll();

  cout << "str2: " << str2 << endl;

  return 0;
}

The program above produces the following output.

$ ./test.exe
str1: 000102030405060708090A0B0C0D0E0F101112131415161718191A1B1C1D1E1F
str2: 0001020304
str2: 00010203040708090A0B0C0D0E0F101112131415161718191A1B1C1D1E1F

MeterFilter

You can also use a MeterFilter to skip bytes in a pipeline. The MeterFilter has a really cool feature that allows you to skip a range of bytes.

The code below creates a file with binary data. The binary data is just the values 0 to 255.

$ hexdump -C data.bin 
00000000  00 01 02 03 04 05 06 07  08 09 0a 0b 0c 0d 0e 0f  |................|
00000010  10 11 12 13 14 15 16 17  18 19 1a 1b 1c 1d 1e 1f  |................|
00000020  20 21 22 23 24 25 26 27  28 29 2a 2b 2c 2d 2e 2f  | !"#$%&'()*+,-./|
00000030  30 31 32 33 34 35 36 37  38 39 3a 3b 3c 3d 3e 3f  |0123456789:;<=>?|
00000040  40 41 42 43 44 45 46 47  48 49 4a 4b 4c 4d 4e 4f  |@ABCDEFGHIJKLMNO|
00000050  50 51 52 53 54 55 56 57  58 59 5a 5b 5c 5d 5e 5f  |PQRSTUVWXYZ[\]^_|
00000060  60 61 62 63 64 65 66 67  68 69 6a 6b 6c 6d 6e 6f  |`abcdefghijklmno|
00000070  70 71 72 73 74 75 76 77  78 79 7a 7b 7c 7d 7e 7f  |pqrstuvwxyz{|}~.|
00000080  80 81 82 83 84 85 86 87  88 89 8a 8b 8c 8d 8e 8f  |................|
00000090  90 91 92 93 94 95 96 97  98 99 9a 9b 9c 9d 9e 9f  |................|
000000a0  a0 a1 a2 a3 a4 a5 a6 a7  a8 a9 aa ab ac ad ae af  |................|
000000b0  b0 b1 b2 b3 b4 b5 b6 b7  b8 b9 ba bb bc bd be bf  |................|
000000c0  c0 c1 c2 c3 c4 c5 c6 c7  c8 c9 ca cb cc cd ce cf  |................|
000000d0  d0 d1 d2 d3 d4 d5 d6 d7  d8 d9 da db dc dd de df  |................|
000000e0  e0 e1 e2 e3 e4 e5 e6 e7  e8 e9 ea eb ec ed ee ef  |................|
000000f0  f0 f1 f2 f3 f4 f5 f6 f7  f8 f9 fa fb fc fd fe ff  |................|

Then the code skips the first 100 bytes and the last 33 bytes. Bytes that are not skipped are processed by a HexEncoder and printed to stdout. The Redirector stops ownership (and destruction) since the MeterFilter is stack based. The MeterFilter will be destroyed when the stack frame goes out of scope, so the FileSource does not need to destroy it.

#include "cryptlib.h"
#include "filters.h"
#include "files.h"
#include "hex.h"

#include <iostream>
#include <fstream>

using namespace CryptoPP;

inline lword FileSize(const FileSource& file)
{
  std::istream* stream = const_cast<FileSource&>(file).GetStream();

  std::ifstream::pos_type old = stream->tellg();
  std::ifstream::pos_type end = stream->seekg(0, std::ios_base::end).tellg();
  stream->seekg(old);

  return static_cast<lword>(end);
}

int main(int argc, char* argv[])
{
    const std::string filename = "data.bin";
    const size_t skip_lead = 100;
    const size_t skip_tail = 33;

    // Create a test file with some data
    std::ofstream output(filename.c_str(), std::ios::binary);
    for (size_t i=0; i<256; ++i) {
        char ch = (char)i;
        output.write(&ch, 1);
    }
    output.close();

    // Create a FileSource, but do not pumpAll
    FileSource fs(filename.c_str(), false /*pumpAll*/);
    const lword fileSize = FileSize(fs);

    // Create a MeterFilter to skip bytes
    MeterFilter meter(new HexEncoder(new FileSink(std::cout)));
    meter.AddRangeToSkip(0, 0, skip_lead, false);
    meter.AddRangeToSkip(0, fileSize-skip_tail, skip_tail, true);

    // Process the stream
    fs.Attach(new Redirector(meter));
    fs.PumpAll();
    std::cout << std::endl;

    return 0;
}

Running the program results in the following output.

$ ./pump_data.exe
6465666768696A6B6C6D6E6F707172737475767778797A7B7C7D7E7F808182838485868788898A8B
8C8D8E8F909192939495969798999A9B9C9D9E9FA0A1A2A3A4A5A6A7A8A9AAABACADAEAFB0B1B2B3
B4B5B6B7B8B9BABBBCBDBEBFC0C1C2C3C4C5C6C7C8C9CACBCCCDCECFD0D1D2D3D4D5D6D7D8D9DADB
DCDDDE

Multiple Sources

Sometimes you need to use multiple sources to transform data. For example, you may need a MySource(s1+f1, ...), where s1 is a signature and f1 is file data, but f1 is too large to fit in memory. Also see How to combine two Sources into new one in Crypto++? on Stack Overflow.

To setup the example use the following code. It writes random ASCII printable characters to a string and a file.

void random_string(std::string& str, size_t len)
{
    const char alphanum[] =
        "0123456789"
        "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
        "abcdefghijklmnopqrstuvwxyz";
    const size_t size = sizeof(alphanum) - 1;

    str.reserve(len);
    for (int i = 0; i < len; ++i)
        str.push_back(alphanum[rand() % size]);
}

int main(int argc, char* argv[])
{
    // Deterministic for a machine
    std::srand(0);

    std::string s1, s2, r;
    const size_t size = 1024*16+1;
    
    random_string(s1, size);
    random_string(s2, size);
    
    // Write s2 to file
    StringSource(s2, true, new FileSink("test.dat"));

    ...
}

Then, to use the multiple sources on the data perform the following.

StringSource ss1(s1, false);
FileSource fs1("test.dat", false);

HashFilter hf1(hash, new StringSink(r));

ss1.Attach(new Redirector(hf1));
ss1.Pump(LWORD_MAX);
ss1.Detach();

fs1.Attach(new Redirector(hf1));
fs1.Pump(LWORD_MAX);
fs1.Detach();

hf1.MessageEnd();

std::cout << "s1 + f1: ";
hex.Put((const byte*)r.data(), r.size());
std::cout << std::endl;

There are several things going on in the code above. First, we dynamically attach and detach the hash filter chain to sources ss1 and fs1.

Second, once the filter is attached we use Pump(LWORD_MAX) to pump all the data from the source into the filter chain. We don't use PumpAll() because PumpAll() signals the end of the current message and generates a MessageEnd().

The reason we don't want a MessageEnd() is, we are processing one message in multiple parts; we are not processing multiple messages. So we want only one MessageEnd() when we determine.

Third, once we are done with the source, we call Detach so StringSource and FileSource destructors don't cause a spurious MessageEnd() message to enter the filter chain. Again, we are processing one message in multiple parts; we are not processing multiple messages. So we want only one MessageEnd() when we determine.

Fourth, when we are done sending our data into the filter, we call hf.MessageEnd() to tell the filter to process all pending or buffered data. This is when we want the MessageEnd() call, and not before.

Fifth, we call Detach() when done rather than Attach(). Detach() deletes the existing filter chain and avoids memory leaks. Attach() attaches a new chain but does not delete the existing filter or chain. Since we are using a Redirector our HashFilter survives. The HashFilter is eventually cleaned as an automatic stack variable.

As an aside, if ss1.PumpAll() or fs1.PumpAll() were used (or destructors allowed to send MessageEnd() into the filter chain) then you would get a concatenation of Hash(s1) and Hash(s2) because it would look like two different messages to the filter instead of one message over two parts.

Downloads

No downloads.