Some thoughts on non blocking alter

This commit is contained in:
alesapin 2020-01-13 19:39:20 +03:00
parent 4b9acaaa90
commit bc59e473e8
12 changed files with 461 additions and 210 deletions

View File

@ -381,6 +381,10 @@ ASTPtr MutationsInterpreter::prepare(bool dry_run)
const auto required_columns = syntax_result->requiredSourceColumns();
affected_indices_columns.insert(std::cbegin(required_columns), std::cend(required_columns));
}
else if (command.type == MutationCommand::CAST)
{
stages.back().column_to_updated.emplace(command.column_name, makeASTFunction("CAST", command.column_name, command.type_name));
}
else
throw Exception("Unknown mutation command type: " + DB::toString<int>(command.type), ErrorCodes::UNKNOWN_MUTATION_COMMAND);
}

View File

@ -1,24 +1,30 @@
#include <Storages/AlterCommands.h>
#include <Storages/IStorage.h>
#include <Compression/CompressionFactory.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeEnum.h>
#include <DataTypes/DataTypeFactory.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/NestedUtils.h>
#include <Interpreters/Context.h>
#include <Interpreters/SyntaxAnalyzer.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/ExpressionActions.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTIndexDeclaration.h>
#include <Parsers/ASTConstraintDeclaration.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTFunction.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/SyntaxAnalyzer.h>
#include <Parsers/ASTAlterQuery.h>
#include <Parsers/ASTColumnDeclaration.h>
#include <Parsers/ASTSetQuery.h>
#include <Parsers/ASTConstraintDeclaration.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTIndexDeclaration.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTSetQuery.h>
#include <Storages/AlterCommands.h>
#include <Storages/IStorage.h>
#include <Common/typeid_cast.h>
#include <Compression/CompressionFactory.h>
#include <Parsers/queryToString.h>
@ -43,6 +49,7 @@ std::optional<AlterCommand> AlterCommand::parse(const ASTAlterCommand * command_
if (command_ast->type == ASTAlterCommand::ADD_COLUMN)
{
AlterCommand command;
command.ast = command_ast;
command.type = AlterCommand::ADD_COLUMN;
const auto & ast_col_decl = command_ast->col_decl->as<ASTColumnDeclaration &>();
@ -83,6 +90,7 @@ std::optional<AlterCommand> AlterCommand::parse(const ASTAlterCommand * command_
throw Exception("\"ALTER TABLE table CLEAR COLUMN column\" queries are not supported yet. Use \"CLEAR COLUMN column IN PARTITION\".", ErrorCodes::NOT_IMPLEMENTED);
AlterCommand command;
command.ast = command_ast;
command.type = AlterCommand::DROP_COLUMN;
command.column_name = getIdentifierName(command_ast->column);
command.if_exists = command_ast->if_exists;
@ -91,6 +99,7 @@ std::optional<AlterCommand> AlterCommand::parse(const ASTAlterCommand * command_
else if (command_ast->type == ASTAlterCommand::MODIFY_COLUMN)
{
AlterCommand command;
command.ast = command_ast;
command.type = AlterCommand::MODIFY_COLUMN;
const auto & ast_col_decl = command_ast->col_decl->as<ASTColumnDeclaration &>();
@ -126,6 +135,7 @@ std::optional<AlterCommand> AlterCommand::parse(const ASTAlterCommand * command_
else if (command_ast->type == ASTAlterCommand::COMMENT_COLUMN)
{
AlterCommand command;
command.ast = command_ast;
command.type = COMMENT_COLUMN;
command.column_name = getIdentifierName(command_ast->column);
const auto & ast_comment = command_ast->comment->as<ASTLiteral &>();
@ -136,6 +146,7 @@ std::optional<AlterCommand> AlterCommand::parse(const ASTAlterCommand * command_
else if (command_ast->type == ASTAlterCommand::MODIFY_ORDER_BY)
{
AlterCommand command;
command.ast = command_ast;
command.type = AlterCommand::MODIFY_ORDER_BY;
command.order_by = command_ast->order_by;
return command;
@ -143,6 +154,7 @@ std::optional<AlterCommand> AlterCommand::parse(const ASTAlterCommand * command_
else if (command_ast->type == ASTAlterCommand::ADD_INDEX)
{
AlterCommand command;
command.ast = command_ast;
command.index_decl = command_ast->index_decl;
command.type = AlterCommand::ADD_INDEX;
@ -160,6 +172,7 @@ std::optional<AlterCommand> AlterCommand::parse(const ASTAlterCommand * command_
else if (command_ast->type == ASTAlterCommand::ADD_CONSTRAINT)
{
AlterCommand command;
command.ast = command_ast;
command.constraint_decl = command_ast->constraint_decl;
command.type = AlterCommand::ADD_CONSTRAINT;
@ -177,6 +190,7 @@ std::optional<AlterCommand> AlterCommand::parse(const ASTAlterCommand * command_
throw Exception("\"ALTER TABLE table CLEAR COLUMN column\" queries are not supported yet. Use \"CLEAR COLUMN column IN PARTITION\".", ErrorCodes::NOT_IMPLEMENTED);
AlterCommand command;
command.ast = command_ast;
command.if_exists = command_ast->if_exists;
command.type = AlterCommand::DROP_CONSTRAINT;
command.constraint_name = command_ast->constraint->as<ASTIdentifier &>().name;
@ -189,6 +203,7 @@ std::optional<AlterCommand> AlterCommand::parse(const ASTAlterCommand * command_
throw Exception("\"ALTER TABLE table CLEAR INDEX index\" queries are not supported yet. Use \"CLEAR INDEX index IN PARTITION\".", ErrorCodes::NOT_IMPLEMENTED);
AlterCommand command;
command.ast = command_ast;
command.type = AlterCommand::DROP_INDEX;
command.index_name = command_ast->index->as<ASTIdentifier &>().name;
command.if_exists = command_ast->if_exists;
@ -198,6 +213,7 @@ std::optional<AlterCommand> AlterCommand::parse(const ASTAlterCommand * command_
else if (command_ast->type == ASTAlterCommand::MODIFY_TTL)
{
AlterCommand command;
command.ast = command_ast;
command.type = AlterCommand::MODIFY_TTL;
command.ttl = command_ast->ttl;
return command;
@ -205,6 +221,7 @@ std::optional<AlterCommand> AlterCommand::parse(const ASTAlterCommand * command_
else if (command_ast->type == ASTAlterCommand::MODIFY_SETTING)
{
AlterCommand command;
command.ast = command_ast;
command.type = AlterCommand::MODIFY_SETTING;
command.settings_changes = command_ast->settings_changes->as<ASTSetQuery &>().changes;
return command;
@ -423,6 +440,76 @@ bool AlterCommand::isSettingsAlter() const
return type == MODIFY_SETTING;
}
namespace
{
/// If true, then in order to ALTER the type of the column from the type from to the type to
/// we don't need to rewrite the data, we only need to update metadata and columns.txt in part directories.
/// The function works for Arrays and Nullables of the same structure.
bool isMetadataOnlyConversion(const IDataType * from, const IDataType * to)
{
if (from->getName() == to->getName())
return true;
static const std::unordered_multimap<std::type_index, const std::type_info &> ALLOWED_CONVERSIONS =
{
{ typeid(DataTypeEnum8), typeid(DataTypeEnum8) },
{ typeid(DataTypeEnum8), typeid(DataTypeInt8) },
{ typeid(DataTypeEnum16), typeid(DataTypeEnum16) },
{ typeid(DataTypeEnum16), typeid(DataTypeInt16) },
{ typeid(DataTypeDateTime), typeid(DataTypeUInt32) },
{ typeid(DataTypeUInt32), typeid(DataTypeDateTime) },
{ typeid(DataTypeDate), typeid(DataTypeUInt16) },
{ typeid(DataTypeUInt16), typeid(DataTypeDate) },
};
while (true)
{
auto it_range = ALLOWED_CONVERSIONS.equal_range(typeid(*from));
for (auto it = it_range.first; it != it_range.second; ++it)
{
if (it->second == typeid(*to))
return true;
}
const auto * arr_from = typeid_cast<const DataTypeArray *>(from);
const auto * arr_to = typeid_cast<const DataTypeArray *>(to);
if (arr_from && arr_to)
{
from = arr_from->getNestedType().get();
to = arr_to->getNestedType().get();
continue;
}
const auto * nullable_from = typeid_cast<const DataTypeNullable *>(from);
const auto * nullable_to = typeid_cast<const DataTypeNullable *>(to);
if (nullable_from && nullable_to)
{
from = nullable_from->getNestedType().get();
to = nullable_to->getNestedType().get();
continue;
}
return false;
}
}
}
bool AlterCommand::isRequireMutationStage(const StorageInMemoryMetadata & metadata) const
{
if (type != MODIFY_COLUMN || data_type == nullptr)
return false;
for (const auto & column : metadata.columns.getAllPhysical())
{
if (column.name == column_name && !isMetadataOnlyConversion(column.type, data_type))
return true;
}
return false;
}
bool AlterCommand::isCommentAlter() const
{
if (type == COMMENT_COLUMN)
@ -440,6 +527,21 @@ bool AlterCommand::isCommentAlter() const
return false;
}
std::optional<MutationCommand> AlterCommand::tryConvertToMutationCommand(const StorageInMemoryMetadata & metadata) const
{
if (!isRequireMutationStage(metadata))
return {};
MutationCommand result;
result.type = MutationCommand::Type::CAST;
result.column_name = column_name;
result.data_type = data_type;
result.predicate = nullptr;
result.ast = ast;
return result;
}
String alterTypeToString(const AlterCommand::Type type)
{
@ -635,6 +737,12 @@ void AlterCommands::prepare(const StorageInMemoryMetadata & metadata, const Cont
command->default_expression = makeASTFunction("CAST",
command->default_expression->clone(),
std::make_shared<ASTLiteral>(explicit_type->getName()));
//TODO(alesap)
//command->ast = std::make_shared<ASTAlterCommand>();
//command->type = ASTAlterCommand::MODIFY_COLUMN;
//command->col_decl = std::make_shared<ASTColumnDeclaration>();
//command->col_decl->name = column.name;
}
}
else
@ -725,4 +833,15 @@ bool AlterCommands::isCommentAlter() const
{
return std::all_of(begin(), end(), [](const AlterCommand & c) { return c.isCommentAlter(); });
}
MutationCommands getMutationCommands(const StorageInMemoryMetadata & metadata) const
{
MutationCommands result;
for (const auto & alter_cmd : *this)
if (auto mutation_cmd = alter_cmd.tryConvertToMutationCommand(metadata); mutation_cmd)
result.push_back(*mutation_cmd);
return result;
}
}

View File

@ -4,6 +4,7 @@
#include <Core/NamesAndTypes.h>
#include <Storages/IStorage_fwd.h>
#include <Storages/StorageInMemoryMetadata.h>
#include <Storages/MutationCommands.h>
#include <Common/SettingsChanges.h>
@ -18,6 +19,8 @@ class ASTAlterCommand;
/// Adding Nested columns is not expanded to add individual columns.
struct AlterCommand
{
ASTPtr ast; /// The AST of the whole command
enum Type
{
ADD_COLUMN,
@ -96,11 +99,15 @@ struct AlterCommand
/// in each part on disk (it's not lightweight alter).
bool isModifyingData() const;
bool isRequireMutationStage(const StorageInMemoryMetadata & metadata) const;
/// Checks that only settings changed by alter
bool isSettingsAlter() const;
/// Checks that only comment changed by alter
bool isCommentAlter() const;
std::optional<MutationCommand> tryConvertToMutationCommand(const StorageInMemoryMetadata & metadata) const;
};
/// Return string representation of AlterCommand::Type
@ -136,6 +143,10 @@ public:
/// At least one command modify comments.
bool isCommentAlter() const;
MutationCommands getMutationCommands(const StorageInMemoryMetadata & metadata) const;
};
MutationCommands extractMutationCommandsFromAlterCommands(const StorageInMemoryMetadata & metadata, AlterCommands & commands);
}

View File

@ -1784,6 +1784,7 @@ void MergeTreeData::alterDataPart(
new_checksums.files[it.second] = add_checksums.files[it.first];
}
/// NOTE(alesap) Don't miss this
/// Write the checksums to the temporary file.
if (!part->checksums.empty())
{

View File

@ -945,16 +945,12 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
context_for_reading.getSettingsRef().max_threads = 1;
std::vector<MutationCommand> commands_for_part;
std::copy_if(
std::cbegin(commands), std::cend(commands),
std::back_inserter(commands_for_part),
[&] (const MutationCommand & command)
{
return command.partition == nullptr ||
future_part.parts[0]->info.partition_id == data.getPartitionIDFromQuery(
command.partition, context_for_reading);
});
for (const auto & command : commands)
{
if (command.partition == nullptr || future_part.parts[0]->info.partition_id == data.getPartitionIDFromQuery(
command.partition, context_for_reading))
commands_for_part.emplace_back(command);
}
if (!isStorageTouchedByMutations(storage_from_source_part, commands_for_part, context_for_reading))
{
@ -1061,7 +1057,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
indices_recalc_syntax, context).getActions(false);
/// We can update only one column, but some skip idx expression may depend on several
/// columns (c1 + c2 * c3). It works because in stream was created with help of
/// columns (c1 + c2 * c3). It works because this stream was created with help of
/// MutationsInterpreter which knows about skip indices and stream 'in' already has
/// all required columns.
/// TODO move this logic to single place.

View File

@ -65,6 +65,12 @@ void ReplicatedMergeTreeLogEntryData::writeText(WriteBuffer & out) const
<< new_part_name;
break;
case FINISH_ALTER: /// Just make local /metadata and /columns consistent with global
out << "alter\n";
for (const String & s : source_parts)
out << s << '\n';
out << "finish";
break;
default:
throw Exception("Unknown log entry type: " + DB::toString<int>(type), ErrorCodes::LOGICAL_ERROR);
}
@ -152,6 +158,18 @@ void ReplicatedMergeTreeLogEntryData::readText(ReadBuffer & in)
>> new_part_name;
source_parts.push_back(source_part);
}
else if (type_str == "alter")
{
type = FINISH_ALTER;
while (!in.eof())
{
String s;
in >> s >> "\n";
if (s == "finish")
break;
source_parts.push_back(s);
}
}
in >> "\n";

View File

@ -37,6 +37,7 @@ struct ReplicatedMergeTreeLogEntryData
CLEAR_INDEX, /// Drop specific index from specified partition.
REPLACE_RANGE, /// Drop certain range of partitions and replace them by new ones
MUTATE_PART, /// Apply one or several mutations to the part.
FINISH_ALTER, /// Apply one or several alter modifications to part
};
static String typeToString(Type type)
@ -50,6 +51,7 @@ struct ReplicatedMergeTreeLogEntryData
case ReplicatedMergeTreeLogEntryData::CLEAR_INDEX: return "CLEAR_INDEX";
case ReplicatedMergeTreeLogEntryData::REPLACE_RANGE: return "REPLACE_RANGE";
case ReplicatedMergeTreeLogEntryData::MUTATE_PART: return "MUTATE_PART";
case ReplicatedMergeTreeLogEntryData::FINISH_ALTER: return "FINISH_ALTER";
default:
throw Exception("Unknown log entry type: " + DB::toString<int>(type), ErrorCodes::LOGICAL_ERROR);
}

View File

@ -557,6 +557,11 @@ static Names getPartNamesToMutate(
}
Names getPartNamesToMutate(ReplicatedMergeTreeMutationEntry & entry) const
{
return getPartNamesToMutate(entry, current_parts);
}
void ReplicatedMergeTreeQueue::updateMutations(zkutil::ZooKeeperPtr zookeeper, Coordination::WatchCallback watch_callback)
{
std::lock_guard lock(update_mutations_mutex);
@ -1001,6 +1006,21 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
}
}
if (entry.type == LogEntry::FINISH_ALTER)
{
for (const auto & name : entry.source_parts)
{
if (future_parts.count(name))
{
String reason = "Not altering storage because part " + name
+ " is not ready yet (log entry for that part is being processed).";
LOG_TRACE(log, reason);
out_postpone_reason = reason;
return false;
}
}
}
return true;
}

View File

@ -339,6 +339,8 @@ public:
/// Adds a subscriber
SubscriberHandler addSubscriber(SubscriberCallBack && callback);
Names getPartNamesToMutate(ReplicatedMergeTreeMutationEntry & entry) const;
struct Status
{
UInt32 future_parts;

View File

@ -2,6 +2,7 @@
#include <Parsers/ASTAlterQuery.h>
#include <Storages/IStorage_fwd.h>
#include <DataTypes/IDataType.h>
#include <optional>
#include <unordered_map>
@ -25,7 +26,8 @@ struct MutationCommand
EMPTY, /// Not used.
DELETE,
UPDATE,
MATERIALIZE_INDEX
MATERIALIZE_INDEX,
CAST /// for ALTER MODIFY column
};
Type type = EMPTY;
@ -40,6 +42,10 @@ struct MutationCommand
String index_name;
ASTPtr partition;
/// For cast
String column_name;
DataTypePtr data_type;
static std::optional<MutationCommand> parse(ASTAlterCommand * command);
};

View File

@ -977,6 +977,10 @@ bool StorageReplicatedMergeTree::executeLogEntry(LogEntry & entry)
{
do_fetch = !tryExecutePartMutation(entry);
}
else if (entry.type == LogEntry::FINISH_ALTER)
{
tryFinishAlter(entry);
}
else
{
throw Exception("Unexpected log entry type: " + toString(static_cast<int>(entry.type)), ErrorCodes::LOGICAL_ERROR);
@ -1152,6 +1156,72 @@ bool StorageReplicatedMergeTree::tryExecuteMerge(const LogEntry & entry)
}
bool StorageReplicatedMergeTree::tryFinishAlter(const StorageReplicatedMergeTree::LogEntry & entry)
{
auto zookeeper = getZooKeeper();
String columns_path = zookeeper_path + "/columns";
auto columns_znode = zookeeper->get(columns_path);
if (!columns_znode.exists)
throw Exception(columns_path + " doesn't exist", ErrorCodes::NOT_FOUND_NODE);
int32_t columns_version = columns_znode.stat.version;
String metadata_path = zookeeper_path + "/metadata";
auto metadata_znode = zookeeper->get(metadata_path);
if (!metadata_znode.exists)
throw Exception(metadata_path + " doesn't exist", ErrorCodes::NOT_FOUND_NODE);
int32_t metadata_version = metadata_znode.stat.version;
const bool changed_columns_version = (columns_version != storage.columns_version);
const bool changed_metadata_version = (metadata_version != storage.metadata_version);
if (!(changed_columns_version || changed_metadata_version))
return;
const String & columns_str = columns_znode.contents;
auto columns_in_zk = ColumnsDescription::parse(columns_str);
const String & metadata_str = metadata_znode.contents;
auto metadata_in_zk = ReplicatedMergeTreeTableMetadata::parse(metadata_str);
auto metadata_diff = ReplicatedMergeTreeTableMetadata(storage).checkAndFindDiff(metadata_in_zk, /* allow_alter = */ true);
MergeTreeData::DataParts parts;
/// If metadata nodes have changed, we will update table structure locally.
if (changed_columns_version || changed_metadata_version)
{
LOG_INFO(log, "Version of metadata nodes in ZooKeeper changed. Waiting for structure write lock.");
auto table_lock = storage.lockExclusively(RWLockImpl::NO_QUERY);
if (columns_in_zk == storage.getColumns() && metadata_diff.empty())
{
LOG_INFO(
log,
"Metadata nodes changed in ZooKeeper, but their contents didn't change. "
"Most probably it is a cyclic ALTER.");
}
else
{
LOG_INFO(log, "Metadata changed in ZooKeeper. Applying changes locally.");
storage.setTableStructure(std::move(columns_in_zk), metadata_diff);
LOG_INFO(log, "Applied changes to the metadata of the table.");
}
columns_version = columns_version;
metadata_version = metadata_version;
recalculateColumnSizes();
/// Update metadata ZK nodes for a specific replica.
if (changed_columns_version)
zookeeper->set(replica_path + "/columns", columns_str);
if (changed_metadata_version)
zookeeper->set(replica_path + "/metadata", metadata_str);
}
}
bool StorageReplicatedMergeTree::tryExecutePartMutation(const StorageReplicatedMergeTree::LogEntry & entry)
{
const String & source_part_name = entry.source_parts.at(0);
@ -3199,6 +3269,7 @@ void StorageReplicatedMergeTree::alter(
const String current_database_name = getDatabaseName();
const String current_table_name = getTableName();
auto maybe_mutation_commands = params.getMutationCommands(getInMemoryMetadata());
/// We cannot check this alter commands with method isModifyingData()
/// because ReplicatedMergeTree stores both columns and metadata for
@ -3218,31 +3289,6 @@ void StorageReplicatedMergeTree::alter(
return;
}
/// Alter is done by modifying the metadata nodes in ZK that are shared between all replicas
/// (/columns, /metadata). We set contents of the shared nodes to the new values and wait while
/// replicas asynchronously apply changes (see ReplicatedMergeTreeAlterThread.cpp) and modify
/// their respective replica metadata nodes (/replicas/<replica>/columns, /replicas/<replica>/metadata).
struct ChangedNode
{
ChangedNode(const String & table_path_, String name_, String new_value_)
: table_path(table_path_), name(std::move(name_)), shared_path(table_path + "/" + name)
, new_value(std::move(new_value_))
{}
const String & table_path;
String name;
String shared_path;
String getReplicaPath(const String & replica) const
{
return table_path + "/replicas/" + replica + "/" + name;
}
String new_value;
int32_t new_version = -1; /// Initialization is to suppress (useless) false positive warning found by cppcheck.
};
auto ast_to_str = [](ASTPtr query) -> String
{
@ -3251,9 +3297,6 @@ void StorageReplicatedMergeTree::alter(
return queryToString(query);
};
/// /columns and /metadata nodes
std::vector<ChangedNode> changed_nodes;
{
/// Just to read current structure. Alter will be done in separate thread.
auto table_lock = lockStructureForShare(false, query_context.getCurrentQueryId());
@ -3309,202 +3352,219 @@ void StorageReplicatedMergeTree::alter(
table_lock_holder.release();
/// Wait until all replicas will apply ALTER.
ReplicatedMergeTreeLogEntryData entry;
entry.type = LogEntry::FINISH_ALTER;
entry.source_replica = replica_name;
for (const auto & node : changed_nodes)
if (maybe_mutation_commands)
{
Coordination::Stat stat;
/// Subscribe to change of shared ZK metadata nodes, to finish waiting if someone will do another ALTER.
if (!getZooKeeper()->exists(node.shared_path, &stat, alter_query_event))
throw Exception(node.shared_path + " doesn't exist", ErrorCodes::NOT_FOUND_NODE);
if (stat.version != node.new_version)
{
LOG_WARNING(log, node.shared_path + " changed before this ALTER finished; " +
"overlapping ALTER-s are fine but use caution with nontransitive changes");
return;
}
ReplicatedMergeTreeMutationEntry entry = mutateImpl(*maybe_mutation_commands, context);
entry.source_parts = queue.getPartNamesToMutate(entry);
}
Strings replicas = getZooKeeper()->getChildren(zookeeper_path + "/replicas");
entry.new_part_name = new_part_name;
entry.create_time = time(nullptr);
std::set<String> inactive_replicas;
std::set<String> timed_out_replicas;
zookeeper->create(zookeeper_path + "/log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential);
time_t replication_alter_columns_timeout = query_context.getSettingsRef().replication_alter_columns_timeout;
waitForAllReplicasToProcessLogEntry(entry);
/// This code is quite similar with waitMutationToFinishOnReplicas
/// but contains more complicated details (versions manipulations, multiple nodes, etc.).
/// It will be removed soon in favor of alter-modify implementation on top of mutations.
/// TODO (alesap)
for (const String & replica : replicas)
{
LOG_DEBUG(log, "Waiting for " << replica << " to apply changes");
///// Wait until all replicas will apply ALTER.
while (!partial_shutdown_called)
{
auto zookeeper = getZooKeeper();
//for (const auto & node : changed_nodes)
//{
// Coordination::Stat stat;
// /// Subscribe to change of shared ZK metadata nodes, to finish waiting if someone will do another ALTER.
// if (!getZooKeeper()->exists(node.shared_path, &stat, alter_query_event))
// throw Exception(node.shared_path + " doesn't exist", ErrorCodes::NOT_FOUND_NODE);
/// Replica could be inactive.
if (!zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/is_active"))
{
LOG_WARNING(log, "Replica " << replica << " is not active during ALTER query."
" ALTER will be done asynchronously when replica becomes active.");
// if (stat.version != node.new_version)
// {
// LOG_WARNING(log, node.shared_path + " changed before this ALTER finished; " +
// "overlapping ALTER-s are fine but use caution with nontransitive changes");
// return;
// }
//}
inactive_replicas.emplace(replica);
break;
}
//Strings replicas = getZooKeeper()->getChildren(zookeeper_path + "/replicas");
struct ReplicaNode
{
explicit ReplicaNode(String path_) : path(std::move(path_)) {}
//std::set<String> inactive_replicas;
//std::set<String> timed_out_replicas;
String path;
String value;
int32_t version = -1;
};
//time_t replication_alter_columns_timeout = query_context.getSettingsRef().replication_alter_columns_timeout;
std::vector<ReplicaNode> replica_nodes;
for (const auto & node : changed_nodes)
replica_nodes.emplace_back(node.getReplicaPath(replica));
///// This code is quite similar with waitMutationToFinishOnReplicas
///// but contains more complicated details (versions manipulations, multiple nodes, etc.).
///// It will be removed soon in favor of alter-modify implementation on top of mutations.
///// TODO (alesap)
//for (const String & replica : replicas)
//{
// LOG_DEBUG(log, "Waiting for " << replica << " to apply changes");
bool replica_was_removed = false;
for (auto & node : replica_nodes)
{
Coordination::Stat stat;
// while (!partial_shutdown_called)
// {
// auto zookeeper = getZooKeeper();
/// Replica could has been removed.
if (!zookeeper->tryGet(node.path, node.value, &stat))
{
LOG_WARNING(log, replica << " was removed");
replica_was_removed = true;
break;
}
// /// Replica could be inactive.
// if (!zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/is_active"))
// {
// LOG_WARNING(log, "Replica " << replica << " is not active during ALTER query."
// " ALTER will be done asynchronously when replica becomes active.");
node.version = stat.version;
}
// inactive_replicas.emplace(replica);
// break;
// }
if (replica_was_removed)
break;
// struct ReplicaNode
// {
// explicit ReplicaNode(String path_) : path(std::move(path_)) {}
bool alter_was_applied = true;
for (size_t i = 0; i < replica_nodes.size(); ++i)
{
if (replica_nodes[i].value != changed_nodes[i].new_value)
{
alter_was_applied = false;
break;
}
}
// String path;
// String value;
// int32_t version = -1;
// };
/// The ALTER has been successfully applied.
if (alter_was_applied)
break;
// std::vector<ReplicaNode> replica_nodes;
// for (const auto & node : changed_nodes)
// replica_nodes.emplace_back(node.getReplicaPath(replica));
for (const auto & node : changed_nodes)
{
Coordination::Stat stat;
if (!zookeeper->exists(node.shared_path, &stat))
throw Exception(node.shared_path + " doesn't exist", ErrorCodes::NOT_FOUND_NODE);
// bool replica_was_removed = false;
// for (auto & node : replica_nodes)
// {
// Coordination::Stat stat;
if (stat.version != node.new_version)
{
LOG_WARNING(log, node.shared_path + " changed before this ALTER finished; "
"overlapping ALTER-s are fine but use caution with nontransitive changes");
return;
}
}
// /// Replica could has been removed.
// if (!zookeeper->tryGet(node.path, node.value, &stat))
// {
// LOG_WARNING(log, replica << " was removed");
// replica_was_removed = true;
// break;
// }
bool replica_nodes_changed_concurrently = false;
for (const auto & replica_node : replica_nodes)
{
Coordination::Stat stat;
if (!zookeeper->exists(replica_node.path, &stat, alter_query_event))
{
LOG_WARNING(log, replica << " was removed");
replica_was_removed = true;
break;
}
// node.version = stat.version;
// }
if (stat.version != replica_node.version)
{
replica_nodes_changed_concurrently = true;
break;
}
}
// if (replica_was_removed)
// break;
if (replica_was_removed)
break;
// bool alter_was_applied = true;
// for (size_t i = 0; i < replica_nodes.size(); ++i)
// {
// if (replica_nodes[i].value != changed_nodes[i].new_value)
// {
// alter_was_applied = false;
// break;
// }
// }
if (replica_nodes_changed_concurrently)
continue;
// /// The ALTER has been successfully applied.
// if (alter_was_applied)
// break;
/// alter_query_event subscribed with zookeeper watch callback to /repliacs/{replica}/metadata
/// and /replicas/{replica}/columns nodes for current relica + shared nodes /columns and /metadata,
/// which is common for all replicas. If changes happen with this nodes (delete, set and create)
/// than event will be notified and wait will be interrupted.
///
/// ReplicatedMergeTreeAlterThread responsible for local /replicas/{replica}/metadata and
/// /replicas/{replica}/columns changes. Shared /columns and /metadata nodes can be changed by *newer*
/// concurrent alter from other replica. First of all it will update shared nodes and we will have no
/// ability to identify, that our *current* alter finshed. So we cannot do anything better than just
/// return from *current* alter with success result.
if (!replication_alter_columns_timeout)
{
alter_query_event->wait();
/// Everything is fine.
}
else if (alter_query_event->tryWait(replication_alter_columns_timeout * 1000))
{
/// Everything is fine.
}
else
{
LOG_WARNING(log, "Timeout when waiting for replica " << replica << " to apply ALTER."
" ALTER will be done asynchronously.");
// for (const auto & node : changed_nodes)
// {
// Coordination::Stat stat;
// if (!zookeeper->exists(node.shared_path, &stat))
// throw Exception(node.shared_path + " doesn't exist", ErrorCodes::NOT_FOUND_NODE);
timed_out_replicas.emplace(replica);
break;
}
}
// if (stat.version != node.new_version)
// {
// LOG_WARNING(log, node.shared_path + " changed before this ALTER finished; "
// "overlapping ALTER-s are fine but use caution with nontransitive changes");
// return;
// }
// }
if (partial_shutdown_called)
throw Exception("Alter is not finished because table shutdown was called. Alter will be done after table restart.",
ErrorCodes::UNFINISHED);
// bool replica_nodes_changed_concurrently = false;
// for (const auto & replica_node : replica_nodes)
// {
// Coordination::Stat stat;
// if (!zookeeper->exists(replica_node.path, &stat, alter_query_event))
// {
// LOG_WARNING(log, replica << " was removed");
// replica_was_removed = true;
// break;
// }
if (!inactive_replicas.empty() || !timed_out_replicas.empty())
{
std::stringstream exception_message;
exception_message << "Alter is not finished because";
// if (stat.version != replica_node.version)
// {
// replica_nodes_changed_concurrently = true;
// break;
// }
// }
if (!inactive_replicas.empty())
{
exception_message << " some replicas are inactive right now";
// if (replica_was_removed)
// break;
for (auto it = inactive_replicas.begin(); it != inactive_replicas.end(); ++it)
exception_message << (it == inactive_replicas.begin() ? ": " : ", ") << *it;
}
// if (replica_nodes_changed_concurrently)
// continue;
if (!timed_out_replicas.empty() && !inactive_replicas.empty())
exception_message << " and";
// /// alter_query_event subscribed with zookeeper watch callback to /repliacs/{replica}/metadata
// /// and /replicas/{replica}/columns nodes for current relica + shared nodes /columns and /metadata,
// /// which is common for all replicas. If changes happen with this nodes (delete, set and create)
// /// than event will be notified and wait will be interrupted.
// ///
// /// ReplicatedMergeTreeAlterThread responsible for local /replicas/{replica}/metadata and
// /// /replicas/{replica}/columns changes. Shared /columns and /metadata nodes can be changed by *newer*
// /// concurrent alter from other replica. First of all it will update shared nodes and we will have no
// /// ability to identify, that our *current* alter finshed. So we cannot do anything better than just
// /// return from *current* alter with success result.
// if (!replication_alter_columns_timeout)
// {
// alter_query_event->wait();
// /// Everything is fine.
// }
// else if (alter_query_event->tryWait(replication_alter_columns_timeout * 1000))
// {
// /// Everything is fine.
// }
// else
// {
// LOG_WARNING(log, "Timeout when waiting for replica " << replica << " to apply ALTER."
// " ALTER will be done asynchronously.");
if (!timed_out_replicas.empty())
{
exception_message << " timeout when waiting for some replicas";
// timed_out_replicas.emplace(replica);
// break;
// }
// }
for (auto it = timed_out_replicas.begin(); it != timed_out_replicas.end(); ++it)
exception_message << (it == timed_out_replicas.begin() ? ": " : ", ") << *it;
// if (partial_shutdown_called)
// throw Exception("Alter is not finished because table shutdown was called. Alter will be done after table restart.",
// ErrorCodes::UNFINISHED);
exception_message << " (replication_alter_columns_timeout = " << replication_alter_columns_timeout << ")";
}
// if (!inactive_replicas.empty() || !timed_out_replicas.empty())
// {
// std::stringstream exception_message;
// exception_message << "Alter is not finished because";
exception_message << ". Alter will be done asynchronously.";
// if (!inactive_replicas.empty())
// {
// exception_message << " some replicas are inactive right now";
throw Exception(exception_message.str(), ErrorCodes::UNFINISHED);
}
}
// for (auto it = inactive_replicas.begin(); it != inactive_replicas.end(); ++it)
// exception_message << (it == inactive_replicas.begin() ? ": " : ", ") << *it;
// }
LOG_DEBUG(log, "ALTER finished");
// if (!timed_out_replicas.empty() && !inactive_replicas.empty())
// exception_message << " and";
// if (!timed_out_replicas.empty())
// {
// exception_message << " timeout when waiting for some replicas";
// for (auto it = timed_out_replicas.begin(); it != timed_out_replicas.end(); ++it)
// exception_message << (it == timed_out_replicas.begin() ? ": " : ", ") << *it;
// exception_message << " (replication_alter_columns_timeout = " << replication_alter_columns_timeout << ")";
// }
// exception_message << ". Alter will be done asynchronously.";
// throw Exception(exception_message.str(), ErrorCodes::UNFINISHED);
// }
//}
//LOG_DEBUG(log, "ALTER finished");
}
void StorageReplicatedMergeTree::alterPartition(const ASTPtr & query, const PartitionCommands & commands, const Context & query_context)
@ -4457,6 +4517,11 @@ void StorageReplicatedMergeTree::fetchPartition(const ASTPtr & partition, const
void StorageReplicatedMergeTree::mutate(const MutationCommands & commands, const Context & query_context)
{
mutateImpl(commands, context);
}
StorageReplicatedMergeTree::mutateImpl(const MutationCommands & commands, const Context & query_context)
{
/// Overview of the mutation algorithm.
///
@ -4572,6 +4637,7 @@ void StorageReplicatedMergeTree::mutate(const MutationCommands & commands, const
waitMutationToFinishOnReplicas(replicas, entry.znode_name);
}
return entry;
}
std::vector<MergeTreeMutationStatus> StorageReplicatedMergeTree::getMutationsStatus() const

View File

@ -110,6 +110,7 @@ public:
void alterPartition(const ASTPtr & query, const PartitionCommands & commands, const Context & query_context) override;
void mutate(const MutationCommands & commands, const Context & context) override;
ReplicatedMergeTreeMutationEntry mutateImpl(const MutationCommands & commands, const Context & context);
std::vector<MergeTreeMutationStatus> getMutationsStatus() const override;
CancellationCode killMutation(const String & mutation_id) override;
@ -382,6 +383,8 @@ private:
/// Do the merge or recommend to make the fetch instead of the merge
bool tryExecuteMerge(const LogEntry & entry);
bool tryFinishAlter(const LogEntry & entry);
bool tryExecutePartMutation(const LogEntry & entry);
@ -431,6 +434,9 @@ private:
/// Checks if some mutations are done and marks them as done.
void mutationsFinalizingTask();
/// finish alter after all heavy processes finished
void finishAlter();
/** Write the selected parts to merge into the log,
* Call when merge_selecting_mutex is locked.
* Returns false if any part is not in ZK.