change syntax to per-column settings

Signed-off-by: Duc Canh Le <duccanh.le@ahrefs.com>
This commit is contained in:
Duc Canh Le 2023-09-28 09:46:37 +00:00
parent 00c2a3d2c2
commit bb9521542d
8 changed files with 88 additions and 57 deletions

View File

@ -10,6 +10,8 @@
#include <Common/randomSeed.h>
#include <Common/atomicRename.h>
#include <Common/logger_useful.h>
#include <Parsers/ASTSetQuery.h>
#include <Storages/MergeTree/MergeTreeSettings.h>
#include <base/hex.h>
#include <Core/Defines.h>
@ -436,12 +438,12 @@ ASTPtr InterpreterCreateQuery::formatColumns(const ColumnsDescription & columns)
column_declaration->children.push_back(column_declaration->codec);
}
if (column.compress_block_sizes.first || column.compress_block_sizes.second)
if (!column.settings.empty())
{
Tuple value;
value.push_back(column.compress_block_sizes.first);
value.push_back(column.compress_block_sizes.second);
column_declaration->compress_block_sizes = std::make_shared<ASTLiteral>(Field(value));
auto per_column_settings = std::make_shared<ASTSetQuery>();
per_column_settings->is_standalone = false;
per_column_settings->changes = column.settings;
column_declaration->per_column_settings = std::move(per_column_settings);
}
if (column.ttl)
@ -646,20 +648,17 @@ ColumnsDescription InterpreterCreateQuery::getColumnsDescription(
col_decl.codec, column.type, sanity_check_compression_codecs, allow_experimental_codecs, enable_deflate_qpl_codec);
}
if (col_decl.compress_block_sizes)
{
auto sizes = col_decl.compress_block_sizes->as<ASTLiteral &>().value.safeGet<Tuple>();
if (sizes.size() != 2 || sizes[0].getType() != Field::Types::UInt64 || sizes[0].getType() != Field::Types::UInt64)
throw Exception(ErrorCodes::SYNTAX_ERROR, "Column {}: COMPRESSION BLOCK must be tuple of two unsigned integer", column.name);
column.compress_block_sizes.first = sizes[0].safeGet<UInt64>();
column.compress_block_sizes.second = sizes[1].safeGet<UInt64>();
if (column.compress_block_sizes.first > column.compress_block_sizes.second)
throw Exception(ErrorCodes::SYNTAX_ERROR, "Column {}: min compress block size must smaller than max compress block size", column.name);
}
if (col_decl.ttl)
column.ttl = col_decl.ttl;
if (col_decl.per_column_settings)
{
column.settings = col_decl.per_column_settings->as<ASTSetQuery &>().changes;
/// Sanity check here, assume mergetree
MergeTreeSettings dummy;
dummy.applyChanges(column.settings);
}
res.add(std::move(column));
}

View File

@ -39,10 +39,10 @@ ASTPtr ASTColumnDeclaration::clone() const
res->children.push_back(res->codec);
}
if (compress_block_sizes)
if (per_column_settings)
{
res->compress_block_sizes = compress_block_sizes->clone();
res->children.push_back(res->compress_block_sizes);
res->per_column_settings = per_column_settings->clone();
res->children.push_back(res->per_column_settings);
}
if (ttl)
@ -105,12 +105,6 @@ void ASTColumnDeclaration::formatImpl(const FormatSettings & settings, FormatSta
codec->formatImpl(settings, state, frame);
}
if (compress_block_sizes)
{
settings.ostr << ' ' << (settings.hilite ? hilite_keyword : "") << "COMPRESS BLOCK" << (settings.hilite ? hilite_none : "") << ' ';
compress_block_sizes->formatImpl(settings, state, frame);
}
if (ttl)
{
settings.ostr << ' ' << (settings.hilite ? hilite_keyword : "") << "TTL" << (settings.hilite ? hilite_none : "") << ' ';
@ -122,6 +116,13 @@ void ASTColumnDeclaration::formatImpl(const FormatSettings & settings, FormatSta
settings.ostr << ' ' << (settings.hilite ? hilite_keyword : "") << "COLLATE" << (settings.hilite ? hilite_none : "") << ' ';
collation->formatImpl(settings, state, frame);
}
if (per_column_settings)
{
settings.ostr << ' ' << (settings.hilite ? hilite_keyword : "") << "SETTINGS" << (settings.hilite ? hilite_none : "") << ' ' << '(';
per_column_settings->formatImpl(settings, state, frame);
settings.ostr << ')';
}
}
}

View File

@ -20,8 +20,8 @@ public:
ASTPtr comment;
ASTPtr codec;
ASTPtr ttl;
ASTPtr compress_block_sizes;
ASTPtr collation;
ASTPtr per_column_settings;
bool primary_key_specifier = false;
String getID(char delim) const override { return "ColumnDeclaration" + (delim + name); }

