diff --git a/dbms/include/DB/Core/ErrorCodes.h b/dbms/include/DB/Core/ErrorCodes.h index ee18904589c..ac8f3dcf89b 100644 --- a/dbms/include/DB/Core/ErrorCodes.h +++ b/dbms/include/DB/Core/ErrorCodes.h @@ -46,6 +46,9 @@ namespace ErrorCodes TOO_LESS_ARGUMENTS_FOR_FUNCTION, UNKNOWN_ELEMENT_IN_AST, CANNOT_PARSE_DATE, + TOO_SMALL_SIZE_COMPRESSED, + TOO_LARGE_SIZE_COMPRESSED, + CANNOT_DECOMPRESS_CORRUPTED_DATA, }; } diff --git a/dbms/include/DB/IO/CompressedInputStream.h b/dbms/include/DB/IO/CompressedInputStream.h deleted file mode 100644 index 34f0659d38c..00000000000 --- a/dbms/include/DB/IO/CompressedInputStream.h +++ /dev/null @@ -1,74 +0,0 @@ -#ifndef DBMS_COMMON_COMPRESSED_INPUT_STREAM_H -#define DBMS_COMMON_COMPRESSED_INPUT_STREAM_H - -#include -#include -#include - -#include - -#include - - -namespace DB -{ - - -/** Аналогично Poco::InflatingStreamBuf, но используется библиотека QuickLZ, - * а также поддерживается только istream. - */ -class DecompressingStreamBuf : public Poco::BufferedStreamBuf -{ -public: - DecompressingStreamBuf(std::istream & istr); - - /** прочитать целиком один сжатый блок данных; - */ - void getChunk(std::vector & res); - -protected: - int readFromDevice(char * buffer, std::streamsize length); - -private: - size_t pos_in_buffer; - std::istream * p_istr; - std::vector uncompressed_buffer; - std::vector compressed_buffer; - std::vector scratch; - - /** Читает и разжимает следующий кусок сжатых данных. */ - void readCompressedChunk(); -}; - - -/** Базовый класс для CompressedInputStream; содержит DecompressingStreamBuf - */ -class DecompressingIOS : public virtual std::ios -{ -public: - DecompressingIOS(std::istream & istr); - DecompressingStreamBuf * rdbuf(); - -protected: - DecompressingStreamBuf buf; -}; - - -/** Разжимает данные, сжатые с помощью алгоритма QuickLZ. - */ -class CompressedInputStream : public DecompressingIOS, public std::istream -{ -public: - CompressedInputStream(std::istream & istr); - int close(); - - /** прочитать целиком один сжатый блок данных - */ - void getChunk(std::vector & res); -}; - - -} - - -#endif diff --git a/dbms/include/DB/IO/CompressedOutputStream.h b/dbms/include/DB/IO/CompressedOutputStream.h deleted file mode 100644 index fb673020dfd..00000000000 --- a/dbms/include/DB/IO/CompressedOutputStream.h +++ /dev/null @@ -1,66 +0,0 @@ -#ifndef DBMS_COMMON_COMPRESSED_OUTPUT_STREAM_H -#define DBMS_COMMON_COMPRESSED_OUTPUT_STREAM_H - -#include -#include -#include - -#include - -#include - -#include - - -namespace DB -{ - - -/** Аналогично Poco::DeflatingStreamBuf, но используется библиотека QuickLZ, - * а также поддерживается только ostream. - */ -class CompressingStreamBuf : public Poco::BufferedStreamBuf -{ -public: - CompressingStreamBuf(std::ostream & ostr); - ~CompressingStreamBuf(); - int close(); - -protected: - int writeToDevice(const char * buffer, std::streamsize length); - -private: - std::ostream * p_ostr; - std::vector compressed_buffer; - std::vector scratch; -}; - - -/** Базовый класс для CompressedOutputStream; содержит CompressingStreamBuf - */ -class CompressingIOS : public virtual std::ios -{ -public: - CompressingIOS(std::ostream & ostr); - CompressingStreamBuf * rdbuf(); - -protected: - CompressingStreamBuf buf; -}; - - -/** Сжимает всё с помощью алгоритма QuickLZ блоками не более DBMS_COMPRESSING_STREAM_BUFFER_SIZE. - * Для записи последнего блока, следует вызвать метод close(). - */ -class CompressedOutputStream : public CompressingIOS, public std::ostream -{ -public: - CompressedOutputStream(std::ostream & ostr); - int close(); -}; - - -} - - -#endif diff --git a/dbms/include/DB/IO/CompressedReadBuffer.h b/dbms/include/DB/IO/CompressedReadBuffer.h index 7d10e334a62..cb373eeddac 100644 --- a/dbms/include/DB/IO/CompressedReadBuffer.h +++ b/dbms/include/DB/IO/CompressedReadBuffer.h @@ -4,12 +4,16 @@ #include #include -#include +#include #include #include #include -#include +#include + + +/// Если сжатый кусок больше 1GB - значит ошибка +#define DB_COMPRESSED_BUFFER_MAX_COMPRESSED_SIZE 0x40000000 namespace DB @@ -21,54 +25,39 @@ private: ReadBuffer & in; std::vector compressed_buffer; - std::vector decompressed_buffer; - char scratch[QLZ_SCRATCH_DECOMPRESS]; - - size_t pos_in_buffer; public: CompressedReadBuffer(ReadBuffer & in_) - : in(in_), - compressed_buffer(QUICKLZ_HEADER_SIZE), - pos_in_buffer(0) + : in(in_) { } - /** Читает и разжимает следующий кусок сжатых данных. */ - void readCompressedChunk() - { - in.readStrict(&compressed_buffer[0], QUICKLZ_HEADER_SIZE); - - size_t size_compressed = qlz_size_compressed(&compressed_buffer[0]); - size_t size_decompressed = qlz_size_decompressed(&compressed_buffer[0]); - - compressed_buffer.resize(size_compressed); - decompressed_buffer.resize(size_decompressed); - - in.readStrict(&compressed_buffer[QUICKLZ_HEADER_SIZE], size_compressed - QUICKLZ_HEADER_SIZE); - - qlz_decompress(&compressed_buffer[0], &decompressed_buffer[0], scratch); - - pos_in_buffer = 0; - } - bool next() { - if (pos_in_buffer == decompressed_buffer.size()) - { - if (in.eof()) - return false; + if (in.eof()) + return false; - readCompressedChunk(); - } - - size_t bytes_to_copy = std::min(decompressed_buffer.size() - pos_in_buffer, - static_cast(DEFAULT_READ_BUFFER_SIZE)); - std::memcpy(working_buffer.begin(), &decompressed_buffer[pos_in_buffer], bytes_to_copy); + size_t size_compressed = 0; + readVarUInt(size_compressed, in); + if (size_compressed == 0) + throw Exception("Too small size_compressed", ErrorCodes::TOO_SMALL_SIZE_COMPRESSED); + if (size_compressed > DB_COMPRESSED_BUFFER_MAX_COMPRESSED_SIZE) + throw Exception("Too large size_compressed", ErrorCodes::TOO_LARGE_SIZE_COMPRESSED); - pos_in_buffer += bytes_to_copy; + compressed_buffer.resize(size_compressed); + in.readStrict(&compressed_buffer[0], size_compressed); + + size_t size_decompressed = 0; + if (!snappy::GetUncompressedLength(&compressed_buffer[0], size_compressed, &size_decompressed)) + throw Exception("Cannot decompress corrupted data", ErrorCodes::CANNOT_DECOMPRESS_CORRUPTED_DATA); + + internal_buffer.resize(size_decompressed); + + if (!snappy::RawUncompress(&compressed_buffer[0], size_compressed, &internal_buffer[0])) + throw Exception("Cannot decompress corrupted data", ErrorCodes::CANNOT_DECOMPRESS_CORRUPTED_DATA); + + working_buffer = Buffer(working_buffer.begin(), working_buffer.begin() + size_decompressed); pos = working_buffer.begin(); - working_buffer = Buffer(working_buffer.begin(), working_buffer.begin() + bytes_to_copy); return true; } @@ -76,4 +65,6 @@ public: } +#undef DB_COMPRESSED_BUFFER_MAX_COMPRESSED_SIZE + #endif diff --git a/dbms/include/DB/IO/CompressedStream.h b/dbms/include/DB/IO/CompressedStream.h deleted file mode 100644 index 1c1e0e55eff..00000000000 --- a/dbms/include/DB/IO/CompressedStream.h +++ /dev/null @@ -1,11 +0,0 @@ -#ifndef DBMS_COMMON_COMPRESSING_STREAM_DEFINES_H -#define DBMS_COMMON_COMPRESSING_STREAM_DEFINES_H - -/** Общие для CompressingStream.h и DecompressingStream.h дефайны */ - -#define DBMS_STREAM_BUFFER_SIZE 4096 -#define DBMS_COMPRESSING_STREAM_BUFFER_SIZE 1048576 -#define QUICKLZ_ADDITIONAL_SPACE 400 -#define QUICKLZ_HEADER_SIZE 9 - -#endif diff --git a/dbms/include/DB/IO/CompressedWriteBuffer.h b/dbms/include/DB/IO/CompressedWriteBuffer.h index 0e0a5019f03..f1f70a343f9 100644 --- a/dbms/include/DB/IO/CompressedWriteBuffer.h +++ b/dbms/include/DB/IO/CompressedWriteBuffer.h @@ -1,10 +1,10 @@ #ifndef DBMS_COMMON_COMPRESSED_WRITEBUFFER_H #define DBMS_COMMON_COMPRESSED_WRITEBUFFER_H -#include +#include #include -#include +#include namespace DB @@ -15,9 +15,7 @@ class CompressedWriteBuffer : public WriteBuffer private: WriteBuffer & out; - char compressed_buffer[DBMS_COMPRESSING_STREAM_BUFFER_SIZE + QUICKLZ_ADDITIONAL_SPACE]; - char scratch[QLZ_SCRATCH_COMPRESS]; - + std::vector compressed_buffer; size_t compressed_bytes; public: @@ -25,15 +23,18 @@ public: void next() { - size_t compressed_size = qlz_compress( + compressed_buffer.resize(snappy::MaxCompressedLength(pos - working_buffer.begin())); + size_t compressed_size = 0; + snappy::RawCompress( working_buffer.begin(), - compressed_buffer, pos - working_buffer.begin(), - scratch); + &compressed_buffer[0], + &compressed_size); - out.write(compressed_buffer, compressed_size); + DB::writeVarUInt(compressed_size, out); + out.write(&compressed_buffer[0], compressed_size); pos = working_buffer.begin(); - compressed_bytes += compressed_size; + compressed_bytes += compressed_size + DB::getLengthOfVarUInt(compressed_size); } /// Объём данных, которые были сжаты diff --git a/dbms/src/IO/CompressedInputStream.cpp b/dbms/src/IO/CompressedInputStream.cpp deleted file mode 100644 index 5a8edfe8e2c..00000000000 --- a/dbms/src/IO/CompressedInputStream.cpp +++ /dev/null @@ -1,116 +0,0 @@ -#include - -#include - -#include - - -namespace DB -{ - - -DecompressingStreamBuf::DecompressingStreamBuf(std::istream & istr) - : Poco::BufferedStreamBuf(DBMS_STREAM_BUFFER_SIZE, std::ios::in), - pos_in_buffer(0), - p_istr(&istr), - compressed_buffer(QUICKLZ_HEADER_SIZE), - scratch(QLZ_SCRATCH_DECOMPRESS) -{ -} - - -void DecompressingStreamBuf::getChunk(std::vector & res) -{ - readCompressedChunk(); - pos_in_buffer = uncompressed_buffer.size(); - res.resize(pos_in_buffer); - memcpy(&res[0], &uncompressed_buffer[0], pos_in_buffer); -} - - -void DecompressingStreamBuf::readCompressedChunk() -{ - /// прочитаем заголовок - p_istr->read(&compressed_buffer[0], QUICKLZ_HEADER_SIZE); - - if (!p_istr->good()) - return; - - size_t size_compressed = qlz_size_compressed(&compressed_buffer[0]); - size_t size_decompressed = qlz_size_decompressed(&compressed_buffer[0]); - - compressed_buffer.resize(size_compressed); - uncompressed_buffer.resize(size_decompressed); - - /// считаем остаток сжатого блока - - p_istr->read(&compressed_buffer[QUICKLZ_HEADER_SIZE], size_compressed - QUICKLZ_HEADER_SIZE); - - if (!p_istr->good()) - return; - - /// разжимаем блок - qlz_decompress(&compressed_buffer[0], &uncompressed_buffer[0], &scratch[0]); -} - - -int DecompressingStreamBuf::readFromDevice(char * buffer, std::streamsize length) -{ - if (length == 0 || !p_istr) - return 0; - - size_t bytes_processed = 0; - - while (bytes_processed < static_cast(length)) - { - if (pos_in_buffer == uncompressed_buffer.size()) - { - readCompressedChunk(); - pos_in_buffer = 0; - - if (!p_istr->good()) - { - p_istr = 0; - return bytes_processed; - } - } - - size_t bytes_to_copy = std::min( - uncompressed_buffer.size() - pos_in_buffer, - static_cast(length) - bytes_processed); - - memcpy(buffer + bytes_processed, &uncompressed_buffer[pos_in_buffer], bytes_to_copy); - pos_in_buffer += bytes_to_copy; - bytes_processed += bytes_to_copy; - } - - return static_cast(length); -} - - -DecompressingIOS::DecompressingIOS(std::istream & istr) - : buf(istr) -{ - poco_ios_init(&buf); -} - - -DecompressingStreamBuf * DecompressingIOS::rdbuf() -{ - return &buf; -} - - -CompressedInputStream::CompressedInputStream(std::istream & istr) - : DecompressingIOS(istr), - std::istream(&buf) -{ -} - - -void CompressedInputStream::getChunk(std::vector & res) -{ - buf.getChunk(res); -} - -} diff --git a/dbms/src/IO/CompressedOutputStream.cpp b/dbms/src/IO/CompressedOutputStream.cpp deleted file mode 100644 index 9877dbcac65..00000000000 --- a/dbms/src/IO/CompressedOutputStream.cpp +++ /dev/null @@ -1,74 +0,0 @@ -#include - -#include - - -namespace DB -{ - - -CompressingStreamBuf::CompressingStreamBuf(std::ostream & ostr) - : Poco::BufferedStreamBuf(DBMS_COMPRESSING_STREAM_BUFFER_SIZE, std::ios::out), - p_ostr(&ostr), - compressed_buffer(DBMS_COMPRESSING_STREAM_BUFFER_SIZE + QUICKLZ_ADDITIONAL_SPACE), - scratch(QLZ_SCRATCH_COMPRESS) -{ -} - - -CompressingStreamBuf::~CompressingStreamBuf() -{ - close(); -} - - -int CompressingStreamBuf::close() -{ - sync(); - return 0; -} - - -int CompressingStreamBuf::writeToDevice(const char * buffer, std::streamsize length) -{ - if (length == 0 || !p_ostr) - return 0; - - size_t compressed_size = qlz_compress( - buffer, - &compressed_buffer[0], - length, - &scratch[0]); - - p_ostr->write(&compressed_buffer[0], compressed_size); - return static_cast(length); -} - - -CompressingIOS::CompressingIOS(std::ostream & ostr) - : buf(ostr) -{ - poco_ios_init(&buf); -} - - -CompressingStreamBuf * CompressingIOS::rdbuf() -{ - return &buf; -} - - -CompressedOutputStream::CompressedOutputStream(std::ostream & ostr) - : CompressingIOS(ostr), - std::ostream(&buf) -{ -} - - -int CompressedOutputStream::close() -{ - return buf.close(); -} - - -} diff --git a/dbms/src/IO/tests/compressed_buffer.cpp b/dbms/src/IO/tests/compressed_buffer.cpp deleted file mode 100644 index 6a249ffa438..00000000000 --- a/dbms/src/IO/tests/compressed_buffer.cpp +++ /dev/null @@ -1,112 +0,0 @@ -#include - -#include -#include -#include - -#include - -#include -#include -#include -#include -#include -#include -#include -#include -#include - - -int main(int argc, char ** argv) -{ - try - { - size_t n = 10000000; - Poco::Stopwatch stopwatch; - - { - std::ofstream ostr("test1"); - DB::WriteBufferFromOStream buf(ostr); - DB::CompressedWriteBuffer compressed_buf(buf); - - stopwatch.restart(); - for (size_t i = 0; i < n; ++i) - { - DB::writeIntText(i, compressed_buf); - DB::writeChar('\t', compressed_buf); - } - stopwatch.stop(); - std::cout << "Writing done (1). Elapsed: " << static_cast(stopwatch.elapsed()) / 1000000 << std::endl; - } - - { - std::ofstream ostr("test2"); - DB::CompressedOutputStream compressed_ostr(ostr); - DB::WriteBufferFromOStream compressed_buf(compressed_ostr); - - stopwatch.restart(); - for (size_t i = 0; i < n; ++i) - { - DB::writeIntText(i, compressed_buf); - DB::writeChar('\t', compressed_buf); - } - stopwatch.stop(); - std::cout << "Writing done (2). Elapsed: " << static_cast(stopwatch.elapsed()) / 1000000 << std::endl; - } - - { - std::ifstream istr("test1"); - DB::ReadBufferFromIStream buf(istr); - DB::CompressedReadBuffer compressed_buf(buf); - std::string s; - - stopwatch.restart(); - for (size_t i = 0; i < n; ++i) - { - size_t x; - DB::readIntText(x, compressed_buf); - compressed_buf.ignore(); - - if (x != i) - { - std::stringstream s; - s << "Failed!, read: " << x << ", expected: " << i; - throw DB::Exception(s.str()); - } - } - stopwatch.stop(); - std::cout << "Reading done (1). Elapsed: " << static_cast(stopwatch.elapsed()) / 1000000 << std::endl; - } - - { - std::ifstream istr("test2"); - DB::CompressedInputStream compressed_istr(istr); - DB::ReadBufferFromIStream compressed_buf(compressed_istr); - std::string s; - - stopwatch.restart(); - for (size_t i = 0; i < n; ++i) - { - size_t x; - DB::readIntText(x, compressed_buf); - compressed_buf.ignore(); - - if (x != i) - { - std::stringstream s; - s << "Failed!, read: " << x << ", expected: " << i; - throw DB::Exception(s.str()); - } - } - stopwatch.stop(); - std::cout << "Reading done (2). Elapsed: " << static_cast(stopwatch.elapsed()) / 1000000 << std::endl; - } - } - catch (const DB::Exception & e) - { - std::cerr << e.what() << ", " << e.message() << std::endl; - return 1; - } - - return 0; -} diff --git a/utils/compressor/test.sh b/utils/compressor/test.sh index 672ba0dc180..117c90a4919 100755 --- a/utils/compressor/test.sh +++ b/utils/compressor/test.sh @@ -1,5 +1,5 @@ #!/bin/sh -./compressor < compressor > compressor.qlz -./compressor -d < compressor.qlz > compressor2 +./compressor < compressor > compressor.snp +./compressor -d < compressor.snp > compressor2 cmp compressor compressor2 && echo "Ok." || echo "Fail."