Merge pull request #8361 from ClickHouse/immutable_parts

More correct alters for non replicated MergeTree
This commit is contained in:
alesapin 2019-12-25 22:08:20 +03:00 committed by GitHub
commit fe6a63a12a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 133 additions and 134 deletions

View File

@ -339,6 +339,8 @@ ASTPtr MutationsInterpreter::prepare(bool dry_run)
affected_materialized.emplace(mat_column);
}
/// Just to be sure, that we don't change type
/// after update expression execution.
const auto & update_expr = kv.second;
auto updated_column = makeASTFunction("CAST",
makeASTFunction("if",

View File

@ -65,8 +65,9 @@ private:
/// Each stage has output_columns that contain columns that are changed at the end of that stage
/// plus columns needed for the next mutations.
///
/// First stage is special: it can contain only DELETEs and is executed using InterpreterSelectQuery
/// to take advantage of table indexes (if there are any).
/// First stage is special: it can contain only filters and is executed using InterpreterSelectQuery
/// to take advantage of table indexes (if there are any). It's necessary because all mutations have
/// `WHERE clause` part.
struct Stage
{
@ -83,7 +84,7 @@ private:
/// A chain of actions needed to execute this stage.
/// First steps calculate filter columns for DELETEs (in the same order as in `filter_column_names`),
/// then there is (possibly) an UPDATE stage, and finally a projection stage.
/// then there is (possibly) an UPDATE step, and finally a projection step.
ExpressionActionsChain expressions_chain;
Names filter_column_names;
};

View File

@ -109,7 +109,7 @@ std::optional<AlterCommand> AlterCommand::parse(const ASTAlterCommand * command_
if (ast_col_decl.comment)
{
const auto & ast_comment = ast_col_decl.comment->as<ASTLiteral &>();
command.comment = ast_comment.value.get<String>();
command.comment.emplace(ast_comment.value.get<String>());
}
if (ast_col_decl.ttl)
@ -225,7 +225,9 @@ void AlterCommand::apply(ColumnsDescription & columns_description, IndicesDescri
column.default_desc.kind = default_kind;
column.default_desc.expression = default_expression;
}
column.comment = comment;
if (comment)
column.comment = *comment;
column.codec = codec;
column.ttl = ttl;
@ -251,19 +253,22 @@ void AlterCommand::apply(ColumnsDescription & columns_description, IndicesDescri
column.codec = codec;
}
if (!isMutable())
{
column.comment = comment;
return;
}
if (comment)
column.comment = *comment;
if (ttl)
column.ttl = ttl;
column.type = data_type;
if (data_type)
column.type = data_type;
column.default_desc.kind = default_kind;
column.default_desc.expression = default_expression;
/// User specified default expression or changed
/// datatype. We have to replace default.
if (default_expression || data_type)
{
column.default_desc.kind = default_kind;
column.default_desc.expression = default_expression;
}
});
}
else if (type == MODIFY_ORDER_BY)
@ -279,7 +284,7 @@ void AlterCommand::apply(ColumnsDescription & columns_description, IndicesDescri
}
else if (type == COMMENT_COLUMN)
{
columns_description.modify(column_name, [&](ColumnDescription & column) { column.comment = comment; });
columns_description.modify(column_name, [&](ColumnDescription & column) { column.comment = *comment; });
}
else if (type == ADD_INDEX)
{
@ -390,13 +395,15 @@ void AlterCommand::apply(ColumnsDescription & columns_description, IndicesDescri
throw Exception("Wrong parameter type in ALTER query", ErrorCodes::LOGICAL_ERROR);
}
bool AlterCommand::isMutable() const
bool AlterCommand::isModifyingData() const
{
if (type == COMMENT_COLUMN || type == MODIFY_SETTING)
return false;
/// Possible change data representation on disk
if (type == MODIFY_COLUMN)
return data_type.get() || default_expression;
return true;
return data_type != nullptr;
return type == ADD_COLUMN /// We need to change columns.txt in each part for MergeTree
|| type == DROP_COLUMN /// We need to change columns.txt in each part for MergeTree
|| type == DROP_INDEX; /// We need to remove file from filesystem for MergeTree
}
bool AlterCommand::isSettingsAlter() const
@ -666,11 +673,11 @@ void AlterCommands::applyForSettingsOnly(SettingsChanges & changes) const
changes = std::move(out_changes);
}
bool AlterCommands::isMutable() const
bool AlterCommands::isModifyingData() const
{
for (const auto & param : *this)
{
if (param.isMutable())
if (param.isModifyingData())
return true;
}

View File

@ -47,7 +47,7 @@ struct AlterCommand
ColumnDefaultKind default_kind{};
ASTPtr default_expression{};
String comment;
std::optional<String> comment;
/// For ADD - after which column to add a new one. If an empty string, add to the end. To add to the beginning now it is impossible.
String after_column;
@ -102,8 +102,11 @@ struct AlterCommand
ConstraintsDescription & constraints_description, ASTPtr & order_by_ast,
ASTPtr & primary_key_ast, ASTPtr & ttl_table_ast, SettingsChanges & changes) const;
/// Checks that not only metadata touched by that command
bool isMutable() const;
/// Checks that alter query changes data. For MergeTree:
/// * column files (data and marks)
/// * each part meta (columns.txt)
/// in each part on disk (it's not lightweight alter).
bool isModifyingData() const;
/// checks that only settings changed by alter
bool isSettingsAlter() const;
@ -124,7 +127,7 @@ public:
void applyForSettingsOnly(SettingsChanges & changes) const;
void validate(const IStorage & table, const Context & context);
bool isMutable() const;
bool isModifyingData() const;
bool isSettingsAlter() const;
};

