dbms: using Snappy (experimental); removed compressed iostreams [#CONV-2546].

This commit is contained in:
Alexey Milovidov 2011-06-17 20:44:10 +00:00
parent 36517e32a2
commit 425fc839b0
10 changed files with 46 additions and 504 deletions

View File

@ -46,6 +46,9 @@ namespace ErrorCodes
TOO_LESS_ARGUMENTS_FOR_FUNCTION,
UNKNOWN_ELEMENT_IN_AST,
CANNOT_PARSE_DATE,
TOO_SMALL_SIZE_COMPRESSED,
TOO_LARGE_SIZE_COMPRESSED,
CANNOT_DECOMPRESS_CORRUPTED_DATA,
};
}

View File

@ -1,74 +0,0 @@
#ifndef DBMS_COMMON_COMPRESSED_INPUT_STREAM_H
#define DBMS_COMMON_COMPRESSED_INPUT_STREAM_H
#include <istream>
#include <ostream>
#include <vector>
#include <Poco/BufferedStreamBuf.h>
#include <DB/IO/CompressedStream.h>
namespace DB
{
/** Аналогично Poco::InflatingStreamBuf, но используется библиотека QuickLZ,
* а также поддерживается только istream.
*/
class DecompressingStreamBuf : public Poco::BufferedStreamBuf
{
public:
DecompressingStreamBuf(std::istream & istr);
/** прочитать целиком один сжатый блок данных;
*/
void getChunk(std::vector<char> & res);
protected:
int readFromDevice(char * buffer, std::streamsize length);
private:
size_t pos_in_buffer;
std::istream * p_istr;
std::vector<char> uncompressed_buffer;
std::vector<char> compressed_buffer;
std::vector<char> scratch;
/** Читает и разжимает следующий кусок сжатых данных. */
void readCompressedChunk();
};
/** Базовый класс для CompressedInputStream; содержит DecompressingStreamBuf
*/
class DecompressingIOS : public virtual std::ios
{
public:
DecompressingIOS(std::istream & istr);
DecompressingStreamBuf * rdbuf();
protected:
DecompressingStreamBuf buf;
};
/** Разжимает данные, сжатые с помощью алгоритма QuickLZ.
*/
class CompressedInputStream : public DecompressingIOS, public std::istream
{
public:
CompressedInputStream(std::istream & istr);
int close();
/** прочитать целиком один сжатый блок данных
*/
void getChunk(std::vector<char> & res);
};
}
#endif

View File

@ -1,66 +0,0 @@
#ifndef DBMS_COMMON_COMPRESSED_OUTPUT_STREAM_H
#define DBMS_COMMON_COMPRESSED_OUTPUT_STREAM_H
#include <istream>
#include <ostream>
#include <vector>
#include <Poco/BufferedStreamBuf.h>
#include <quicklz/quicklz_level1.h>
#include <DB/IO/CompressedStream.h>
namespace DB
{
/** Аналогично Poco::DeflatingStreamBuf, но используется библиотека QuickLZ,
* а также поддерживается только ostream.
*/
class CompressingStreamBuf : public Poco::BufferedStreamBuf
{
public:
CompressingStreamBuf(std::ostream & ostr);
~CompressingStreamBuf();
int close();
protected:
int writeToDevice(const char * buffer, std::streamsize length);
private:
std::ostream * p_ostr;
std::vector<char> compressed_buffer;
std::vector<char> scratch;
};
/** Базовый класс для CompressedOutputStream; содержит CompressingStreamBuf
*/
class CompressingIOS : public virtual std::ios
{
public:
CompressingIOS(std::ostream & ostr);
CompressingStreamBuf * rdbuf();
protected:
CompressingStreamBuf buf;
};
/** Сжимает всё с помощью алгоритма QuickLZ блоками не более DBMS_COMPRESSING_STREAM_BUFFER_SIZE.
* Для записи последнего блока, следует вызвать метод close().
*/
class CompressedOutputStream : public CompressingIOS, public std::ostream
{
public:
CompressedOutputStream(std::ostream & ostr);
int close();
};
}
#endif

View File

