diff --git a/dbms/src/Common/ErrorCodes.cpp b/dbms/src/Common/ErrorCodes.cpp index 718fc0cbf89..6f64b56b4ee 100644 --- a/dbms/src/Common/ErrorCodes.cpp +++ b/dbms/src/Common/ErrorCodes.cpp @@ -482,6 +482,7 @@ namespace ErrorCodes extern const int UNKNOWN_ACCESS_TYPE = 508; extern const int INVALID_GRANT = 509; extern const int CACHE_DICTIONARY_UPDATE_FAIL = 510; + extern const int CANNOT_ASSIGN_ALTER = 512; extern const int KEEPER_EXCEPTION = 999; extern const int POCO_EXCEPTION = 1000; diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp index bf410011fef..1e22b88ab7f 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp @@ -255,7 +255,7 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(zkutil::ZooKeeperPtr & zoo log_entry.toString(), zkutil::CreateMode::PersistentSequential)); - ops.emplace_back(zkutil::makeCheckRequest(storage.zookeeper_path + "/columns", storage.getMetadataVersion())); + ops.emplace_back(zkutil::makeCheckRequest(storage.zookeeper_path + "/metadata", storage.getMetadataVersion())); ops.emplace_back(zkutil::makeSetRequest(storage.zookeeper_path + "/block_numbers/" + part->info.partition_id, "", -1)); /// Deletes the information that the block number is used for writing. @@ -311,7 +311,6 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(zkutil::ZooKeeperPtr & zoo transaction.commit(); storage.merge_selecting_task->schedule(); - LOG_DEBUG(log, "COMMITED INSERT WITH VERSION:" << storage.getMetadataVersion() << " of part " << part_name); /// Lock nodes have been already deleted, do not delete them in destructor block_number_lock->assumeUnlocked(); } diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index ec34f219e63..419c525faa3 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -1045,7 +1045,10 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry( { if (!alter_sequence.canExecuteMetaAlter(entry.alter_version, state_lock)) { - out_postpone_reason = "Alter is not started, because more old alter is executing right now"; + int head_alter = alter_sequence.getHeadAlterVersion(state_lock); + out_postpone_reason = "Cannot execute alter data with version: " + std::to_string(entry.alter_version) + + " because another alter " + std::to_string(head_alter) + + " must be executed before"; return false; } } @@ -1054,7 +1057,15 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry( { if (!alter_sequence.canExecuteDataAlter(entry.alter_version, state_lock)) { - out_postpone_reason = "Cannot execute alter data with version: " + std::to_string(entry.alter_version) + " because metadata alter is not finished yet"; + int head_alter = alter_sequence.getHeadAlterVersion(state_lock); + if (head_alter == entry.alter_version) + out_postpone_reason = "Cannot execute alter data with version: " + + std::to_string(entry.alter_version) + " because metadata still not altered"; + else + out_postpone_reason = "Cannot execute alter data with version: " + std::to_string(entry.alter_version) + + " because another alter " + std::to_string(head_alter) + " must be executed before"; + + return false; } } diff --git a/dbms/src/Storages/MergeTree/ReplicatedQueueAlterState.h b/dbms/src/Storages/MergeTree/ReplicatedQueueAlterState.h index a13b20a44a0..d09dbfb8b81 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedQueueAlterState.h +++ b/dbms/src/Storages/MergeTree/ReplicatedQueueAlterState.h @@ -17,31 +17,34 @@ namespace ErrorCodes class AlterSequence { private: - struct AlterInQueue + struct AlterState { bool metadata_finished = false; bool data_finished = false; - AlterInQueue() = default; + AlterState() = default; - AlterInQueue(bool metadata_finished_, bool data_finished_) + AlterState(bool metadata_finished_, bool data_finished_) : metadata_finished(metadata_finished_) , data_finished(data_finished_) { } }; - std::map queue_state; + std::map queue_state; public: - bool empty() const { - return queue_state.empty(); + int getHeadAlterVersion(std::lock_guard & /*state_lock*/) const + { + if (!queue_state.empty()) + return queue_state.begin()->first; + return -1; } void addMutationForAlter(int alter_version, std::lock_guard & /*state_lock*/) { if (!queue_state.count(alter_version)) - queue_state.emplace(alter_version, AlterInQueue(true, false)); + queue_state.emplace(alter_version, AlterState{true, false}); else queue_state[alter_version].data_finished = false; } @@ -49,7 +52,7 @@ public: void addMetadataAlter(int alter_version, std::lock_guard & /*state_lock*/) { if (!queue_state.count(alter_version)) - queue_state.emplace(alter_version, AlterInQueue(false, true)); + queue_state.emplace(alter_version, AlterState{false, true}); else queue_state[alter_version].metadata_finished = false; } @@ -80,14 +83,19 @@ public: bool canExecuteDataAlter(int alter_version, std::lock_guard & /*state_lock*/) const { + if (!queue_state.count(alter_version)) return true; + return queue_state.at(alter_version).metadata_finished; } bool canExecuteMetaAlter(int alter_version, std::lock_guard & /*state_lock*/) const { - return queue_state.empty() || queue_state.begin()->first == alter_version; + if (queue_state.empty()) + return true; + + return queue_state.begin()->first == alter_version; } }; diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index 1f78d38e1a5..51c31365c02 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -113,6 +113,7 @@ namespace ErrorCodes extern const int KEEPER_EXCEPTION; extern const int ALL_REPLICAS_LOST; extern const int REPLICA_STATUS_CHANGED; + extern const int CANNOT_ASSIGN_ALTER; } namespace ActionLocks @@ -999,7 +1000,7 @@ bool StorageReplicatedMergeTree::executeLogEntry(LogEntry & entry) } else if (entry.type == LogEntry::ALTER_METADATA) { - executeMetadataAlter(entry); + return executeMetadataAlter(entry); } else { @@ -3246,7 +3247,7 @@ bool StorageReplicatedMergeTree::executeMetadataAlter(const StorageReplicatedMer setTableStructure(std::move(columns_from_entry), metadata_diff); metadata_version = entry.alter_version; - LOG_INFO(log, "Applied changes to the metadata of the table. Setting metadata version:" << metadata_version); + LOG_INFO(log, "Applied changes to the metadata of the table. Current metadata version: " << metadata_version); } recalculateColumnSizes(); @@ -3277,7 +3278,6 @@ void StorageReplicatedMergeTree::alter( return; } - auto ast_to_str = [](ASTPtr query) -> String { if (!query) return ""; @@ -3289,22 +3289,17 @@ void StorageReplicatedMergeTree::alter( ReplicatedMergeTreeLogEntryData entry; std::optional mutation_znode; - { - Coordination::Requests ops; + while (true) { /// We can safely read structure, because we guarded with alter_intention_lock if (is_readonly) throw Exception("Can't ALTER readonly table", ErrorCodes::TABLE_IS_READ_ONLY); - Coordination::Stat metadata_stat; - String metadata_in_zk = zookeeper->get(zookeeper_path + "/metadata", &metadata_stat); StorageInMemoryMetadata metadata = getInMemoryMetadata(); params.apply(metadata); - String new_columns_str = metadata.columns.toString(); - ops.emplace_back(zkutil::makeSetRequest(zookeeper_path + "/columns", new_columns_str, metadata_version)); ReplicatedMergeTreeTableMetadata new_metadata(*this); if (ast_to_str(metadata.order_by_ast) != ast_to_str(order_by_ast)) @@ -3321,8 +3316,14 @@ void StorageReplicatedMergeTree::alter( if (new_constraints_str != getConstraints().toString()) new_metadata.constraints = new_constraints_str; + Coordination::Requests ops; + String new_metadata_str = new_metadata.toString(); - ops.emplace_back(zkutil::makeSetRequest(zookeeper_path + "/metadata", new_metadata_str, metadata_stat.version)); + ops.emplace_back(zkutil::makeSetRequest(zookeeper_path + "/metadata", new_metadata_str, metadata_version)); + + String new_columns_str = metadata.columns.toString(); + ops.emplace_back(zkutil::makeSetRequest(zookeeper_path + "/columns", new_columns_str, -1)); + { lockStructureExclusively(table_lock_holder, query_context.getCurrentQueryId()); @@ -3356,7 +3357,7 @@ void StorageReplicatedMergeTree::alter( ReplicatedMergeTreeMutationEntry mutation_entry; mutation_entry.source_replica = replica_name; mutation_entry.commands = maybe_mutation_commands; - mutation_entry.alter_version = metadata_version + 1; + mutation_entry.alter_version = metadata_version + 1; Coordination::Stat mutations_stat; zookeeper->get(mutations_path, &mutations_stat); @@ -3400,21 +3401,33 @@ void StorageReplicatedMergeTree::alter( queue.pullLogsToQueue(zookeeper); if (entry.have_mutation) { - String alter_path = dynamic_cast(*results[results.size() - 3 - partitions_count]).path_created; + /// Record in replication /log + String alter_path = dynamic_cast(*results[2]).path_created; entry.znode_name = alter_path.substr(alter_path.find_last_of('/') + 1); + /// Record in /mutations String mutation_path = dynamic_cast(*results.back()).path_created; mutation_znode = mutation_path.substr(mutation_path.find_last_of('/') + 1); } else { + /// Record in replication /log String alter_path = dynamic_cast(*results.back()).path_created; entry.znode_name = alter_path.substr(alter_path.find_last_of('/') + 1); } + break; + } + else if (rc == Coordination::ZBADVERSION) + { + if (dynamic_cast(*results[0]).error) + throw Exception("Metadata on replica is not up to date with common metadata in Zookeeper. Cannot alter", ErrorCodes::CANNOT_ASSIGN_ALTER); + + LOG_TRACE(log, "We have version conflict with inserts because of concurrent inserts. Will try to assign alter one more time."); + continue; } else { - throw Coordination::Exception("Cannot alter", rc); + throw Coordination::Exception("Alter cannot be assigned because of Zookeeper error", rc); } } @@ -4400,11 +4413,108 @@ void StorageReplicatedMergeTree::fetchPartition(const ASTPtr & partition, const void StorageReplicatedMergeTree::mutate(const MutationCommands & commands, const Context & query_context) { - auto zookeeper = getZooKeeper(); - Coordination::Requests requests; + /// Overview of the mutation algorithm. + /// + /// When the client executes a mutation, this method is called. It acquires block numbers in all + /// partitions, saves them in the mutation entry and writes the mutation entry to a new ZK node in + /// the /mutations folder. This block numbers are needed to determine which parts should be mutated and + /// which shouldn't (parts inserted after the mutation will have the block number higher than the + /// block number acquired by the mutation in that partition and so will not be mutatied). + /// This block number is called "mutation version" in that partition. + /// + /// Mutation versions are acquired atomically in all partitions, so the case when an insert in some + /// partition has the block number higher than the mutation version but the following insert into another + /// partition acquires the block number lower than the mutation version in that partition is impossible. + /// Another important invariant: mutation entries appear in /mutations in the order of their mutation + /// versions (in any partition). This means that mutations form a sequence and we can execute them in + /// the order of their mutation versions and not worry that some mutation with the smaller version + /// will suddenly appear. + /// + /// During mutations individual parts are immutable - when we want to change the contents of a part + /// we prepare the new part and add it to MergeTreeData (the original part gets replaced). The fact that + /// we have mutated the part is recorded in the part->info.mutation field of MergeTreePartInfo. + /// The relation with the original part is preserved because the new part covers the same block range + /// as the original one. + /// + /// We then can for each part determine its "mutation version": the version of the last mutation in + /// the mutation sequence that we regard as already applied to that part. All mutations with the greater + /// version number will still need to be applied to that part. + /// + /// Execution of mutations is done asynchronously. All replicas watch the /mutations directory and + /// load new mutation entries as they appear (see mutationsUpdatingTask()). Next we need to determine + /// how to mutate individual parts consistently with part merges. This is done by the leader replica + /// (see mergeSelectingTask() and class ReplicatedMergeTreeMergePredicate for details). Important + /// invariants here are that a) all source parts for a single merge must have the same mutation version + /// and b) any part can be mutated only once or merged only once (e.g. once we have decided to mutate + /// a part then we need to execute that mutation and can assign merges only to the new part and not to the + /// original part). Multiple consecutive mutations can be executed at once (without writing the + /// intermediate result to a part). + /// + /// Leader replica records its decisions to the replication log (/log directory in ZK) in the form of + /// MUTATE_PART entries and all replicas then execute them in the background pool + /// (see tryExecutePartMutation() function). When a replica encounters a MUTATE_PART command, it is + /// guaranteed that the corresponding mutation entry is already loaded (when we pull entries from + /// replication log into the replica queue, we also load mutation entries). Note that just as with merges + /// the replica can decide not to do the mutation locally and fetch the mutated part from another replica + /// instead. + /// + /// Mutations of individual parts are in fact pretty similar to merges, e.g. their assignment and execution + /// is governed by the same storage_settings. TODO: support a single "merge-mutation" operation when the data + /// read from the the source parts is first mutated on the fly to some uniform mutation version and then + /// merged to a resulting part. + /// + /// After all needed parts are mutated (i.e. all active parts have the mutation version greater than + /// the version of this mutation), the mutation is considered done and can be deleted. + + ReplicatedMergeTreeMutationEntry entry; + entry.source_replica = replica_name; + entry.commands = commands; + + String mutations_path = zookeeper_path + "/mutations"; + + /// Update the mutations_path node when creating the mutation and check its version to ensure that + /// nodes for mutations are created in the same order as the corresponding block numbers. + /// Should work well if the number of concurrent mutation requests is small. + while (true) + { + auto zookeeper = getZooKeeper(); + + Coordination::Stat mutations_stat; + zookeeper->get(mutations_path, &mutations_stat); + + EphemeralLocksInAllPartitions block_number_locks( + zookeeper_path + "/block_numbers", "block-", zookeeper_path + "/temp", *zookeeper); + + for (const auto & lock : block_number_locks.getLocks()) + entry.block_numbers[lock.partition_id] = lock.number; + + entry.create_time = time(nullptr); + + Coordination::Requests requests; + requests.emplace_back(zkutil::makeSetRequest(mutations_path, String(), mutations_stat.version)); + requests.emplace_back(zkutil::makeCreateRequest( + mutations_path + "/", entry.toString(), zkutil::CreateMode::PersistentSequential)); + + Coordination::Responses responses; + int32_t rc = zookeeper->tryMulti(requests, responses); + + if (rc == Coordination::ZOK) + { + const String & path_created = + dynamic_cast(responses[1].get())->path_created; + entry.znode_name = path_created.substr(path_created.find_last_of('/') + 1); + LOG_TRACE(log, "Created mutation with ID " << entry.znode_name); + break; + } + else if (rc == Coordination::ZBADVERSION) + { + LOG_TRACE(log, "Version conflict when trying to create a mutation node, retrying..."); + continue; + } + else + throw Coordination::Exception("Unable to create a mutation znode", rc); + } - ReplicatedMergeTreeMutationEntry entry = prepareMutationEntry(zookeeper, commands, requests); - mutateImpl(zookeeper, requests, entry); waitMutation(entry.znode_name, query_context.getSettingsRef().mutations_sync); }