At least something works

This commit is contained in:
alesapin 2020-05-19 12:54:56 +03:00
parent 1ccd4fb978
commit 70e5553204
7 changed files with 182 additions and 37 deletions

View File

@ -1,5 +1,6 @@
#include "MutationsInterpreter.h"
#include <Parsers/queryToString.h>
#include <Functions/FunctionFactory.h>
#include <Functions/IFunction.h>
#include <Interpreters/InDepthNodeVisitor.h>
@ -635,6 +636,7 @@ ASTPtr MutationsInterpreter::prepareInterpreterSelectQuery(std::vector<Stage> &
}
select->setExpression(ASTSelectQuery::Expression::WHERE, std::move(where_expression));
}
LOG_DEBUG(&Poco::Logger::get("DEBUG"), "QUERY:" << queryToString(select));
return select;
}

View File

@ -594,7 +594,7 @@ bool AlterCommand::isCommentAlter() const
return false;
}
std::optional<MutationCommand> AlterCommand::tryConvertToMutationCommand(const StorageInMemoryMetadata & metadata) const
std::optional<MutationCommand> AlterCommand::tryConvertToMutationCommand(StorageInMemoryMetadata & metadata) const
{
if (!isRequireMutationStage(metadata))
return {};
@ -637,6 +637,7 @@ std::optional<MutationCommand> AlterCommand::tryConvertToMutationCommand(const S
}
result.ast = ast->clone();
apply(metadata);
return result;
}
@ -733,6 +734,7 @@ void AlterCommands::validate(const StorageInMemoryMetadata & metadata, const Con
auto all_columns = metadata.columns;
/// Default expression for all added/modified columns
ASTPtr default_expr_list = std::make_shared<ASTExpressionList>();
NameSet modified_columns, renamed_columns;
for (size_t i = 0; i < size(); ++i)
{
const auto & command = (*this)[i];
@ -740,7 +742,7 @@ void AlterCommands::validate(const StorageInMemoryMetadata & metadata, const Con
const auto & column_name = command.column_name;
if (command.type == AlterCommand::ADD_COLUMN)
{
if (metadata.columns.has(column_name) || metadata.columns.hasNested(column_name))
if (all_columns.has(column_name) || all_columns.hasNested(column_name))
{
if (!command.if_not_exists)
throw Exception{"Cannot add column " + backQuote(column_name) + ": column with this name already exists",
@ -757,7 +759,7 @@ void AlterCommands::validate(const StorageInMemoryMetadata & metadata, const Con
}
else if (command.type == AlterCommand::MODIFY_COLUMN)
{
if (!metadata.columns.has(column_name))
if (!all_columns.has(column_name))
{
if (!command.if_exists)
throw Exception{"Wrong column name. Cannot find column " + backQuote(column_name) + " to modify",
@ -765,18 +767,23 @@ void AlterCommands::validate(const StorageInMemoryMetadata & metadata, const Con
else
continue;
}
if (renamed_columns.count(column_name))
throw Exception{"Cannot rename and modify the same column " + backQuote(column_name) + " in a single ALTER query",
ErrorCodes::NOT_IMPLEMENTED};
modified_columns.emplace(column_name);
}
else if (command.type == AlterCommand::DROP_COLUMN)
{
if (metadata.columns.has(command.column_name) || metadata.columns.hasNested(command.column_name))
if (all_columns.has(command.column_name) || all_columns.hasNested(command.column_name))
{
for (const ColumnDescription & column : metadata.columns)
for (const ColumnDescription & column : all_columns)
{
const auto & default_expression = column.default_desc.expression;
if (default_expression)
{
ASTPtr query = default_expression->clone();
auto syntax_result = SyntaxAnalyzer(context).analyze(query, metadata.columns.getAll());
auto syntax_result = SyntaxAnalyzer(context).analyze(query, all_columns.getAll());
const auto actions = ExpressionAnalyzer(query, syntax_result, context).getActions(true);
const auto required_columns = actions->getRequiredColumns();
@ -786,6 +793,7 @@ void AlterCommands::validate(const StorageInMemoryMetadata & metadata, const Con
ErrorCodes::ILLEGAL_COLUMN);
}
}
all_columns.remove(command.column_name);
}
else if (!command.if_exists)
throw Exception(
@ -794,7 +802,7 @@ void AlterCommands::validate(const StorageInMemoryMetadata & metadata, const Con
}
else if (command.type == AlterCommand::COMMENT_COLUMN)
{
if (!metadata.columns.has(command.column_name))
if (!all_columns.has(command.column_name))
{
if (!command.if_exists)
throw Exception{"Wrong column name. Cannot find column " + backQuote(command.column_name) + " to comment",
@ -842,6 +850,10 @@ void AlterCommands::validate(const StorageInMemoryMetadata & metadata, const Con
throw Exception{"Cannot rename to " + backQuote(command.rename_to) + ": column with this name already exists",
ErrorCodes::DUPLICATE_COLUMN};
if (modified_columns.count(column_name))
throw Exception{"Cannot rename and modify the same column " + backQuote(column_name) + " in a single ALTER query",
ErrorCodes::NOT_IMPLEMENTED};
String from_nested_table_name = Nested::extractTableName(command.column_name);
String to_nested_table_name = Nested::extractTableName(command.rename_to);
bool from_nested = from_nested_table_name != command.column_name;
@ -855,6 +867,8 @@ void AlterCommands::validate(const StorageInMemoryMetadata & metadata, const Con
else if (!from_nested && !to_nested)
{
all_columns.rename(command.column_name, command.rename_to);
renamed_columns.emplace(command.column_name);
renamed_columns.emplace(command.rename_to);
}
else
{
@ -886,9 +900,9 @@ void AlterCommands::validate(const StorageInMemoryMetadata & metadata, const Con
default_expr_list->children.emplace_back(setAlias(command.default_expression->clone(), tmp_column_name));
}
} /// if we change data type for column with default
else if (metadata.columns.has(column_name) && command.data_type)
else if (all_columns.has(column_name) && command.data_type)
{
auto column_in_table = metadata.columns.get(column_name);
auto column_in_table = all_columns.get(column_name);
/// Column doesn't have a default, nothing to check
if (!column_in_table.default_desc.expression)
continue;
@ -931,7 +945,7 @@ bool AlterCommands::isCommentAlter() const
}
MutationCommands AlterCommands::getMutationCommands(const StorageInMemoryMetadata & metadata) const
MutationCommands AlterCommands::getMutationCommands(StorageInMemoryMetadata metadata) const
{
MutationCommands result;
for (const auto & alter_cmd : *this)

View File

@ -121,7 +121,7 @@ struct AlterCommand
/// If possible, convert alter command to mutation command. In other case
/// return empty optional. Some storages may execute mutations after
/// metadata changes.
std::optional<MutationCommand> tryConvertToMutationCommand(const StorageInMemoryMetadata & metadata) const;
std::optional<MutationCommand> tryConvertToMutationCommand(StorageInMemoryMetadata & metadata) const;
};
/// Return string representation of AlterCommand::Type
@ -162,7 +162,7 @@ public:
/// Return mutation commands which some storages may execute as part of
/// alter. If alter can be performed is pure metadata update, than result is
/// empty.
MutationCommands getMutationCommands(const StorageInMemoryMetadata & metadata) const;
MutationCommands getMutationCommands(StorageInMemoryMetadata metadata) const;
};
}

View File

@ -407,7 +407,7 @@ MergeTreeData::DataPartsVector MergeTreeDataMergerMutator::selectAllPartsFromPar
/// PK columns are sorted and merged, ordinary columns are gathered using info from merge step
static void extractMergingAndGatheringColumns(
const NamesAndTypesList & all_columns,
const NamesAndTypesList & storage_columns,
const ExpressionActionsPtr & sorting_key_expr,
const MergeTreeIndices & indexes,
const MergeTreeData::MergingParams & merging_params,
@ -437,11 +437,11 @@ static void extractMergingAndGatheringColumns(
/// Force to merge at least one column in case of empty key
if (key_columns.empty())
key_columns.emplace(all_columns.front().name);
key_columns.emplace(storage_columns.front().name);
/// TODO: also force "summing" and "aggregating" columns to make Horizontal merge only for such columns
for (const auto & column : all_columns)
for (const auto & column : storage_columns)
{
if (key_columns.count(column.name))
{
@ -600,14 +600,14 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
MergeTreeData::DataPart::ColumnToSize merged_column_to_size;
Names all_column_names = data.getColumns().getNamesOfPhysical();
NamesAndTypesList all_columns = data.getColumns().getAllPhysical();
NamesAndTypesList storage_columns = data.getColumns().getAllPhysical();
const auto data_settings = data.getSettings();
NamesAndTypesList gathering_columns;
NamesAndTypesList merging_columns;
Names gathering_column_names, merging_column_names;
extractMergingAndGatheringColumns(
all_columns, data.sorting_key_expr, data.skip_indices,
storage_columns, data.sorting_key_expr, data.skip_indices,
data.merging_params, gathering_columns, gathering_column_names, merging_columns, merging_column_names);
MergeTreeData::MutableDataPartPtr new_data_part = data.createPart(
@ -617,7 +617,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
disk,
TMP_PREFIX + future_part.name);
new_data_part->setColumns(all_columns);
new_data_part->setColumns(storage_columns);
new_data_part->partition.assign(future_part.getPartition());
new_data_part->is_temp = true;
@ -669,7 +669,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
}
else
{
merging_columns = all_columns;
merging_columns = storage_columns;
merging_column_names = all_column_names;
gathering_columns.clear();
gathering_column_names.clear();
@ -959,7 +959,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
if (merge_alg != MergeAlgorithm::Vertical)
to.writeSuffixAndFinalizePart(new_data_part);
else
to.writeSuffixAndFinalizePart(new_data_part, &all_columns, &checksums_gathered_columns);
to.writeSuffixAndFinalizePart(new_data_part, &storage_columns, &checksums_gathered_columns);
return new_data_part;
}
@ -1013,12 +1013,16 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
const auto data_settings = data.getSettings();
MutationCommands for_interpreter, for_file_renames;
LOG_DEBUG(&Poco::Logger::get("DEBUG"), "COMMANDS FOR PART:" << commands_for_part.size());
LOG_DEBUG(&Poco::Logger::get("DEBUG"), "TOTAL COLUMNS:" << commands.size());
splitMutationCommands(source_part, commands_for_part, for_interpreter, for_file_renames);
LOG_DEBUG(&Poco::Logger::get("DEBUG"), "COMMANDS FOR INTERPRETER:" << for_interpreter.size());
LOG_DEBUG(&Poco::Logger::get("DEBUG"), "COMMANDS FOR FILE RENAMES:" << for_file_renames.size());
UInt64 watch_prev_elapsed = 0;
MergeStageProgress stage_progress(1.0);
NamesAndTypesList all_columns = data.getColumns().getAllPhysical();
NamesAndTypesList storage_columns = data.getColumns().getAllPhysical();
if (!for_interpreter.empty())
{
@ -1026,6 +1030,9 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
in = interpreter->execute(table_lock_holder);
updated_header = interpreter->getUpdatedHeader();
in->setProgressCallback(MergeProgressCallback(merge_entry, watch_prev_elapsed, stage_progress));
LOG_DEBUG(&Poco::Logger::get("DEBUG"), "UPDATED HEADER:" << updated_header.dumpStructure());
LOG_DEBUG(&Poco::Logger::get("DEBUG"), "STREAM HEADER:" << in->getHeader().dumpStructure());
}
auto new_data_part = data.createPart(
@ -1036,7 +1043,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
/// It shouldn't be changed by mutation.
new_data_part->index_granularity_info = source_part->index_granularity_info;
new_data_part->setColumns(getColumnsForNewDataPart(source_part, updated_header, all_columns, for_file_renames));
new_data_part->setColumns(getColumnsForNewDataPart(source_part, updated_header, storage_columns, for_file_renames));
new_data_part->partition.assign(source_part->partition);
auto disk = new_data_part->disk;
@ -1113,7 +1120,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
disk->createHardLink(it->path(), destination);
}
merge_entry->columns_written = all_columns.size() - updated_header.columns();
merge_entry->columns_written = storage_columns.size() - updated_header.columns();
new_data_part->checksums = source_part->checksums;
@ -1258,6 +1265,18 @@ void MergeTreeDataMergerMutator::splitMutationCommands(
NameSet removed_columns_from_compact_part;
NameSet already_changed_columns;
bool is_compact_part = isCompactPart(part);
ColumnsDescription part_columns(part->getColumns());
NameSet modified_columns;
for (const auto & command : commands)
{
if (command.type == MutationCommand::Type::DELETE || command.type == MutationCommand::Type::UPDATE
|| command.type == MutationCommand::Type::MATERIALIZE_INDEX || command.type == MutationCommand::Type::MATERIALIZE_TTL || command.type == MutationCommand::Type::READ_COLUMN)
{
modified_columns.emplace(command.column_name);
}
}
for (const auto & command : commands)
{
if (command.type == MutationCommand::Type::DELETE
@ -1269,28 +1288,30 @@ void MergeTreeDataMergerMutator::splitMutationCommands(
for (const auto & [column_name, expr] : command.column_to_update_expression)
already_changed_columns.emplace(column_name);
}
else if (command.type == MutationCommand::Type::READ_COLUMN)
else if (command.type == MutationCommand::Type::READ_COLUMN && part_columns.has(command.column_name))
{
LOG_DEBUG(&Poco::Logger::get("DEBUG"), "READ COLUMN:" << command.column_name);
/// If we don't have this column in source part, than we don't
/// need to materialize it
if (part->getColumns().contains(command.column_name))
if (part_columns.has(command.column_name))
{
LOG_DEBUG(&Poco::Logger::get("DEBUG"), "PART HAS COLUMN:" << command.column_name);
for_interpreter.push_back(command);
if (!command.column_name.empty())
already_changed_columns.emplace(command.column_name);
already_changed_columns.emplace(command.column_name);
}
else
for_file_renames.push_back(command);
}
else if (is_compact_part && command.type == MutationCommand::Type::DROP_COLUMN)
else if (is_compact_part && command.type == MutationCommand::Type::DROP_COLUMN && part_columns.has(command.column_name))
{
removed_columns_from_compact_part.emplace(command.column_name);
for_file_renames.push_back(command);
part_columns.remove(command.column_name);
}
else if (command.type == MutationCommand::Type::RENAME_COLUMN)
else if (command.type == MutationCommand::Type::RENAME_COLUMN && part_columns.has(command.column_name))
{
if (is_compact_part)
if (is_compact_part || modified_columns.count(command.rename_to))
{
for_interpreter.push_back(
{
@ -1298,11 +1319,15 @@ void MergeTreeDataMergerMutator::splitMutationCommands(
.column_name = command.rename_to,
});
already_changed_columns.emplace(command.column_name);
part_columns.rename(command.column_name, command.rename_to);
}
else
{
part_columns.rename(command.column_name, command.rename_to);
for_file_renames.push_back(command);
}
}
else
else if (part_columns.has(command.column_name))
{
for_file_renames.push_back(command);
}
@ -1429,9 +1454,12 @@ NameSet MergeTreeDataMergerMutator::collectFilesToSkip(
NamesAndTypesList MergeTreeDataMergerMutator::getColumnsForNewDataPart(
MergeTreeData::DataPartPtr source_part,
const Block & updated_header,
NamesAndTypesList all_columns,
NamesAndTypesList storage_columns,
const MutationCommands & commands_for_removes)
{
LOG_DEBUG(&Poco::Logger::get("DEBUG"), "COLUMNS FOr NEW PART START:" << storage_columns.toString());
if (isCompactPart(source_part))
return updated_header.getNamesAndTypesList();
NameSet removed_columns;
NameToNameMap renamed_columns;
for (const auto & command : commands_for_removes)
@ -1439,12 +1467,17 @@ NamesAndTypesList MergeTreeDataMergerMutator::getColumnsForNewDataPart(
if (command.type == MutationCommand::DROP_COLUMN)
removed_columns.insert(command.column_name);
if (command.type == MutationCommand::RENAME_COLUMN)
{
LOG_DEBUG(&Poco::Logger::get("DEBUG"), "RENAME FROM:" << command.column_name << " TO:" << command.rename_to);
renamed_columns.emplace(command.rename_to, command.column_name);
}
}
Names source_column_names = source_part->getColumns().getNames();
NameSet source_columns_name_set(source_column_names.begin(), source_column_names.end());
for (auto it = all_columns.begin(); it != all_columns.end();)
NameSet columns_that_was_renamed;
for (auto it = storage_columns.begin(); it != storage_columns.end();)
{
LOG_DEBUG(&Poco::Logger::get("DEBUG"), "LOOKING AT:" << it->name);
if (updated_header.has(it->name))
{
auto updated_type = updated_header.getByName(it->name).type;
@ -1461,9 +1494,14 @@ NamesAndTypesList MergeTreeDataMergerMutator::getColumnsForNewDataPart(
++it;
}
else
it = all_columns.erase(it);
{
LOG_DEBUG(&Poco::Logger::get("DEBUG"), "ERASE:" << it->name);
it = storage_columns.erase(it);
}
}
return all_columns;
LOG_DEBUG(&Poco::Logger::get("DEBUG"), "COLUMNS FOr NEW PART FINISH:" << storage_columns.toString());
return storage_columns;
}
MergeTreeIndices MergeTreeDataMergerMutator::getIndicesForNewDataPart(

View File

@ -153,11 +153,11 @@ private:
/// Because we will generate new versions of them after we perform mutation.
static NameSet collectFilesToSkip(const Block & updated_header, const std::set<MergeTreeIndexPtr> & indices_to_recalc, const String & mrk_extension);
/// Get the columns list of the resulting part in the same order as all_columns.
/// Get the columns list of the resulting part in the same order as storage_columns.
static NamesAndTypesList getColumnsForNewDataPart(
MergeTreeData::DataPartPtr source_part,
const Block & updated_header,
NamesAndTypesList all_columns,
NamesAndTypesList storage_columns,
const MutationCommands & commands_for_removes);
/// Get skip indcies, that should exists in the resulting data part.

View File

@ -0,0 +1,24 @@
CREATE TABLE default.rename_table_multiple\n(\n `key` Int32, \n `value1_string` String, \n `value2` Int32\n)\nENGINE = MergeTree\nORDER BY tuple()\nSETTINGS index_granularity = 8192
key value1_string value2
1 2 3
CREATE TABLE default.rename_table_multiple\n(\n `key` Int32, \n `value1_string` String, \n `value2_old` Int32, \n `value2` Int64 DEFAULT 7\n)\nENGINE = MergeTree\nORDER BY tuple()\nSETTINGS index_granularity = 8192
key value1_string value2_old value2
1 2 3 7
4 5 6 7
CREATE TABLE default.rename_table_multiple\n(\n `key` Int32, \n `value1_string` String, \n `value2_old` Int64 DEFAULT 7\n)\nENGINE = MergeTree\nORDER BY tuple()\nSETTINGS index_granularity = 8192
key value1_string value2_old
1 2 7
4 5 7
7 8 10
CREATE TABLE default.rename_table_multiple_compact\n(\n `key` Int32, \n `value1_string` String, \n `value2` Int32\n)\nENGINE = MergeTree\nORDER BY tuple()\nSETTINGS min_rows_for_wide_part = 100000, index_granularity = 8192
key value1_string value2
1 2 3
CREATE TABLE default.rename_table_multiple_compact\n(\n `key` Int32, \n `value1_string` String, \n `value2_old` Int32, \n `value2` Int64 DEFAULT 7\n)\nENGINE = MergeTree\nORDER BY tuple()\nSETTINGS min_rows_for_wide_part = 100000, index_granularity = 8192
key value1_string value2_old value2
1 2 3 7
4 5 6 7
CREATE TABLE default.rename_table_multiple_compact\n(\n `key` Int32, \n `value1_string` String, \n `value2_old` Int64 DEFAULT 7\n)\nENGINE = MergeTree\nORDER BY tuple()\nSETTINGS min_rows_for_wide_part = 100000, index_granularity = 8192
key value1_string value2_old
1 2 7
4 5 7
7 8 10

View File

@ -0,0 +1,67 @@
DROP TABLE IF EXISTS rename_table_multiple;
CREATE TABLE rename_table_multiple (key Int32, value1 String, value2 Int32) ENGINE = MergeTree ORDER BY tuple();
INSERT INTO rename_table_multiple VALUES (1, 2, 3);
ALTER TABLE rename_table_multiple RENAME COLUMN value1 TO value1_string, MODIFY COLUMN value1_string String; --{serverError 48}
ALTER TABLE rename_table_multiple MODIFY COLUMN value1 String, RENAME COLUMN value1 to value1_string; --{serverError 48}
ALTER TABLE rename_table_multiple RENAME COLUMN value1 TO value1_string;
ALTER TABLE rename_table_multiple MODIFY COLUMN value1_string String;
SHOW CREATE TABLE rename_table_multiple;
SELECT * FROM rename_table_multiple FORMAT TSVWithNames;
INSERT INTO rename_table_multiple VALUES (4, '5', 6);
ALTER TABLE rename_table_multiple RENAME COLUMN value2 TO value2_old, ADD COLUMN value2 Int64 DEFAULT 7;
SHOW CREATE TABLE rename_table_multiple;
SELECT * FROM rename_table_multiple ORDER BY key FORMAT TSVWithNames;
INSERT INTO rename_table_multiple VALUES (7, '8', 9, 10);
ALTER TABLE rename_table_multiple DROP COLUMN value2_old, RENAME COLUMN value2 TO value2_old;
SHOW CREATE TABLE rename_table_multiple;
SELECT * FROM rename_table_multiple ORDER BY key FORMAT TSVWithNames;
DROP TABLE IF EXISTS rename_table_multiple;
DROP TABLE IF EXISTS rename_table_multiple_compact;
CREATE TABLE rename_table_multiple_compact (key Int32, value1 String, value2 Int32) ENGINE = MergeTree ORDER BY tuple() SETTINGS min_rows_for_wide_part = 100000;
INSERT INTO rename_table_multiple_compact VALUES (1, 2, 3);
ALTER TABLE rename_table_multiple_compact RENAME COLUMN value1 TO value1_string, MODIFY COLUMN value1_string String; --{serverError 48}
ALTER TABLE rename_table_multiple_compact MODIFY COLUMN value1 String, RENAME COLUMN value1 to value1_string; --{serverError 48}
ALTER TABLE rename_table_multiple_compact RENAME COLUMN value1 TO value1_string;
ALTER TABLE rename_table_multiple_compact MODIFY COLUMN value1_string String;
SHOW CREATE TABLE rename_table_multiple_compact;
SELECT * FROM rename_table_multiple_compact FORMAT TSVWithNames;
INSERT INTO rename_table_multiple_compact VALUES (4, '5', 6);
ALTER TABLE rename_table_multiple_compact RENAME COLUMN value2 TO value2_old, ADD COLUMN value2 Int64 DEFAULT 7;
SHOW CREATE TABLE rename_table_multiple_compact;
SELECT * FROM rename_table_multiple_compact ORDER BY key FORMAT TSVWithNames;
INSERT INTO rename_table_multiple_compact VALUES (7, '8', 9, 10);
ALTER TABLE rename_table_multiple_compact DROP COLUMN value2_old, RENAME COLUMN value2 TO value2_old;
SHOW CREATE TABLE rename_table_multiple_compact;
SELECT * FROM rename_table_multiple_compact ORDER BY key FORMAT TSVWithNames;
DROP TABLE IF EXISTS rename_table_multiple_compact;