Merge pull request #4054 from yandex/compression_codec_alter

Compression codec alter
This commit is contained in:
alesapin 2019-01-15 16:18:33 +03:00 committed by GitHub
commit 4714bb5f68
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 160 additions and 2 deletions

View File

@ -5,6 +5,7 @@
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Common/hex.h>
#include <sstream>
namespace DB
@ -20,14 +21,16 @@ extern const int CORRUPTED_DATA;
CompressionCodecMultiple::CompressionCodecMultiple(Codecs codecs)
: codecs(codecs)
{
std::ostringstream ss;
for (size_t idx = 0; idx < codecs.size(); idx++)
{
if (idx != 0)
codec_desc = codec_desc + ',';
ss << ',' << ' ';
const auto codec = codecs[idx];
codec_desc = codec_desc + codec->getCodecDesc();
ss << codec->getCodecDesc();
}
codec_desc = ss.str();
}
UInt8 CompressionCodecMultiple::getMethodByte() const

View File

@ -14,6 +14,7 @@
#include <Parsers/ASTAlterQuery.h>
#include <Parsers/ASTColumnDeclaration.h>
#include <Common/typeid_cast.h>
#include <Compression/CompressionFactory.h>
namespace DB
@ -30,6 +31,7 @@ namespace ErrorCodes
std::optional<AlterCommand> AlterCommand::parse(const ASTAlterCommand * command_ast)
{
const DataTypeFactory & data_type_factory = DataTypeFactory::instance();
const CompressionCodecFactory & compression_codec_factory = CompressionCodecFactory::instance();
if (command_ast->type == ASTAlterCommand::ADD_COLUMN)
{
@ -49,6 +51,9 @@ std::optional<AlterCommand> AlterCommand::parse(const ASTAlterCommand * command_
command.default_expression = ast_col_decl.default_expression;
}
if (ast_col_decl.codec)
command.codec = compression_codec_factory.get(ast_col_decl.codec);
if (command_ast->column)
command.after_column = *getIdentifierName(command_ast->column);
@ -86,6 +91,9 @@ std::optional<AlterCommand> AlterCommand::parse(const ASTAlterCommand * command_
command.default_expression = ast_col_decl.default_expression;
}
if (ast_col_decl.codec)
command.codec = compression_codec_factory.get(ast_col_decl.codec);
if (ast_col_decl.comment)
{
const auto & ast_comment = typeid_cast<ASTLiteral &>(*ast_col_decl.comment);
@ -168,6 +176,9 @@ void AlterCommand::apply(ColumnsDescription & columns_description, ASTPtr & orde
if (default_expression)
columns_description.defaults.emplace(column_name, ColumnDefault{default_kind, default_expression});
if (codec)
columns_description.codecs.emplace(column_name, codec);
/// Slow, because each time a list is copied
columns_description.ordinary = Nested::flatten(columns_description.ordinary);
}
@ -200,6 +211,9 @@ void AlterCommand::apply(ColumnsDescription & columns_description, ASTPtr & orde
}
else if (type == MODIFY_COLUMN)
{
if (codec)
columns_description.codecs[column_name] = codec;
if (!is_mutable())
{
auto & comments = columns_description.comments;

View File

@ -55,6 +55,9 @@ struct AlterCommand
/// indicates that this command should not be applied, for example in case of if_exists=true and column doesn't exist.
bool ignore = false;
/// For ADD and MODIFY
CompressionCodecPtr codec;
AlterCommand() = default;
AlterCommand(const Type type, const String & column_name, const DataTypePtr & data_type,
const ColumnDefaultKind default_kind, const ASTPtr & default_expression,

View File

@ -41,6 +41,7 @@ StorageSystemColumns::StorageSystemColumns(const std::string & name_)
{ "is_in_sorting_key", std::make_shared<DataTypeUInt8>() },
{ "is_in_primary_key", std::make_shared<DataTypeUInt8>() },
{ "is_in_sampling_key", std::make_shared<DataTypeUInt8>() },
{ "compression_codec", std::make_shared<DataTypeString>() },
}));
}
@ -86,6 +87,7 @@ protected:
NamesAndTypesList columns;
ColumnDefaults column_defaults;
ColumnComments column_comments;
ColumnCodecs column_codecs;
Names cols_required_for_partition_key;
Names cols_required_for_sorting_key;
Names cols_required_for_primary_key;
@ -114,6 +116,7 @@ protected:
}
columns = storage->getColumns().getAll();
column_codecs = storage->getColumns().codecs;
column_defaults = storage->getColumns().defaults;
column_comments = storage->getColumns().comments;
@ -219,6 +222,20 @@ protected:
res_columns[res_index++]->insert(find_in_vector(cols_required_for_sampling));
}
{
const auto it = column_codecs.find(column.name);
if (it == std::end(column_codecs))
{
if (columns_mask[src_index++])
res_columns[res_index++]->insertDefault();
}
else
{
if (columns_mask[src_index++])
res_columns[res_index++]->insert("CODEC(" + it->second->getCodecDesc() + ")");
}
}
++rows_count;
}
}

View File

