dbms: added checksums in compressed data [#CONV-2546].

This commit is contained in:
Alexey Milovidov 2011-06-24 20:18:09 +00:00
parent 4b2f373163
commit 739361334f
4 changed files with 29 additions and 6 deletions

View File

@ -46,6 +46,8 @@ namespace ErrorCodes
TOO_LESS_ARGUMENTS_FOR_FUNCTION, TOO_LESS_ARGUMENTS_FOR_FUNCTION,
UNKNOWN_ELEMENT_IN_AST, UNKNOWN_ELEMENT_IN_AST,
CANNOT_PARSE_DATE, CANNOT_PARSE_DATE,
TOO_LARGE_SIZE_COMPRESSED,
CHECKSUM_DOESNT_MATCH,
}; };
} }

View File

@ -2,8 +2,8 @@
#define DBMS_COMMON_COMPRESSED_READBUFFER_H #define DBMS_COMMON_COMPRESSED_READBUFFER_H
#include <vector> #include <vector>
#include <algorithm>
#include <city.h>
#include <quicklz/quicklz_level1.h> #include <quicklz/quicklz_level1.h>
#include <DB/Core/Exception.h> #include <DB/Core/Exception.h>
@ -12,6 +12,9 @@
#include <DB/IO/CompressedStream.h> #include <DB/IO/CompressedStream.h>
#define DBMS_COMPRESSED_READ_BUFFER_MAX_COMPRESSED_SIZE 0x40000000ULL /// 1GB
namespace DB namespace DB
{ {
@ -34,10 +37,16 @@ public:
{ {
if (in.eof()) if (in.eof())
return false; return false;
uint128 checksum;
in.readStrict(reinterpret_cast<char *>(&checksum), sizeof(checksum));
in.readStrict(&compressed_buffer[0], QUICKLZ_HEADER_SIZE); in.readStrict(&compressed_buffer[0], QUICKLZ_HEADER_SIZE);
size_t size_compressed = qlz_size_compressed(&compressed_buffer[0]); size_t size_compressed = qlz_size_compressed(&compressed_buffer[0]);
if (size_compressed > DBMS_COMPRESSED_READ_BUFFER_MAX_COMPRESSED_SIZE)
throw Exception("Too large size_compressed. Most likely corrupted data.", ErrorCodes::TOO_LARGE_SIZE_COMPRESSED);
size_t size_decompressed = qlz_size_decompressed(&compressed_buffer[0]); size_t size_decompressed = qlz_size_decompressed(&compressed_buffer[0]);
compressed_buffer.resize(size_compressed); compressed_buffer.resize(size_compressed);
@ -45,6 +54,9 @@ public:
in.readStrict(&compressed_buffer[QUICKLZ_HEADER_SIZE], size_compressed - QUICKLZ_HEADER_SIZE); in.readStrict(&compressed_buffer[QUICKLZ_HEADER_SIZE], size_compressed - QUICKLZ_HEADER_SIZE);
if (checksum != CityHash128(&compressed_buffer[0], size_compressed))
throw Exception("Checksum doesnt match: corrupted data.", ErrorCodes::CHECKSUM_DOESNT_MATCH);
qlz_decompress(&compressed_buffer[0], &internal_buffer[0], scratch); qlz_decompress(&compressed_buffer[0], &internal_buffer[0], scratch);
pos = working_buffer.begin(); pos = working_buffer.begin();

View File

@ -1,6 +1,9 @@
#ifndef DBMS_COMMON_COMPRESSED_WRITEBUFFER_H #ifndef DBMS_COMMON_COMPRESSED_WRITEBUFFER_H
#define DBMS_COMMON_COMPRESSED_WRITEBUFFER_H #define DBMS_COMMON_COMPRESSED_WRITEBUFFER_H
#include <vector>
#include <city.h>
#include <quicklz/quicklz_level1.h> #include <quicklz/quicklz_level1.h>
#include <DB/IO/WriteBuffer.h> #include <DB/IO/WriteBuffer.h>
@ -15,7 +18,7 @@ class CompressedWriteBuffer : public WriteBuffer
private: private:
WriteBuffer & out; WriteBuffer & out;
char compressed_buffer[DBMS_COMPRESSING_STREAM_BUFFER_SIZE + QUICKLZ_ADDITIONAL_SPACE]; std::vector<char> compressed_buffer;
char scratch[QLZ_SCRATCH_COMPRESS]; char scratch[QLZ_SCRATCH_COMPRESS];
size_t compressed_bytes; size_t compressed_bytes;
@ -25,13 +28,19 @@ public:
void next() void next()
{ {
size_t uncompressed_size = pos - working_buffer.begin();
compressed_buffer.resize(uncompressed_size + QUICKLZ_ADDITIONAL_SPACE);
size_t compressed_size = qlz_compress( size_t compressed_size = qlz_compress(
working_buffer.begin(), working_buffer.begin(),
compressed_buffer, &compressed_buffer[0],
pos - working_buffer.begin(), uncompressed_size,
scratch); scratch);
out.write(compressed_buffer, compressed_size); uint128 checksum = CityHash128(&compressed_buffer[0], compressed_size);
out.write(reinterpret_cast<const char *>(&checksum), sizeof(checksum));
out.write(&compressed_buffer[0], compressed_size);
pos = working_buffer.begin(); pos = working_buffer.begin();
compressed_bytes += compressed_size; compressed_bytes += compressed_size;
} }

View File

@ -21,7 +21,7 @@ int main(int argc, char ** argv)
{ {
try try
{ {
size_t n = 10000000; size_t n = 100000000;
Poco::Stopwatch stopwatch; Poco::Stopwatch stopwatch;
{ {