View File

@ -10,6 +10,7 @@
#include <Parsers/ExpressionListParsers.h>
#include <Parsers/IParserBase.h>
#include <Parsers/ParserDataType.h>
#include <Parsers/ParserSetQuery.h>
#include <Poco/String.h>
namespace DB
@ -136,14 +137,14 @@ bool IParserColumnDeclaration<NameParser>::parseImpl(Pos & pos, ASTPtr & node, E
ParserKeyword s_type{"TYPE"};
ParserKeyword s_collate{"COLLATE"};
ParserKeyword s_primary_key{"PRIMARY KEY"};
ParserKeyword s_compress_block{"COMPRESS BLOCK"};
ParserKeyword s_settings("SETTINGS");
ParserExpression expr_parser;
ParserStringLiteral string_literal_parser;
ParserLiteral literal_parser;
ParserCodec codec_parser;
ParserTupleOfLiterals compress_block_parser;
ParserCollation collation_parser;
ParserExpression expression_parser;
ParserSetQuery settings_parser(true);
/// mandatory column name
ASTPtr name;
@ -178,7 +179,7 @@ bool IParserColumnDeclaration<NameParser>::parseImpl(Pos & pos, ASTPtr & node, E
ASTPtr default_expression;
ASTPtr comment_expression;
ASTPtr codec_expression;
ASTPtr compress_block_sizes;
ASTPtr per_column_settings;
ASTPtr ttl_expression;
ASTPtr collation_expression;
bool primary_key_specifier = false;
@ -304,12 +305,6 @@ bool IParserColumnDeclaration<NameParser>::parseImpl(Pos & pos, ASTPtr & node, E
return false;
}
if (s_compress_block.ignore(pos, expected))
{
if (!compress_block_parser.parse(pos, compress_block_sizes, expected))
return false;
}
if (s_ttl.ignore(pos, expected))
{
if (!expression_parser.parse(pos, ttl_expression, expected))
@ -321,6 +316,26 @@ bool IParserColumnDeclaration<NameParser>::parseImpl(Pos & pos, ASTPtr & node, E
primary_key_specifier = true;
}
auto old_pos = pos;
if (s_settings.ignore(pos, expected))
{
ParserToken parser_opening_bracket(TokenType::OpeningRoundBracket);
if (parser_opening_bracket.ignore(pos, expected))
{
if (!settings_parser.parse(pos, per_column_settings, expected))
return false;
ParserToken parser_closing_bracket(TokenType::ClosingRoundBracket);
if (!parser_closing_bracket.ignore(pos, expected))
return false;
}
else
{
/// This could be settings in alter query
/// E.g: ALTER TABLE alter_enum_array MODIFY COLUMN x String SETTINGS mutations_sync=2;
pos = old_pos;
}
}
node = column_declaration;
if (type)
@ -351,10 +366,10 @@ bool IParserColumnDeclaration<NameParser>::parseImpl(Pos & pos, ASTPtr & node, E
column_declaration->children.push_back(std::move(codec_expression));
}
if (compress_block_sizes)
if (per_column_settings)
{
column_declaration->compress_block_sizes = compress_block_sizes;
column_declaration->children.push_back(std::move(compress_block_sizes));
column_declaration->per_column_settings = per_column_settings;
column_declaration->children.push_back(std::move(per_column_settings));
}
if (ttl_expression)

View File

@ -24,6 +24,7 @@
#include <Interpreters/Context.h>
#include <Storages/IStorage.h>
#include <Common/typeid_cast.h>
#include "Parsers/ASTSetQuery.h"
#include <Core/Defines.h>
#include <Compression/CompressionFactory.h>
#include <Interpreters/ExpressionAnalyzer.h>
@ -62,7 +63,7 @@ bool ColumnDescription::operator==(const ColumnDescription & other) const
&& default_desc == other.default_desc
&& comment == other.comment
&& ast_to_str(codec) == ast_to_str(other.codec)
&& compress_block_sizes == other.compress_block_sizes
&& settings == other.settings
&& ast_to_str(ttl) == ast_to_str(other.ttl);
}
@ -95,14 +96,15 @@ void ColumnDescription::writeText(WriteBuffer & buf) const
writeEscapedString(queryToString(codec), buf);
}
if (compress_block_sizes.first || compress_block_sizes.second)
if (!settings.empty())
{
writeChar('\t', buf);
DB::writeText("COMPRESSION BLOCK ", buf);
Tuple value;
value.push_back(compress_block_sizes.first);
value.push_back(compress_block_sizes.second);
writeEscapedString(queryToString(ASTLiteral(Field(value))), buf);
DB::writeText("SETTINGS ", buf);
DB::writeText("(", buf);
ASTSetQuery ast;
ast.changes = settings;
writeEscapedString(queryToString(ast), buf);
DB::writeText(")", buf);
}
if (ttl)
@ -149,12 +151,9 @@ void ColumnDescription::readText(ReadBuffer & buf)
if (col_ast->ttl)
ttl = col_ast->ttl;
if (col_ast->compress_block_sizes)
{
auto sizes = col_ast->compress_block_sizes->as<ASTLiteral &>().value.safeGet<Tuple>();
compress_block_sizes.first = sizes[0].safeGet<UInt64>();
compress_block_sizes.second = sizes[1].safeGet<UInt64>();
}
if (col_ast->per_column_settings)
settings = col_ast->per_column_settings->as<ASTSetQuery &>().changes;
}
else
throw Exception(ErrorCodes::CANNOT_PARSE_TEXT, "Cannot parse column description");