@ -4,12 +4,16 @@
#include <vector>
#include <algorithm>
#include <quicklz/quicklz_level1.h>
#include <snappy.h>
#include <DB/Core/Exception.h>
#include <DB/Core/ErrorCodes.h>
#include <DB/IO/ReadBuffer.h>
#include <DB/IO/CompressedStream.h>
#include <DB/IO/VarInt.h>
/// Если сжатый кусок больше 1GB - значит ошибка
#define DB_COMPRESSED_BUFFER_MAX_COMPRESSED_SIZE 0x40000000
namespace DB
@ -21,54 +25,39 @@ private:
ReadBuffer & in;
std::vector<char> compressed_buffer;
std::vector<char> decompressed_buffer;
char scratch[QLZ_SCRATCH_DECOMPRESS];
size_t pos_in_buffer;
public:
CompressedReadBuffer(ReadBuffer & in_)
: in(in_),
compressed_buffer(QUICKLZ_HEADER_SIZE),
pos_in_buffer(0)
: in(in_)
{
}
/** Читает и разжимает следующий кусок сжатых данных. */
void readCompressedChunk()
{
in.readStrict(&compressed_buffer[0], QUICKLZ_HEADER_SIZE);
size_t size_compressed = qlz_size_compressed(&compressed_buffer[0]);
size_t size_decompressed = qlz_size_decompressed(&compressed_buffer[0]);
compressed_buffer.resize(size_compressed);
decompressed_buffer.resize(size_decompressed);
in.readStrict(&compressed_buffer[QUICKLZ_HEADER_SIZE], size_compressed - QUICKLZ_HEADER_SIZE);
qlz_decompress(&compressed_buffer[0], &decompressed_buffer[0], scratch);
pos_in_buffer = 0;
}
bool next()
{
if (pos_in_buffer == decompressed_buffer.size())
{
if (in.eof())
return false;
if (in.eof())
return false;
readCompressedChunk();
}
size_t bytes_to_copy = std::min(decompressed_buffer.size() - pos_in_buffer,
static_cast<size_t>(DEFAULT_READ_BUFFER_SIZE));
std::memcpy(working_buffer.begin(), &decompressed_buffer[pos_in_buffer], bytes_to_copy);
size_t size_compressed = 0;
readVarUInt(size_compressed, in);
if (size_compressed == 0)
throw Exception("Too small size_compressed", ErrorCodes::TOO_SMALL_SIZE_COMPRESSED);
if (size_compressed > DB_COMPRESSED_BUFFER_MAX_COMPRESSED_SIZE)
throw Exception("Too large size_compressed", ErrorCodes::TOO_LARGE_SIZE_COMPRESSED);
pos_in_buffer += bytes_to_copy;
compressed_buffer.resize(size_compressed);
in.readStrict(&compressed_buffer[0], size_compressed);
size_t size_decompressed = 0;
if (!snappy::GetUncompressedLength(&compressed_buffer[0], size_compressed, &size_decompressed))
throw Exception("Cannot decompress corrupted data", ErrorCodes::CANNOT_DECOMPRESS_CORRUPTED_DATA);
internal_buffer.resize(size_decompressed);
if (!snappy::RawUncompress(&compressed_buffer[0], size_compressed, &internal_buffer[0]))
throw Exception("Cannot decompress corrupted data", ErrorCodes::CANNOT_DECOMPRESS_CORRUPTED_DATA);
working_buffer = Buffer(working_buffer.begin(), working_buffer.begin() + size_decompressed);
pos = working_buffer.begin();
working_buffer = Buffer(working_buffer.begin(), working_buffer.begin() + bytes_to_copy);
return true;
}
@ -76,4 +65,6 @@ public:
}
#undef DB_COMPRESSED_BUFFER_MAX_COMPRESSED_SIZE
#endif

View File

@ -1,11 +0,0 @@
#ifndef DBMS_COMMON_COMPRESSING_STREAM_DEFINES_H
#define DBMS_COMMON_COMPRESSING_STREAM_DEFINES_H
/** Общие для CompressingStream.h и DecompressingStream.h дефайны */
#define DBMS_STREAM_BUFFER_SIZE 4096
#define DBMS_COMPRESSING_STREAM_BUFFER_SIZE 1048576
#define QUICKLZ_ADDITIONAL_SPACE 400
#define QUICKLZ_HEADER_SIZE 9
#endif

View File

