Better comments

This commit is contained in:
alesapin 2020-09-22 15:49:55 +03:00
parent 96d06c6ae6
commit fd394f699c
7 changed files with 59 additions and 30 deletions

View File

@ -6,6 +6,7 @@
#include <IO/ReadBuffer.h>
#include <Parsers/queryToString.h>
#include <Compression/CompressionCodecMultiple.h>
#include <Compression/CompressionCodecNone.h>
#include <IO/WriteHelpers.h>
#include <boost/algorithm/string/join.hpp>
@ -67,6 +68,7 @@ ASTPtr CompressionCodecFactory::validateCodecAndGetPreprocessedAST(const ASTPtr
bool has_none = false;
std::optional<size_t> generic_compression_codec_pos;
bool can_substitute_codec_arguments = true;
for (size_t i = 0; i < func->arguments->children.size(); ++i)
{
const auto & inner_codec_ast = func->arguments->children[i];
@ -101,10 +103,19 @@ ASTPtr CompressionCodecFactory::validateCodecAndGetPreprocessedAST(const ASTPtr
{
if (column_type)
{
CompressionCodecPtr prev_codec;
IDataType::StreamCallback callback = [&](const IDataType::SubstreamPath & substream_path, const IDataType & substream_type)
{
if (IDataType::isSpecialCompressionAllowed(substream_path))
{
result_codec = getImpl(codec_family_name, codec_arguments, &substream_type);
/// Case for column Tuple, which compressed with codec which depends on data type, like Delta.
/// We cannot substitute parameters for such codecs.
if (prev_codec && prev_codec->getHash() != result_codec->getHash())
can_substitute_codec_arguments = false;
prev_codec = result_codec;
}
};
IDataType::SubstreamPath stream_path;
@ -158,10 +169,24 @@ ASTPtr CompressionCodecFactory::validateCodecAndGetPreprocessedAST(const ASTPtr
" (Note: you can enable setting 'allow_suspicious_codecs' to skip this check).", ErrorCodes::BAD_ARGUMENTS);
}
std::shared_ptr<ASTFunction> result = std::make_shared<ASTFunction>();
result->name = "CODEC";
result->arguments = codecs_descriptions;
return result;
/// For columns with nested types like Tuple(UInt32, UInt64) we
/// obviously cannot substitute parameters for codecs which depend on
/// data type, because for the first column Delta(4) is suitable and
/// Delta(8) for the second. So we should leave codec description as is
/// and deduce them in get method for each subtype separately. For all
/// other types it's better to substitute parameters, for better
/// readability and backward compatibility.
if (can_substitute_codec_arguments)
{
std::shared_ptr<ASTFunction> result = std::make_shared<ASTFunction>();
result->name = "CODEC";
result->arguments = codecs_descriptions;
return result;
}
else
{
return ast;
}
}
throw Exception("Unknown codec family: " + queryToString(ast), ErrorCodes::UNKNOWN_CODEC);
@ -212,7 +237,7 @@ CompressionCodecPtr CompressionCodecFactory::get(const ASTPtr & ast, const IData
else if (codecs.size() > 1)
return std::make_shared<CompressionCodecMultiple>(codecs);
else
return nullptr;
return std::make_shared<CompressionCodecNone>();
}
throw Exception("Unexpected AST structure for compression codec: " + queryToString(ast), ErrorCodes::UNEXPECTED_AST_STRUCTURE);

View File

@ -39,6 +39,8 @@ public:
/// Validate codecs AST specified by user and parses codecs description (substitute default parameters)
ASTPtr validateCodecAndGetPreprocessedAST(const ASTPtr & ast, const IDataType * column_type, bool sanity_check) const;
/// Just wrapper for previous method.
ASTPtr validateCodecAndGetPreprocessedAST(const ASTPtr & ast, const DataTypePtr & column_type, bool sanity_check) const
{
return validateCodecAndGetPreprocessedAST(ast, column_type.get(), sanity_check);
@ -51,8 +53,14 @@ public:
/// information about type to improve inner settings, but every codec should
/// be able to work without information about type. Also AST can contain
/// codec, which can be alias to current default codec, which can be changed
/// in runtime.
/// in runtime. If only_generic is true than method will filter all
/// isGenericCompression() == false codecs from result. If nothing found
/// will return nullptr. It's useful for auxiliary parts of complex columns
/// like Nullable, Array and so on. If all codecs are non generic and
/// only_generic = true, than codec NONE will be returned.
CompressionCodecPtr get(const ASTPtr & ast, const IDataType * column_type, CompressionCodecPtr current_default = nullptr, bool only_generic = false) const;
/// Just wrapper for previous method.
CompressionCodecPtr get(const ASTPtr & ast, const DataTypePtr & column_type, CompressionCodecPtr current_default = nullptr, bool only_generic = false) const
{
return get(ast, column_type.get(), current_default, only_generic);

View File

@ -104,7 +104,8 @@ public:
using SubstreamPath = std::vector<Substream>;
using StreamCallback = std::function<void(const SubstreamPath &, const IDataType & substream_type)>;
using StreamCallback = std::function<void(const SubstreamPath &, const IDataType &)>;
virtual void enumerateStreams(const StreamCallback & callback, SubstreamPath & path) const
{
callback(path, *this);
@ -442,6 +443,9 @@ public:
static String getFileNameForStream(const String & column_name, const SubstreamPath & path);
/// Substream path supports special compression methods like codec Delta.
/// For all other substreams (like ArraySizes, NullMasks, etc.) we use only
/// generic compression codecs like LZ4.
static bool isSpecialCompressionAllowed(const SubstreamPath & path);
private:
friend class DataTypeFactory;

View File

@ -45,17 +45,13 @@ void MergeTreeDataPartWriterCompact::addStreams(const String & name, const IData
return;
CompressionCodecPtr compression_codec;
if (IDataType::isSpecialCompressionAllowed(substream_path))
{
compression_codec = CompressionCodecFactory::instance().get(effective_codec_desc, &substream_type, default_codec);
}
else
{
compression_codec = CompressionCodecFactory::instance().get(effective_codec_desc, nullptr, default_codec, true);
}
if (compression_codec == nullptr)
compression_codec = CompressionCodecFactory::instance().getDefaultCodec();
/// If we can use special codec than just get it
if (IDataType::isSpecialCompressionAllowed(substream_path))
compression_codec = CompressionCodecFactory::instance().get(effective_codec_desc, &substream_type, default_codec);
else /// otherwise return only generic codecs and don't use info about data_type
compression_codec = CompressionCodecFactory::instance().get(effective_codec_desc, nullptr, default_codec, true);
UInt64 codec_id = compression_codec->getHash();
auto & stream = streams_by_codec[codec_id];
if (!stream)

View File

@ -54,6 +54,7 @@ private:
std::unique_ptr<WriteBufferFromFileBase> plain_file;
HashingWriteBuffer plain_hashing;
/// Compressed stream which allows to write with codec.
struct CompressedStream
{
CompressedWriteBuffer compressed_buf;
@ -66,10 +67,11 @@ private:
using CompressedStreamPtr = std::shared_ptr<CompressedStream>;
/// Create compressed stream for every different codec.
/// Create compressed stream for every different codec. All streams write to
/// a single file on disk.
std::unordered_map<UInt64, CompressedStreamPtr> streams_by_codec;
/// For better performance save pointer to stream by every column.
/// Stream for each column's substreams path (look at addStreams).
std::unordered_map<String, CompressedStreamPtr> compressed_streams;
/// marks -> marks_file

View File

@ -46,17 +46,11 @@ void MergeTreeDataPartWriterWide::addStreams(
return;
CompressionCodecPtr compression_codec;
/// If we can use special codec than just get it
if (IDataType::isSpecialCompressionAllowed(substream_path))
{
compression_codec = CompressionCodecFactory::instance().get(effective_codec_desc, &substream_type, default_codec);
}
else
{
else /// otherwise return only generic codecs and don't use info about data_type
compression_codec = CompressionCodecFactory::instance().get(effective_codec_desc, nullptr, default_codec, true);
}
if (compression_codec == nullptr)
compression_codec = CompressionCodecFactory::instance().getDefaultCodec();
column_streams[stream_name] = std::make_unique<Stream>(
stream_name,

View File

@ -3,18 +3,18 @@
CREATE TABLE default.columns_with_multiple_streams\n(\n `field0` Nullable(Int64) CODEC(Delta(2), LZ4),\n `field1` Nullable(UInt8) CODEC(Delta(8), LZ4),\n `field2` Array(Array(Int64)) CODEC(Delta(8), LZ4),\n `field3` Tuple(UInt32, Array(UInt64)) CODEC(T64, Default)\n)\nENGINE = MergeTree\nORDER BY tuple()\nSETTINGS min_rows_for_wide_part = 0, min_bytes_for_wide_part = 0, index_granularity = 8192
1 1 [[1]] (1,[1])
2 2 [[2]] (2,[2])
CREATE TABLE default.columns_with_multiple_streams\n(\n `field0` Nullable(Int64) CODEC(Delta(2), LZ4),\n `field1` Nullable(UInt8) CODEC(Delta(8), LZ4),\n `field2` Array(Array(Int64)) CODEC(Delta(8), LZ4),\n `field3` Tuple(UInt32, Array(UInt64)) CODEC(Delta(8), Default)\n)\nENGINE = MergeTree\nORDER BY tuple()\nSETTINGS min_rows_for_wide_part = 0, min_bytes_for_wide_part = 0, index_granularity = 8192
CREATE TABLE default.columns_with_multiple_streams\n(\n `field0` Nullable(Int64) CODEC(Delta(2), LZ4),\n `field1` Nullable(UInt8) CODEC(Delta(8), LZ4),\n `field2` Array(Array(Int64)) CODEC(Delta(8), LZ4),\n `field3` Tuple(UInt32, Array(UInt64)) CODEC(Delta, Default)\n)\nENGINE = MergeTree\nORDER BY tuple()\nSETTINGS min_rows_for_wide_part = 0, min_bytes_for_wide_part = 0, index_granularity = 8192
1 1 [[1]] (1,[1])
2 2 [[2]] (2,[2])
3 3 [[3]] (3,[3])
1 1 [[1]] (1,[1])
1 1 [[1]] (1,[1])
CREATE TABLE default.columns_with_multiple_streams_compact\n(\n `field0` Nullable(Int64) CODEC(Delta(2), LZ4),\n `field1` Nullable(UInt8) CODEC(Delta(8), LZ4),\n `field2` Array(Array(Int64)) CODEC(Delta(8), LZ4),\n `field3` Tuple(UInt32, Array(UInt64)) CODEC(Delta(8), Default)\n)\nENGINE = MergeTree\nORDER BY tuple()\nSETTINGS min_rows_for_wide_part = 100000, min_bytes_for_wide_part = 100000, index_granularity = 8192
CREATE TABLE default.columns_with_multiple_streams_compact\n(\n `field0` Nullable(Int64) CODEC(Delta(2), LZ4),\n `field1` Nullable(UInt8) CODEC(Delta(8), LZ4),\n `field2` Array(Array(Int64)) CODEC(Delta(8), LZ4),\n `field3` Tuple(UInt32, Array(UInt64)) CODEC(Delta, Default)\n)\nENGINE = MergeTree\nORDER BY tuple()\nSETTINGS min_rows_for_wide_part = 100000, min_bytes_for_wide_part = 100000, index_granularity = 8192
1 1 [[1]] (1,[1])
2 2 [[2]] (2,[2])
1 1 [[1]] (1,[1])
2 2 [[2]] (2,[2])
CREATE TABLE default.columns_with_multiple_streams_compact\n(\n `field0` Nullable(Int64) CODEC(Delta(2), LZ4),\n `field1` Nullable(UInt8) CODEC(Delta(8), LZ4),\n `field2` Array(Array(Int64)) CODEC(Delta(8), LZ4),\n `field3` Tuple(UInt32, Array(UInt64)) CODEC(Delta(8), Default)\n)\nENGINE = MergeTree\nORDER BY tuple()\nSETTINGS min_rows_for_wide_part = 100000, min_bytes_for_wide_part = 100000, index_granularity = 8192
CREATE TABLE default.columns_with_multiple_streams_compact\n(\n `field0` Nullable(Int64) CODEC(Delta(2), LZ4),\n `field1` Nullable(UInt8) CODEC(Delta(8), LZ4),\n `field2` Array(Array(Int64)) CODEC(Delta(8), LZ4),\n `field3` Tuple(UInt32, Array(UInt64)) CODEC(Delta, Default)\n)\nENGINE = MergeTree\nORDER BY tuple()\nSETTINGS min_rows_for_wide_part = 100000, min_bytes_for_wide_part = 100000, index_granularity = 8192
1 1 [[1]] (1,[1])
2 2 [[2]] (2,[2])
3 3 [[3]] (3,[3])