mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 23:21:59 +00:00
DB: unified compressed buffers and compressed iostreams [#CONV-2546].
This commit is contained in:
parent
ea1a7d9192
commit
a6ec731e4c
@ -19,7 +19,7 @@ class BufferWithOwnMemory : public Base
|
|||||||
protected:
|
protected:
|
||||||
std::vector<char> memory;
|
std::vector<char> memory;
|
||||||
public:
|
public:
|
||||||
BufferWithOwnMemory(size_t size = DBMS_DEFAULT_BUFFER_SIZE) : Base(NULL, size), memory(size)
|
BufferWithOwnMemory(size_t size = DBMS_DEFAULT_BUFFER_SIZE) : Base(NULL, 0), memory(size)
|
||||||
{
|
{
|
||||||
Base::set(&memory[0], size);
|
Base::set(&memory[0], size);
|
||||||
}
|
}
|
||||||
|
@ -13,9 +13,6 @@
|
|||||||
#include <DB/IO/CompressedStream.h>
|
#include <DB/IO/CompressedStream.h>
|
||||||
|
|
||||||
|
|
||||||
#define DBMS_COMPRESSED_READ_BUFFER_MAX_COMPRESSED_SIZE 0x40000000ULL /// 1GB
|
|
||||||
|
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
|
|
||||||
@ -38,7 +35,7 @@ private:
|
|||||||
in.readStrict(&compressed_buffer[0], QUICKLZ_HEADER_SIZE);
|
in.readStrict(&compressed_buffer[0], QUICKLZ_HEADER_SIZE);
|
||||||
|
|
||||||
size_t size_compressed = qlz_size_compressed(&compressed_buffer[0]);
|
size_t size_compressed = qlz_size_compressed(&compressed_buffer[0]);
|
||||||
if (size_compressed > DBMS_COMPRESSED_READ_BUFFER_MAX_COMPRESSED_SIZE)
|
if (size_compressed > DBMS_MAX_COMPRESSED_SIZE)
|
||||||
throw Exception("Too large size_compressed. Most likely corrupted data.", ErrorCodes::TOO_LARGE_SIZE_COMPRESSED);
|
throw Exception("Too large size_compressed. Most likely corrupted data.", ErrorCodes::TOO_LARGE_SIZE_COMPRESSED);
|
||||||
|
|
||||||
size_t size_decompressed = qlz_size_decompressed(&compressed_buffer[0]);
|
size_t size_decompressed = qlz_size_decompressed(&compressed_buffer[0]);
|
||||||
|
@ -3,6 +3,7 @@
|
|||||||
|
|
||||||
/** Общие для CompressingStream.h и DecompressingStream.h дефайны */
|
/** Общие для CompressingStream.h и DecompressingStream.h дефайны */
|
||||||
|
|
||||||
|
#define DBMS_MAX_COMPRESSED_SIZE 0x40000000ULL /// 1GB
|
||||||
#define DBMS_STREAM_BUFFER_SIZE 4096
|
#define DBMS_STREAM_BUFFER_SIZE 4096
|
||||||
#define DBMS_COMPRESSING_STREAM_BUFFER_SIZE 1048576
|
#define DBMS_COMPRESSING_STREAM_BUFFER_SIZE 1048576
|
||||||
#define QUICKLZ_ADDITIONAL_SPACE 400
|
#define QUICKLZ_ADDITIONAL_SPACE 400
|
||||||
|
@ -1,7 +1,10 @@
|
|||||||
#include <algorithm>
|
#include <algorithm>
|
||||||
|
|
||||||
|
#include <city.h>
|
||||||
#include <quicklz/quicklz_level1.h>
|
#include <quicklz/quicklz_level1.h>
|
||||||
|
|
||||||
|
#include <DB/Core/ErrorCodes.h>
|
||||||
|
#include <DB/Core/Exception.h>
|
||||||
#include <DB/IO/CompressedInputStream.h>
|
#include <DB/IO/CompressedInputStream.h>
|
||||||
|
|
||||||
|
|
||||||
@ -30,13 +33,25 @@ void DecompressingStreamBuf::getChunk(std::vector<char> & res)
|
|||||||
|
|
||||||
void DecompressingStreamBuf::readCompressedChunk()
|
void DecompressingStreamBuf::readCompressedChunk()
|
||||||
{
|
{
|
||||||
|
/// прочитаем чексумму
|
||||||
|
uint128 checksum;
|
||||||
|
p_istr->read(reinterpret_cast<char *>(&checksum), sizeof(checksum));
|
||||||
|
|
||||||
|
if (p_istr->eof())
|
||||||
|
return;
|
||||||
|
if (!p_istr->good())
|
||||||
|
throw Exception("Cannot read all data.", ErrorCodes::CANNOT_READ_ALL_DATA);
|
||||||
|
|
||||||
/// прочитаем заголовок
|
/// прочитаем заголовок
|
||||||
p_istr->read(&compressed_buffer[0], QUICKLZ_HEADER_SIZE);
|
p_istr->read(&compressed_buffer[0], QUICKLZ_HEADER_SIZE);
|
||||||
|
|
||||||
if (!p_istr->good())
|
if (!p_istr->good())
|
||||||
return;
|
throw Exception("Cannot read all data.", ErrorCodes::CANNOT_READ_ALL_DATA);
|
||||||
|
|
||||||
size_t size_compressed = qlz_size_compressed(&compressed_buffer[0]);
|
size_t size_compressed = qlz_size_compressed(&compressed_buffer[0]);
|
||||||
|
if (size_compressed > DBMS_MAX_COMPRESSED_SIZE)
|
||||||
|
throw Exception("Too large size_compressed. Most likely corrupted data.", ErrorCodes::TOO_LARGE_SIZE_COMPRESSED);
|
||||||
|
|
||||||
size_t size_decompressed = qlz_size_decompressed(&compressed_buffer[0]);
|
size_t size_decompressed = qlz_size_decompressed(&compressed_buffer[0]);
|
||||||
|
|
||||||
compressed_buffer.resize(size_compressed);
|
compressed_buffer.resize(size_compressed);
|
||||||
@ -47,7 +62,10 @@ void DecompressingStreamBuf::readCompressedChunk()
|
|||||||
p_istr->read(&compressed_buffer[QUICKLZ_HEADER_SIZE], size_compressed - QUICKLZ_HEADER_SIZE);
|
p_istr->read(&compressed_buffer[QUICKLZ_HEADER_SIZE], size_compressed - QUICKLZ_HEADER_SIZE);
|
||||||
|
|
||||||
if (!p_istr->good())
|
if (!p_istr->good())
|
||||||
return;
|
throw Exception("Cannot read all data.", ErrorCodes::CANNOT_READ_ALL_DATA);
|
||||||
|
|
||||||
|
if (checksum != CityHash128(&compressed_buffer[0], size_compressed))
|
||||||
|
throw Exception("Checksum doesnt match: corrupted data.", ErrorCodes::CHECKSUM_DOESNT_MATCH);
|
||||||
|
|
||||||
/// разжимаем блок
|
/// разжимаем блок
|
||||||
qlz_decompress(&compressed_buffer[0], &uncompressed_buffer[0], &scratch[0]);
|
qlz_decompress(&compressed_buffer[0], &uncompressed_buffer[0], &scratch[0]);
|
||||||
|
@ -1,5 +1,7 @@
|
|||||||
#include <algorithm>
|
#include <algorithm>
|
||||||
|
|
||||||
|
#include <city.h>
|
||||||
|
|
||||||
#include <DB/IO/CompressedOutputStream.h>
|
#include <DB/IO/CompressedOutputStream.h>
|
||||||
|
|
||||||
|
|
||||||
@ -40,6 +42,8 @@ int CompressingStreamBuf::writeToDevice(const char * buffer, std::streamsize len
|
|||||||
length,
|
length,
|
||||||
&scratch[0]);
|
&scratch[0]);
|
||||||
|
|
||||||
|
uint128 checksum = CityHash128(&compressed_buffer[0], compressed_size);
|
||||||
|
p_ostr->write(reinterpret_cast<const char *>(&checksum), sizeof(checksum));
|
||||||
p_ostr->write(&compressed_buffer[0], compressed_size);
|
p_ostr->write(&compressed_buffer[0], compressed_size);
|
||||||
return static_cast<int>(length);
|
return static_cast<int>(length);
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user