@ -0,0 +1,33 @@
2018-01-01 1
2018-01-01 2
CODEC(ZSTD)
2018-01-01 1 default_value
2018-01-01 2 default_value
2018-01-01 3 3
2018-01-01 4 4
CODEC(NONE)
2018-01-01 1 default_value
2018-01-01 2 default_value
2018-01-01 3 3
2018-01-01 4 4
2018-01-01 5 5
2018-01-01 6 6
2018-01-01 1 default_value
2018-01-01 2 default_value
2018-01-01 3 3
2018-01-01 4 4
2018-01-01 5 5
2018-01-01 6 6
CODEC(ZSTD, LZ4HC, LZ4, LZ4, NONE)
2018-01-01 1 default_value
2018-01-01 2 default_value
2018-01-01 3 3
2018-01-01 4 4
2018-01-01 5 5
2018-01-01 6 6
2018-01-01 7 7
2018-01-01 8 8
CODEC(ZSTD, LZ4HC, LZ4, LZ4, NONE)
CODEC(NONE, LZ4, LZ4HC, ZSTD)
2
1

View File

@ -0,0 +1,88 @@
SET send_logs_level = 'none';
DROP TABLE IF EXISTS test.alter_compression_codec;
CREATE TABLE test.alter_compression_codec (
somedate Date CODEC(LZ4),
id UInt64 CODEC(NONE)
) ENGINE = MergeTree() PARTITION BY somedate ORDER BY id;
INSERT INTO test.alter_compression_codec VALUES('2018-01-01', 1);
INSERT INTO test.alter_compression_codec VALUES('2018-01-01', 2);
SELECT * FROM test.alter_compression_codec ORDER BY id;
ALTER TABLE test.alter_compression_codec ADD COLUMN alter_column String DEFAULT 'default_value' CODEC(ZSTD);
SELECT compression_codec FROM system.columns WHERE database = 'test' AND table = 'alter_compression_codec' AND name = 'alter_column';
INSERT INTO test.alter_compression_codec VALUES('2018-01-01', 3, '3');
INSERT INTO test.alter_compression_codec VALUES('2018-01-01', 4, '4');
SELECT * FROM test.alter_compression_codec ORDER BY id;
ALTER TABLE test.alter_compression_codec MODIFY COLUMN alter_column CODEC(NONE);
SELECT compression_codec FROM system.columns WHERE database = 'test' AND table = 'alter_compression_codec' AND name = 'alter_column';
INSERT INTO test.alter_compression_codec VALUES('2018-01-01', 5, '5');
INSERT INTO test.alter_compression_codec VALUES('2018-01-01', 6, '6');
SELECT * FROM test.alter_compression_codec ORDER BY id;
OPTIMIZE TABLE test.alter_compression_codec FINAL;
SELECT * FROM test.alter_compression_codec ORDER BY id;
ALTER TABLE test.alter_compression_codec MODIFY COLUMN alter_column CODEC(ZSTD, LZ4HC, LZ4, LZ4, NONE);
SELECT compression_codec FROM system.columns WHERE database = 'test' AND table = 'alter_compression_codec' AND name = 'alter_column';
INSERT INTO test.alter_compression_codec VALUES('2018-01-01', 7, '7');
INSERT INTO test.alter_compression_codec VALUES('2018-01-01', 8, '8');
OPTIMIZE TABLE test.alter_compression_codec FINAL;
SELECT * FROM test.alter_compression_codec ORDER BY id;
ALTER TABLE test.alter_compression_codec MODIFY COLUMN alter_column FixedString(100);
SELECT compression_codec FROM system.columns WHERE database = 'test' AND table = 'alter_compression_codec' AND name = 'alter_column';
DROP TABLE IF EXISTS test.alter_compression_codec;
DROP TABLE IF EXISTS test.alter_bad_codec;
CREATE TABLE test.alter_bad_codec (
somedate Date CODEC(LZ4),
id UInt64 CODEC(NONE)
) ENGINE = MergeTree() ORDER BY tuple();
ALTER TABLE test.alter_bad_codec ADD COLUMN alter_column DateTime DEFAULT '2019-01-01 00:00:00' CODEC(gbdgkjsdh); -- { serverError 432 }
ALTER TABLE test.alter_bad_codec ADD COLUMN alter_column DateTime DEFAULT '2019-01-01 00:00:00' CODEC(ZSTD(100)); -- { serverError 433 }
DROP TABLE IF EXISTS test.alter_bad_codec;
DROP TABLE IF EXISTS test.large_alter_table;
DROP TABLE IF EXISTS test.store_of_hash;
CREATE TABLE test.large_alter_table (
somedate Date CODEC(ZSTD, ZSTD, ZSTD(12), LZ4HC(12)),
id UInt64 CODEC(LZ4, ZSTD, NONE, LZ4HC),
data String CODEC(ZSTD(2), LZ4HC, NONE, LZ4, LZ4)
) ENGINE = MergeTree() PARTITION BY somedate ORDER BY id SETTINGS index_granularity = 2;
INSERT INTO test.large_alter_table SELECT toDate('2019-01-01'), number, toString(number + rand()) FROM system.numbers LIMIT 300000;
CREATE TABLE test.store_of_hash (hash UInt64) ENGINE = Memory();
INSERT INTO test.store_of_hash SELECT sum(cityHash64(*)) FROM test.large_alter_table;
ALTER TABLE test.large_alter_table MODIFY COLUMN data CODEC(NONE, LZ4, LZ4HC, ZSTD);
OPTIMIZE TABLE test.large_alter_table;
SELECT compression_codec FROM system.columns WHERE database = 'test' AND table = 'large_alter_table' AND name = 'data';
DETACH TABLE test.large_alter_table;
ATTACH TABLE test.large_alter_table;
INSERT INTO test.store_of_hash SELECT sum(cityHash64(*)) FROM test.large_alter_table;
SELECT COUNT(hash) FROM test.store_of_hash;
SELECT COUNT(DISTINCT hash) FROM test.store_of_hash;
DROP TABLE IF EXISTS test.large_alter_table;
DROP TABLE IF EXISTS test.store_of_hash;