From e60253110ad1d5ac2ab8ad7fb38c0a5dd62a07b5 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Fri, 17 Jun 2011 21:19:39 +0000 Subject: [PATCH] Merge --- dbms/include/DB/Core/ErrorCodes.h | 3 - dbms/include/DB/IO/CompressedInputStream.h | 74 +++++++++++++ dbms/include/DB/IO/CompressedOutputStream.h | 66 +++++++++++ dbms/include/DB/IO/CompressedReadBuffer.h | 69 +++++++----- dbms/include/DB/IO/CompressedStream.h | 11 ++ dbms/include/DB/IO/CompressedWriteBuffer.h | 21 ++-- dbms/src/IO/CompressedInputStream.cpp | 116 ++++++++++++++++++++ dbms/src/IO/CompressedOutputStream.cpp | 74 +++++++++++++ dbms/src/IO/tests/compressed_buffer.cpp | 112 +++++++++++++++++++ utils/compressor/test.sh | 4 +- 10 files changed, 504 insertions(+), 46 deletions(-) create mode 100644 dbms/include/DB/IO/CompressedInputStream.h create mode 100644 dbms/include/DB/IO/CompressedOutputStream.h create mode 100644 dbms/include/DB/IO/CompressedStream.h create mode 100644 dbms/src/IO/CompressedInputStream.cpp create mode 100644 dbms/src/IO/CompressedOutputStream.cpp create mode 100644 dbms/src/IO/tests/compressed_buffer.cpp diff --git a/dbms/include/DB/Core/ErrorCodes.h b/dbms/include/DB/Core/ErrorCodes.h index ac8f3dcf89b..ee18904589c 100644 --- a/dbms/include/DB/Core/ErrorCodes.h +++ b/dbms/include/DB/Core/ErrorCodes.h @@ -46,9 +46,6 @@ namespace ErrorCodes TOO_LESS_ARGUMENTS_FOR_FUNCTION, UNKNOWN_ELEMENT_IN_AST, CANNOT_PARSE_DATE, - TOO_SMALL_SIZE_COMPRESSED, - TOO_LARGE_SIZE_COMPRESSED, - CANNOT_DECOMPRESS_CORRUPTED_DATA, }; } diff --git a/dbms/include/DB/IO/CompressedInputStream.h b/dbms/include/DB/IO/CompressedInputStream.h new file mode 100644 index 00000000000..34f0659d38c --- /dev/null +++ b/dbms/include/DB/IO/CompressedInputStream.h @@ -0,0 +1,74 @@ +#ifndef DBMS_COMMON_COMPRESSED_INPUT_STREAM_H +#define DBMS_COMMON_COMPRESSED_INPUT_STREAM_H + +#include +#include +#include + +#include + +#include + + +namespace DB +{ + + +/** Аналогично Poco::InflatingStreamBuf, но используется библиотека QuickLZ, + * а также поддерживается только istream. + */ +class DecompressingStreamBuf : public Poco::BufferedStreamBuf +{ +public: + DecompressingStreamBuf(std::istream & istr); + + /** прочитать целиком один сжатый блок данных; + */ + void getChunk(std::vector & res); + +protected: + int readFromDevice(char * buffer, std::streamsize length); + +private: + size_t pos_in_buffer; + std::istream * p_istr; + std::vector uncompressed_buffer; + std::vector compressed_buffer; + std::vector scratch; + + /** Читает и разжимает следующий кусок сжатых данных. */ + void readCompressedChunk(); +}; + + +/** Базовый класс для CompressedInputStream; содержит DecompressingStreamBuf + */ +class DecompressingIOS : public virtual std::ios +{ +public: + DecompressingIOS(std::istream & istr); + DecompressingStreamBuf * rdbuf(); + +protected: + DecompressingStreamBuf buf; +}; + + +/** Разжимает данные, сжатые с помощью алгоритма QuickLZ. + */ +class CompressedInputStream : public DecompressingIOS, public std::istream +{ +public: + CompressedInputStream(std::istream & istr); + int close(); + + /** прочитать целиком один сжатый блок данных + */ + void getChunk(std::vector & res); +}; + + +} + + +#endif diff --git a/dbms/include/DB/IO/CompressedOutputStream.h b/dbms/include/DB/IO/CompressedOutputStream.h new file mode 100644 index 00000000000..fb673020dfd --- /dev/null +++ b/dbms/include/DB/IO/CompressedOutputStream.h @@ -0,0 +1,66 @@ +#ifndef DBMS_COMMON_COMPRESSED_OUTPUT_STREAM_H +#define DBMS_COMMON_COMPRESSED_OUTPUT_STREAM_H + +#include +#include +#include + +#include + +#include + +#include + + +namespace DB +{ + + +/** Аналогично Poco::DeflatingStreamBuf, но используется библиотека QuickLZ, + * а также поддерживается только ostream. + */ +class CompressingStreamBuf : public Poco::BufferedStreamBuf +{ +public: + CompressingStreamBuf(std::ostream & ostr); + ~CompressingStreamBuf(); + int close(); + +protected: + int writeToDevice(const char * buffer, std::streamsize length); + +private: + std::ostream * p_ostr; + std::vector compressed_buffer; + std::vector scratch; +}; + + +/** Базовый класс для CompressedOutputStream; содержит CompressingStreamBuf + */ +class CompressingIOS : public virtual std::ios +{ +public: + CompressingIOS(std::ostream & ostr); + CompressingStreamBuf * rdbuf(); + +protected: + CompressingStreamBuf buf; +}; + + +/** Сжимает всё с помощью алгоритма QuickLZ блоками не более DBMS_COMPRESSING_STREAM_BUFFER_SIZE. + * Для записи последнего блока, следует вызвать метод close(). + */ +class CompressedOutputStream : public CompressingIOS, public std::ostream +{ +public: + CompressedOutputStream(std::ostream & ostr); + int close(); +}; + + +} + + +#endif diff --git a/dbms/include/DB/IO/CompressedReadBuffer.h b/dbms/include/DB/IO/CompressedReadBuffer.h index cb373eeddac..7d10e334a62 100644 --- a/dbms/include/DB/IO/CompressedReadBuffer.h +++ b/dbms/include/DB/IO/CompressedReadBuffer.h @@ -4,16 +4,12 @@ #include #include -#include +#include #include #include #include -#include - - -/// Если сжатый кусок больше 1GB - значит ошибка -#define DB_COMPRESSED_BUFFER_MAX_COMPRESSED_SIZE 0x40000000 +#include namespace DB @@ -25,39 +21,54 @@ private: ReadBuffer & in; std::vector compressed_buffer; + std::vector decompressed_buffer; + char scratch[QLZ_SCRATCH_DECOMPRESS]; + + size_t pos_in_buffer; public: CompressedReadBuffer(ReadBuffer & in_) - : in(in_) + : in(in_), + compressed_buffer(QUICKLZ_HEADER_SIZE), + pos_in_buffer(0) { } + /** Читает и разжимает следующий кусок сжатых данных. */ + void readCompressedChunk() + { + in.readStrict(&compressed_buffer[0], QUICKLZ_HEADER_SIZE); + + size_t size_compressed = qlz_size_compressed(&compressed_buffer[0]); + size_t size_decompressed = qlz_size_decompressed(&compressed_buffer[0]); + + compressed_buffer.resize(size_compressed); + decompressed_buffer.resize(size_decompressed); + + in.readStrict(&compressed_buffer[QUICKLZ_HEADER_SIZE], size_compressed - QUICKLZ_HEADER_SIZE); + + qlz_decompress(&compressed_buffer[0], &decompressed_buffer[0], scratch); + + pos_in_buffer = 0; + } + bool next() { - if (in.eof()) - return false; + if (pos_in_buffer == decompressed_buffer.size()) + { + if (in.eof()) + return false; - size_t size_compressed = 0; - readVarUInt(size_compressed, in); - if (size_compressed == 0) - throw Exception("Too small size_compressed", ErrorCodes::TOO_SMALL_SIZE_COMPRESSED); - if (size_compressed > DB_COMPRESSED_BUFFER_MAX_COMPRESSED_SIZE) - throw Exception("Too large size_compressed", ErrorCodes::TOO_LARGE_SIZE_COMPRESSED); + readCompressedChunk(); + } + + size_t bytes_to_copy = std::min(decompressed_buffer.size() - pos_in_buffer, + static_cast(DEFAULT_READ_BUFFER_SIZE)); + std::memcpy(working_buffer.begin(), &decompressed_buffer[pos_in_buffer], bytes_to_copy); - compressed_buffer.resize(size_compressed); - in.readStrict(&compressed_buffer[0], size_compressed); - - size_t size_decompressed = 0; - if (!snappy::GetUncompressedLength(&compressed_buffer[0], size_compressed, &size_decompressed)) - throw Exception("Cannot decompress corrupted data", ErrorCodes::CANNOT_DECOMPRESS_CORRUPTED_DATA); - - internal_buffer.resize(size_decompressed); - - if (!snappy::RawUncompress(&compressed_buffer[0], size_compressed, &internal_buffer[0])) - throw Exception("Cannot decompress corrupted data", ErrorCodes::CANNOT_DECOMPRESS_CORRUPTED_DATA); - - working_buffer = Buffer(working_buffer.begin(), working_buffer.begin() + size_decompressed); + pos_in_buffer += bytes_to_copy; pos = working_buffer.begin(); + working_buffer = Buffer(working_buffer.begin(), working_buffer.begin() + bytes_to_copy); return true; } @@ -65,6 +76,4 @@ public: } -#undef DB_COMPRESSED_BUFFER_MAX_COMPRESSED_SIZE - #endif diff --git a/dbms/include/DB/IO/CompressedStream.h b/dbms/include/DB/IO/CompressedStream.h new file mode 100644 index 00000000000..1c1e0e55eff --- /dev/null +++ b/dbms/include/DB/IO/CompressedStream.h @@ -0,0 +1,11 @@ +#ifndef DBMS_COMMON_COMPRESSING_STREAM_DEFINES_H +#define DBMS_COMMON_COMPRESSING_STREAM_DEFINES_H + +/** Общие для CompressingStream.h и DecompressingStream.h дефайны */ + +#define DBMS_STREAM_BUFFER_SIZE 4096 +#define DBMS_COMPRESSING_STREAM_BUFFER_SIZE 1048576 +#define QUICKLZ_ADDITIONAL_SPACE 400 +#define QUICKLZ_HEADER_SIZE 9 + +#endif diff --git a/dbms/include/DB/IO/CompressedWriteBuffer.h b/dbms/include/DB/IO/CompressedWriteBuffer.h index f1f70a343f9..0e0a5019f03 100644 --- a/dbms/include/DB/IO/CompressedWriteBuffer.h +++ b/dbms/include/DB/IO/CompressedWriteBuffer.h @@ -1,10 +1,10 @@ #ifndef DBMS_COMMON_COMPRESSED_WRITEBUFFER_H #define DBMS_COMMON_COMPRESSED_WRITEBUFFER_H -#include +#include #include -#include +#include namespace DB @@ -15,7 +15,9 @@ class CompressedWriteBuffer : public WriteBuffer private: WriteBuffer & out; - std::vector compressed_buffer; + char compressed_buffer[DBMS_COMPRESSING_STREAM_BUFFER_SIZE + QUICKLZ_ADDITIONAL_SPACE]; + char scratch[QLZ_SCRATCH_COMPRESS]; + size_t compressed_bytes; public: @@ -23,18 +25,15 @@ public: void next() { - compressed_buffer.resize(snappy::MaxCompressedLength(pos - working_buffer.begin())); - size_t compressed_size = 0; - snappy::RawCompress( + size_t compressed_size = qlz_compress( working_buffer.begin(), + compressed_buffer, pos - working_buffer.begin(), - &compressed_buffer[0], - &compressed_size); + scratch); - DB::writeVarUInt(compressed_size, out); - out.write(&compressed_buffer[0], compressed_size); + out.write(compressed_buffer, compressed_size); pos = working_buffer.begin(); - compressed_bytes += compressed_size + DB::getLengthOfVarUInt(compressed_size); + compressed_bytes += compressed_size; } /// Объём данных, которые были сжаты diff --git a/dbms/src/IO/CompressedInputStream.cpp b/dbms/src/IO/CompressedInputStream.cpp new file mode 100644 index 00000000000..5a8edfe8e2c --- /dev/null +++ b/dbms/src/IO/CompressedInputStream.cpp @@ -0,0 +1,116 @@ +#include + +#include + +#include + + +namespace DB +{ + + +DecompressingStreamBuf::DecompressingStreamBuf(std::istream & istr) + : Poco::BufferedStreamBuf(DBMS_STREAM_BUFFER_SIZE, std::ios::in), + pos_in_buffer(0), + p_istr(&istr), + compressed_buffer(QUICKLZ_HEADER_SIZE), + scratch(QLZ_SCRATCH_DECOMPRESS) +{ +} + + +void DecompressingStreamBuf::getChunk(std::vector & res) +{ + readCompressedChunk(); + pos_in_buffer = uncompressed_buffer.size(); + res.resize(pos_in_buffer); + memcpy(&res[0], &uncompressed_buffer[0], pos_in_buffer); +} + + +void DecompressingStreamBuf::readCompressedChunk() +{ + /// прочитаем заголовок + p_istr->read(&compressed_buffer[0], QUICKLZ_HEADER_SIZE); + + if (!p_istr->good()) + return; + + size_t size_compressed = qlz_size_compressed(&compressed_buffer[0]); + size_t size_decompressed = qlz_size_decompressed(&compressed_buffer[0]); + + compressed_buffer.resize(size_compressed); + uncompressed_buffer.resize(size_decompressed); + + /// считаем остаток сжатого блока + + p_istr->read(&compressed_buffer[QUICKLZ_HEADER_SIZE], size_compressed - QUICKLZ_HEADER_SIZE); + + if (!p_istr->good()) + return; + + /// разжимаем блок + qlz_decompress(&compressed_buffer[0], &uncompressed_buffer[0], &scratch[0]); +} + + +int DecompressingStreamBuf::readFromDevice(char * buffer, std::streamsize length) +{ + if (length == 0 || !p_istr) + return 0; + + size_t bytes_processed = 0; + + while (bytes_processed < static_cast(length)) + { + if (pos_in_buffer == uncompressed_buffer.size()) + { + readCompressedChunk(); + pos_in_buffer = 0; + + if (!p_istr->good()) + { + p_istr = 0; + return bytes_processed; + } + } + + size_t bytes_to_copy = std::min( + uncompressed_buffer.size() - pos_in_buffer, + static_cast(length) - bytes_processed); + + memcpy(buffer + bytes_processed, &uncompressed_buffer[pos_in_buffer], bytes_to_copy); + pos_in_buffer += bytes_to_copy; + bytes_processed += bytes_to_copy; + } + + return static_cast(length); +} + + +DecompressingIOS::DecompressingIOS(std::istream & istr) + : buf(istr) +{ + poco_ios_init(&buf); +} + + +DecompressingStreamBuf * DecompressingIOS::rdbuf() +{ + return &buf; +} + + +CompressedInputStream::CompressedInputStream(std::istream & istr) + : DecompressingIOS(istr), + std::istream(&buf) +{ +} + + +void CompressedInputStream::getChunk(std::vector & res) +{ + buf.getChunk(res); +} + +} diff --git a/dbms/src/IO/CompressedOutputStream.cpp b/dbms/src/IO/CompressedOutputStream.cpp new file mode 100644 index 00000000000..9877dbcac65 --- /dev/null +++ b/dbms/src/IO/CompressedOutputStream.cpp @@ -0,0 +1,74 @@ +#include + +#include + + +namespace DB +{ + + +CompressingStreamBuf::CompressingStreamBuf(std::ostream & ostr) + : Poco::BufferedStreamBuf(DBMS_COMPRESSING_STREAM_BUFFER_SIZE, std::ios::out), + p_ostr(&ostr), + compressed_buffer(DBMS_COMPRESSING_STREAM_BUFFER_SIZE + QUICKLZ_ADDITIONAL_SPACE), + scratch(QLZ_SCRATCH_COMPRESS) +{ +} + + +CompressingStreamBuf::~CompressingStreamBuf() +{ + close(); +} + + +int CompressingStreamBuf::close() +{ + sync(); + return 0; +} + + +int CompressingStreamBuf::writeToDevice(const char * buffer, std::streamsize length) +{ + if (length == 0 || !p_ostr) + return 0; + + size_t compressed_size = qlz_compress( + buffer, + &compressed_buffer[0], + length, + &scratch[0]); + + p_ostr->write(&compressed_buffer[0], compressed_size); + return static_cast(length); +} + + +CompressingIOS::CompressingIOS(std::ostream & ostr) + : buf(ostr) +{ + poco_ios_init(&buf); +} + + +CompressingStreamBuf * CompressingIOS::rdbuf() +{ + return &buf; +} + + +CompressedOutputStream::CompressedOutputStream(std::ostream & ostr) + : CompressingIOS(ostr), + std::ostream(&buf) +{ +} + + +int CompressedOutputStream::close() +{ + return buf.close(); +} + + +} diff --git a/dbms/src/IO/tests/compressed_buffer.cpp b/dbms/src/IO/tests/compressed_buffer.cpp new file mode 100644 index 00000000000..6a249ffa438 --- /dev/null +++ b/dbms/src/IO/tests/compressed_buffer.cpp @@ -0,0 +1,112 @@ +#include + +#include +#include +#include + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +int main(int argc, char ** argv) +{ + try + { + size_t n = 10000000; + Poco::Stopwatch stopwatch; + + { + std::ofstream ostr("test1"); + DB::WriteBufferFromOStream buf(ostr); + DB::CompressedWriteBuffer compressed_buf(buf); + + stopwatch.restart(); + for (size_t i = 0; i < n; ++i) + { + DB::writeIntText(i, compressed_buf); + DB::writeChar('\t', compressed_buf); + } + stopwatch.stop(); + std::cout << "Writing done (1). Elapsed: " << static_cast(stopwatch.elapsed()) / 1000000 << std::endl; + } + + { + std::ofstream ostr("test2"); + DB::CompressedOutputStream compressed_ostr(ostr); + DB::WriteBufferFromOStream compressed_buf(compressed_ostr); + + stopwatch.restart(); + for (size_t i = 0; i < n; ++i) + { + DB::writeIntText(i, compressed_buf); + DB::writeChar('\t', compressed_buf); + } + stopwatch.stop(); + std::cout << "Writing done (2). Elapsed: " << static_cast(stopwatch.elapsed()) / 1000000 << std::endl; + } + + { + std::ifstream istr("test1"); + DB::ReadBufferFromIStream buf(istr); + DB::CompressedReadBuffer compressed_buf(buf); + std::string s; + + stopwatch.restart(); + for (size_t i = 0; i < n; ++i) + { + size_t x; + DB::readIntText(x, compressed_buf); + compressed_buf.ignore(); + + if (x != i) + { + std::stringstream s; + s << "Failed!, read: " << x << ", expected: " << i; + throw DB::Exception(s.str()); + } + } + stopwatch.stop(); + std::cout << "Reading done (1). Elapsed: " << static_cast(stopwatch.elapsed()) / 1000000 << std::endl; + } + + { + std::ifstream istr("test2"); + DB::CompressedInputStream compressed_istr(istr); + DB::ReadBufferFromIStream compressed_buf(compressed_istr); + std::string s; + + stopwatch.restart(); + for (size_t i = 0; i < n; ++i) + { + size_t x; + DB::readIntText(x, compressed_buf); + compressed_buf.ignore(); + + if (x != i) + { + std::stringstream s; + s << "Failed!, read: " << x << ", expected: " << i; + throw DB::Exception(s.str()); + } + } + stopwatch.stop(); + std::cout << "Reading done (2). Elapsed: " << static_cast(stopwatch.elapsed()) / 1000000 << std::endl; + } + } + catch (const DB::Exception & e) + { + std::cerr << e.what() << ", " << e.message() << std::endl; + return 1; + } + + return 0; +} diff --git a/utils/compressor/test.sh b/utils/compressor/test.sh index 117c90a4919..672ba0dc180 100755 --- a/utils/compressor/test.sh +++ b/utils/compressor/test.sh @@ -1,5 +1,5 @@ #!/bin/sh -./compressor < compressor > compressor.snp -./compressor -d < compressor.snp > compressor2 +./compressor < compressor > compressor.qlz +./compressor -d < compressor.qlz > compressor2 cmp compressor compressor2 && echo "Ok." || echo "Fail."