From bc59e473e8be7ff4cc34b44c3f188cb6bb66466e Mon Sep 17 00:00:00 2001 From: alesapin Date: Mon, 13 Jan 2020 19:39:20 +0300 Subject: [PATCH] Some thoughts on non blocking alter --- .../src/Interpreters/MutationsInterpreter.cpp | 4 + dbms/src/Storages/AlterCommands.cpp | 143 +++++- dbms/src/Storages/AlterCommands.h | 11 + dbms/src/Storages/MergeTree/MergeTreeData.cpp | 1 + .../MergeTree/MergeTreeDataMergerMutator.cpp | 18 +- .../MergeTree/ReplicatedMergeTreeLogEntry.cpp | 18 + .../MergeTree/ReplicatedMergeTreeLogEntry.h | 2 + .../MergeTree/ReplicatedMergeTreeQueue.cpp | 20 + .../MergeTree/ReplicatedMergeTreeQueue.h | 2 + dbms/src/Storages/MutationCommands.h | 8 +- .../Storages/StorageReplicatedMergeTree.cpp | 438 ++++++++++-------- .../src/Storages/StorageReplicatedMergeTree.h | 6 + 12 files changed, 461 insertions(+), 210 deletions(-) diff --git a/dbms/src/Interpreters/MutationsInterpreter.cpp b/dbms/src/Interpreters/MutationsInterpreter.cpp index 8ff10e92dee..4a585cf424f 100644 --- a/dbms/src/Interpreters/MutationsInterpreter.cpp +++ b/dbms/src/Interpreters/MutationsInterpreter.cpp @@ -381,6 +381,10 @@ ASTPtr MutationsInterpreter::prepare(bool dry_run) const auto required_columns = syntax_result->requiredSourceColumns(); affected_indices_columns.insert(std::cbegin(required_columns), std::cend(required_columns)); } + else if (command.type == MutationCommand::CAST) + { + stages.back().column_to_updated.emplace(command.column_name, makeASTFunction("CAST", command.column_name, command.type_name)); + } else throw Exception("Unknown mutation command type: " + DB::toString(command.type), ErrorCodes::UNKNOWN_MUTATION_COMMAND); } diff --git a/dbms/src/Storages/AlterCommands.cpp b/dbms/src/Storages/AlterCommands.cpp index fc7bf608b17..58e21caedd6 100644 --- a/dbms/src/Storages/AlterCommands.cpp +++ b/dbms/src/Storages/AlterCommands.cpp @@ -1,24 +1,30 @@ -#include -#include +#include +#include +#include +#include +#include #include +#include #include #include #include -#include -#include #include -#include -#include -#include -#include -#include -#include +#include +#include #include #include -#include +#include #include +#include +#include +#include +#include +#include +#include +#include +#include #include -#include + #include @@ -43,6 +49,7 @@ std::optional AlterCommand::parse(const ASTAlterCommand * command_ if (command_ast->type == ASTAlterCommand::ADD_COLUMN) { AlterCommand command; + command.ast = command_ast; command.type = AlterCommand::ADD_COLUMN; const auto & ast_col_decl = command_ast->col_decl->as(); @@ -83,6 +90,7 @@ std::optional AlterCommand::parse(const ASTAlterCommand * command_ throw Exception("\"ALTER TABLE table CLEAR COLUMN column\" queries are not supported yet. Use \"CLEAR COLUMN column IN PARTITION\".", ErrorCodes::NOT_IMPLEMENTED); AlterCommand command; + command.ast = command_ast; command.type = AlterCommand::DROP_COLUMN; command.column_name = getIdentifierName(command_ast->column); command.if_exists = command_ast->if_exists; @@ -91,6 +99,7 @@ std::optional AlterCommand::parse(const ASTAlterCommand * command_ else if (command_ast->type == ASTAlterCommand::MODIFY_COLUMN) { AlterCommand command; + command.ast = command_ast; command.type = AlterCommand::MODIFY_COLUMN; const auto & ast_col_decl = command_ast->col_decl->as(); @@ -126,6 +135,7 @@ std::optional AlterCommand::parse(const ASTAlterCommand * command_ else if (command_ast->type == ASTAlterCommand::COMMENT_COLUMN) { AlterCommand command; + command.ast = command_ast; command.type = COMMENT_COLUMN; command.column_name = getIdentifierName(command_ast->column); const auto & ast_comment = command_ast->comment->as(); @@ -136,6 +146,7 @@ std::optional AlterCommand::parse(const ASTAlterCommand * command_ else if (command_ast->type == ASTAlterCommand::MODIFY_ORDER_BY) { AlterCommand command; + command.ast = command_ast; command.type = AlterCommand::MODIFY_ORDER_BY; command.order_by = command_ast->order_by; return command; @@ -143,6 +154,7 @@ std::optional AlterCommand::parse(const ASTAlterCommand * command_ else if (command_ast->type == ASTAlterCommand::ADD_INDEX) { AlterCommand command; + command.ast = command_ast; command.index_decl = command_ast->index_decl; command.type = AlterCommand::ADD_INDEX; @@ -160,6 +172,7 @@ std::optional AlterCommand::parse(const ASTAlterCommand * command_ else if (command_ast->type == ASTAlterCommand::ADD_CONSTRAINT) { AlterCommand command; + command.ast = command_ast; command.constraint_decl = command_ast->constraint_decl; command.type = AlterCommand::ADD_CONSTRAINT; @@ -177,6 +190,7 @@ std::optional AlterCommand::parse(const ASTAlterCommand * command_ throw Exception("\"ALTER TABLE table CLEAR COLUMN column\" queries are not supported yet. Use \"CLEAR COLUMN column IN PARTITION\".", ErrorCodes::NOT_IMPLEMENTED); AlterCommand command; + command.ast = command_ast; command.if_exists = command_ast->if_exists; command.type = AlterCommand::DROP_CONSTRAINT; command.constraint_name = command_ast->constraint->as().name; @@ -189,6 +203,7 @@ std::optional AlterCommand::parse(const ASTAlterCommand * command_ throw Exception("\"ALTER TABLE table CLEAR INDEX index\" queries are not supported yet. Use \"CLEAR INDEX index IN PARTITION\".", ErrorCodes::NOT_IMPLEMENTED); AlterCommand command; + command.ast = command_ast; command.type = AlterCommand::DROP_INDEX; command.index_name = command_ast->index->as().name; command.if_exists = command_ast->if_exists; @@ -198,6 +213,7 @@ std::optional AlterCommand::parse(const ASTAlterCommand * command_ else if (command_ast->type == ASTAlterCommand::MODIFY_TTL) { AlterCommand command; + command.ast = command_ast; command.type = AlterCommand::MODIFY_TTL; command.ttl = command_ast->ttl; return command; @@ -205,6 +221,7 @@ std::optional AlterCommand::parse(const ASTAlterCommand * command_ else if (command_ast->type == ASTAlterCommand::MODIFY_SETTING) { AlterCommand command; + command.ast = command_ast; command.type = AlterCommand::MODIFY_SETTING; command.settings_changes = command_ast->settings_changes->as().changes; return command; @@ -423,6 +440,76 @@ bool AlterCommand::isSettingsAlter() const return type == MODIFY_SETTING; } +namespace +{ + +/// If true, then in order to ALTER the type of the column from the type from to the type to +/// we don't need to rewrite the data, we only need to update metadata and columns.txt in part directories. +/// The function works for Arrays and Nullables of the same structure. +bool isMetadataOnlyConversion(const IDataType * from, const IDataType * to) +{ + if (from->getName() == to->getName()) + return true; + + static const std::unordered_multimap ALLOWED_CONVERSIONS = + { + { typeid(DataTypeEnum8), typeid(DataTypeEnum8) }, + { typeid(DataTypeEnum8), typeid(DataTypeInt8) }, + { typeid(DataTypeEnum16), typeid(DataTypeEnum16) }, + { typeid(DataTypeEnum16), typeid(DataTypeInt16) }, + { typeid(DataTypeDateTime), typeid(DataTypeUInt32) }, + { typeid(DataTypeUInt32), typeid(DataTypeDateTime) }, + { typeid(DataTypeDate), typeid(DataTypeUInt16) }, + { typeid(DataTypeUInt16), typeid(DataTypeDate) }, + }; + + while (true) + { + auto it_range = ALLOWED_CONVERSIONS.equal_range(typeid(*from)); + for (auto it = it_range.first; it != it_range.second; ++it) + { + if (it->second == typeid(*to)) + return true; + } + + const auto * arr_from = typeid_cast(from); + const auto * arr_to = typeid_cast(to); + if (arr_from && arr_to) + { + from = arr_from->getNestedType().get(); + to = arr_to->getNestedType().get(); + continue; + } + + const auto * nullable_from = typeid_cast(from); + const auto * nullable_to = typeid_cast(to); + if (nullable_from && nullable_to) + { + from = nullable_from->getNestedType().get(); + to = nullable_to->getNestedType().get(); + continue; + } + + return false; + } +} + +} + + +bool AlterCommand::isRequireMutationStage(const StorageInMemoryMetadata & metadata) const +{ + if (type != MODIFY_COLUMN || data_type == nullptr) + return false; + + for (const auto & column : metadata.columns.getAllPhysical()) + { + if (column.name == column_name && !isMetadataOnlyConversion(column.type, data_type)) + return true; + } + return false; +} + bool AlterCommand::isCommentAlter() const { if (type == COMMENT_COLUMN) @@ -440,6 +527,21 @@ bool AlterCommand::isCommentAlter() const return false; } +std::optional AlterCommand::tryConvertToMutationCommand(const StorageInMemoryMetadata & metadata) const +{ + if (!isRequireMutationStage(metadata)) + return {}; + + MutationCommand result; + + result.type = MutationCommand::Type::CAST; + result.column_name = column_name; + result.data_type = data_type; + result.predicate = nullptr; + result.ast = ast; + return result; +} + String alterTypeToString(const AlterCommand::Type type) { @@ -635,6 +737,12 @@ void AlterCommands::prepare(const StorageInMemoryMetadata & metadata, const Cont command->default_expression = makeASTFunction("CAST", command->default_expression->clone(), std::make_shared(explicit_type->getName())); + + //TODO(alesap) + //command->ast = std::make_shared(); + //command->type = ASTAlterCommand::MODIFY_COLUMN; + //command->col_decl = std::make_shared(); + //command->col_decl->name = column.name; } } else @@ -725,4 +833,15 @@ bool AlterCommands::isCommentAlter() const { return std::all_of(begin(), end(), [](const AlterCommand & c) { return c.isCommentAlter(); }); } + + +MutationCommands getMutationCommands(const StorageInMemoryMetadata & metadata) const +{ + MutationCommands result; + for (const auto & alter_cmd : *this) + if (auto mutation_cmd = alter_cmd.tryConvertToMutationCommand(metadata); mutation_cmd) + result.push_back(*mutation_cmd); + return result; +} + } diff --git a/dbms/src/Storages/AlterCommands.h b/dbms/src/Storages/AlterCommands.h index e547752fa09..fbe29800b34 100644 --- a/dbms/src/Storages/AlterCommands.h +++ b/dbms/src/Storages/AlterCommands.h @@ -4,6 +4,7 @@ #include #include #include +#include #include @@ -18,6 +19,8 @@ class ASTAlterCommand; /// Adding Nested columns is not expanded to add individual columns. struct AlterCommand { + ASTPtr ast; /// The AST of the whole command + enum Type { ADD_COLUMN, @@ -96,11 +99,15 @@ struct AlterCommand /// in each part on disk (it's not lightweight alter). bool isModifyingData() const; + bool isRequireMutationStage(const StorageInMemoryMetadata & metadata) const; + /// Checks that only settings changed by alter bool isSettingsAlter() const; /// Checks that only comment changed by alter bool isCommentAlter() const; + + std::optional tryConvertToMutationCommand(const StorageInMemoryMetadata & metadata) const; }; /// Return string representation of AlterCommand::Type @@ -136,6 +143,10 @@ public: /// At least one command modify comments. bool isCommentAlter() const; + + MutationCommands getMutationCommands(const StorageInMemoryMetadata & metadata) const; }; + +MutationCommands extractMutationCommandsFromAlterCommands(const StorageInMemoryMetadata & metadata, AlterCommands & commands); } diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.cpp b/dbms/src/Storages/MergeTree/MergeTreeData.cpp index a978c72caad..a9fb732e81d 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeData.cpp @@ -1784,6 +1784,7 @@ void MergeTreeData::alterDataPart( new_checksums.files[it.second] = add_checksums.files[it.first]; } + /// NOTE(alesap) Don't miss this /// Write the checksums to the temporary file. if (!part->checksums.empty()) { diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp index 05db73ce215..3295c863ef9 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp @@ -945,16 +945,12 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor context_for_reading.getSettingsRef().max_threads = 1; std::vector commands_for_part; - std::copy_if( - std::cbegin(commands), std::cend(commands), - std::back_inserter(commands_for_part), - [&] (const MutationCommand & command) - { - return command.partition == nullptr || - future_part.parts[0]->info.partition_id == data.getPartitionIDFromQuery( - command.partition, context_for_reading); - }); - + for (const auto & command : commands) + { + if (command.partition == nullptr || future_part.parts[0]->info.partition_id == data.getPartitionIDFromQuery( + command.partition, context_for_reading)) + commands_for_part.emplace_back(command); + } if (!isStorageTouchedByMutations(storage_from_source_part, commands_for_part, context_for_reading)) { @@ -1061,7 +1057,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor indices_recalc_syntax, context).getActions(false); /// We can update only one column, but some skip idx expression may depend on several - /// columns (c1 + c2 * c3). It works because in stream was created with help of + /// columns (c1 + c2 * c3). It works because this stream was created with help of /// MutationsInterpreter which knows about skip indices and stream 'in' already has /// all required columns. /// TODO move this logic to single place. diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.cpp index 29878cc064e..0599a35905d 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.cpp @@ -65,6 +65,12 @@ void ReplicatedMergeTreeLogEntryData::writeText(WriteBuffer & out) const << new_part_name; break; + case FINISH_ALTER: /// Just make local /metadata and /columns consistent with global + out << "alter\n"; + for (const String & s : source_parts) + out << s << '\n'; + out << "finish"; + break; default: throw Exception("Unknown log entry type: " + DB::toString(type), ErrorCodes::LOGICAL_ERROR); } @@ -152,6 +158,18 @@ void ReplicatedMergeTreeLogEntryData::readText(ReadBuffer & in) >> new_part_name; source_parts.push_back(source_part); } + else if (type_str == "alter") + { + type = FINISH_ALTER; + while (!in.eof()) + { + String s; + in >> s >> "\n"; + if (s == "finish") + break; + source_parts.push_back(s); + } + } in >> "\n"; diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.h b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.h index ca8c9315fa9..7d63c20ce9b 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.h +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.h @@ -37,6 +37,7 @@ struct ReplicatedMergeTreeLogEntryData CLEAR_INDEX, /// Drop specific index from specified partition. REPLACE_RANGE, /// Drop certain range of partitions and replace them by new ones MUTATE_PART, /// Apply one or several mutations to the part. + FINISH_ALTER, /// Apply one or several alter modifications to part }; static String typeToString(Type type) @@ -50,6 +51,7 @@ struct ReplicatedMergeTreeLogEntryData case ReplicatedMergeTreeLogEntryData::CLEAR_INDEX: return "CLEAR_INDEX"; case ReplicatedMergeTreeLogEntryData::REPLACE_RANGE: return "REPLACE_RANGE"; case ReplicatedMergeTreeLogEntryData::MUTATE_PART: return "MUTATE_PART"; + case ReplicatedMergeTreeLogEntryData::FINISH_ALTER: return "FINISH_ALTER"; default: throw Exception("Unknown log entry type: " + DB::toString(type), ErrorCodes::LOGICAL_ERROR); } diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index 7fd08788704..41e1d408b52 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -557,6 +557,11 @@ static Names getPartNamesToMutate( } +Names getPartNamesToMutate(ReplicatedMergeTreeMutationEntry & entry) const +{ + return getPartNamesToMutate(entry, current_parts); +} + void ReplicatedMergeTreeQueue::updateMutations(zkutil::ZooKeeperPtr zookeeper, Coordination::WatchCallback watch_callback) { std::lock_guard lock(update_mutations_mutex); @@ -1001,6 +1006,21 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry( } } + if (entry.type == LogEntry::FINISH_ALTER) + { + for (const auto & name : entry.source_parts) + { + if (future_parts.count(name)) + { + String reason = "Not altering storage because part " + name + + " is not ready yet (log entry for that part is being processed)."; + LOG_TRACE(log, reason); + out_postpone_reason = reason; + return false; + } + } + } + return true; } diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h index 4e199068667..6ed2f1889ed 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h @@ -339,6 +339,8 @@ public: /// Adds a subscriber SubscriberHandler addSubscriber(SubscriberCallBack && callback); + Names getPartNamesToMutate(ReplicatedMergeTreeMutationEntry & entry) const; + struct Status { UInt32 future_parts; diff --git a/dbms/src/Storages/MutationCommands.h b/dbms/src/Storages/MutationCommands.h index 96ebd30f254..ce84c08e631 100644 --- a/dbms/src/Storages/MutationCommands.h +++ b/dbms/src/Storages/MutationCommands.h @@ -2,6 +2,7 @@ #include #include +#include #include #include @@ -25,7 +26,8 @@ struct MutationCommand EMPTY, /// Not used. DELETE, UPDATE, - MATERIALIZE_INDEX + MATERIALIZE_INDEX, + CAST /// for ALTER MODIFY column }; Type type = EMPTY; @@ -40,6 +42,10 @@ struct MutationCommand String index_name; ASTPtr partition; + /// For cast + String column_name; + DataTypePtr data_type; + static std::optional parse(ASTAlterCommand * command); }; diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index 67b9cbd5ca4..0fa52b28ba1 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -977,6 +977,10 @@ bool StorageReplicatedMergeTree::executeLogEntry(LogEntry & entry) { do_fetch = !tryExecutePartMutation(entry); } + else if (entry.type == LogEntry::FINISH_ALTER) + { + tryFinishAlter(entry); + } else { throw Exception("Unexpected log entry type: " + toString(static_cast(entry.type)), ErrorCodes::LOGICAL_ERROR); @@ -1152,6 +1156,72 @@ bool StorageReplicatedMergeTree::tryExecuteMerge(const LogEntry & entry) } +bool StorageReplicatedMergeTree::tryFinishAlter(const StorageReplicatedMergeTree::LogEntry & entry) +{ + auto zookeeper = getZooKeeper(); + + String columns_path = zookeeper_path + "/columns"; + auto columns_znode = zookeeper->get(columns_path); + if (!columns_znode.exists) + throw Exception(columns_path + " doesn't exist", ErrorCodes::NOT_FOUND_NODE); + int32_t columns_version = columns_znode.stat.version; + + String metadata_path = zookeeper_path + "/metadata"; + auto metadata_znode = zookeeper->get(metadata_path); + if (!metadata_znode.exists) + throw Exception(metadata_path + " doesn't exist", ErrorCodes::NOT_FOUND_NODE); + int32_t metadata_version = metadata_znode.stat.version; + + const bool changed_columns_version = (columns_version != storage.columns_version); + const bool changed_metadata_version = (metadata_version != storage.metadata_version); + + if (!(changed_columns_version || changed_metadata_version)) + return; + + const String & columns_str = columns_znode.contents; + auto columns_in_zk = ColumnsDescription::parse(columns_str); + + const String & metadata_str = metadata_znode.contents; + auto metadata_in_zk = ReplicatedMergeTreeTableMetadata::parse(metadata_str); + auto metadata_diff = ReplicatedMergeTreeTableMetadata(storage).checkAndFindDiff(metadata_in_zk, /* allow_alter = */ true); + + MergeTreeData::DataParts parts; + + /// If metadata nodes have changed, we will update table structure locally. + if (changed_columns_version || changed_metadata_version) + { + LOG_INFO(log, "Version of metadata nodes in ZooKeeper changed. Waiting for structure write lock."); + + auto table_lock = storage.lockExclusively(RWLockImpl::NO_QUERY); + + if (columns_in_zk == storage.getColumns() && metadata_diff.empty()) + { + LOG_INFO( + log, + "Metadata nodes changed in ZooKeeper, but their contents didn't change. " + "Most probably it is a cyclic ALTER."); + } + else + { + LOG_INFO(log, "Metadata changed in ZooKeeper. Applying changes locally."); + + storage.setTableStructure(std::move(columns_in_zk), metadata_diff); + + LOG_INFO(log, "Applied changes to the metadata of the table."); + } + + columns_version = columns_version; + metadata_version = metadata_version; + + recalculateColumnSizes(); + /// Update metadata ZK nodes for a specific replica. + if (changed_columns_version) + zookeeper->set(replica_path + "/columns", columns_str); + if (changed_metadata_version) + zookeeper->set(replica_path + "/metadata", metadata_str); + } +} + bool StorageReplicatedMergeTree::tryExecutePartMutation(const StorageReplicatedMergeTree::LogEntry & entry) { const String & source_part_name = entry.source_parts.at(0); @@ -3199,6 +3269,7 @@ void StorageReplicatedMergeTree::alter( const String current_database_name = getDatabaseName(); const String current_table_name = getTableName(); + auto maybe_mutation_commands = params.getMutationCommands(getInMemoryMetadata()); /// We cannot check this alter commands with method isModifyingData() /// because ReplicatedMergeTree stores both columns and metadata for @@ -3218,31 +3289,6 @@ void StorageReplicatedMergeTree::alter( return; } - /// Alter is done by modifying the metadata nodes in ZK that are shared between all replicas - /// (/columns, /metadata). We set contents of the shared nodes to the new values and wait while - /// replicas asynchronously apply changes (see ReplicatedMergeTreeAlterThread.cpp) and modify - /// their respective replica metadata nodes (/replicas//columns, /replicas//metadata). - - struct ChangedNode - { - ChangedNode(const String & table_path_, String name_, String new_value_) - : table_path(table_path_), name(std::move(name_)), shared_path(table_path + "/" + name) - , new_value(std::move(new_value_)) - {} - - const String & table_path; - String name; - - String shared_path; - - String getReplicaPath(const String & replica) const - { - return table_path + "/replicas/" + replica + "/" + name; - } - - String new_value; - int32_t new_version = -1; /// Initialization is to suppress (useless) false positive warning found by cppcheck. - }; auto ast_to_str = [](ASTPtr query) -> String { @@ -3251,9 +3297,6 @@ void StorageReplicatedMergeTree::alter( return queryToString(query); }; - /// /columns and /metadata nodes - std::vector changed_nodes; - { /// Just to read current structure. Alter will be done in separate thread. auto table_lock = lockStructureForShare(false, query_context.getCurrentQueryId()); @@ -3309,202 +3352,219 @@ void StorageReplicatedMergeTree::alter( table_lock_holder.release(); - /// Wait until all replicas will apply ALTER. + ReplicatedMergeTreeLogEntryData entry; + entry.type = LogEntry::FINISH_ALTER; + entry.source_replica = replica_name; - for (const auto & node : changed_nodes) + if (maybe_mutation_commands) { - Coordination::Stat stat; - /// Subscribe to change of shared ZK metadata nodes, to finish waiting if someone will do another ALTER. - if (!getZooKeeper()->exists(node.shared_path, &stat, alter_query_event)) - throw Exception(node.shared_path + " doesn't exist", ErrorCodes::NOT_FOUND_NODE); - - if (stat.version != node.new_version) - { - LOG_WARNING(log, node.shared_path + " changed before this ALTER finished; " + - "overlapping ALTER-s are fine but use caution with nontransitive changes"); - return; - } + ReplicatedMergeTreeMutationEntry entry = mutateImpl(*maybe_mutation_commands, context); + entry.source_parts = queue.getPartNamesToMutate(entry); } - Strings replicas = getZooKeeper()->getChildren(zookeeper_path + "/replicas"); + entry.new_part_name = new_part_name; + entry.create_time = time(nullptr); - std::set inactive_replicas; - std::set timed_out_replicas; + zookeeper->create(zookeeper_path + "/log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential); - time_t replication_alter_columns_timeout = query_context.getSettingsRef().replication_alter_columns_timeout; + waitForAllReplicasToProcessLogEntry(entry); - /// This code is quite similar with waitMutationToFinishOnReplicas - /// but contains more complicated details (versions manipulations, multiple nodes, etc.). - /// It will be removed soon in favor of alter-modify implementation on top of mutations. - /// TODO (alesap) - for (const String & replica : replicas) - { - LOG_DEBUG(log, "Waiting for " << replica << " to apply changes"); + ///// Wait until all replicas will apply ALTER. - while (!partial_shutdown_called) - { - auto zookeeper = getZooKeeper(); + //for (const auto & node : changed_nodes) + //{ + // Coordination::Stat stat; + // /// Subscribe to change of shared ZK metadata nodes, to finish waiting if someone will do another ALTER. + // if (!getZooKeeper()->exists(node.shared_path, &stat, alter_query_event)) + // throw Exception(node.shared_path + " doesn't exist", ErrorCodes::NOT_FOUND_NODE); - /// Replica could be inactive. - if (!zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/is_active")) - { - LOG_WARNING(log, "Replica " << replica << " is not active during ALTER query." - " ALTER will be done asynchronously when replica becomes active."); + // if (stat.version != node.new_version) + // { + // LOG_WARNING(log, node.shared_path + " changed before this ALTER finished; " + + // "overlapping ALTER-s are fine but use caution with nontransitive changes"); + // return; + // } + //} - inactive_replicas.emplace(replica); - break; - } + //Strings replicas = getZooKeeper()->getChildren(zookeeper_path + "/replicas"); - struct ReplicaNode - { - explicit ReplicaNode(String path_) : path(std::move(path_)) {} + //std::set inactive_replicas; + //std::set timed_out_replicas; - String path; - String value; - int32_t version = -1; - }; + //time_t replication_alter_columns_timeout = query_context.getSettingsRef().replication_alter_columns_timeout; - std::vector replica_nodes; - for (const auto & node : changed_nodes) - replica_nodes.emplace_back(node.getReplicaPath(replica)); + ///// This code is quite similar with waitMutationToFinishOnReplicas + ///// but contains more complicated details (versions manipulations, multiple nodes, etc.). + ///// It will be removed soon in favor of alter-modify implementation on top of mutations. + ///// TODO (alesap) + //for (const String & replica : replicas) + //{ + // LOG_DEBUG(log, "Waiting for " << replica << " to apply changes"); - bool replica_was_removed = false; - for (auto & node : replica_nodes) - { - Coordination::Stat stat; + // while (!partial_shutdown_called) + // { + // auto zookeeper = getZooKeeper(); - /// Replica could has been removed. - if (!zookeeper->tryGet(node.path, node.value, &stat)) - { - LOG_WARNING(log, replica << " was removed"); - replica_was_removed = true; - break; - } + // /// Replica could be inactive. + // if (!zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/is_active")) + // { + // LOG_WARNING(log, "Replica " << replica << " is not active during ALTER query." + // " ALTER will be done asynchronously when replica becomes active."); - node.version = stat.version; - } + // inactive_replicas.emplace(replica); + // break; + // } - if (replica_was_removed) - break; + // struct ReplicaNode + // { + // explicit ReplicaNode(String path_) : path(std::move(path_)) {} - bool alter_was_applied = true; - for (size_t i = 0; i < replica_nodes.size(); ++i) - { - if (replica_nodes[i].value != changed_nodes[i].new_value) - { - alter_was_applied = false; - break; - } - } + // String path; + // String value; + // int32_t version = -1; + // }; - /// The ALTER has been successfully applied. - if (alter_was_applied) - break; + // std::vector replica_nodes; + // for (const auto & node : changed_nodes) + // replica_nodes.emplace_back(node.getReplicaPath(replica)); - for (const auto & node : changed_nodes) - { - Coordination::Stat stat; - if (!zookeeper->exists(node.shared_path, &stat)) - throw Exception(node.shared_path + " doesn't exist", ErrorCodes::NOT_FOUND_NODE); + // bool replica_was_removed = false; + // for (auto & node : replica_nodes) + // { + // Coordination::Stat stat; - if (stat.version != node.new_version) - { - LOG_WARNING(log, node.shared_path + " changed before this ALTER finished; " - "overlapping ALTER-s are fine but use caution with nontransitive changes"); - return; - } - } + // /// Replica could has been removed. + // if (!zookeeper->tryGet(node.path, node.value, &stat)) + // { + // LOG_WARNING(log, replica << " was removed"); + // replica_was_removed = true; + // break; + // } - bool replica_nodes_changed_concurrently = false; - for (const auto & replica_node : replica_nodes) - { - Coordination::Stat stat; - if (!zookeeper->exists(replica_node.path, &stat, alter_query_event)) - { - LOG_WARNING(log, replica << " was removed"); - replica_was_removed = true; - break; - } + // node.version = stat.version; + // } - if (stat.version != replica_node.version) - { - replica_nodes_changed_concurrently = true; - break; - } - } + // if (replica_was_removed) + // break; - if (replica_was_removed) - break; + // bool alter_was_applied = true; + // for (size_t i = 0; i < replica_nodes.size(); ++i) + // { + // if (replica_nodes[i].value != changed_nodes[i].new_value) + // { + // alter_was_applied = false; + // break; + // } + // } - if (replica_nodes_changed_concurrently) - continue; + // /// The ALTER has been successfully applied. + // if (alter_was_applied) + // break; - /// alter_query_event subscribed with zookeeper watch callback to /repliacs/{replica}/metadata - /// and /replicas/{replica}/columns nodes for current relica + shared nodes /columns and /metadata, - /// which is common for all replicas. If changes happen with this nodes (delete, set and create) - /// than event will be notified and wait will be interrupted. - /// - /// ReplicatedMergeTreeAlterThread responsible for local /replicas/{replica}/metadata and - /// /replicas/{replica}/columns changes. Shared /columns and /metadata nodes can be changed by *newer* - /// concurrent alter from other replica. First of all it will update shared nodes and we will have no - /// ability to identify, that our *current* alter finshed. So we cannot do anything better than just - /// return from *current* alter with success result. - if (!replication_alter_columns_timeout) - { - alter_query_event->wait(); - /// Everything is fine. - } - else if (alter_query_event->tryWait(replication_alter_columns_timeout * 1000)) - { - /// Everything is fine. - } - else - { - LOG_WARNING(log, "Timeout when waiting for replica " << replica << " to apply ALTER." - " ALTER will be done asynchronously."); + // for (const auto & node : changed_nodes) + // { + // Coordination::Stat stat; + // if (!zookeeper->exists(node.shared_path, &stat)) + // throw Exception(node.shared_path + " doesn't exist", ErrorCodes::NOT_FOUND_NODE); - timed_out_replicas.emplace(replica); - break; - } - } + // if (stat.version != node.new_version) + // { + // LOG_WARNING(log, node.shared_path + " changed before this ALTER finished; " + // "overlapping ALTER-s are fine but use caution with nontransitive changes"); + // return; + // } + // } - if (partial_shutdown_called) - throw Exception("Alter is not finished because table shutdown was called. Alter will be done after table restart.", - ErrorCodes::UNFINISHED); + // bool replica_nodes_changed_concurrently = false; + // for (const auto & replica_node : replica_nodes) + // { + // Coordination::Stat stat; + // if (!zookeeper->exists(replica_node.path, &stat, alter_query_event)) + // { + // LOG_WARNING(log, replica << " was removed"); + // replica_was_removed = true; + // break; + // } - if (!inactive_replicas.empty() || !timed_out_replicas.empty()) - { - std::stringstream exception_message; - exception_message << "Alter is not finished because"; + // if (stat.version != replica_node.version) + // { + // replica_nodes_changed_concurrently = true; + // break; + // } + // } - if (!inactive_replicas.empty()) - { - exception_message << " some replicas are inactive right now"; + // if (replica_was_removed) + // break; - for (auto it = inactive_replicas.begin(); it != inactive_replicas.end(); ++it) - exception_message << (it == inactive_replicas.begin() ? ": " : ", ") << *it; - } + // if (replica_nodes_changed_concurrently) + // continue; - if (!timed_out_replicas.empty() && !inactive_replicas.empty()) - exception_message << " and"; + // /// alter_query_event subscribed with zookeeper watch callback to /repliacs/{replica}/metadata + // /// and /replicas/{replica}/columns nodes for current relica + shared nodes /columns and /metadata, + // /// which is common for all replicas. If changes happen with this nodes (delete, set and create) + // /// than event will be notified and wait will be interrupted. + // /// + // /// ReplicatedMergeTreeAlterThread responsible for local /replicas/{replica}/metadata and + // /// /replicas/{replica}/columns changes. Shared /columns and /metadata nodes can be changed by *newer* + // /// concurrent alter from other replica. First of all it will update shared nodes and we will have no + // /// ability to identify, that our *current* alter finshed. So we cannot do anything better than just + // /// return from *current* alter with success result. + // if (!replication_alter_columns_timeout) + // { + // alter_query_event->wait(); + // /// Everything is fine. + // } + // else if (alter_query_event->tryWait(replication_alter_columns_timeout * 1000)) + // { + // /// Everything is fine. + // } + // else + // { + // LOG_WARNING(log, "Timeout when waiting for replica " << replica << " to apply ALTER." + // " ALTER will be done asynchronously."); - if (!timed_out_replicas.empty()) - { - exception_message << " timeout when waiting for some replicas"; + // timed_out_replicas.emplace(replica); + // break; + // } + // } - for (auto it = timed_out_replicas.begin(); it != timed_out_replicas.end(); ++it) - exception_message << (it == timed_out_replicas.begin() ? ": " : ", ") << *it; + // if (partial_shutdown_called) + // throw Exception("Alter is not finished because table shutdown was called. Alter will be done after table restart.", + // ErrorCodes::UNFINISHED); - exception_message << " (replication_alter_columns_timeout = " << replication_alter_columns_timeout << ")"; - } + // if (!inactive_replicas.empty() || !timed_out_replicas.empty()) + // { + // std::stringstream exception_message; + // exception_message << "Alter is not finished because"; - exception_message << ". Alter will be done asynchronously."; + // if (!inactive_replicas.empty()) + // { + // exception_message << " some replicas are inactive right now"; - throw Exception(exception_message.str(), ErrorCodes::UNFINISHED); - } - } + // for (auto it = inactive_replicas.begin(); it != inactive_replicas.end(); ++it) + // exception_message << (it == inactive_replicas.begin() ? ": " : ", ") << *it; + // } - LOG_DEBUG(log, "ALTER finished"); + // if (!timed_out_replicas.empty() && !inactive_replicas.empty()) + // exception_message << " and"; + + // if (!timed_out_replicas.empty()) + // { + // exception_message << " timeout when waiting for some replicas"; + + // for (auto it = timed_out_replicas.begin(); it != timed_out_replicas.end(); ++it) + // exception_message << (it == timed_out_replicas.begin() ? ": " : ", ") << *it; + + // exception_message << " (replication_alter_columns_timeout = " << replication_alter_columns_timeout << ")"; + // } + + // exception_message << ". Alter will be done asynchronously."; + + // throw Exception(exception_message.str(), ErrorCodes::UNFINISHED); + // } + //} + + //LOG_DEBUG(log, "ALTER finished"); } void StorageReplicatedMergeTree::alterPartition(const ASTPtr & query, const PartitionCommands & commands, const Context & query_context) @@ -4457,6 +4517,11 @@ void StorageReplicatedMergeTree::fetchPartition(const ASTPtr & partition, const void StorageReplicatedMergeTree::mutate(const MutationCommands & commands, const Context & query_context) +{ + mutateImpl(commands, context); +} + +StorageReplicatedMergeTree::mutateImpl(const MutationCommands & commands, const Context & query_context) { /// Overview of the mutation algorithm. /// @@ -4572,6 +4637,7 @@ void StorageReplicatedMergeTree::mutate(const MutationCommands & commands, const waitMutationToFinishOnReplicas(replicas, entry.znode_name); } + return entry; } std::vector StorageReplicatedMergeTree::getMutationsStatus() const diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.h b/dbms/src/Storages/StorageReplicatedMergeTree.h index 60c2ea0b870..0ccdf046892 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.h +++ b/dbms/src/Storages/StorageReplicatedMergeTree.h @@ -110,6 +110,7 @@ public: void alterPartition(const ASTPtr & query, const PartitionCommands & commands, const Context & query_context) override; void mutate(const MutationCommands & commands, const Context & context) override; + ReplicatedMergeTreeMutationEntry mutateImpl(const MutationCommands & commands, const Context & context); std::vector getMutationsStatus() const override; CancellationCode killMutation(const String & mutation_id) override; @@ -382,6 +383,8 @@ private: /// Do the merge or recommend to make the fetch instead of the merge bool tryExecuteMerge(const LogEntry & entry); + bool tryFinishAlter(const LogEntry & entry); + bool tryExecutePartMutation(const LogEntry & entry); @@ -431,6 +434,9 @@ private: /// Checks if some mutations are done and marks them as done. void mutationsFinalizingTask(); + /// finish alter after all heavy processes finished + void finishAlter(); + /** Write the selected parts to merge into the log, * Call when merge_selecting_mutex is locked. * Returns false if any part is not in ZK.