View File

@ -7,6 +7,7 @@
#include <Core/NamesAndAliases.h>
#include <Interpreters/Context_fwd.h>
#include <Storages/ColumnDefault.h>
#include <Common/SettingsChanges.h>
#include <Common/Exception.h>
#include <boost/multi_index/member.hpp>
@ -82,7 +83,7 @@ struct ColumnDescription
ColumnDefault default_desc;
String comment;
ASTPtr codec;
std::pair<size_t, size_t> compress_block_sizes;
SettingsChanges settings;
ASTPtr ttl;
ColumnDescription() = default;

View File

@ -145,7 +145,10 @@ void MergeTreeDataPartWriterWide::addStreams(
const auto column_desc = metadata_snapshot->columns.tryGetColumnDescription(GetColumnsOptions(GetColumnsOptions::AllPhysical), column.getNameInStorage());
auto max_compress_block_size = column_desc ? column_desc->compress_block_sizes.second : 0;
UInt64 max_compress_block_size = 0;
if (column_desc)
if (const auto * value = column_desc->settings.tryGet("max_compress_block_size"))
max_compress_block_size = value->safeGet<UInt64>();
if (!max_compress_block_size)
max_compress_block_size = settings.max_compress_block_size;
@ -329,7 +332,10 @@ StreamsWithMarks MergeTreeDataPartWriterWide::getCurrentMarksForColumn(
{
StreamsWithMarks result;
const auto column_desc = metadata_snapshot->columns.tryGetColumnDescription(GetColumnsOptions(GetColumnsOptions::AllPhysical), column.getNameInStorage());
auto min_compress_block_size = column_desc ? column_desc->compress_block_sizes.first : 0;
UInt64 min_compress_block_size = 0;
if (column_desc)
if (const auto * value = column_desc->settings.tryGet("min_compress_block_size"))
min_compress_block_size = value->safeGet<UInt64>();
if (!min_compress_block_size)
min_compress_block_size = settings.min_compress_block_size;
data_part->getSerialization(column.name)->enumerateStreams([&] (const ISerialization::SubstreamPath & substream_path)

View File

@ -2,7 +2,7 @@
CREATE TABLE t
(
`id` UInt64 CODEC(ZSTD(1)),
`long_string` String CODEC(ZSTD(9, 24)) COMPRESS BLOCK (67108864, 67108864),
`long_string` String CODEC(ZSTD(9, 24)) SETTINGS (min_compress_block_size = 81920, max_compress_block_size = 163840),
`v1` String CODEC(ZSTD(1)),
`v2` UInt64 CODEC(ZSTD(1)),
`v3` Float32 CODEC(ZSTD(1)),
@ -21,8 +21,8 @@ SET allow_experimental_object_type = 1;
CREATE TABLE t2
(
`id` UInt64 CODEC(ZSTD(1)),
`tup` Tuple(UInt64, UInt64) CODEC(ZSTD(1)) COMPRESS BLOCK (1024, 8192),
`json` JSON CODEC(ZSTD(9, 24)) COMPRESS BLOCK (671088, 671088),
`tup` Tuple(UInt64, UInt64) CODEC(ZSTD(1)) SETTINGS (min_compress_block_size = 81920, max_compress_block_size = 163840),
`json` JSON CODEC(ZSTD(9, 24)) SETTINGS (min_compress_block_size = 81920, max_compress_block_size = 163840),
)
ENGINE = MergeTree
ORDER BY id
@ -30,3 +30,13 @@ SETTINGS min_bytes_for_wide_part = 1;
INSERT INTO TABLE t2 SELECT number, tuple(number, number), concat('{"key": ', toString(number), ' ,"value": ', toString(rand(number+1)), '}') FROM numbers(1000);
SELECT tup, json.key AS key FROM t2 ORDER BY key LIMIT 10;
CREATE TABLE t3
(
`id` UInt64 CODEC(ZSTD(1)),
`long_string` String CODEC(ZSTD(1)) SETTINGS (min_block_size = 81920, max_compress_block_size = 163840),
)
ENGINE = MergeTree
ORDER BY id
SETTINGS min_bytes_for_wide_part = 1; -- {serverError 115}