@ -1,10 +1,10 @@
#ifndef DBMS_COMMON_COMPRESSED_WRITEBUFFER_H
#define DBMS_COMMON_COMPRESSED_WRITEBUFFER_H
#include <quicklz/quicklz_level1.h>
#include <snappy.h>
#include <DB/IO/WriteBuffer.h>
#include <DB/IO/CompressedStream.h>
#include <DB/IO/VarInt.h>
namespace DB
@ -15,9 +15,7 @@ class CompressedWriteBuffer : public WriteBuffer
private:
WriteBuffer & out;
char compressed_buffer[DBMS_COMPRESSING_STREAM_BUFFER_SIZE + QUICKLZ_ADDITIONAL_SPACE];
char scratch[QLZ_SCRATCH_COMPRESS];
std::vector<char> compressed_buffer;
size_t compressed_bytes;
public:
@ -25,15 +23,18 @@ public:
void next()
{
size_t compressed_size = qlz_compress(
compressed_buffer.resize(snappy::MaxCompressedLength(pos - working_buffer.begin()));
size_t compressed_size = 0;
snappy::RawCompress(
working_buffer.begin(),
compressed_buffer,
pos - working_buffer.begin(),
scratch);
&compressed_buffer[0],
&compressed_size);
out.write(compressed_buffer, compressed_size);
DB::writeVarUInt(compressed_size, out);
out.write(&compressed_buffer[0], compressed_size);
pos = working_buffer.begin();
compressed_bytes += compressed_size;
compressed_bytes += compressed_size + DB::getLengthOfVarUInt(compressed_size);
}
/// Объём данных, которые были сжаты

View File

@ -1,116 +0,0 @@
#include <algorithm>
#include <quicklz/quicklz_level1.h>
#include <DB/IO/CompressedInputStream.h>
namespace DB
{
DecompressingStreamBuf::DecompressingStreamBuf(std::istream & istr)
: Poco::BufferedStreamBuf(DBMS_STREAM_BUFFER_SIZE, std::ios::in),
pos_in_buffer(0),
p_istr(&istr),
compressed_buffer(QUICKLZ_HEADER_SIZE),
scratch(QLZ_SCRATCH_DECOMPRESS)
{
}
void DecompressingStreamBuf::getChunk(std::vector<char> & res)
{
readCompressedChunk();
pos_in_buffer = uncompressed_buffer.size();
res.resize(pos_in_buffer);
memcpy(&res[0], &uncompressed_buffer[0], pos_in_buffer);
}
void DecompressingStreamBuf::readCompressedChunk()
{
/// прочитаем заголовок
p_istr->read(&compressed_buffer[0], QUICKLZ_HEADER_SIZE);
if (!p_istr->good())
return;
size_t size_compressed = qlz_size_compressed(&compressed_buffer[0]);
size_t size_decompressed = qlz_size_decompressed(&compressed_buffer[0]);
compressed_buffer.resize(size_compressed);
uncompressed_buffer.resize(size_decompressed);
/// считаем остаток сжатого блока
p_istr->read(&compressed_buffer[QUICKLZ_HEADER_SIZE], size_compressed - QUICKLZ_HEADER_SIZE);
if (!p_istr->good())
return;
/// разжимаем блок
qlz_decompress(&compressed_buffer[0], &uncompressed_buffer[0], &scratch[0]);
}
int DecompressingStreamBuf::readFromDevice(char * buffer, std::streamsize length)
{
if (length == 0 || !p_istr)
return 0;
size_t bytes_processed = 0;
while (bytes_processed < static_cast<size_t>(length))
{
if (pos_in_buffer == uncompressed_buffer.size())
{
readCompressedChunk();
pos_in_buffer = 0;
if (!p_istr->good())
{
p_istr = 0;
return bytes_processed;
}
}
size_t bytes_to_copy = std::min(
uncompressed_buffer.size() - pos_in_buffer,
static_cast<size_t>(length) - bytes_processed);
memcpy(buffer + bytes_processed, &uncompressed_buffer[pos_in_buffer], bytes_to_copy);
pos_in_buffer += bytes_to_copy;
bytes_processed += bytes_to_copy;
}
return static_cast<int>(length);
}
DecompressingIOS::DecompressingIOS(std::istream & istr)
: buf(istr)
{
poco_ios_init(&buf);
}
DecompressingStreamBuf * DecompressingIOS::rdbuf()
{
return &buf;
}
CompressedInputStream::CompressedInputStream(std::istream & istr)
: DecompressingIOS(istr),
std::istream(&buf)
{
}
void CompressedInputStream::getChunk(std::vector<char> & res)
{
buf.getChunk(res);
}
}

View File