View File

@ -402,7 +402,7 @@ void IStorage::alter(
const Context & context,
TableStructureWriteLockHolder & table_lock_holder)
{
if (params.isMutable())
if (params.isModifyingData())
throw Exception("Method alter supports only change comment of column for storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
const String database_name = getDatabaseName();

View File

@ -1342,11 +1342,11 @@ void MergeTreeData::checkAlter(const AlterCommands & commands, const Context & c
"before using data skipping indices.", ErrorCodes::BAD_ARGUMENTS);
/// Set of columns that shouldn't be altered.
NameSet columns_alter_forbidden;
NameSet columns_alter_type_forbidden;
/// Primary key columns can be ALTERed only if they are used in the key as-is
/// (and not as a part of some expression) and if the ALTER only affects column metadata.
NameSet columns_alter_metadata_only;
NameSet columns_alter_type_metadata_only;
if (partition_key_expr)
{
@ -1354,13 +1354,13 @@ void MergeTreeData::checkAlter(const AlterCommands & commands, const Context & c
/// TODO: in some cases (e.g. adding an Enum value) a partition key column can still be ALTERed.
/// We should allow it.
for (const String & col : partition_key_expr->getRequiredColumns())
columns_alter_forbidden.insert(col);
columns_alter_type_forbidden.insert(col);
}
for (const auto & index : skip_indices)
{
for (const String & col : index->expr->getRequiredColumns())
columns_alter_forbidden.insert(col);
columns_alter_type_forbidden.insert(col);
}
if (sorting_key_expr)
@ -1368,17 +1368,16 @@ void MergeTreeData::checkAlter(const AlterCommands & commands, const Context & c
for (const ExpressionAction & action : sorting_key_expr->getActions())
{
auto action_columns = action.getNeededColumns();
columns_alter_forbidden.insert(action_columns.begin(), action_columns.end());
columns_alter_type_forbidden.insert(action_columns.begin(), action_columns.end());
}
for (const String & col : sorting_key_expr->getRequiredColumns())
columns_alter_metadata_only.insert(col);
columns_alter_type_metadata_only.insert(col);
/// We don't process sample_by_ast separately because it must be among the primary key columns
/// and we don't process primary_key_expr separately because it is a prefix of sorting_key_expr.
}
if (!merging_params.sign_column.empty())
columns_alter_forbidden.insert(merging_params.sign_column);
columns_alter_type_forbidden.insert(merging_params.sign_column);
std::map<String, const IDataType *> old_types;
for (const auto & column : getColumns().getAllPhysical())
@ -1386,34 +1385,26 @@ void MergeTreeData::checkAlter(const AlterCommands & commands, const Context & c
for (const AlterCommand & command : commands)
{
if (!command.isMutable())
if (command.type == AlterCommand::MODIFY_ORDER_BY && !is_custom_partitioned)
{
continue;
}
if (columns_alter_forbidden.count(command.column_name))
throw Exception("Trying to ALTER key column " + command.column_name, ErrorCodes::ILLEGAL_COLUMN);
if (columns_alter_metadata_only.count(command.column_name))
{
if (command.type == AlterCommand::MODIFY_COLUMN)
{
auto it = old_types.find(command.column_name);
if (it != old_types.end() && isMetadataOnlyConversion(it->second, command.data_type.get()))
continue;
}
throw Exception(
"ALTER of key column " + command.column_name + " must be metadata-only",
ErrorCodes::ILLEGAL_COLUMN);
"ALTER MODIFY ORDER BY is not supported for default-partitioned tables created with the old syntax",
ErrorCodes::BAD_ARGUMENTS);
}
if (command.type == AlterCommand::MODIFY_ORDER_BY)
else if (command.isModifyingData())
{
if (!is_custom_partitioned)
throw Exception(
"ALTER MODIFY ORDER BY is not supported for default-partitioned tables created with the old syntax",
ErrorCodes::BAD_ARGUMENTS);
if (columns_alter_type_forbidden.count(command.column_name))
throw Exception("Trying to ALTER key column " + command.column_name, ErrorCodes::ILLEGAL_COLUMN);
if (columns_alter_type_metadata_only.count(command.column_name))
{
if (command.type == AlterCommand::MODIFY_COLUMN)
{
auto it = old_types.find(command.column_name);
if (it == old_types.end() || !isMetadataOnlyConversion(it->second, command.data_type.get()))
throw Exception("ALTER of key column " + command.column_name + " must be metadata-only", ErrorCodes::ILLEGAL_COLUMN);
}
}
}
}
@ -1425,17 +1416,20 @@ void MergeTreeData::checkAlter(const AlterCommands & commands, const Context & c
for (const auto & setting : new_changes)
checkSettingCanBeChanged(setting.name);
/// Check that type conversions are possible.
ExpressionActionsPtr unused_expression;
NameToNameMap unused_map;
bool unused_bool;
createConvertExpression(nullptr, getColumns().getAllPhysical(), new_columns.getAllPhysical(),
getIndices().indices, new_indices.indices, unused_expression, unused_map, unused_bool);
if (commands.isModifyingData())
{
/// Check that type conversions are possible.
ExpressionActionsPtr unused_expression;
NameToNameMap unused_map;
bool unused_bool;
createConvertExpression(nullptr, getColumns().getAllPhysical(), new_columns.getAllPhysical(),
getIndices().indices, new_indices.indices, unused_expression, unused_map, unused_bool);
}
}
void MergeTreeData::createConvertExpression(const DataPartPtr & part, const NamesAndTypesList & old_columns, const NamesAndTypesList & new_columns,
const IndicesASTs & old_indices, const IndicesASTs & new_indices, ExpressionActionsPtr & out_expression,
NameToNameMap & out_rename_map, bool & out_force_update_metadata) const
void MergeTreeData::createConvertExpression(const DataPartPtr & part, const NamesAndTypesList & old_columns,
const NamesAndTypesList & new_columns, const IndicesASTs & old_indices, const IndicesASTs & new_indices,
ExpressionActionsPtr & out_expression, NameToNameMap & out_rename_map, bool & out_force_update_metadata) const
{
const auto settings = getSettings();
out_expression = nullptr;
@ -1457,7 +1451,7 @@ void MergeTreeData::createConvertExpression(const DataPartPtr & part, const Name
/// Remove old indices
std::set<String> new_indices_set;
std::unordered_set<String> new_indices_set;
for (const auto & index_decl : new_indices)
new_indices_set.emplace(index_decl->as<ASTIndexDeclaration &>().name);
for (const auto & index_decl : old_indices)
@ -1465,8 +1459,8 @@ void MergeTreeData::createConvertExpression(const DataPartPtr & part, const Name
const auto & index = index_decl->as<ASTIndexDeclaration &>();
if (!new_indices_set.count(index.name))
{
out_rename_map["skp_idx_" + index.name + ".idx"] = "";
out_rename_map["skp_idx_" + index.name + part_mrk_file_extension] = "";
out_rename_map["skp_idx_" + index.name + ".idx"] = ""; /// drop this file
out_rename_map["skp_idx_" + index.name + part_mrk_file_extension] = ""; /// and this one
}
}
@ -1494,8 +1488,8 @@ void MergeTreeData::createConvertExpression(const DataPartPtr & part, const Name
/// Delete files if they are no longer shared with another column.
if (--stream_counts[file_name] == 0)
{
out_rename_map[file_name + ".bin"] = "";
out_rename_map[file_name + part_mrk_file_extension] = "";
out_rename_map[file_name + ".bin"] = ""; /// drop this file
out_rename_map[file_name + part_mrk_file_extension] = ""; /// and this one
}
}, {});
}
@ -1847,7 +1841,7 @@ void MergeTreeData::AlterDataPartTransaction::commit()
mutable_part.checksums = new_checksums;
mutable_part.columns = new_columns;
/// 3) Delete the old files.
/// 3) Delete the old files and drop required columns (DROP COLUMN)
for (const auto & from_to : rename_map)
{
String name = from_to.second.empty() ? from_to.first : from_to.second;

View File

@ -234,7 +234,7 @@ public:
const NamesAndTypesList & getNewColumns() const { return new_columns; }
const DataPart::Checksums & getNewChecksums() const { return new_checksums; }
AlterDataPartTransaction(DataPartPtr data_part_) : data_part(data_part_), alter_lock(data_part->alter_mutex) {}
AlterDataPartTransaction(DataPartPtr data_part_) : data_part(data_part_) {}
const DataPartPtr & getDataPart() const { return data_part; }
bool isValid() const;
@ -244,9 +244,7 @@ public:
bool valid = true;
//don't interchange order of data_part & alter_lock
DataPartPtr data_part;
DataPartsLock alter_lock;
DataPart::Checksums new_checksums;
NamesAndTypesList new_columns;

View File

@ -227,16 +227,6 @@ struct MergeTreeDataPart
*/
mutable std::shared_mutex columns_lock;
/** It is taken for the whole time ALTER a part: from the beginning of the recording of the temporary files to their renaming to permanent.
* It is taken with unlocked `columns_lock`.
*
* NOTE: "You can" do without this mutex if you could turn ReadRWLock into WriteRWLock without removing the lock.
* This transformation is impossible, because it would create a deadlock, if you do it from two threads at once.
* Taking this mutex means that we want to lock columns_lock on read with intention then, not
* unblocking, block it for writing.
*/
mutable std::mutex alter_mutex;
MergeTreeIndexGranularityInfo index_granularity_info;
~MergeTreeDataPart();

View File

@ -249,34 +249,10 @@ void StorageMergeTree::alter(
const String current_database_name = getDatabaseName();
const String current_table_name = getTableName();
if (!params.isMutable())
{
lockStructureExclusively(table_lock_holder, context.getCurrentQueryId());
auto new_columns = getColumns();
auto new_indices = getIndices();
auto new_constraints = getConstraints();
ASTPtr new_order_by_ast = order_by_ast;
ASTPtr new_primary_key_ast = primary_key_ast;
ASTPtr new_ttl_table_ast = ttl_table_ast;
SettingsChanges new_changes;
params.apply(new_columns, new_indices, new_constraints, new_order_by_ast, new_primary_key_ast, new_ttl_table_ast, new_changes);
changeSettings(new_changes, table_lock_holder);
IDatabase::ASTModifier settings_modifier = getSettingsModifier(new_changes);
context.getDatabase(current_database_name)->alterTable(context, current_table_name, new_columns, new_indices, new_constraints, settings_modifier);
setColumns(std::move(new_columns));
return;
}
/// NOTE: Here, as in ReplicatedMergeTree, you can do ALTER which does not block the writing of data for a long time.
/// Also block moves, because they can replace part with old state
auto merge_blocker = merger_mutator.merges_blocker.cancel();
auto moves_blocked = parts_mover.moves_blocker.cancel();
lockNewDataStructureExclusively(table_lock_holder, context.getCurrentQueryId());
checkAlter(params, context);
auto new_columns = getColumns();
auto new_indices = getIndices();
auto new_constraints = getConstraints();
@ -284,13 +260,11 @@ void StorageMergeTree::alter(
ASTPtr new_primary_key_ast = primary_key_ast;
ASTPtr new_ttl_table_ast = ttl_table_ast;
SettingsChanges new_changes;
params.apply(new_columns, new_indices, new_constraints, new_order_by_ast, new_primary_key_ast, new_ttl_table_ast, new_changes);
auto transactions = prepareAlterTransactions(new_columns, new_indices, context);
lockStructureExclusively(table_lock_holder, context.getCurrentQueryId());
IDatabase::ASTModifier storage_modifier = [&] (IAST & ast)
/// Modifier for storage AST in /metadata/storage_db/storage.sql
IDatabase::ASTModifier storage_modifier = [&](IAST & ast)
{
auto & storage_ast = ast.as<ASTStorage &>();
@ -310,24 +284,51 @@ void StorageMergeTree::alter(
}
};
changeSettings(new_changes, table_lock_holder);
context.getDatabase(current_database_name)->alterTable(context, current_table_name, new_columns, new_indices, new_constraints, storage_modifier);
/// Reinitialize primary key because primary key column types might have changed.
setProperties(new_order_by_ast, new_primary_key_ast, new_columns, new_indices, new_constraints);
setTTLExpressions(new_columns.getColumnTTLs(), new_ttl_table_ast);
for (auto & transaction : transactions)
/// Update metdata in memory
auto update_metadata = [&]()
{
transaction->commit();
transaction.reset();
}
changeSettings(new_changes, table_lock_holder);
/// Reinitialize primary key because primary key column types might have changed.
setProperties(new_order_by_ast, new_primary_key_ast, new_columns, new_indices, new_constraints);
/// Columns sizes could be changed
recalculateColumnSizes();
setTTLExpressions(new_columns.getColumnTTLs(), new_ttl_table_ast);
};
/// This alter can be performed at metadata level only
if (!params.isModifyingData())
{
lockStructureExclusively(table_lock_holder, context.getCurrentQueryId());
context.getDatabase(current_database_name)->alterTable(context, current_table_name, new_columns, new_indices, new_constraints, storage_modifier);
update_metadata();
}
else
{
/// NOTE: Here, as in ReplicatedMergeTree, you can do ALTER which does not block the writing of data for a long time.
/// Also block moves, because they can replace part with old state
auto merge_blocker = merger_mutator.merges_blocker.cancel();
auto moves_blocked = parts_mover.moves_blocker.cancel();
auto transactions = prepareAlterTransactions(new_columns, new_indices, context);
lockStructureExclusively(table_lock_holder, context.getCurrentQueryId());
context.getDatabase(current_database_name)->alterTable(context, current_table_name, new_columns, new_indices, new_constraints, storage_modifier);
update_metadata();
for (auto & transaction : transactions)
{
transaction->commit();
transaction.reset();
}
/// Columns sizes could be changed
recalculateColumnSizes();
}
}
@ -453,7 +454,6 @@ void StorageMergeTree::mutate(const MutationCommands & commands, const Context &
auto check = [version, this]() { return isMutationDone(version); };
std::unique_lock lock(mutation_wait_mutex);
mutation_wait_event.wait(lock, check);
}
}

View File

@ -3231,6 +3231,10 @@ void StorageReplicatedMergeTree::alter(
const String current_database_name = getDatabaseName();
const String current_table_name = getTableName();
/// We cannot check this alter commands with method isModifyingData()
/// because ReplicatedMergeTree stores both columns and metadata for
/// each replica. So we have to wait AlterThread even with lightweight
/// metadata alter.
if (params.isSettingsAlter())
{
/// We don't replicate storage_settings_ptr ALTER. It's local operation.