ISSUES-838 add lz4、none、zstd codec

This commit is contained in:
zhang2014 2018-10-11 10:57:48 +08:00
parent 9eca78e764
commit 6d0c4eaf89
32 changed files with 1156 additions and 95 deletions

View File

@ -72,6 +72,7 @@ add_headers_and_sources(dbms src/Storages/Kafka)
add_headers_and_sources(dbms src/Storages/MergeTree)
add_headers_and_sources(dbms src/Client)
add_headers_and_sources(dbms src/Formats)
add_headers_and_sources(dbms src/Compression)
list (APPEND clickhouse_common_io_sources ${CONFIG_BUILD})
list (APPEND clickhouse_common_io_headers ${CONFIG_VERSION} ${CONFIG_COMMON})

View File

@ -13,3 +13,4 @@ add_subdirectory (AggregateFunctions)
add_subdirectory (Client)
add_subdirectory (TableFunctions)
add_subdirectory (Formats)
add_subdirectory (Compression)

View File

@ -402,6 +402,8 @@ namespace ErrorCodes
extern const int SYSTEM_ERROR = 425;
extern const int NULL_POINTER_DEREFERENCE = 426;
extern const int CANNOT_COMPILE_REGEXP = 427;
extern const int ILLEGAL_SYNTAX_FOR_CODEC_TYPE = 428;
extern const int UNKNOWN_CODEC = 429;
extern const int KEEPER_EXCEPTION = 999;
extern const int POCO_EXCEPTION = 1000;

View File

View File

@ -0,0 +1,57 @@
//#include <Compression/CompressionCodecDelta.h>
//#include <IO/CompressedStream.h>
//#include <Compression/CompressionFactory.h>
//
//
//namespace DB
//{
//
//char CompressionCodecDelta::getMethodByte()
//{
// return static_cast<char>(CompressionMethodByte::LZ4);
//}
//
//void CompressionCodecDelta::getCodecDesc(String & codec_desc)
//{
// codec_desc = "DELTA";
//}
//
//size_t CompressionCodecDelta::compress(char * source, size_t source_size, char * dest)
//{
// /// TODO: use SIMD
// return LZ4_compress_default(source, dest, source_size, LZ4_COMPRESSBOUND(source_size));
//}
//
//size_t CompressionCodecDelta::decompress(char * source, size_t source_size, char * dest, size_t size_decompressed)
//{
// LZ4::decompress(source, dest, source_size, size_decompressed, lz4_stat);
// return size_decompressed;
//}
//
//void registerCodecLZ4(CompressionCodecFactory & factory)
//{
// factory.registerCompressionCodec("DELTA", static_cast<char>(CompressionMethodByte::DELTA), [&](ASTPtr & parameters)
// {
// int width = 1;
//
// if (arguments && !arguments->children.empty())
// {
// const auto children = arguments->children;
// const ASTIdentifier * identifier = static_cast<const ASTIdentifier *>(children[0].get());
//
// String delta_type = identifier->name;
// if (delta_type == "Int8" || delta_type == "UInt8")
// width = 1;
// else if (delta_type == "Int16" || delta_type == "UInt16")
// width = 2;
// else if (delta_type == "Int32" || delta_type == "UInt32")
// width = 4;
// else if (delta_type == "Int64" || delta_type == "UInt64")
// width = 8;
// }
//
// return std::make_shared<CompressionCodecDelta>(width);
// });
//}
//
//}

View File

@ -0,0 +1,26 @@
//#pragma once
//
//#include <IO/WriteBuffer.h>
//#include <Compression/ICompressionCodec.h>
//#include <IO/BufferWithOwnMemory.h>
//#include <Parsers/StringRange.h>
//
//namespace DB
//{
//
//class CompressionCodecDelta : public ICompressionCodec
//{
//public:
// char getMethodByte() override;
//
// void getCodecDesc(String & codec_desc) override;
//
// size_t compress(char * source, size_t source_size, char * dest) override;
//
// size_t getCompressedReserveSize(size_t uncompressed_size) override;
//
// size_t decompress(char * source, size_t source_size, char * dest, size_t decompressed_size) override;
//
//};
//
//}

View File

@ -0,0 +1,46 @@
#include <Compression/CompressionCodecLZ4.h>
#include <lz4.h>
#include <lz4hc.h>
#include <IO/CompressedStream.h>
#include <Compression/CompressionFactory.h>
#include <IO/LZ4_decompress_faster.h>
#include "CompressionCodecLZ4.h"
namespace DB
{
char CompressionCodecLZ4::getMethodByte()
{
return static_cast<char>(CompressionMethodByte::LZ4);
}
void CompressionCodecLZ4::getCodecDesc(String & codec_desc)
{
codec_desc = "LZ4";
}
size_t CompressionCodecLZ4::getCompressedReserveSize(size_t uncompressed_size)
{
return LZ4_COMPRESSBOUND(uncompressed_size);
}
size_t CompressionCodecLZ4::compress(char * source, size_t source_size, char * dest)
{
return LZ4_compress_default(source, dest, source_size, LZ4_COMPRESSBOUND(source_size));
}
size_t CompressionCodecLZ4::decompress(char * source, size_t source_size, char * dest, size_t size_decompressed)
{
LZ4::decompress(source, dest, source_size, size_decompressed, lz4_stat);
return size_decompressed;
}
void registerCodecLZ4(CompressionCodecFactory & factory)
{
factory.registerSimpleCompressionCodec("LZ4", static_cast<char>(CompressionMethodByte::LZ4), [&](){
return std::make_shared<CompressionCodecLZ4>();
});
}
}

View File

@ -0,0 +1,29 @@
#pragma once
#include <IO/WriteBuffer.h>
#include <Compression/ICompressionCodec.h>
#include <IO/BufferWithOwnMemory.h>
#include <Parsers/StringRange.h>
#include <IO/LZ4_decompress_faster.h>
namespace DB
{
class CompressionCodecLZ4 : public ICompressionCodec
{
public:
char getMethodByte() override;
void getCodecDesc(String & codec_desc) override;
size_t compress(char * source, size_t source_size, char * dest) override;
size_t getCompressedReserveSize(size_t uncompressed_size) override;
size_t decompress(char * source, size_t source_size, char * dest, size_t decompressed_size) override;
private:
LZ4::PerformanceStatistics lz4_stat;
};
}

View File

