diff --git a/dbms/src/Common/ErrorCodes.cpp b/dbms/src/Common/ErrorCodes.cpp index 6f64b56b4ee..46174c6b331 100644 --- a/dbms/src/Common/ErrorCodes.cpp +++ b/dbms/src/Common/ErrorCodes.cpp @@ -483,6 +483,7 @@ namespace ErrorCodes extern const int INVALID_GRANT = 509; extern const int CACHE_DICTIONARY_UPDATE_FAIL = 510; extern const int CANNOT_ASSIGN_ALTER = 512; + extern const int CONCURRENT_ALTER_IS_PROCESSING = 513; extern const int KEEPER_EXCEPTION = 999; extern const int POCO_EXCEPTION = 1000; diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp index 1e22b88ab7f..ce5b5212316 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp @@ -29,6 +29,7 @@ namespace ErrorCodes extern const int KEEPER_EXCEPTION; extern const int TIMEOUT_EXCEEDED; extern const int NO_ACTIVE_REPLICAS; + extern const int CONCURRENT_ALTER_IS_PROCESSING; } @@ -255,7 +256,14 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(zkutil::ZooKeeperPtr & zoo log_entry.toString(), zkutil::CreateMode::PersistentSequential)); + /// We check metadata_version has the same version as shared node. + /// In other case we may have parts, which nobody will alter. + /// ops.emplace_back(zkutil::makeCheckRequest(storage.zookeeper_path + "/metadata", storage.getMetadataVersion())); + + /// We update version of block_number/partition node to register fact of new insert. + /// If we want to be sure, that no inserts happend in some period of time, than we can receive + /// version of all partition nodes inside block numbers and then make check requirests in zookeeper transaction. ops.emplace_back(zkutil::makeSetRequest(storage.zookeeper_path + "/block_numbers/" + part->info.partition_id, "", -1)); /// Deletes the information that the block number is used for writing. @@ -347,6 +355,11 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(zkutil::ZooKeeperPtr & zoo throw Exception("Another quorum insert has been already started", ErrorCodes::UNSATISFIED_QUORUM_FOR_PREVIOUS_WRITE); } + else if (multi_code == Coordination::ZBADVERSION) + { + transaction.rollback(); + throw Exception("Current metadata version is not consistent with version in zookeeper. Concurrent alter of metadata is processing now, client must retry", ErrorCodes::CONCURRENT_ALTER_IS_PROCESSING); + } else { /// NOTE: We could be here if the node with the quorum existed, but was quickly removed. diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.h b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.h index 0284d3e99a6..ab96011f88a 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.h +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.h @@ -107,12 +107,18 @@ struct ReplicatedMergeTreeLogEntryData std::shared_ptr replace_range_entry; - //TODO(alesap) - int alter_version; - bool have_mutation; + /// ALTER METADATA and MUTATE PART command - String columns_str; - String metadata_str; + /// Version of metadata which will be set after this alter + /// Also present in MUTATE_PART command, to track mutations + /// required for complete alter execution. + int alter_version; /// May be equal to -1, if it's normal mutation, not metadata update. + + /// only ALTER METADATA command + bool have_mutation; /// If this alter requires additional mutation step, for data update + + String columns_str; /// New columns data corresponding to alter_version + String metadata_str; /// New metadata corresponding to alter_version /// Returns a set of parts that will appear after executing the entry + parts to block /// selection of merges. These parts are added to queue.virtual_parts. diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeMutationEntry.h b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeMutationEntry.h index eb5c1a5eab5..dbbf4f98a71 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeMutationEntry.h +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeMutationEntry.h @@ -13,6 +13,11 @@ namespace DB class ReadBuffer; class WriteBuffer; +/// Mutation entry in /mutations path in zookeeper. This record contains information about blocks +/// in patitions. We will mutatate all parts with left number less than this numbers. +/// +/// These entries processed separately from main replication /log, and produce other entries +/// -- MUTATE_PART. struct ReplicatedMergeTreeMutationEntry { void writeText(WriteBuffer & out) const; @@ -21,14 +26,24 @@ struct ReplicatedMergeTreeMutationEntry String toString() const; static ReplicatedMergeTreeMutationEntry parse(const String & str, String znode_name); + /// Name of znode (mutation-xxxxxxx) String znode_name; + /// Create time of znode time_t create_time = 0; + + /// Replica which initiated mutation String source_replica; + /// Accuired numbers of blocks + /// partition_id -> block_number std::map block_numbers; + + /// Mutation commands which will give to MUTATE_PART entries MutationCommands commands; + /// Version of metadata. Not equal to -1 only if this mutation + /// was created by ALTER MODIFY/DROP queries. int alter_version = -1; }; diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index 419c525faa3..8009e78f12c 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -150,7 +150,7 @@ void ReplicatedMergeTreeQueue::insertUnlocked( } } if (entry->type == LogEntry::ALTER_METADATA) - alter_sequence.addMetadataAlter(entry->alter_version, state_lock); + alter_chain.addMetadataAlter(entry->alter_version, state_lock); } @@ -226,7 +226,7 @@ void ReplicatedMergeTreeQueue::updateStateOnQueueEntryRemoval( } if (entry->type == LogEntry::ALTER_METADATA) - alter_sequence.finishMetadataAlter(entry->alter_version, entry->have_mutation, state_lock); + alter_chain.finishMetadataAlter(entry->alter_version, entry->have_mutation, state_lock); } else { @@ -696,7 +696,7 @@ void ReplicatedMergeTreeQueue::updateMutations(zkutil::ZooKeeperPtr zookeeper, C some_mutations_are_probably_done = true; if (entry->alter_version != -1) - alter_sequence.addMutationForAlter(entry->alter_version, state_lock); + alter_chain.addMutationForAlter(entry->alter_version, state_lock); } } @@ -1043,10 +1043,10 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry( if (entry.type == LogEntry::ALTER_METADATA) { - if (!alter_sequence.canExecuteMetaAlter(entry.alter_version, state_lock)) + if (!alter_chain.canExecuteMetaAlter(entry.alter_version, state_lock)) { - int head_alter = alter_sequence.getHeadAlterVersion(state_lock); - out_postpone_reason = "Cannot execute alter data with version: " + std::to_string(entry.alter_version) + int head_alter = alter_chain.getHeadAlterVersion(state_lock); + out_postpone_reason = "Cannot execute alter metadata with version: " + std::to_string(entry.alter_version) + " because another alter " + std::to_string(head_alter) + " must be executed before"; return false; @@ -1055,9 +1055,9 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry( if (entry.type == LogEntry::MUTATE_PART && entry.alter_version != -1) { - if (!alter_sequence.canExecuteDataAlter(entry.alter_version, state_lock)) + if (!alter_chain.canExecuteDataAlter(entry.alter_version, state_lock)) { - int head_alter = alter_sequence.getHeadAlterVersion(state_lock); + int head_alter = alter_chain.getHeadAlterVersion(state_lock); if (head_alter == entry.alter_version) out_postpone_reason = "Cannot execute alter data with version: " + std::to_string(entry.alter_version) + " because metadata still not altered"; @@ -1352,7 +1352,7 @@ bool ReplicatedMergeTreeQueue::tryFinalizeMutations(zkutil::ZooKeeperPtr zookeep LOG_INFO(log, "Seems like we jumped over mutation " << znode << " when downloaded part with bigger mutation number." << " It's OK, tasks for rest parts will be skipped, but probably a lot of mutations were executed concurrently on different replicas."); mutation.parts_to_do.clear(); - alter_sequence.finishDataAlter(mutation.entry->alter_version, lock); + alter_chain.finishDataAlter(mutation.entry->alter_version, lock); } } else if (mutation.parts_to_do.size() == 0) @@ -1393,7 +1393,7 @@ bool ReplicatedMergeTreeQueue::tryFinalizeMutations(zkutil::ZooKeeperPtr zookeep if (entry->alter_version != -1) { LOG_TRACE(log, "Finishing data alter with version " << entry->alter_version << " for entry " << entry->znode_name); - alter_sequence.finishDataAlter(entry->alter_version, lock); + alter_chain.finishDataAlter(entry->alter_version, lock); } } } diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h index 9ce9ba9d40c..3a9a93a8616 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h @@ -9,7 +9,7 @@ #include #include #include -#include +#include #include #include @@ -132,7 +132,7 @@ private: /// Provides only one simultaneous call to pullLogsToQueue. std::mutex pull_logs_to_queue_mutex; - AlterSequence alter_sequence; + ReplicatedQueueAlterChain alter_chain; /// List of subscribers /// A subscriber callback is called when an entry queue is deleted diff --git a/dbms/src/Storages/MergeTree/ReplicatedQueueAlterState.h b/dbms/src/Storages/MergeTree/ReplicatedQueueAlterChain.h similarity index 98% rename from dbms/src/Storages/MergeTree/ReplicatedQueueAlterState.h rename to dbms/src/Storages/MergeTree/ReplicatedQueueAlterChain.h index d09dbfb8b81..d8c917cac6c 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedQueueAlterState.h +++ b/dbms/src/Storages/MergeTree/ReplicatedQueueAlterChain.h @@ -14,7 +14,7 @@ namespace ErrorCodes extern const int LOGICAL_ERROR; } -class AlterSequence +class ReplicatedQueueAlterChain { private: struct AlterState diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index 51c31365c02..3e8ff411ecd 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -3261,7 +3261,6 @@ void StorageReplicatedMergeTree::alter( { assertNotReadonly(); - auto maybe_mutation_commands = params.getMutationCommands(getInMemoryMetadata()); auto table_id = getStorageID(); if (params.isSettingsAlter()) @@ -3286,78 +3285,86 @@ void StorageReplicatedMergeTree::alter( auto zookeeper = getZooKeeper(); - ReplicatedMergeTreeLogEntryData entry; - + std::optional alter_entry; std::optional mutation_znode; + while (true) { + /// Clear nodes from previous iteration + alter_entry.emplace(); + mutation_znode.emplace(); + /// We can safely read structure, because we guarded with alter_intention_lock if (is_readonly) throw Exception("Can't ALTER readonly table", ErrorCodes::TABLE_IS_READ_ONLY); - StorageInMemoryMetadata metadata = getInMemoryMetadata(); - params.apply(metadata); + StorageInMemoryMetadata current_metadata = getInMemoryMetadata(); + StorageInMemoryMetadata future_metadata = current_metadata; + params.apply(future_metadata); + ReplicatedMergeTreeTableMetadata future_metadata_in_zk(*this); + if (ast_to_str(future_metadata.order_by_ast) != ast_to_str(current_metadata.order_by_ast)) + future_metadata_in_zk.sorting_key = serializeAST(*extractKeyExpressionList(future_metadata.order_by_ast)); - ReplicatedMergeTreeTableMetadata new_metadata(*this); - if (ast_to_str(metadata.order_by_ast) != ast_to_str(order_by_ast)) - new_metadata.sorting_key = serializeAST(*extractKeyExpressionList(metadata.order_by_ast)); + if (ast_to_str(future_metadata.ttl_for_table_ast) != ast_to_str(current_metadata.ttl_for_table_ast)) + future_metadata_in_zk.ttl_table = serializeAST(*future_metadata.ttl_for_table_ast); - if (ast_to_str(metadata.ttl_for_table_ast) != ast_to_str(ttl_table_ast)) - new_metadata.ttl_table = serializeAST(*metadata.ttl_for_table_ast); + String new_indices_str = future_metadata.indices.toString(); + if (new_indices_str != current_metadata.indices.toString()) + future_metadata_in_zk.skip_indices = new_indices_str; - String new_indices_str = metadata.indices.toString(); - if (new_indices_str != getIndices().toString()) - new_metadata.skip_indices = new_indices_str; - - String new_constraints_str = metadata.constraints.toString(); - if (new_constraints_str != getConstraints().toString()) - new_metadata.constraints = new_constraints_str; + String new_constraints_str = future_metadata.constraints.toString(); + if (new_constraints_str != current_metadata.indices.toString()) + future_metadata_in_zk.constraints = new_constraints_str; Coordination::Requests ops; - String new_metadata_str = new_metadata.toString(); + String new_metadata_str = future_metadata_in_zk.toString(); ops.emplace_back(zkutil::makeSetRequest(zookeeper_path + "/metadata", new_metadata_str, metadata_version)); - String new_columns_str = metadata.columns.toString(); + String new_columns_str = future_metadata.columns.toString(); ops.emplace_back(zkutil::makeSetRequest(zookeeper_path + "/columns", new_columns_str, -1)); + if (ast_to_str(current_metadata.settings_ast) != ast_to_str(future_metadata.settings_ast)) { lockStructureExclusively(table_lock_holder, query_context.getCurrentQueryId()); - auto old_metadata = getInMemoryMetadata(); - old_metadata.settings_ast = metadata.settings_ast; - changeSettings(metadata.settings_ast, table_lock_holder); - global_context.getDatabase(table_id.database_name)->alterTable(query_context, table_id.table_name, old_metadata); + /// Just change settings + current_metadata.settings_ast = future_metadata.settings_ast; + changeSettings(current_metadata.settings_ast, table_lock_holder); + global_context.getDatabase(table_id.database_name)->alterTable(query_context, table_id.table_name, current_metadata); } - entry.type = LogEntry::ALTER_METADATA; - entry.source_replica = replica_name; - entry.metadata_str = new_metadata_str; - entry.columns_str = new_columns_str; - entry.alter_version = metadata_version + 1; + /// We can be sure, that in case of successfull commit in zookeeper our + /// version will increments by 1. Because we update with version check. + int new_metadata_version = metadata_version + 1; - entry.create_time = time(nullptr); - if (!maybe_mutation_commands.empty()) - entry.have_mutation = true; - else - entry.have_mutation = false; + alter_entry->type = LogEntry::ALTER_METADATA; + alter_entry->source_replica = replica_name; + alter_entry->metadata_str = new_metadata_str; + alter_entry->columns_str = new_columns_str; + alter_entry->alter_version = new_metadata_version; + alter_entry->create_time = time(nullptr); - ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential)); + auto maybe_mutation_commands = params.getMutationCommands(current_metadata); + alter_entry->have_mutation = !maybe_mutation_commands.empty(); + + ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/log/log-", alter_entry->toString(), zkutil::CreateMode::PersistentSequential)); std::optional lock_holder; - size_t partitions_count = 0; - if (!maybe_mutation_commands.empty()) + + /// No we will prepare mutations record + if (alter_entry->have_mutation) { String mutations_path = zookeeper_path + "/mutations"; ReplicatedMergeTreeMutationEntry mutation_entry; mutation_entry.source_replica = replica_name; mutation_entry.commands = maybe_mutation_commands; - mutation_entry.alter_version = metadata_version + 1; + mutation_entry.alter_version = new_metadata_version; Coordination::Stat mutations_stat; zookeeper->get(mutations_path, &mutations_stat); @@ -3382,7 +3389,6 @@ void StorageReplicatedMergeTree::alter( { mutation_entry.block_numbers[lock.partition_id] = lock.number; ops.emplace_back(zkutil::makeCheckRequest(zookeeper_path + "/block_numbers/" + lock.partition_id, partition_versions[lock.partition_id])); - partitions_count++; } mutation_entry.create_time = time(nullptr); @@ -3390,7 +3396,6 @@ void StorageReplicatedMergeTree::alter( ops.emplace_back(zkutil::makeSetRequest(mutations_path, String(), mutations_stat.version)); ops.emplace_back( zkutil::makeCreateRequest(mutations_path + "/", mutation_entry.toString(), zkutil::CreateMode::PersistentSequential)); - } Coordination::Responses results; @@ -3399,11 +3404,11 @@ void StorageReplicatedMergeTree::alter( if (rc == Coordination::ZOK) { queue.pullLogsToQueue(zookeeper); - if (entry.have_mutation) + if (alter_entry->have_mutation) { /// Record in replication /log String alter_path = dynamic_cast(*results[2]).path_created; - entry.znode_name = alter_path.substr(alter_path.find_last_of('/') + 1); + alter_entry->znode_name = alter_path.substr(alter_path.find_last_of('/') + 1); /// Record in /mutations String mutation_path = dynamic_cast(*results.back()).path_created; @@ -3413,7 +3418,7 @@ void StorageReplicatedMergeTree::alter( { /// Record in replication /log String alter_path = dynamic_cast(*results.back()).path_created; - entry.znode_name = alter_path.substr(alter_path.find_last_of('/') + 1); + alter_entry->znode_name = alter_path.substr(alter_path.find_last_of('/') + 1); } break; } @@ -3437,9 +3442,9 @@ void StorageReplicatedMergeTree::alter( std::vector unwaited; if (query_context.getSettingsRef().replication_alter_partitions_sync == 2) - unwaited = waitForAllReplicasToProcessLogEntry(entry, false); + unwaited = waitForAllReplicasToProcessLogEntry(*alter_entry, false); else if (query_context.getSettingsRef().replication_alter_partitions_sync == 1) - waitForReplicaToProcessLogEntry(replica_name, entry); + waitForReplicaToProcessLogEntry(replica_name, *alter_entry); if (!unwaited.empty()) throw Exception("Some replicas doesn't finish metadata alter", ErrorCodes::UNFINISHED); @@ -4534,111 +4539,6 @@ void StorageReplicatedMergeTree::waitMutation(const String & znode_name, size_t } } - -ReplicatedMergeTreeMutationEntry StorageReplicatedMergeTree::prepareMutationEntry( - zkutil::ZooKeeperPtr zookeeper, const MutationCommands & commands, Coordination::Requests & requests, int alter_version) const -{ - String mutations_path = zookeeper_path + "/mutations"; - - ReplicatedMergeTreeMutationEntry entry; - entry.source_replica = replica_name; - entry.commands = commands; - entry.alter_version = alter_version; - Coordination::Stat mutations_stat; - zookeeper->get(mutations_path, &mutations_stat); - - EphemeralLocksInAllPartitions block_number_locks(zookeeper_path + "/block_numbers", "block-", zookeeper_path + "/temp", *zookeeper); - - for (const auto & lock : block_number_locks.getLocks()) - entry.block_numbers[lock.partition_id] = lock.number; - - entry.create_time = time(nullptr); - - requests.emplace_back(zkutil::makeSetRequest(mutations_path, String(), mutations_stat.version)); - requests.emplace_back(zkutil::makeCreateRequest(mutations_path + "/", entry.toString(), zkutil::CreateMode::PersistentSequential)); - - return entry; -} - -void StorageReplicatedMergeTree::mutateImpl( - zkutil::ZooKeeperPtr zookeeper, - const Coordination::Requests & requests, - ReplicatedMergeTreeMutationEntry & entry) -{ - /// Overview of the mutation algorithm. - /// - /// When the client executes a mutation, this method is called. It acquires block numbers in all - /// partitions, saves them in the mutation entry and writes the mutation entry to a new ZK node in - /// the /mutations folder. This block numbers are needed to determine which parts should be mutated and - /// which shouldn't (parts inserted after the mutation will have the block number higher than the - /// block number acquired by the mutation in that partition and so will not be mutatied). - /// This block number is called "mutation version" in that partition. - /// - /// Mutation versions are acquired atomically in all partitions, so the case when an insert in some - /// partition has the block number higher than the mutation version but the following insert into another - /// partition acquires the block number lower than the mutation version in that partition is impossible. - /// Another important invariant: mutation entries appear in /mutations in the order of their mutation - /// versions (in any partition). This means that mutations form a sequence and we can execute them in - /// the order of their mutation versions and not worry that some mutation with the smaller version - /// will suddenly appear. - /// - /// During mutations individual parts are immutable - when we want to change the contents of a part - /// we prepare the new part and add it to MergeTreeData (the original part gets replaced). The fact that - /// we have mutated the part is recorded in the part->info.mutation field of MergeTreePartInfo. - /// The relation with the original part is preserved because the new part covers the same block range - /// as the original one. - /// - /// We then can for each part determine its "mutation version": the version of the last mutation in - /// the mutation sequence that we regard as already applied to that part. All mutations with the greater - /// version number will still need to be applied to that part. - /// - /// Execution of mutations is done asynchronously. All replicas watch the /mutations directory and - /// load new mutation entries as they appear (see mutationsUpdatingTask()). Next we need to determine - /// how to mutate individual parts consistently with part merges. This is done by the leader replica - /// (see mergeSelectingTask() and class ReplicatedMergeTreeMergePredicate for details). Important - /// invariants here are that a) all source parts for a single merge must have the same mutation version - /// and b) any part can be mutated only once or merged only once (e.g. once we have decided to mutate - /// a part then we need to execute that mutation and can assign merges only to the new part and not to the - /// original part). Multiple consecutive mutations can be executed at once (without writing the - /// intermediate result to a part). - /// - /// Leader replica records its decisions to the replication log (/log directory in ZK) in the form of - /// MUTATE_PART entries and all replicas then execute them in the background pool - /// (see tryExecutePartMutation() function). When a replica encounters a MUTATE_PART command, it is - /// guaranteed that the corresponding mutation entry is already loaded (when we pull entries from - /// replication log into the replica queue, we also load mutation entries). Note that just as with merges - /// the replica can decide not to do the mutation locally and fetch the mutated part from another replica - /// instead. - /// - /// Mutations of individual parts are in fact pretty similar to merges, e.g. their assignment and execution - /// is governed by the same storage_settings. TODO: support a single "merge-mutation" operation when the data - /// read from the the source parts is first mutated on the fly to some uniform mutation version and then - /// merged to a resulting part. - /// - /// After all needed parts are mutated (i.e. all active parts have the mutation version greater than - /// the version of this mutation), the mutation is considered done and can be deleted. - - - Coordination::Responses responses; - int32_t rc = zookeeper->tryMulti(requests, responses); - - if (rc == Coordination::ZOK) - { - const String & path_created = - dynamic_cast(responses[1].get())->path_created; - entry.znode_name = path_created.substr(path_created.find_last_of('/') + 1); - LOG_TRACE(log, "Created mutation with ID " << entry.znode_name); - } - else if (rc == Coordination::ZBADVERSION) - /// This error mean, that parallel mutation is created in mutations queue (/mutations) right now. - /// (NOTE: concurrent mutations execution is OK, but here we have case with concurrent mutation intention from client) - /// We can retry this error by ourself, but mutations is not designed for highly concurrent execution. - /// So, if client sure that he do what he want, than he should retry. - throw Coordination::Exception("Parallel mutation is creating right now. Client should retry.", rc); - else - throw Coordination::Exception("Unable to create a mutation znode", rc); -} - std::vector StorageReplicatedMergeTree::getMutationsStatus() const { return queue.getMutationsStatus(); diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.h b/dbms/src/Storages/StorageReplicatedMergeTree.h index 1945924e84d..55b53e59836 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.h +++ b/dbms/src/Storages/StorageReplicatedMergeTree.h @@ -109,8 +109,6 @@ public: void alterPartition(const ASTPtr & query, const PartitionCommands & commands, const Context & query_context) override; void mutate(const MutationCommands & commands, const Context & context) override; - ReplicatedMergeTreeMutationEntry prepareMutationEntry(zkutil::ZooKeeperPtr zk, const MutationCommands & commands, Coordination::Requests & requests, int alter_version = -1) const; - void mutateImpl(zkutil::ZooKeeperPtr zookeeper, const Coordination::Requests & requests, ReplicatedMergeTreeMutationEntry & entry); void waitMutation(const String & znode_name, size_t mutation_sync) const; std::vector getMutationsStatus() const override; CancellationCode killMutation(const String & mutation_id) override;