diff --git a/dbms/include/DB/Core/ErrorCodes.h b/dbms/include/DB/Core/ErrorCodes.h index ee18904589c..f8afa0ee96e 100644 --- a/dbms/include/DB/Core/ErrorCodes.h +++ b/dbms/include/DB/Core/ErrorCodes.h @@ -46,6 +46,8 @@ namespace ErrorCodes TOO_LESS_ARGUMENTS_FOR_FUNCTION, UNKNOWN_ELEMENT_IN_AST, CANNOT_PARSE_DATE, + TOO_LARGE_SIZE_COMPRESSED, + CHECKSUM_DOESNT_MATCH, }; } diff --git a/dbms/include/DB/IO/CompressedReadBuffer.h b/dbms/include/DB/IO/CompressedReadBuffer.h index cc0fbbf072f..9a2db41e8c6 100644 --- a/dbms/include/DB/IO/CompressedReadBuffer.h +++ b/dbms/include/DB/IO/CompressedReadBuffer.h @@ -2,8 +2,8 @@ #define DBMS_COMMON_COMPRESSED_READBUFFER_H #include -#include +#include #include #include @@ -12,6 +12,9 @@ #include +#define DBMS_COMPRESSED_READ_BUFFER_MAX_COMPRESSED_SIZE 0x40000000ULL /// 1GB + + namespace DB { @@ -34,10 +37,16 @@ public: { if (in.eof()) return false; + + uint128 checksum; + in.readStrict(reinterpret_cast(&checksum), sizeof(checksum)); in.readStrict(&compressed_buffer[0], QUICKLZ_HEADER_SIZE); size_t size_compressed = qlz_size_compressed(&compressed_buffer[0]); + if (size_compressed > DBMS_COMPRESSED_READ_BUFFER_MAX_COMPRESSED_SIZE) + throw Exception("Too large size_compressed. Most likely corrupted data.", ErrorCodes::TOO_LARGE_SIZE_COMPRESSED); + size_t size_decompressed = qlz_size_decompressed(&compressed_buffer[0]); compressed_buffer.resize(size_compressed); @@ -45,6 +54,9 @@ public: in.readStrict(&compressed_buffer[QUICKLZ_HEADER_SIZE], size_compressed - QUICKLZ_HEADER_SIZE); + if (checksum != CityHash128(&compressed_buffer[0], size_compressed)) + throw Exception("Checksum doesnt match: corrupted data.", ErrorCodes::CHECKSUM_DOESNT_MATCH); + qlz_decompress(&compressed_buffer[0], &internal_buffer[0], scratch); pos = working_buffer.begin(); diff --git a/dbms/include/DB/IO/CompressedWriteBuffer.h b/dbms/include/DB/IO/CompressedWriteBuffer.h index 0e0a5019f03..b54ea9638e9 100644 --- a/dbms/include/DB/IO/CompressedWriteBuffer.h +++ b/dbms/include/DB/IO/CompressedWriteBuffer.h @@ -1,6 +1,9 @@ #ifndef DBMS_COMMON_COMPRESSED_WRITEBUFFER_H #define DBMS_COMMON_COMPRESSED_WRITEBUFFER_H +#include + +#include #include #include @@ -15,7 +18,7 @@ class CompressedWriteBuffer : public WriteBuffer private: WriteBuffer & out; - char compressed_buffer[DBMS_COMPRESSING_STREAM_BUFFER_SIZE + QUICKLZ_ADDITIONAL_SPACE]; + std::vector compressed_buffer; char scratch[QLZ_SCRATCH_COMPRESS]; size_t compressed_bytes; @@ -25,13 +28,19 @@ public: void next() { + size_t uncompressed_size = pos - working_buffer.begin(); + compressed_buffer.resize(uncompressed_size + QUICKLZ_ADDITIONAL_SPACE); + size_t compressed_size = qlz_compress( working_buffer.begin(), - compressed_buffer, - pos - working_buffer.begin(), + &compressed_buffer[0], + uncompressed_size, scratch); - out.write(compressed_buffer, compressed_size); + uint128 checksum = CityHash128(&compressed_buffer[0], compressed_size); + out.write(reinterpret_cast(&checksum), sizeof(checksum)); + + out.write(&compressed_buffer[0], compressed_size); pos = working_buffer.begin(); compressed_bytes += compressed_size; } diff --git a/dbms/src/IO/tests/compressed_buffer.cpp b/dbms/src/IO/tests/compressed_buffer.cpp index 6a249ffa438..b332c3cda08 100644 --- a/dbms/src/IO/tests/compressed_buffer.cpp +++ b/dbms/src/IO/tests/compressed_buffer.cpp @@ -21,7 +21,7 @@ int main(int argc, char ** argv) { try { - size_t n = 10000000; + size_t n = 100000000; Poco::Stopwatch stopwatch; {