@ -0,0 +1,93 @@
#include <Compression/CompressionCodecMultiple.h>
#include <IO/CompressedStream.h>
#include <common/unaligned.h>
#include <Compression/CompressionFactory.h>
namespace DB
{
CompressionCodecMultiple::CompressionCodecMultiple(Codecs codecs)
: codecs(codecs)
{
for (size_t idx = 0; idx < codecs.size(); idx++)
{
if (idx != 0)
codec_desc = codec_desc + ',';
const auto codec = codecs[idx];
String inner_codec_desc;
codec->getCodecDesc(inner_codec_desc);
codec_desc = codec_desc + inner_codec_desc;
}
}
char CompressionCodecMultiple::getMethodByte()
{
return static_cast<char>(CompressionMethodByte::Multiple);
}
void CompressionCodecMultiple::getCodecDesc(String & codec_desc_)
{
codec_desc_ = codec_desc;
}
size_t CompressionCodecMultiple::getCompressedReserveSize(size_t uncompressed_size)
{
for (auto & codec : codecs)
uncompressed_size = codec->getCompressedReserveSize(uncompressed_size);
return sizeof(UInt8) + codecs.size() + uncompressed_size;
}
size_t CompressionCodecMultiple::compress(char * source, size_t source_size, char * dest)
{
static constexpr size_t without_method_header_size = sizeof(UInt32) + sizeof(UInt32);
PODArray<char> compressed_buf;
PODArray<char> uncompressed_buf(source_size);
uncompressed_buf.insert(source, source + source_size);
dest[0] = static_cast<char>(codecs.size());
for (size_t idx = 0; idx < codecs.size(); ++idx)
{
const auto codec = codecs[idx];
dest[idx + 1] = codec->getMethodByte();
compressed_buf.resize(without_method_header_size + codec->getCompressedReserveSize(source_size));
size_t size_compressed = without_method_header_size;
size_compressed += codec->compress(&uncompressed_buf[0], source_size, &compressed_buf[without_method_header_size]);
UInt32 compressed_size_32 = size_compressed;
UInt32 uncompressed_size_32 = source_size;
unalignedStore(&compressed_buf[0], compressed_size_32);
unalignedStore(&compressed_buf[4], uncompressed_size_32);
uncompressed_buf.swap(compressed_buf);
source_size = size_compressed;
}
memcpy(&dest[codecs.size() + 1], &compressed_buf[0], source_size);
return source_size;
}
size_t CompressionCodecMultiple::decompress(char * source, size_t source_size, char * dest, size_t decompressed_size)
{
UInt8 compression_methods_size = source[0];
PODArray<char> compressed_buf;
PODArray<char> uncompressed_buf;
compressed_buf.insert(&source[compression_methods_size + 1], source_size - (compression_methods_size + 1));
for (size_t idx = 0; idx < compression_methods_size; ++idx)
{
UInt8 compression_method = source[idx + 1];
const auto codec = CompressionCodecFactory::instance().get(compression_method);
codec->decompress(&compressed_buf[8], 0, uncompressed_buf.data(), 0);
uncompressed_buf.swap(compressed_buf);
}
memcpy(dest, uncompressed_buf.data(), decompressed_size);
return decompressed_size;
}
}

View File

@ -0,0 +1,29 @@
#pragma once
#include <Compression/ICompressionCodec.h>
namespace DB
{
class CompressionCodecMultiple final : public ICompressionCodec
{
public:
CompressionCodecMultiple(Codecs codecs);
char getMethodByte() override;
void getCodecDesc(String & codec_desc) override;
size_t compress(char * source, size_t source_size, char * dest) override;
size_t getCompressedReserveSize(size_t uncompressed_size) override;
size_t decompress(char *source, size_t source_size, char *dest, size_t decompressed_size) override;
private:
Codecs codecs;
String codec_desc;
};
}

View File

@ -0,0 +1,38 @@
#include <Compression/CompressionCodecNone.h>
#include <IO/CompressedStream.h>
#include <Compression/CompressionFactory.h>
namespace DB
{
char CompressionCodecNone::getMethodByte()
{
return static_cast<char>(CompressionMethodByte::NONE);
}
void CompressionCodecNone::getCodecDesc(String & codec_desc)
{
codec_desc = "NONE";
}
size_t CompressionCodecNone::compress(char * source, size_t source_size, char * dest)
{
memcpy(dest, source, source_size);
return source_size;
}
size_t CompressionCodecNone::decompress(char *source, size_t /*source_size*/, char *dest, size_t size_decompressed)
{
memcpy(dest, source, size_decompressed);
return size_decompressed;
}
void registerCodecNone(CompressionCodecFactory & factory)
{
factory.registerSimpleCompressionCodec("NONE", static_cast<char>(CompressionMethodByte::NONE), [&](){
return std::make_shared<CompressionCodecNone>();
});
}
}

View File

@ -0,0 +1,23 @@
#pragma once
#include <IO/WriteBuffer.h>
#include <Compression/ICompressionCodec.h>
#include <IO/BufferWithOwnMemory.h>
#include <Parsers/StringRange.h>
namespace DB
{
class CompressionCodecNone : public ICompressionCodec
{
public:
char getMethodByte() override;
void getCodecDesc(String & codec_desc) override;
size_t compress(char * source, size_t source_size, char * compressed_buf) override;
size_t decompress(char *source, size_t source_size, char *dest, size_t decompressed_size) override;
};
}

View File

@ -0,0 +1,78 @@
#include <Compression/CompressionCodecZSTD.h>
#include <IO/CompressedStream.h>
#include <Compression/CompressionFactory.h>
#include <zstd.h>
#include <Core/Field.h>
#include <Parsers/IAST.h>
#include <Parsers/ASTLiteral.h>
#include <Common/typeid_cast.h>
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_COMPRESS;
extern const int CANNOT_DECOMPRESS;
}
char CompressionCodecZSTD::getMethodByte()
{
return static_cast<char>(CompressionMethodByte::ZSTD);
}
void CompressionCodecZSTD::getCodecDesc(String & codec_desc)
{
codec_desc = "ZSTD";
}
size_t CompressionCodecZSTD::getCompressedReserveSize(size_t uncompressed_size)
{
return ZSTD_compressBound(uncompressed_size);
}
size_t CompressionCodecZSTD::compress(char * source, size_t source_size, char * dest)
{
size_t compressed_size = ZSTD_compress(dest, ZSTD_compressBound(source_size), source, source_size, level);
if (ZSTD_isError(compressed_size))
throw Exception("Cannot compress block with ZSTD: " + std::string(ZSTD_getErrorName(compressed_size)), ErrorCodes::CANNOT_COMPRESS);
return compressed_size;
}
size_t CompressionCodecZSTD::decompress(char * source, size_t source_size, char * dest, size_t size_decompressed)
{
size_t res = ZSTD_decompress(dest, size_decompressed, source, source_size);
// compressed_buffer + COMPRESSED_BLOCK_HEADER_SIZE, size_compressed_without_checksum - COMPRESSED_BLOCK_HEADER_SIZE);
if (ZSTD_isError(res))
throw Exception("Cannot ZSTD_decompress: " + std::string(ZSTD_getErrorName(res)), ErrorCodes::CANNOT_DECOMPRESS);
return size_decompressed;
}
CompressionCodecZSTD::CompressionCodecZSTD(int level)
:level(level)
{
}
void registerCodecZSTD(CompressionCodecFactory & factory)
{
UInt8 method_code = static_cast<char>(CompressionMethodByte::ZSTD);
factory.registerCompressionCodec("ZSTD", method_code, [&](const ASTPtr & arguments) -> CompressionCodecPtr
{
int level = 0;
if (arguments && !arguments->children.empty())
{
const auto children = arguments->children;
const ASTLiteral * literal = static_cast<const ASTLiteral *>(children[0].get());
level = literal->value.safeGet<UInt64>();
}
return std::make_shared<CompressionCodecZSTD>(level);
});
}
}

View File

@ -0,0 +1,30 @@
#pragma once
#include <IO/WriteBuffer.h>
#include <Compression/ICompressionCodec.h>
#include <IO/BufferWithOwnMemory.h>
#include <Parsers/StringRange.h>
namespace DB
{
class CompressionCodecZSTD : public ICompressionCodec
{
public:
CompressionCodecZSTD(int level);
char getMethodByte() override;
void getCodecDesc(String & codec_desc) override;
size_t compress(char * source, size_t source_size, char * dest) override;
size_t getCompressedReserveSize(size_t uncompressed_size) override;
size_t decompress(char *source, size_t source_size, char *dest, size_t decompressed_size) override;
private:
int level;
};
}

View File