@ -1,74 +0,0 @@
#include <algorithm>
#include <DB/IO/CompressedOutputStream.h>
namespace DB
{
CompressingStreamBuf::CompressingStreamBuf(std::ostream & ostr)
: Poco::BufferedStreamBuf(DBMS_COMPRESSING_STREAM_BUFFER_SIZE, std::ios::out),
p_ostr(&ostr),
compressed_buffer(DBMS_COMPRESSING_STREAM_BUFFER_SIZE + QUICKLZ_ADDITIONAL_SPACE),
scratch(QLZ_SCRATCH_COMPRESS)
{
}
CompressingStreamBuf::~CompressingStreamBuf()
{
close();
}
int CompressingStreamBuf::close()
{
sync();
return 0;
}
int CompressingStreamBuf::writeToDevice(const char * buffer, std::streamsize length)
{
if (length == 0 || !p_ostr)
return 0;
size_t compressed_size = qlz_compress(
buffer,
&compressed_buffer[0],
length,
&scratch[0]);
p_ostr->write(&compressed_buffer[0], compressed_size);
return static_cast<int>(length);
}
CompressingIOS::CompressingIOS(std::ostream & ostr)
: buf(ostr)
{
poco_ios_init(&buf);
}
CompressingStreamBuf * CompressingIOS::rdbuf()
{
return &buf;
}
CompressedOutputStream::CompressedOutputStream(std::ostream & ostr)
: CompressingIOS(ostr),
std::ostream(&buf)
{
}
int CompressedOutputStream::close()
{
return buf.close();
}
}

View File

@ -1,112 +0,0 @@
#include <string>
#include <iostream>
#include <sstream>
#include <fstream>
#include <Poco/Stopwatch.h>
#include <DB/Core/Types.h>
#include <DB/IO/WriteBufferFromOStream.h>
#include <DB/IO/ReadBufferFromIStream.h>
#include <DB/IO/CompressedWriteBuffer.h>
#include <DB/IO/CompressedReadBuffer.h>
#include <DB/IO/WriteHelpers.h>
#include <DB/IO/ReadHelpers.h>
#include <DB/IO/CompressedInputStream.h>
#include <DB/IO/CompressedOutputStream.h>
int main(int argc, char ** argv)
{
try
{
size_t n = 10000000;
Poco::Stopwatch stopwatch;
{
std::ofstream ostr("test1");
DB::WriteBufferFromOStream buf(ostr);
DB::CompressedWriteBuffer compressed_buf(buf);
stopwatch.restart();
for (size_t i = 0; i < n; ++i)
{
DB::writeIntText(i, compressed_buf);
DB::writeChar('\t', compressed_buf);
}
stopwatch.stop();
std::cout << "Writing done (1). Elapsed: " << static_cast<double>(stopwatch.elapsed()) / 1000000 << std::endl;
}
{
std::ofstream ostr("test2");
DB::CompressedOutputStream compressed_ostr(ostr);
DB::WriteBufferFromOStream compressed_buf(compressed_ostr);
stopwatch.restart();
for (size_t i = 0; i < n; ++i)
{
DB::writeIntText(i, compressed_buf);
DB::writeChar('\t', compressed_buf);
}
stopwatch.stop();
std::cout << "Writing done (2). Elapsed: " << static_cast<double>(stopwatch.elapsed()) / 1000000 << std::endl;
}
{
std::ifstream istr("test1");
DB::ReadBufferFromIStream buf(istr);
DB::CompressedReadBuffer compressed_buf(buf);
std::string s;
stopwatch.restart();
for (size_t i = 0; i < n; ++i)
{
size_t x;
DB::readIntText(x, compressed_buf);
compressed_buf.ignore();
if (x != i)
{
std::stringstream s;
s << "Failed!, read: " << x << ", expected: " << i;
throw DB::Exception(s.str());
}
}
stopwatch.stop();
std::cout << "Reading done (1). Elapsed: " << static_cast<double>(stopwatch.elapsed()) / 1000000 << std::endl;
}
{
std::ifstream istr("test2");
DB::CompressedInputStream compressed_istr(istr);
DB::ReadBufferFromIStream compressed_buf(compressed_istr);
std::string s;
stopwatch.restart();
for (size_t i = 0; i < n; ++i)
{
size_t x;
DB::readIntText(x, compressed_buf);
compressed_buf.ignore();
if (x != i)
{
std::stringstream s;
s << "Failed!, read: " << x << ", expected: " << i;
throw DB::Exception(s.str());
}
}
stopwatch.stop();
std::cout << "Reading done (2). Elapsed: " << static_cast<double>(stopwatch.elapsed()) / 1000000 << std::endl;
}
}
catch (const DB::Exception & e)
{
std::cerr << e.what() << ", " << e.message() << std::endl;
return 1;
}
return 0;
}

View File

@ -1,5 +1,5 @@
#!/bin/sh
./compressor < compressor > compressor.qlz
./compressor -d < compressor.qlz > compressor2
./compressor < compressor > compressor.snp
./compressor -d < compressor.snp > compressor2
cmp compressor compressor2 && echo "Ok." || echo "Fail."