More correct separation between metadata and data alters for non replicated MergeTree.

This commit is contained in:
alesapin 2019-12-23 19:44:50 +03:00
parent cfeaea2627
commit 25ecc38615
6 changed files with 86 additions and 76 deletions

View File

@ -109,7 +109,7 @@ std::optional<AlterCommand> AlterCommand::parse(const ASTAlterCommand * command_
if (ast_col_decl.comment)
{
const auto & ast_comment = ast_col_decl.comment->as<ASTLiteral &>();
command.comment = ast_comment.value.get<String>();
command.comment.emplace(ast_comment.value.get<String>());
}
if (ast_col_decl.ttl)
@ -225,7 +225,7 @@ void AlterCommand::apply(ColumnsDescription & columns_description, IndicesDescri
column.default_desc.kind = default_kind;
column.default_desc.expression = default_expression;
}
column.comment = comment;
column.comment = *comment;
column.codec = codec;
column.ttl = ttl;
@ -251,11 +251,8 @@ void AlterCommand::apply(ColumnsDescription & columns_description, IndicesDescri
column.codec = codec;
}
if (!isMutable())
{
column.comment = comment;
return;
}
if (comment)
column.comment = *comment;
if (ttl)
column.ttl = ttl;
@ -279,7 +276,7 @@ void AlterCommand::apply(ColumnsDescription & columns_description, IndicesDescri
}
else if (type == COMMENT_COLUMN)
{
columns_description.modify(column_name, [&](ColumnDescription & column) { column.comment = comment; });
columns_description.modify(column_name, [&](ColumnDescription & column) { column.comment = *comment; });
}
else if (type == ADD_INDEX)
{
@ -390,13 +387,15 @@ void AlterCommand::apply(ColumnsDescription & columns_description, IndicesDescri
throw Exception("Wrong parameter type in ALTER query", ErrorCodes::LOGICAL_ERROR);
}
bool AlterCommand::isMutable() const
bool AlterCommand::isModifyingData() const
{
if (type == COMMENT_COLUMN || type == MODIFY_SETTING)
return false;
/// Change binary representation on disk
if (type == MODIFY_COLUMN)
return data_type.get() || default_expression;
return true;
return type == ADD_COLUMN /// We need to change columns.txt in each part
|| type == DROP_COLUMN /// We need to change columns.txt in each part
|| type == DROP_INDEX; /// We need to remove file from filesystem
}
bool AlterCommand::isSettingsAlter() const
@ -666,11 +665,11 @@ void AlterCommands::applyForSettingsOnly(SettingsChanges & changes) const
changes = std::move(out_changes);
}
bool AlterCommands::isMutable() const
bool AlterCommands::isModifyingData() const
{
for (const auto & param : *this)
{
if (param.isMutable())
if (param.isModifyingData())
return true;
}

View File

@ -47,7 +47,7 @@ struct AlterCommand
ColumnDefaultKind default_kind{};
ASTPtr default_expression{};
String comment;
std::optional<String> comment;
/// For ADD - after which column to add a new one. If an empty string, add to the end. To add to the beginning now it is impossible.
String after_column;
@ -102,8 +102,11 @@ struct AlterCommand
ConstraintsDescription & constraints_description, ASTPtr & order_by_ast,
ASTPtr & primary_key_ast, ASTPtr & ttl_table_ast, SettingsChanges & changes) const;
/// Checks that not only metadata touched by that command
bool isMutable() const;
/// Checks that alter query changes data. For MergeTree:
/// * column files (data and marks)
/// * each part meta (columns.txt)
/// in each part on disk (it's not lightweight alter).
bool isModifyingData() const;
/// checks that only settings changed by alter
bool isSettingsAlter() const;
@ -124,7 +127,7 @@ public:
void applyForSettingsOnly(SettingsChanges & changes) const;
void validate(const IStorage & table, const Context & context);
bool isMutable() const;
bool isModifyingData() const;
bool isSettingsAlter() const;
};

View File