@ -0,0 +1,121 @@
#include <Compression/CompressionFactory.h>
#include <Parsers/parseQuery.h>
#include <Parsers/ParserCreateQuery.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>
#include <Common/typeid_cast.h>
#include <Poco/String.h>
#include <IO/ReadBuffer.h>
#include <Parsers/queryToString.h>
#include <Compression/CompressionCodecMultiple.h>
#include <Compression/CompressionCodecLZ4.h>
#include <Compression/CompressionCodecNone.h>
#include <IO/WriteHelpers.h>
namespace DB
{
namespace ErrorCodes
{
extern const int UNKNOWN_CODEC;
extern const int UNEXPECTED_AST_STRUCTURE;
extern const int ILLEGAL_SYNTAX_FOR_CODEC_TYPE;
extern const int DATA_TYPE_CANNOT_HAVE_ARGUMENTS;
}
CompressionCodecPtr CompressionCodecFactory::getDefaultCodec() const
{
return default_codec;
}
CompressionCodecPtr CompressionCodecFactory::get(const ASTPtr & ast) const
{
if (const auto * func = typeid_cast<const ASTFunction *>(ast.get()))
{
if (func->parameters)
throw Exception("Compression codec cannot have multiple parenthesed parameters.", ErrorCodes::ILLEGAL_SYNTAX_FOR_CODEC_TYPE);
if (Poco::toLower(func->name) != "codec")
throw Exception("", ErrorCodes::UNKNOWN_CODEC);
Codecs codecs;
codecs.reserve(func->arguments->children.size());
for (const auto & inner_codec_ast : func->arguments->children)
{
if (const auto * family_name = typeid_cast<const ASTIdentifier *>(inner_codec_ast.get()))
codecs.emplace_back(getImpl(family_name->name, {}));
else if (const auto * ast_func = typeid_cast<const ASTFunction *>(inner_codec_ast.get()))
codecs.emplace_back(getImpl(ast_func->name, ast_func->arguments));
else
throw Exception("Unexpected AST element for compression codec.", ErrorCodes::UNEXPECTED_AST_STRUCTURE);
}
if (codecs.size() == 1)
return codecs.back();
else if (codecs.size() > 1)
return std::make_shared<CompressionCodecMultiple>(codecs);
}
throw Exception("Unknown codec family: " + queryToString(ast), ErrorCodes::UNKNOWN_CODEC);
}
CompressionCodecPtr CompressionCodecFactory::get(const UInt8 byte_code) const
{
const auto family_code_and_creator = family_code_with_codec.find(byte_code);
if (family_code_and_creator == family_code_with_codec.end())
throw Exception("Unknown codec family code : " + toString(byte_code), ErrorCodes::UNKNOWN_CODEC);
return family_code_and_creator->second({});
}
CompressionCodecPtr CompressionCodecFactory::getImpl(const String & family_name, const ASTPtr & arguments) const
{
const auto family_and_creator = family_name_with_codec.find(family_name);
if (family_and_creator == family_name_with_codec.end())
throw Exception("Unknown codec family: " + family_name, ErrorCodes::UNKNOWN_CODEC);
return family_and_creator->second(arguments);
}
void CompressionCodecFactory::registerCompressionCodec(const String & family_name, UInt8 byte_code, Creator creator)
{
if (creator == nullptr)
throw Exception("CompressionCodecFactory: the codec family " + family_name + " has been provided a null constructor",
ErrorCodes::LOGICAL_ERROR);
if (!family_name_with_codec.emplace(family_name, creator).second)
throw Exception("CompressionCodecFactory: the codec family name '" + family_name + "' is not unique", ErrorCodes::LOGICAL_ERROR);
if (!family_code_with_codec.emplace(byte_code, creator).second)
throw Exception("CompressionCodecFactory: the codec family name '" + family_name + "' is not unique", ErrorCodes::LOGICAL_ERROR);
}
void CompressionCodecFactory::registerSimpleCompressionCodec(const String & family_name, UInt8 byte_code,
std::function<CompressionCodecPtr()> creator)
{
registerCompressionCodec(family_name, byte_code, [family_name, creator](const ASTPtr & ast)
{
if (ast)
throw Exception("Compression codec " + family_name + " cannot have arguments", ErrorCodes::DATA_TYPE_CANNOT_HAVE_ARGUMENTS);
return creator();
});
}
void registerCodecLZ4(CompressionCodecFactory & factory);
void registerCodecNone(CompressionCodecFactory & factory);
void registerCodecZSTD(CompressionCodecFactory & factory);
//void registerCodecDelta(CompressionCodecFactory & factory);
CompressionCodecFactory::CompressionCodecFactory()
{
default_codec = std::make_shared<CompressionCodecLZ4>();
registerCodecLZ4(*this);
registerCodecNone(*this);
registerCodecZSTD(*this);
// registerCodecDelta(*this);
}
}

View File

@ -0,0 +1,56 @@
#pragma once
#include <memory>
#include <functional>
#include <unordered_map>
#include <ext/singleton.h>
#include <Common/IFactoryWithAliases.h>
#include <Compression/ICompressionCodec.h>
#include <IO/CompressedStream.h>
namespace DB
{
class ICompressionCodec;
using CompressionCodecPtr = std::shared_ptr<ICompressionCodec>;
class IAST;
using ASTPtr = std::shared_ptr<IAST>;
/** Creates a codec object by name of compression algorithm family and parameters.
*/
class CompressionCodecFactory final : public ext::singleton<CompressionCodecFactory>
{
protected:
using Creator = std::function<CompressionCodecPtr(const ASTPtr & parameters)>;
using SimpleCreator = std::function<CompressionCodecPtr()>;
using CompressionCodecsDictionary = std::unordered_map<String, Creator>;
using CompressionCodecsCodeDictionary = std::unordered_map<UInt8, Creator>;
public:
CompressionCodecPtr getDefaultCodec() const;
CompressionCodecPtr get(const ASTPtr & ast) const;
CompressionCodecPtr get(const UInt8 byte_code) const;
void registerCompressionCodec(const String & family_name, UInt8 byte_code, Creator creator);
void registerSimpleCompressionCodec(const String & family_name, UInt8 byte_code, SimpleCreator creator);
protected:
CompressionCodecPtr getImpl(const String & family_name, const ASTPtr & arguments) const;
private:
CompressionCodecsDictionary family_name_with_codec;
CompressionCodecsCodeDictionary family_code_with_codec;
CompressionCodecPtr default_codec;
CompressionCodecFactory();
friend class ext::singleton<CompressionCodecFactory>;
};
}

View File

@ -0,0 +1,196 @@
#include <Compression/ICompressionCodec.h>
#include <IO/LZ4_decompress_faster.h>
#include <common/unaligned.h>
#include <IO/CompressedStream.h>
#include <Common/hex.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadBufferFromFileBase.h>
#include <Common/typeid_cast.h>
#include <Compression/CompressionFactory.h>
#include <zstd.h>
namespace ProfileEvents
{
extern const Event ReadCompressedBytes;
extern const Event CompressedReadBufferBlocks;
extern const Event CompressedReadBufferBytes;
}
namespace DB
{
namespace ErrorCodes
{
extern const int CHECKSUM_DOESNT_MATCH;
extern const int TOO_LARGE_SIZE_COMPRESSED;
extern const int UNKNOWN_COMPRESSION_METHOD;
extern const int CANNOT_DECOMPRESS;
extern const int SEEK_POSITION_OUT_OF_BOUND;
}
CompressionCodecReadBufferPtr ICompressionCodec::liftCompressed(ReadBuffer & origin)
{
return std::make_shared<CompressionCodecReadBuffer>(origin);
}
CompressionCodecWriteBufferPtr ICompressionCodec::liftCompressed(WriteBuffer & origin)
{
return std::make_shared<CompressionCodecWriteBuffer>(*this, origin);
}
CompressionCodecReadBuffer::CompressionCodecReadBuffer(ReadBuffer & origin)
: origin(origin)
{
}
/// Read compressed data into compressed_buffer. Get size of decompressed data from block header. Checksum if need.
/// Returns number of compressed bytes read.
size_t CompressionCodecReadBuffer::readCompressedData(size_t & size_decompressed, size_t & size_compressed)
{
if (origin.eof())
return 0;
CityHash_v1_0_2::uint128 checksum;
origin.readStrict(reinterpret_cast<char *>(&checksum), sizeof(checksum));
own_compressed_buffer.resize(COMPRESSED_BLOCK_HEADER_SIZE);
origin.readStrict(own_compressed_buffer.data(), COMPRESSED_BLOCK_HEADER_SIZE);
size_compressed = unalignedLoad<UInt32>(&own_compressed_buffer[1]);
size_decompressed = unalignedLoad<UInt32>(&own_compressed_buffer[5]);
if (size_compressed > DBMS_MAX_COMPRESSED_SIZE)
throw Exception("Too large size_compressed: " + toString(size_compressed) + ". Most likely corrupted data.", ErrorCodes::TOO_LARGE_SIZE_COMPRESSED);
ProfileEvents::increment(ProfileEvents::ReadCompressedBytes, size_compressed + sizeof(checksum));
/// Is whole compressed block located in 'origin' buffer?
if (origin.offset() >= COMPRESSED_BLOCK_HEADER_SIZE &&
origin.position() + size_compressed + LZ4::ADDITIONAL_BYTES_AT_END_OF_BUFFER - COMPRESSED_BLOCK_HEADER_SIZE <= origin.buffer().end())
{
origin.position() -= COMPRESSED_BLOCK_HEADER_SIZE;
compressed_buffer = origin.position();
origin.position() += size_compressed;
}
else
{
own_compressed_buffer.resize(size_compressed + LZ4::ADDITIONAL_BYTES_AT_END_OF_BUFFER);
compressed_buffer = own_compressed_buffer.data();
origin.readStrict(compressed_buffer + COMPRESSED_BLOCK_HEADER_SIZE, size_compressed - COMPRESSED_BLOCK_HEADER_SIZE);
}
auto checksum_calculated = CityHash_v1_0_2::CityHash128(compressed_buffer, size_compressed);
if (checksum != checksum_calculated)
throw Exception("Checksum doesn't match: corrupted data."
" Reference: " + getHexUIntLowercase(checksum.first) + getHexUIntLowercase(checksum.second)
+ ". Actual: " + getHexUIntLowercase(checksum_calculated.first) + getHexUIntLowercase(checksum_calculated.second)
+ ". Size of compressed block: " + toString(size_compressed) + ".",
ErrorCodes::CHECKSUM_DOESNT_MATCH);
return size_compressed + sizeof(checksum);
}
void CompressionCodecReadBuffer::decompress(char * to, size_t size_decompressed, size_t size_compressed_without_checksum)
{
ProfileEvents::increment(ProfileEvents::CompressedReadBufferBlocks);
ProfileEvents::increment(ProfileEvents::CompressedReadBufferBytes, size_decompressed);
UInt8 current_method = compressed_buffer[0]; /// See CompressedWriteBuffer.h
if (current_method != method)
codec = CompressionCodecFactory::instance().get(method);
codec->decompress(compressed_buffer + COMPRESSED_BLOCK_HEADER_SIZE,
size_compressed_without_checksum - COMPRESSED_BLOCK_HEADER_SIZE, to, size_decompressed);
}
bool CompressionCodecReadBuffer::nextImpl()
{
size_t size_decompressed;
size_t size_compressed_without_checksum;
size_compressed = readCompressedData(size_decompressed, size_compressed_without_checksum);
if (!size_compressed)
return false;
memory.resize(size_decompressed + LZ4::ADDITIONAL_BYTES_AT_END_OF_BUFFER);
working_buffer = Buffer(memory.data(), &memory[size_decompressed]);
decompress(working_buffer.begin(), size_decompressed, size_compressed_without_checksum);
return true;
}
void CompressionCodecReadBuffer::seek(size_t offset_in_compressed_file, size_t offset_in_decompressed_block)
{
if (const auto file_in = dynamic_cast<ReadBufferFromFileBase *>(&origin))
{
if (size_compressed &&
offset_in_compressed_file == file_in->getPositionInFile() - size_compressed &&
offset_in_decompressed_block <= working_buffer.size())
{
bytes += offset();
pos = working_buffer.begin() + offset_in_decompressed_block;
/// `bytes` can overflow and get negative, but in `count()` everything will overflow back and get right.
bytes -= offset();
}
else
{
file_in->seek(offset_in_compressed_file);
bytes += offset();
nextImpl();
if (offset_in_decompressed_block > working_buffer.size())
throw Exception("Seek position is beyond the decompressed block"
" (pos: " + toString(offset_in_decompressed_block) + ", block size: " + toString(working_buffer.size()) + ")",
ErrorCodes::SEEK_POSITION_OUT_OF_BOUND);
pos = working_buffer.begin() + offset_in_decompressed_block;
bytes -= offset();
}
}
else
throw Exception("CompressionCodec: cannot seek in non-file buffer", ErrorCodes::LOGICAL_ERROR);
}
CompressionCodecWriteBuffer::~CompressionCodecWriteBuffer()
{
try
{
next();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
void CompressionCodecWriteBuffer::nextImpl()
{
if (!offset())
return;
static constexpr size_t header_size = 1 + sizeof(UInt32) + sizeof(UInt32);
size_t uncompressed_size = offset();
size_t compressed_reserve_size = compression_codec.getCompressedReserveSize(uncompressed_size);
compressed_buffer.resize(header_size + compressed_reserve_size);
compressed_buffer[0] = compression_codec.getMethodByte();
size_t compressed_size = header_size + compression_codec.compress(working_buffer.begin(), uncompressed_size, &compressed_buffer[header_size]);
UInt32 compressed_size_32 = compressed_size;
UInt32 uncompressed_size_32 = uncompressed_size;
unalignedStore(&compressed_buffer[1], compressed_size_32);
unalignedStore(&compressed_buffer[5], uncompressed_size_32);
CityHash_v1_0_2::uint128 checksum = CityHash_v1_0_2::CityHash128(compressed_buffer.data(), compressed_size);
out.write(reinterpret_cast<const char *>(&checksum), sizeof(checksum));
out.write(compressed_buffer.data(), compressed_size);
}
CompressionCodecWriteBuffer::CompressionCodecWriteBuffer(ICompressionCodec & compression_codec, WriteBuffer & out, size_t buf_size)
: BufferWithOwnMemory<WriteBuffer>(buf_size), out(out), compression_codec(compression_codec)
{
}
}

View File

@ -0,0 +1,92 @@
#pragma once
#include <memory>
#include <Core/Field.h>
#include <IO/ReadBuffer.h>
#include <IO/WriteBuffer.h>
#include <IO/BufferWithOwnMemory.h>
#include <Common/PODArray.h>
#include <DataTypes/IDataType.h>
#include <boost/noncopyable.hpp>
#include <IO/UncompressedCache.h>
#include <IO/LZ4_decompress_faster.h>
namespace DB
{
class ICompressionCodec;
using CompressionCodecPtr = std::shared_ptr<ICompressionCodec>;
using Codecs = std::vector<CompressionCodecPtr>;
class CompressionCodecReadBuffer;
class CompressionCodecWriteBuffer;
using CompressionCodecReadBufferPtr = std::shared_ptr<CompressionCodecReadBuffer>;
using CompressionCodecWriteBufferPtr = std::shared_ptr<CompressionCodecWriteBuffer>;
class CompressionCodecWriteBuffer : public BufferWithOwnMemory<WriteBuffer>
{
public:
CompressionCodecWriteBuffer(ICompressionCodec & compression_codec, WriteBuffer & out, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE);
~CompressionCodecWriteBuffer() override;
private:
void nextImpl() override;
private:
WriteBuffer & out;
ICompressionCodec & compression_codec;
PODArray<char> compressed_buffer;
};
class CompressionCodecReadBuffer : public BufferWithOwnMemory<ReadBuffer>
{
public:
size_t size_compressed = 0;
size_t size_decompressed = 0;
CompressionCodecReadBuffer(ReadBuffer & origin);
size_t readCompressedData(size_t & size_decompressed, size_t & size_compressed);
void decompress(char * to, size_t size_decompressed, size_t size_compressed_without_checksum);
void seek(size_t offset_in_compressed_file, size_t offset_in_decompressed_block);
private:
ReadBuffer & origin;
char * compressed_buffer;
UInt8 method;
CompressionCodecPtr codec;
PODArray<char> own_compressed_buffer;
bool nextImpl() override;
};
/**
*
*/
class ICompressionCodec : private boost::noncopyable
{
public:
virtual ~ICompressionCodec() = default;
CompressionCodecReadBufferPtr liftCompressed(ReadBuffer & origin);
CompressionCodecWriteBufferPtr liftCompressed(WriteBuffer & origin);
virtual char getMethodByte() = 0;
virtual void getCodecDesc(String & codec_desc) = 0;
virtual size_t compress(char * source, size_t source_size, char * dest) = 0;
virtual size_t decompress(char * source, size_t source_size, char * dest, size_t decompressed_size) = 0;
virtual size_t getCompressedReserveSize(size_t uncompressed_size) { return uncompressed_size; }
};
}

View File

@ -3,6 +3,7 @@
#include <IO/WriteHelpers.h>
#include <IO/CompressedStream.h>
#include <IO/LZ4_decompress_faster.h>
#include "CachedCompressedReadBuffer.h"
namespace DB
@ -19,7 +20,7 @@ void CachedCompressedReadBuffer::initInput()
if (!file_in)
{
file_in = createReadBufferFromFileBase(path, estimated_size, aio_threshold, buf_size);
compressed_in = &*file_in;
in = codec->liftCompressed(*file_in);
if (profile_callback)
file_in->setProfileCallback(profile_callback, clock_type);
@ -30,7 +31,6 @@ void CachedCompressedReadBuffer::initInput()
bool CachedCompressedReadBuffer::nextImpl()
{
/// Let's check for the presence of a decompressed block in the cache, grab the ownership of this block, if it exists.
UInt128 key = cache->hash(path, file_pos);
owned_cell = cache->get(key);
@ -44,12 +44,15 @@ bool CachedCompressedReadBuffer::nextImpl()
size_t size_decompressed;
size_t size_compressed_without_checksum;
owned_cell->compressed_size = readCompressedData(size_decompressed, size_compressed_without_checksum);
owned_cell->compressed_size = in->readCompressedData(size_decompressed, size_compressed_without_checksum);
if (owned_cell->compressed_size)
{
owned_cell->data.resize(size_decompressed + LZ4::ADDITIONAL_BYTES_AT_END_OF_BUFFER);
decompress(owned_cell->data.data(), size_decompressed, size_compressed_without_checksum);
in->decompress(owned_cell->data.data(), size_decompressed, size_compressed_without_checksum);
in->buffer() = Buffer(owned_cell->data.data(), owned_cell->data.data() + owned_cell->data.size() - LZ4::ADDITIONAL_BYTES_AT_END_OF_BUFFER);
in->decompress(owned_cell->data.data(), size_decompressed, size_compressed_without_checksum);
/// Put data into cache.
cache->set(key, owned_cell);
@ -71,14 +74,13 @@ bool CachedCompressedReadBuffer::nextImpl()
CachedCompressedReadBuffer::CachedCompressedReadBuffer(
const std::string & path_, UncompressedCache * cache_, size_t estimated_size_, size_t aio_threshold_,
size_t buf_size_)
: ReadBuffer(nullptr, 0), path(path_), cache(cache_), buf_size(buf_size_), estimated_size(estimated_size_),
const std::string & path_, UncompressedCache * cache_, const CompressionCodecPtr & codec,
size_t estimated_size_, size_t aio_threshold_, size_t buf_size_)
: ReadBuffer(nullptr, 0), path(path_), cache(cache_), codec(codec), buf_size(buf_size_), estimated_size(estimated_size_),
aio_threshold(aio_threshold_), file_pos(0)
{
}
void CachedCompressedReadBuffer::seek(size_t offset_in_compressed_file, size_t offset_in_decompressed_block)
{
if (owned_cell &&

View File

@ -6,6 +6,7 @@
#include <IO/CompressedReadBufferBase.h>
#include <IO/UncompressedCache.h>
#include <port/clock.h>
#include <Compression/ICompressionCodec.h>
namespace DB
@ -18,15 +19,17 @@ namespace DB
* Disadvantages:
* - in case you need to read a lot of data in a row, but of them only a part is cached, you have to do seek-and.
*/
class CachedCompressedReadBuffer : public CompressedReadBufferBase, public ReadBuffer
class CachedCompressedReadBuffer : public ReadBuffer
{
private:
const std::string path;
UncompressedCache * cache;
const CompressionCodecPtr & codec;
size_t buf_size;
size_t estimated_size;
size_t aio_threshold;
CompressionCodecReadBufferPtr in;
std::unique_ptr<ReadBufferFromFileBase> file_in;
size_t file_pos;
@ -42,8 +45,8 @@ private:
public:
CachedCompressedReadBuffer(
const std::string & path_, UncompressedCache * cache_, size_t estimated_size_, size_t aio_threshold_,
size_t buf_size_ = DBMS_DEFAULT_BUFFER_SIZE);
const std::string & path_, UncompressedCache * cache_, const CompressionCodecPtr & codec,
size_t estimated_size_, size_t aio_threshold_, size_t buf_size_ = DBMS_DEFAULT_BUFFER_SIZE);
void seek(size_t offset_in_compressed_file, size_t offset_in_decompressed_block);

View File

@ -47,6 +47,7 @@ enum class CompressionMethodByte : uint8_t
NONE = 0x02,
LZ4 = 0x82,
ZSTD = 0x90,
Multiple = 0x91,
};
}

View File

@ -44,6 +44,8 @@
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Compression/CompressionFactory.h>
namespace DB
{
@ -170,14 +172,15 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create)
using ColumnsAndDefaults = std::pair<NamesAndTypesList, ColumnDefaults>;
using ParsedColumns = std::tuple<NamesAndTypesList, ColumnDefaults, ColumnComments>;
using ColumnsDeclarationAndModifiers = std::tuple<NamesAndTypesList, ColumnDefaults, ColumnCodecs, ColumnComments>;
/// AST to the list of columns with types. Columns of Nested type are expanded into a list of real columns.
static ParsedColumns parseColumns(const ASTExpressionList & column_list_ast, const Context & context)
static ColumnsDeclarationAndModifiers parseColumns(const ASTExpressionList & column_list_ast, const Context & context)
{
/// list of table columns in correct order
NamesAndTypesList columns{};
ColumnDefaults defaults{};
ColumnCodecs codecs{};
ColumnComments comments{};
/// Columns requiring type-deduction or default_expression type-check
@ -223,6 +226,12 @@ static ParsedColumns parseColumns(const ASTExpressionList & column_list_ast, con
default_expr_list->children.emplace_back(setAlias(col_decl.default_expression->clone(), col_decl.name));
}
if (col_decl.codec)
{
auto codec = CompressionCodecFactory::instance().get(col_decl.codec);
codecs.emplace(col_decl.name, codec);
}
if (col_decl.comment)
{
if (auto comment_str = typeid_cast<ASTLiteral &>(*col_decl.comment).value.get<String>(); !comment_str.empty())
@ -278,14 +287,14 @@ static ParsedColumns parseColumns(const ASTExpressionList & column_list_ast, con
}
}
return {Nested::flatten(columns), defaults, comments};
return {Nested::flatten(columns), defaults, codecs, comments};
}
static NamesAndTypesList removeAndReturnColumns(ColumnsAndDefaults & columns_and_defaults, const ColumnDefaultKind kind)
static NamesAndTypesList removeAndReturnColumns(ColumnsDeclarationAndModifiers & columns_declare, const ColumnDefaultKind kind)
{
auto & columns = columns_and_defaults.first;
auto & defaults = columns_and_defaults.second;
auto & columns = std::get<0>(columns_declare);
auto & defaults = std::get<1>(columns_declare);
NamesAndTypesList removed{};
@ -359,6 +368,18 @@ ASTPtr InterpreterCreateQuery::formatColumns(const ColumnsDescription & columns)
column_declaration->comment = std::make_shared<ASTLiteral>(Field(comments_it->second));
}
const auto ct = columns.codecs.find(column.name);
if (ct != std::end(columns.codecs))
{
String codec_desc;
ct->second->getCodecDesc(codec_desc);
codec_desc = "CODEC(" + codec_desc + ")";
auto pos = codec_desc.data();
const auto end = pos + codec_desc.size();
ParserIdentifierWithParameters codec_p;
column_declaration->codec = parseQuery(codec_p, pos, end, "column codec", 0);
}
columns_list->children.push_back(column_declaration_ptr);
}
@ -370,12 +391,12 @@ ColumnsDescription InterpreterCreateQuery::getColumnsDescription(const ASTExpres
ColumnsDescription res;
auto && parsed_columns = parseColumns(columns, context);
auto columns_and_defaults = std::make_pair(std::move(std::get<0>(parsed_columns)), std::move(std::get<1>(parsed_columns)));
res.materialized = removeAndReturnColumns(columns_and_defaults, ColumnDefaultKind::Materialized);
res.aliases = removeAndReturnColumns(columns_and_defaults, ColumnDefaultKind::Alias);
res.ordinary = std::move(columns_and_defaults.first);
res.defaults = std::move(columns_and_defaults.second);
res.comments = std::move(std::get<2>(parsed_columns));
res.ordinary = std::move(std::get<0>(parsed_columns));
res.defaults = std::move(std::get<1>(parsed_columns));
res.codecs = std::move(std::get<2>(parsed_columns));
res.comments = std::move(std::get<3>(parsed_columns));
res.aliases = removeAndReturnColumns(parsed_columns, ColumnDefaultKind::Alias);
res.materialized = removeAndReturnColumns(parsed_columns, ColumnDefaultKind::Materialized);
if (res.ordinary.size() + res.materialized.size() == 0)
throw Exception{"Cannot CREATE table without physical columns", ErrorCodes::EMPTY_LIST_OF_COLUMNS_PASSED};

View File

@ -15,6 +15,7 @@ public:
ASTPtr type;
String default_specifier;
ASTPtr default_expression;
ASTPtr codec;
ASTPtr comment;
String getID(char delim) const override { return "ColumnDeclaration" + (delim + name); }
@ -36,6 +37,12 @@ public:
res->children.push_back(res->default_expression);
}
if (codec)
{
res->codec=codec->clone();
res->children.push_back(res->codec);
}
if (comment)
{
res->comment = comment->clone();
@ -63,6 +70,12 @@ public:
default_expression->formatImpl(settings, state, frame);
}
if (codec)
{
settings.ostr << ' ';
codec->formatImpl(settings, state, frame);
}
if (comment)
{
settings.ostr << ' ' << (settings.hilite ? hilite_keyword : "") << "COMMENT" << (settings.hilite ? hilite_none : "") << ' ';

View File

@ -102,9 +102,22 @@ public:
}
protected:
using ASTDeclarePtr = std::shared_ptr<ASTColumnDeclaration>;
const char * getName() const { return "column declaration"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected);
bool isDeclareColumnType(Pos & pos, Expected & expected);
bool parseDeclarationCodec(Pos &pos, const ASTDeclarePtr &declaration, Expected &expected);
bool parseDefaultExpression(Pos &pos, const ASTDeclarePtr &declaration, Expected &expected);
bool parseDeclarationComment(Pos &pos, const ASTDeclarePtr &declaration, Expected &expected);
bool isDeclareColumnCodec(Pos & pos, Expected & expected);
bool require_type = true;
};
@ -114,78 +127,112 @@ using ParserCompoundColumnDeclaration = IParserColumnDeclaration<ParserCompoundI
template <typename NameParser>
bool IParserColumnDeclaration<NameParser>::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
ASTPtr column_name;
ASTPtr column_type;
ASTPtr column_codec;
NameParser name_parser;
ParserIdentifierWithOptionalParameters type_parser;
ParserKeyword s_default{"DEFAULT"};
ParserKeyword s_materialized{"MATERIALIZED"};
ParserKeyword s_alias{"ALIAS"};
ParserKeyword s_comment{"COMMENT"};
ParserTernaryOperatorExpression expr_parser;
ParserStringLiteral string_literal_parser;
/// mandatory column name
ASTPtr name;
if (!name_parser.parse(pos, name, expected))
if (!name_parser.parse(pos, column_name, expected))
return false;
/** column name should be followed by type name if it
* is not immediately followed by {DEFAULT, MATERIALIZED, ALIAS, COMMENT}
*/
ASTPtr type;
String default_specifier;
ASTPtr default_expression;
ASTPtr comment_expression;
if (isDeclareColumnType(pos, expected)
&& !type_parser.parse(pos, column_type, expected))
return false;
if (!s_default.check_without_moving(pos, expected) &&
!s_materialized.check_without_moving(pos, expected) &&
!s_alias.check_without_moving(pos, expected) &&
!s_comment.check_without_moving(pos, expected))
const auto column_declaration = std::make_shared<ASTColumnDeclaration>();
if (!parseDefaultExpression(pos, column_declaration, expected)
|| !parseDeclarationCodec(pos, column_declaration, expected)
|| !parseDeclarationComment(pos, column_declaration, expected))
return false;
if (require_type && !column_type && column_declaration->default_expression)
return false;
if (column_type)
{
if (!type_parser.parse(pos, type, expected))
return false;
column_declaration->type = column_type;
column_declaration->children.push_back(std::move(column_type));
}
node = column_declaration;
return true;
}
template<typename NameParser>
bool IParserColumnDeclaration<NameParser>::isDeclareColumnType(Pos & pos, Expected & expected)
{
auto check_pos = pos;
return !ParserKeyword{"CODEC"}.check(check_pos, expected) &&
!ParserKeyword{"ALIAS"}.check(check_pos, expected) &&
!ParserKeyword{"COMMENT"}.check(check_pos, expected) &&
!ParserKeyword{"DEFAULT"}.check(check_pos, expected) &&
!ParserKeyword{"MATERIALIZED"}.check(check_pos, expected);
}
template<typename NameParser>
bool IParserColumnDeclaration<NameParser>::isDeclareColumnCodec(Pos & pos, Expected & expected)
{
auto check_pos = pos;
return ParserKeyword{"CODEC"}.check(check_pos, expected);
}
template<typename NameParser>
bool IParserColumnDeclaration<NameParser>::parseDeclarationCodec(Pos & pos, const ASTDeclarePtr & declaration, Expected & expected)
{
ParserKeyword s_codec{"CODEC"};
ParserIdentifierWithParameters codec_parser;
if (s_codec.ignore(pos, expected))
{
if (!codec_parser.parse(pos, declaration->codec, expected))
return false;
declaration->children.push_back(declaration->codec);
}
return true;
}
template<typename NameParser>
bool IParserColumnDeclaration<NameParser>::parseDefaultExpression(Pos & pos, const ASTDeclarePtr & declaration, Expected & expected)
{
ParserKeyword s_alias{"ALIAS"};
ParserKeyword s_default{"DEFAULT"};
ParserKeyword s_materialized{"MATERIALIZED"};
ParserTernaryOperatorExpression expr_parser;
Pos pos_before_specifier = pos;
if (s_default.ignore(pos, expected) || s_materialized.ignore(pos, expected) || s_alias.ignore(pos, expected))
{
default_specifier = Poco::toUpper(std::string{pos_before_specifier->begin, pos_before_specifier->end});
declaration->default_specifier = Poco::toUpper(std::string{pos_before_specifier->begin, pos_before_specifier->end});
/// should be followed by an expression
if (!expr_parser.parse(pos, default_expression, expected))
if (!expr_parser.parse(pos, declaration->default_expression, expected))
return false;
declaration->children.push_back(declaration->default_expression);
}
if (require_type && !type && !default_expression)
return false; /// reject column name without type
return true;
}
template<typename NameParser>
bool IParserColumnDeclaration<NameParser>::parseDeclarationComment(Pos & pos, const ASTDeclarePtr & declaration, Expected & expected)
{
ParserKeyword s_comment{"COMMENT"};
ParserStringLiteral string_literal_parser;
if (s_comment.ignore(pos, expected))
{
/// should be followed by a string literal
if (!string_literal_parser.parse(pos, comment_expression, expected))
if (!string_literal_parser.parse(pos, declaration->comment, expected))
return false;
}
const auto column_declaration = std::make_shared<ASTColumnDeclaration>();
node = column_declaration;
column_declaration->name = typeid_cast<ASTIdentifier &>(*name).name;
if (type)
{
column_declaration->type = type;
column_declaration->children.push_back(std::move(type));
}
if (default_expression)
{
column_declaration->default_specifier = default_specifier;
column_declaration->default_expression = default_expression;
column_declaration->children.push_back(std::move(default_expression));
}
if (comment_expression)
{
column_declaration->comment = comment_expression;
column_declaration->children.push_back(std::move(comment_expression));
declaration->children.push_back(declaration->comment);
}
return true;

View File

@ -0,0 +1,11 @@
#pragma once
#include <string>
#include <unordered_map>
#include <Parsers/IAST.h>
#include <Compression/ICompressionCodec.h>
namespace DB
{
using ColumnCodecs = std::unordered_map<std::string, CompressionCodecPtr>;
}

View File

@ -20,6 +20,7 @@
#include <ext/map.h>
#include <boost/range/join.hpp>
#include <Compression/CompressionFactory.h>
#include <optional>
@ -195,6 +196,18 @@ void parseColumn(ReadBufferFromString & buf, ColumnsDescription & result, const
assertChar('\n', buf);
}
CompressionCodecPtr ColumnsDescription::getCodec(const String & column_name, const CompressionSettings & /*compression_settings*/) const
{
const auto codec = codecs.find(column_name);
/// TODO get
if (codec == codecs.end())
return CompressionCodecFactory::instance().getDefaultCodec();
// return CompressionCodecFactory::instance().get(compression_settings.method, compression_settings.level);
return codec->second;
}
ColumnsDescription ColumnsDescription::parse(const String & str)
{
ReadBufferFromString buf{str};

View File

@ -4,6 +4,8 @@
#include <Core/Names.h>
#include <Storages/ColumnDefault.h>
#include <Core/Block.h>
#include <Storages/ColumnCodec.h>
#include <IO/CompressionSettings.h>
namespace DB
@ -18,6 +20,7 @@ struct ColumnsDescription
NamesAndTypesList materialized;
NamesAndTypesList aliases;
ColumnDefaults defaults;
ColumnCodecs codecs;
ColumnComments comments;
ColumnsDescription() = default;
@ -60,10 +63,12 @@ struct ColumnsDescription
bool hasPhysical(const String & column_name) const;
String toString() const;
CompressionCodecPtr getCodec(const String & column_name, const CompressionSettings & compression_settings) const;
static ColumnsDescription parse(const String & str);
static const ColumnsDescription * loadFromContext(const Context & context, const String & db, const String & table);
};

View File

@ -9,6 +9,7 @@
#include <Storages/MergeTree/MergeTreeReader.h>
#include <Common/typeid_cast.h>
#include <Poco/File.h>
#include <IO/createReadBufferFromFileBase.h>
namespace DB
@ -49,8 +50,13 @@ MergeTreeReader::MergeTreeReader(const String & path,
if (!Poco::File(path).exists())
throw Exception("Part " + path + " is missing", ErrorCodes::NOT_FOUND_EXPECTED_DATA_PART);
const auto columns_desc = storage.getColumns();
for (const NameAndTypePair & column : columns)
addStreams(column.name, *column.type, all_mark_ranges, profile_callback, clock_type);
{
CompressionCodecPtr codec = columns_desc.getCodec(column.name, {});
addStreams(column.name, *column.type, codec, all_mark_ranges, profile_callback, clock_type);
}
}
catch (...)
{
@ -158,6 +164,7 @@ size_t MergeTreeReader::readRows(size_t from_mark, bool continue_reading, size_t
MergeTreeReader::Stream::Stream(
const String & path_prefix_, const String & extension_, size_t marks_count_,
const CompressionCodecPtr & codec,
const MarkRanges & all_mark_ranges,
MarkCache * mark_cache_, bool save_marks_in_cache_,
UncompressedCache * uncompressed_cache,
@ -229,8 +236,8 @@ MergeTreeReader::Stream::Stream(
/// Initialize the objects that shall be used to perform read operations.
if (uncompressed_cache)
{
auto buffer = std::make_unique<CachedCompressedReadBuffer>(
path_prefix + extension, uncompressed_cache, estimated_size, aio_threshold, buffer_size);
auto buffer = std::make_shared<CachedCompressedReadBuffer>(
path_prefix + extension, uncompressed_cache, codec, estimated_size, aio_threshold, buffer_size);
if (profile_callback)
buffer->setProfileCallback(profile_callback, clock_type);
@ -240,13 +247,13 @@ MergeTreeReader::Stream::Stream(
}
else
{
auto buffer = std::make_unique<CompressedReadBufferFromFile>(
path_prefix + extension, estimated_size, aio_threshold, buffer_size);
file_in = createReadBufferFromFileBase(path_prefix + extension, estimated_size, aio_threshold, buffer_size);
if (profile_callback)
buffer->setProfileCallback(profile_callback, clock_type);
file_in->setProfileCallback(profile_callback, clock_type);
non_cached_buffer = std::move(buffer);
const auto compressed_buffer = codec->liftCompressed(*file_in);
non_cached_buffer = compressed_buffer;
data_buffer = non_cached_buffer.get();
}
}
@ -354,8 +361,8 @@ void MergeTreeReader::Stream::seekToStart()
}
void MergeTreeReader::addStreams(const String & name, const IDataType & type, const MarkRanges & all_mark_ranges,
const ReadBufferFromFileBase::ProfileCallback & profile_callback, clockid_t clock_type)
void MergeTreeReader::addStreams(const String & name, const IDataType & type, const CompressionCodecPtr & codec,
const MarkRanges & all_mark_ranges, const ReadBufferFromFileBase::ProfileCallback & profile_callback, clockid_t clock_type)
{
IDataType::StreamCallback callback = [&] (const IDataType::SubstreamPath & substream_path)
{
@ -374,7 +381,7 @@ void MergeTreeReader::addStreams(const String & name, const IDataType & type, co
streams.emplace(stream_name, std::make_unique<Stream>(
path + stream_name, DATA_FILE_EXTENSION, data_part->marks_count,
all_mark_ranges, mark_cache, save_marks_in_cache,
codec, all_mark_ranges, mark_cache, save_marks_in_cache,
uncompressed_cache, aio_threshold, max_read_buffer_size, profile_callback, clock_type));
};

View File

@ -63,6 +63,7 @@ private:
public:
Stream(
const String & path_prefix_, const String & extension_, size_t marks_count_,
const CompressionCodecPtr & codec,
const MarkRanges & all_mark_ranges,
MarkCache * mark_cache, bool save_marks_in_cache,
UncompressedCache * uncompressed_cache,
@ -91,8 +92,9 @@ private:
bool save_marks_in_cache;
MarkCache::MappedPtr marks;
std::unique_ptr<CachedCompressedReadBuffer> cached_buffer;
std::unique_ptr<CompressedReadBufferFromFile> non_cached_buffer;
std::unique_ptr<ReadBufferFromFileBase> file_in;
std::shared_ptr<CachedCompressedReadBuffer> cached_buffer;
std::shared_ptr<CompressionCodecReadBuffer> non_cached_buffer;
};
using FileStreams = std::map<std::string, std::unique_ptr<Stream>>;
@ -121,8 +123,8 @@ private:
size_t max_read_buffer_size;
size_t index_granularity;
void addStreams(const String & name, const IDataType & type, const MarkRanges & all_mark_ranges,
const ReadBufferFromFileBase::ProfileCallback & profile_callback, clockid_t clock_type);
void addStreams(const String & name, const IDataType & type, const CompressionCodecPtr & codec,
const MarkRanges & all_mark_ranges, const ReadBufferFromFileBase::ProfileCallback & profile_callback, clockid_t clock_type);
void readData(
const String & name, const IDataType & type, IColumn & column,

View File

@ -41,6 +41,7 @@ void IMergedBlockOutputStream::addStreams(
const String & path,
const String & name,
const IDataType & type,
const CompressionCodecPtr & codec,
size_t estimated_size,
bool skip_offsets)
{
@ -59,8 +60,8 @@ void IMergedBlockOutputStream::addStreams(
stream_name,
path + stream_name, DATA_FILE_EXTENSION,
path + stream_name, MARKS_FILE_EXTENSION,
codec,
max_compress_block_size,
compression_settings,
estimated_size,
aio_threshold);
};
@ -183,15 +184,15 @@ IMergedBlockOutputStream::ColumnStream::ColumnStream(
const std::string & data_file_extension_,
const std::string & marks_path,
const std::string & marks_file_extension_,
const CompressionCodecPtr & compression_codec,
size_t max_compress_block_size,
CompressionSettings compression_settings,
size_t estimated_size,
size_t aio_threshold) :
escaped_column_name(escaped_column_name_),
data_file_extension{data_file_extension_},
marks_file_extension{marks_file_extension_},
plain_file(createWriteBufferFromFileBase(data_path + data_file_extension, estimated_size, aio_threshold, max_compress_block_size)),
plain_hashing(*plain_file), compressed_buf(plain_hashing, compression_settings), compressed(compressed_buf),
plain_hashing(*plain_file), compressed_buf(compression_codec->liftCompressed(plain_hashing)), compressed(*compressed_buf.get()),
marks_file(marks_path + marks_file_extension, 4096, O_TRUNC | O_CREAT | O_WRONLY), marks(marks_file)
{
}
@ -239,7 +240,10 @@ MergedBlockOutputStream::MergedBlockOutputStream(
{
init();
for (const auto & it : columns_list)
addStreams(part_path, it.name, *it.type, 0, false);
{
const auto columns = storage.getColumns();
addStreams(part_path, it.name, *it.type, columns.getCodec(it.name, compression_settings), 0, false);
}
}
MergedBlockOutputStream::MergedBlockOutputStream(
@ -270,7 +274,10 @@ MergedBlockOutputStream::MergedBlockOutputStream(
}
for (const auto & it : columns_list)
addStreams(part_path, it.name, *it.type, total_size, false);
{
const auto columns = storage.getColumns();
addStreams(part_path, it.name, *it.type, columns.getCodec(it.name, compression_settings), total_size, false);
}
}
std::string MergedBlockOutputStream::getPartPath() const
@ -532,7 +539,8 @@ void MergedColumnOnlyOutputStream::write(const Block & block)
{
const auto & col = block.safeGetByPosition(i);
addStreams(part_path, col.name, *col.type, 0, skip_offsets);
const auto columns = storage.getColumns();
addStreams(part_path, col.name, *col.type, columns.getCodec(col.name, compression_settings), 0, skip_offsets);
serialization_states.emplace_back(nullptr);
settings.getter = createStreamGetter(col.name, tmp_offset_columns, false);
col.type->serializeBinaryBulkStatePrefix(settings, serialization_states.back());

View File

@ -37,8 +37,8 @@ protected:
const std::string & data_file_extension_,
const std::string & marks_path,
const std::string & marks_file_extension_,
const CompressionCodecPtr & compression_codec,
size_t max_compress_block_size,
CompressionSettings compression_settings,
size_t estimated_size,
size_t aio_threshold);
@ -49,7 +49,7 @@ protected:
/// compressed -> compressed_buf -> plain_hashing -> plain_file
std::unique_ptr<WriteBufferFromFileBase> plain_file;
HashingWriteBuffer plain_hashing;
CompressedWriteBuffer compressed_buf;
WriteBufferPtr compressed_buf;
HashingWriteBuffer compressed;
/// marks -> marks_file
@ -65,7 +65,8 @@ protected:
using ColumnStreams = std::map<String, std::unique_ptr<ColumnStream>>;
void addStreams(const String & path, const String & name, const IDataType & type, size_t estimated_size, bool skip_offsets);
void addStreams(const String & path, const String & name, const IDataType & type,
const CompressionCodecPtr & codec, size_t estimated_size, bool skip_offsets);
IDataType::OutputStreamGetter createStreamGetter(const String & name, WrittenOffsetColumns & offset_columns, bool skip_offsets);

View File

@ -0,0 +1,9 @@
DROP TABLE IF EXISTS test.compression_codec;
CREATE TABLE test.compression_codec(day Date CODEC(ZSTD), its UInt32 CODEC(Delta(UInt32), LZ4HC(2)))
INSERT INTO test.compression_codec('2018-01-01', '')
SELECT * FROM test.compression_codec;
DROP TABLE IF EXISTS test.compression_codec;