@ -402,7 +402,7 @@ void IStorage::alter(
const Context & context,
TableStructureWriteLockHolder & table_lock_holder)
{
if (params.isMutable())
if (params.isModifyingData())
throw Exception("Method alter supports only change comment of column for storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
const String database_name = getDatabaseName();

View File

@ -1386,7 +1386,7 @@ void MergeTreeData::checkAlter(const AlterCommands & commands, const Context & c
for (const AlterCommand & command : commands)
{
if (!command.isMutable())
if (!command.isModifyingData())
{
continue;
}
@ -1433,9 +1433,9 @@ void MergeTreeData::checkAlter(const AlterCommands & commands, const Context & c
getIndices().indices, new_indices.indices, unused_expression, unused_map, unused_bool);
}
void MergeTreeData::createConvertExpression(const DataPartPtr & part, const NamesAndTypesList & old_columns, const NamesAndTypesList & new_columns,
const IndicesASTs & old_indices, const IndicesASTs & new_indices, ExpressionActionsPtr & out_expression,
NameToNameMap & out_rename_map, bool & out_force_update_metadata) const
void MergeTreeData::createConvertExpression(const DataPartPtr & part, const NamesAndTypesList & old_columns,
const NamesAndTypesList & new_columns, const IndicesASTs & old_indices, const IndicesASTs & new_indices,
ExpressionActionsPtr & out_expression, NameToNameMap & out_rename_map, bool & out_force_update_metadata) const
{
const auto settings = getSettings();
out_expression = nullptr;
@ -1457,7 +1457,7 @@ void MergeTreeData::createConvertExpression(const DataPartPtr & part, const Name
/// Remove old indices
std::set<String> new_indices_set;
std::unordered_set<String> new_indices_set;
for (const auto & index_decl : new_indices)
new_indices_set.emplace(index_decl->as<ASTIndexDeclaration &>().name);
for (const auto & index_decl : old_indices)
@ -1465,8 +1465,8 @@ void MergeTreeData::createConvertExpression(const DataPartPtr & part, const Name
const auto & index = index_decl->as<ASTIndexDeclaration &>();
if (!new_indices_set.count(index.name))
{
out_rename_map["skp_idx_" + index.name + ".idx"] = "";
out_rename_map["skp_idx_" + index.name + part_mrk_file_extension] = "";
out_rename_map["skp_idx_" + index.name + ".idx"] = ""; /// drop this file
out_rename_map["skp_idx_" + index.name + part_mrk_file_extension] = ""; /// and this one
}
}
@ -1494,8 +1494,8 @@ void MergeTreeData::createConvertExpression(const DataPartPtr & part, const Name
/// Delete files if they are no longer shared with another column.
if (--stream_counts[file_name] == 0)
{
out_rename_map[file_name + ".bin"] = "";
out_rename_map[file_name + part_mrk_file_extension] = "";
out_rename_map[file_name + ".bin"] = ""; /// drop this file
out_rename_map[file_name + part_mrk_file_extension] = ""; /// and this one
}
}, {});
}
@ -1847,7 +1847,7 @@ void MergeTreeData::AlterDataPartTransaction::commit()
mutable_part.checksums = new_checksums;
mutable_part.columns = new_columns;
/// 3) Delete the old files.
/// 3) Delete the old files and drop required columns (DROP COLUMN)
for (const auto & from_to : rename_map)
{
String name = from_to.second.empty() ? from_to.first : from_to.second;

View File

@ -249,34 +249,10 @@ void StorageMergeTree::alter(
const String current_database_name = getDatabaseName();
const String current_table_name = getTableName();
if (!params.isMutable())
{
lockStructureExclusively(table_lock_holder, context.getCurrentQueryId());
auto new_columns = getColumns();
auto new_indices = getIndices();
auto new_constraints = getConstraints();
ASTPtr new_order_by_ast = order_by_ast;
ASTPtr new_primary_key_ast = primary_key_ast;
ASTPtr new_ttl_table_ast = ttl_table_ast;
SettingsChanges new_changes;
params.apply(new_columns, new_indices, new_constraints, new_order_by_ast, new_primary_key_ast, new_ttl_table_ast, new_changes);
changeSettings(new_changes, table_lock_holder);
IDatabase::ASTModifier settings_modifier = getSettingsModifier(new_changes);
context.getDatabase(current_database_name)->alterTable(context, current_table_name, new_columns, new_indices, new_constraints, settings_modifier);
setColumns(std::move(new_columns));
return;
}
/// NOTE: Here, as in ReplicatedMergeTree, you can do ALTER which does not block the writing of data for a long time.
/// Also block moves, because they can replace part with old state
auto merge_blocker = merger_mutator.merges_blocker.cancel();
auto moves_blocked = parts_mover.moves_blocker.cancel();
lockNewDataStructureExclusively(table_lock_holder, context.getCurrentQueryId());
checkAlter(params, context);
auto new_columns = getColumns();
auto new_indices = getIndices();
auto new_constraints = getConstraints();
@ -284,13 +260,11 @@ void StorageMergeTree::alter(
ASTPtr new_primary_key_ast = primary_key_ast;
ASTPtr new_ttl_table_ast = ttl_table_ast;
SettingsChanges new_changes;
params.apply(new_columns, new_indices, new_constraints, new_order_by_ast, new_primary_key_ast, new_ttl_table_ast, new_changes);
auto transactions = prepareAlterTransactions(new_columns, new_indices, context);
lockStructureExclusively(table_lock_holder, context.getCurrentQueryId());
IDatabase::ASTModifier storage_modifier = [&] (IAST & ast)
/// Modifier for storage AST in /metadata/storage_db/storage.sql
IDatabase::ASTModifier storage_modifier = [&](IAST & ast)
{
auto & storage_ast = ast.as<ASTStorage &>();
@ -310,24 +284,54 @@ void StorageMergeTree::alter(
}
};
changeSettings(new_changes, table_lock_holder);
context.getDatabase(current_database_name)->alterTable(context, current_table_name, new_columns, new_indices, new_constraints, storage_modifier);
/// Reinitialize primary key because primary key column types might have changed.
setProperties(new_order_by_ast, new_primary_key_ast, new_columns, new_indices, new_constraints);
setTTLExpressions(new_columns.getColumnTTLs(), new_ttl_table_ast);
for (auto & transaction : transactions)
/// Update metdata in memory
auto update_metadata = [&]()
{
transaction->commit();
transaction.reset();
}
changeSettings(new_changes, table_lock_holder);
/// Reinitialize primary key because primary key column types might have changed.
setProperties(new_order_by_ast, new_primary_key_ast, new_columns, new_indices, new_constraints);
/// Columns sizes could be changed
recalculateColumnSizes();
setTTLExpressions(new_columns.getColumnTTLs(), new_ttl_table_ast);
};
/// This alter can be performed at metadata level only
if (!params.isModifyingData())
{
lockStructureExclusively(table_lock_holder, context.getCurrentQueryId());
params.apply(new_columns, new_indices, new_constraints, new_order_by_ast, new_primary_key_ast, new_ttl_table_ast, new_changes);
IDatabase::ASTModifier settings_modifier = getSettingsModifier(new_changes);
context.getDatabase(current_database_name)->alterTable(context, current_table_name, new_columns, new_indices, new_constraints, storage_modifier);
update_metadata();
}
else
{
/// NOTE: Here, as in ReplicatedMergeTree, you can do ALTER which does not block the writing of data for a long time.
/// Also block moves, because they can replace part with old state
auto merge_blocker = merger_mutator.merges_blocker.cancel();
auto moves_blocked = parts_mover.moves_blocker.cancel();
auto transactions = prepareAlterTransactions(new_columns, new_indices, context);
lockStructureExclusively(table_lock_holder, context.getCurrentQueryId());
context.getDatabase(current_database_name)->alterTable(context, current_table_name, new_columns, new_indices, new_constraints, storage_modifier);
update_metadata();
for (auto & transaction : transactions)
{
transaction->commit();
transaction.reset();
}
/// Columns sizes could be changed
recalculateColumnSizes();
}
}

View File

@ -3158,6 +3158,10 @@ void StorageReplicatedMergeTree::alter(
const String current_database_name = getDatabaseName();
const String current_table_name = getTableName();
/// We cannot check this alter commands with method isModifyingData()
/// because ReplicatedMergeTree stores both columns and metadata for
/// each replica. So we have to wait AlterThread even with lightweight
/// metadata alter.
if (params.isSettingsAlter())
{
/// We don't replicate storage_settings_ptr ALTER. It's local operation.