This commit is contained in:
alesapin 2020-02-13 19:16:09 +03:00
parent 8c160c7905
commit 78d42142cf
9 changed files with 102 additions and 169 deletions

View File

@ -483,6 +483,7 @@ namespace ErrorCodes
extern const int INVALID_GRANT = 509;
extern const int CACHE_DICTIONARY_UPDATE_FAIL = 510;
extern const int CANNOT_ASSIGN_ALTER = 512;
extern const int CONCURRENT_ALTER_IS_PROCESSING = 513;
extern const int KEEPER_EXCEPTION = 999;
extern const int POCO_EXCEPTION = 1000;

View File

@ -29,6 +29,7 @@ namespace ErrorCodes
extern const int KEEPER_EXCEPTION;
extern const int TIMEOUT_EXCEEDED;
extern const int NO_ACTIVE_REPLICAS;
extern const int CONCURRENT_ALTER_IS_PROCESSING;
}
@ -255,7 +256,14 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(zkutil::ZooKeeperPtr & zoo
log_entry.toString(),
zkutil::CreateMode::PersistentSequential));
/// We check metadata_version has the same version as shared node.
/// In other case we may have parts, which nobody will alter.
///
ops.emplace_back(zkutil::makeCheckRequest(storage.zookeeper_path + "/metadata", storage.getMetadataVersion()));
/// We update version of block_number/partition node to register fact of new insert.
/// If we want to be sure, that no inserts happend in some period of time, than we can receive
/// version of all partition nodes inside block numbers and then make check requirests in zookeeper transaction.
ops.emplace_back(zkutil::makeSetRequest(storage.zookeeper_path + "/block_numbers/" + part->info.partition_id, "", -1));
/// Deletes the information that the block number is used for writing.
@ -347,6 +355,11 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(zkutil::ZooKeeperPtr & zoo
throw Exception("Another quorum insert has been already started", ErrorCodes::UNSATISFIED_QUORUM_FOR_PREVIOUS_WRITE);
}
else if (multi_code == Coordination::ZBADVERSION)
{
transaction.rollback();
throw Exception("Current metadata version is not consistent with version in zookeeper. Concurrent alter of metadata is processing now, client must retry", ErrorCodes::CONCURRENT_ALTER_IS_PROCESSING);
}
else
{
/// NOTE: We could be here if the node with the quorum existed, but was quickly removed.

View File

@ -107,12 +107,18 @@ struct ReplicatedMergeTreeLogEntryData
std::shared_ptr<ReplaceRangeEntry> replace_range_entry;
//TODO(alesap)
int alter_version;
bool have_mutation;
/// ALTER METADATA and MUTATE PART command
String columns_str;
String metadata_str;
/// Version of metadata which will be set after this alter
/// Also present in MUTATE_PART command, to track mutations
/// required for complete alter execution.
int alter_version; /// May be equal to -1, if it's normal mutation, not metadata update.
/// only ALTER METADATA command
bool have_mutation; /// If this alter requires additional mutation step, for data update
String columns_str; /// New columns data corresponding to alter_version
String metadata_str; /// New metadata corresponding to alter_version
/// Returns a set of parts that will appear after executing the entry + parts to block
/// selection of merges. These parts are added to queue.virtual_parts.

View File

@ -13,6 +13,11 @@ namespace DB
class ReadBuffer;
class WriteBuffer;
/// Mutation entry in /mutations path in zookeeper. This record contains information about blocks
/// in patitions. We will mutatate all parts with left number less than this numbers.
///
/// These entries processed separately from main replication /log, and produce other entries
/// -- MUTATE_PART.
struct ReplicatedMergeTreeMutationEntry
{
void writeText(WriteBuffer & out) const;
@ -21,14 +26,24 @@ struct ReplicatedMergeTreeMutationEntry
String toString() const;
static ReplicatedMergeTreeMutationEntry parse(const String & str, String znode_name);
/// Name of znode (mutation-xxxxxxx)
String znode_name;
/// Create time of znode
time_t create_time = 0;
/// Replica which initiated mutation
String source_replica;
/// Accuired numbers of blocks
/// partition_id -> block_number
std::map<String, Int64> block_numbers;
/// Mutation commands which will give to MUTATE_PART entries
MutationCommands commands;
/// Version of metadata. Not equal to -1 only if this mutation
/// was created by ALTER MODIFY/DROP queries.
int alter_version = -1;
};

View File

@ -150,7 +150,7 @@ void ReplicatedMergeTreeQueue::insertUnlocked(
}
}
if (entry->type == LogEntry::ALTER_METADATA)
alter_sequence.addMetadataAlter(entry->alter_version, state_lock);
alter_chain.addMetadataAlter(entry->alter_version, state_lock);
}
@ -226,7 +226,7 @@ void ReplicatedMergeTreeQueue::updateStateOnQueueEntryRemoval(
}
if (entry->type == LogEntry::ALTER_METADATA)
alter_sequence.finishMetadataAlter(entry->alter_version, entry->have_mutation, state_lock);
alter_chain.finishMetadataAlter(entry->alter_version, entry->have_mutation, state_lock);
}
else
{
@ -696,7 +696,7 @@ void ReplicatedMergeTreeQueue::updateMutations(zkutil::ZooKeeperPtr zookeeper, C
some_mutations_are_probably_done = true;
if (entry->alter_version != -1)
alter_sequence.addMutationForAlter(entry->alter_version, state_lock);
alter_chain.addMutationForAlter(entry->alter_version, state_lock);
}
}
@ -1043,10 +1043,10 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
if (entry.type == LogEntry::ALTER_METADATA)
{
if (!alter_sequence.canExecuteMetaAlter(entry.alter_version, state_lock))
if (!alter_chain.canExecuteMetaAlter(entry.alter_version, state_lock))
{
int head_alter = alter_sequence.getHeadAlterVersion(state_lock);
out_postpone_reason = "Cannot execute alter data with version: " + std::to_string(entry.alter_version)
int head_alter = alter_chain.getHeadAlterVersion(state_lock);
out_postpone_reason = "Cannot execute alter metadata with version: " + std::to_string(entry.alter_version)
+ " because another alter " + std::to_string(head_alter)
+ " must be executed before";
return false;
@ -1055,9 +1055,9 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
if (entry.type == LogEntry::MUTATE_PART && entry.alter_version != -1)
{
if (!alter_sequence.canExecuteDataAlter(entry.alter_version, state_lock))
if (!alter_chain.canExecuteDataAlter(entry.alter_version, state_lock))
{
int head_alter = alter_sequence.getHeadAlterVersion(state_lock);
int head_alter = alter_chain.getHeadAlterVersion(state_lock);
if (head_alter == entry.alter_version)
out_postpone_reason = "Cannot execute alter data with version: "
+ std::to_string(entry.alter_version) + " because metadata still not altered";
@ -1352,7 +1352,7 @@ bool ReplicatedMergeTreeQueue::tryFinalizeMutations(zkutil::ZooKeeperPtr zookeep
LOG_INFO(log, "Seems like we jumped over mutation " << znode << " when downloaded part with bigger mutation number."
<< " It's OK, tasks for rest parts will be skipped, but probably a lot of mutations were executed concurrently on different replicas.");
mutation.parts_to_do.clear();
alter_sequence.finishDataAlter(mutation.entry->alter_version, lock);
alter_chain.finishDataAlter(mutation.entry->alter_version, lock);
}
}
else if (mutation.parts_to_do.size() == 0)
@ -1393,7 +1393,7 @@ bool ReplicatedMergeTreeQueue::tryFinalizeMutations(zkutil::ZooKeeperPtr zookeep
if (entry->alter_version != -1)
{
LOG_TRACE(log, "Finishing data alter with version " << entry->alter_version << " for entry " << entry->znode_name);
alter_sequence.finishDataAlter(entry->alter_version, lock);
alter_chain.finishDataAlter(entry->alter_version, lock);
}
}
}

View File

@ -9,7 +9,7 @@
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MergeTreeMutationStatus.h>
#include <Storages/MergeTree/ReplicatedMergeTreeQuorumAddedParts.h>
#include <Storages/MergeTree/ReplicatedQueueAlterState.h>
#include <Storages/MergeTree/ReplicatedQueueAlterChain.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Core/BackgroundSchedulePool.h>
@ -132,7 +132,7 @@ private:
/// Provides only one simultaneous call to pullLogsToQueue.
std::mutex pull_logs_to_queue_mutex;
AlterSequence alter_sequence;
ReplicatedQueueAlterChain alter_chain;
/// List of subscribers
/// A subscriber callback is called when an entry queue is deleted

View File

@ -14,7 +14,7 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
class AlterSequence
class ReplicatedQueueAlterChain
{
private:
struct AlterState

View File

@ -3261,7 +3261,6 @@ void StorageReplicatedMergeTree::alter(
{
assertNotReadonly();
auto maybe_mutation_commands = params.getMutationCommands(getInMemoryMetadata());
auto table_id = getStorageID();
if (params.isSettingsAlter())
@ -3286,78 +3285,86 @@ void StorageReplicatedMergeTree::alter(
auto zookeeper = getZooKeeper();
ReplicatedMergeTreeLogEntryData entry;
std::optional<ReplicatedMergeTreeLogEntryData> alter_entry;
std::optional<String> mutation_znode;
while (true) {
/// Clear nodes from previous iteration
alter_entry.emplace();
mutation_znode.emplace();
/// We can safely read structure, because we guarded with alter_intention_lock
if (is_readonly)
throw Exception("Can't ALTER readonly table", ErrorCodes::TABLE_IS_READ_ONLY);
StorageInMemoryMetadata metadata = getInMemoryMetadata();
params.apply(metadata);
StorageInMemoryMetadata current_metadata = getInMemoryMetadata();
StorageInMemoryMetadata future_metadata = current_metadata;
params.apply(future_metadata);
ReplicatedMergeTreeTableMetadata future_metadata_in_zk(*this);
if (ast_to_str(future_metadata.order_by_ast) != ast_to_str(current_metadata.order_by_ast))
future_metadata_in_zk.sorting_key = serializeAST(*extractKeyExpressionList(future_metadata.order_by_ast));
ReplicatedMergeTreeTableMetadata new_metadata(*this);
if (ast_to_str(metadata.order_by_ast) != ast_to_str(order_by_ast))
new_metadata.sorting_key = serializeAST(*extractKeyExpressionList(metadata.order_by_ast));
if (ast_to_str(future_metadata.ttl_for_table_ast) != ast_to_str(current_metadata.ttl_for_table_ast))
future_metadata_in_zk.ttl_table = serializeAST(*future_metadata.ttl_for_table_ast);
if (ast_to_str(metadata.ttl_for_table_ast) != ast_to_str(ttl_table_ast))
new_metadata.ttl_table = serializeAST(*metadata.ttl_for_table_ast);
String new_indices_str = future_metadata.indices.toString();
if (new_indices_str != current_metadata.indices.toString())
future_metadata_in_zk.skip_indices = new_indices_str;
String new_indices_str = metadata.indices.toString();
if (new_indices_str != getIndices().toString())
new_metadata.skip_indices = new_indices_str;
String new_constraints_str = metadata.constraints.toString();
if (new_constraints_str != getConstraints().toString())
new_metadata.constraints = new_constraints_str;
String new_constraints_str = future_metadata.constraints.toString();
if (new_constraints_str != current_metadata.indices.toString())
future_metadata_in_zk.constraints = new_constraints_str;
Coordination::Requests ops;
String new_metadata_str = new_metadata.toString();
String new_metadata_str = future_metadata_in_zk.toString();
ops.emplace_back(zkutil::makeSetRequest(zookeeper_path + "/metadata", new_metadata_str, metadata_version));
String new_columns_str = metadata.columns.toString();
String new_columns_str = future_metadata.columns.toString();
ops.emplace_back(zkutil::makeSetRequest(zookeeper_path + "/columns", new_columns_str, -1));
if (ast_to_str(current_metadata.settings_ast) != ast_to_str(future_metadata.settings_ast))
{
lockStructureExclusively(table_lock_holder, query_context.getCurrentQueryId());
auto old_metadata = getInMemoryMetadata();
old_metadata.settings_ast = metadata.settings_ast;
changeSettings(metadata.settings_ast, table_lock_holder);
global_context.getDatabase(table_id.database_name)->alterTable(query_context, table_id.table_name, old_metadata);
/// Just change settings
current_metadata.settings_ast = future_metadata.settings_ast;
changeSettings(current_metadata.settings_ast, table_lock_holder);
global_context.getDatabase(table_id.database_name)->alterTable(query_context, table_id.table_name, current_metadata);
}
entry.type = LogEntry::ALTER_METADATA;
entry.source_replica = replica_name;
entry.metadata_str = new_metadata_str;
entry.columns_str = new_columns_str;
entry.alter_version = metadata_version + 1;
/// We can be sure, that in case of successfull commit in zookeeper our
/// version will increments by 1. Because we update with version check.
int new_metadata_version = metadata_version + 1;
entry.create_time = time(nullptr);
if (!maybe_mutation_commands.empty())
entry.have_mutation = true;
else
entry.have_mutation = false;
alter_entry->type = LogEntry::ALTER_METADATA;
alter_entry->source_replica = replica_name;
alter_entry->metadata_str = new_metadata_str;
alter_entry->columns_str = new_columns_str;
alter_entry->alter_version = new_metadata_version;
alter_entry->create_time = time(nullptr);
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential));
auto maybe_mutation_commands = params.getMutationCommands(current_metadata);
alter_entry->have_mutation = !maybe_mutation_commands.empty();
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/log/log-", alter_entry->toString(), zkutil::CreateMode::PersistentSequential));
std::optional<EphemeralLocksInAllPartitions> lock_holder;
size_t partitions_count = 0;
if (!maybe_mutation_commands.empty())
/// No we will prepare mutations record
if (alter_entry->have_mutation)
{
String mutations_path = zookeeper_path + "/mutations";
ReplicatedMergeTreeMutationEntry mutation_entry;
mutation_entry.source_replica = replica_name;
mutation_entry.commands = maybe_mutation_commands;
mutation_entry.alter_version = metadata_version + 1;
mutation_entry.alter_version = new_metadata_version;
Coordination::Stat mutations_stat;
zookeeper->get(mutations_path, &mutations_stat);
@ -3382,7 +3389,6 @@ void StorageReplicatedMergeTree::alter(
{
mutation_entry.block_numbers[lock.partition_id] = lock.number;
ops.emplace_back(zkutil::makeCheckRequest(zookeeper_path + "/block_numbers/" + lock.partition_id, partition_versions[lock.partition_id]));
partitions_count++;
}
mutation_entry.create_time = time(nullptr);
@ -3390,7 +3396,6 @@ void StorageReplicatedMergeTree::alter(
ops.emplace_back(zkutil::makeSetRequest(mutations_path, String(), mutations_stat.version));
ops.emplace_back(
zkutil::makeCreateRequest(mutations_path + "/", mutation_entry.toString(), zkutil::CreateMode::PersistentSequential));
}
Coordination::Responses results;
@ -3399,11 +3404,11 @@ void StorageReplicatedMergeTree::alter(
if (rc == Coordination::ZOK)
{
queue.pullLogsToQueue(zookeeper);
if (entry.have_mutation)
if (alter_entry->have_mutation)
{
/// Record in replication /log
String alter_path = dynamic_cast<const Coordination::CreateResponse &>(*results[2]).path_created;
entry.znode_name = alter_path.substr(alter_path.find_last_of('/') + 1);
alter_entry->znode_name = alter_path.substr(alter_path.find_last_of('/') + 1);
/// Record in /mutations
String mutation_path = dynamic_cast<const Coordination::CreateResponse &>(*results.back()).path_created;
@ -3413,7 +3418,7 @@ void StorageReplicatedMergeTree::alter(
{
/// Record in replication /log
String alter_path = dynamic_cast<const Coordination::CreateResponse &>(*results.back()).path_created;
entry.znode_name = alter_path.substr(alter_path.find_last_of('/') + 1);
alter_entry->znode_name = alter_path.substr(alter_path.find_last_of('/') + 1);
}
break;
}
@ -3437,9 +3442,9 @@ void StorageReplicatedMergeTree::alter(
std::vector<String> unwaited;
if (query_context.getSettingsRef().replication_alter_partitions_sync == 2)
unwaited = waitForAllReplicasToProcessLogEntry(entry, false);
unwaited = waitForAllReplicasToProcessLogEntry(*alter_entry, false);
else if (query_context.getSettingsRef().replication_alter_partitions_sync == 1)
waitForReplicaToProcessLogEntry(replica_name, entry);
waitForReplicaToProcessLogEntry(replica_name, *alter_entry);
if (!unwaited.empty())
throw Exception("Some replicas doesn't finish metadata alter", ErrorCodes::UNFINISHED);
@ -4534,111 +4539,6 @@ void StorageReplicatedMergeTree::waitMutation(const String & znode_name, size_t
}
}
ReplicatedMergeTreeMutationEntry StorageReplicatedMergeTree::prepareMutationEntry(
zkutil::ZooKeeperPtr zookeeper, const MutationCommands & commands, Coordination::Requests & requests, int alter_version) const
{
String mutations_path = zookeeper_path + "/mutations";
ReplicatedMergeTreeMutationEntry entry;
entry.source_replica = replica_name;
entry.commands = commands;
entry.alter_version = alter_version;
Coordination::Stat mutations_stat;
zookeeper->get(mutations_path, &mutations_stat);
EphemeralLocksInAllPartitions block_number_locks(zookeeper_path + "/block_numbers", "block-", zookeeper_path + "/temp", *zookeeper);
for (const auto & lock : block_number_locks.getLocks())
entry.block_numbers[lock.partition_id] = lock.number;
entry.create_time = time(nullptr);
requests.emplace_back(zkutil::makeSetRequest(mutations_path, String(), mutations_stat.version));
requests.emplace_back(zkutil::makeCreateRequest(mutations_path + "/", entry.toString(), zkutil::CreateMode::PersistentSequential));
return entry;
}
void StorageReplicatedMergeTree::mutateImpl(
zkutil::ZooKeeperPtr zookeeper,
const Coordination::Requests & requests,
ReplicatedMergeTreeMutationEntry & entry)
{
/// Overview of the mutation algorithm.
///
/// When the client executes a mutation, this method is called. It acquires block numbers in all
/// partitions, saves them in the mutation entry and writes the mutation entry to a new ZK node in
/// the /mutations folder. This block numbers are needed to determine which parts should be mutated and
/// which shouldn't (parts inserted after the mutation will have the block number higher than the
/// block number acquired by the mutation in that partition and so will not be mutatied).
/// This block number is called "mutation version" in that partition.
///
/// Mutation versions are acquired atomically in all partitions, so the case when an insert in some
/// partition has the block number higher than the mutation version but the following insert into another
/// partition acquires the block number lower than the mutation version in that partition is impossible.
/// Another important invariant: mutation entries appear in /mutations in the order of their mutation
/// versions (in any partition). This means that mutations form a sequence and we can execute them in
/// the order of their mutation versions and not worry that some mutation with the smaller version
/// will suddenly appear.
///
/// During mutations individual parts are immutable - when we want to change the contents of a part
/// we prepare the new part and add it to MergeTreeData (the original part gets replaced). The fact that
/// we have mutated the part is recorded in the part->info.mutation field of MergeTreePartInfo.
/// The relation with the original part is preserved because the new part covers the same block range
/// as the original one.
///
/// We then can for each part determine its "mutation version": the version of the last mutation in
/// the mutation sequence that we regard as already applied to that part. All mutations with the greater
/// version number will still need to be applied to that part.
///
/// Execution of mutations is done asynchronously. All replicas watch the /mutations directory and
/// load new mutation entries as they appear (see mutationsUpdatingTask()). Next we need to determine
/// how to mutate individual parts consistently with part merges. This is done by the leader replica
/// (see mergeSelectingTask() and class ReplicatedMergeTreeMergePredicate for details). Important
/// invariants here are that a) all source parts for a single merge must have the same mutation version
/// and b) any part can be mutated only once or merged only once (e.g. once we have decided to mutate
/// a part then we need to execute that mutation and can assign merges only to the new part and not to the
/// original part). Multiple consecutive mutations can be executed at once (without writing the
/// intermediate result to a part).
///
/// Leader replica records its decisions to the replication log (/log directory in ZK) in the form of
/// MUTATE_PART entries and all replicas then execute them in the background pool
/// (see tryExecutePartMutation() function). When a replica encounters a MUTATE_PART command, it is
/// guaranteed that the corresponding mutation entry is already loaded (when we pull entries from
/// replication log into the replica queue, we also load mutation entries). Note that just as with merges
/// the replica can decide not to do the mutation locally and fetch the mutated part from another replica
/// instead.
///
/// Mutations of individual parts are in fact pretty similar to merges, e.g. their assignment and execution
/// is governed by the same storage_settings. TODO: support a single "merge-mutation" operation when the data
/// read from the the source parts is first mutated on the fly to some uniform mutation version and then
/// merged to a resulting part.
///
/// After all needed parts are mutated (i.e. all active parts have the mutation version greater than
/// the version of this mutation), the mutation is considered done and can be deleted.
Coordination::Responses responses;
int32_t rc = zookeeper->tryMulti(requests, responses);
if (rc == Coordination::ZOK)
{
const String & path_created =
dynamic_cast<const Coordination::CreateResponse *>(responses[1].get())->path_created;
entry.znode_name = path_created.substr(path_created.find_last_of('/') + 1);
LOG_TRACE(log, "Created mutation with ID " << entry.znode_name);
}
else if (rc == Coordination::ZBADVERSION)
/// This error mean, that parallel mutation is created in mutations queue (/mutations) right now.
/// (NOTE: concurrent mutations execution is OK, but here we have case with concurrent mutation intention from client)
/// We can retry this error by ourself, but mutations is not designed for highly concurrent execution.
/// So, if client sure that he do what he want, than he should retry.
throw Coordination::Exception("Parallel mutation is creating right now. Client should retry.", rc);
else
throw Coordination::Exception("Unable to create a mutation znode", rc);
}
std::vector<MergeTreeMutationStatus> StorageReplicatedMergeTree::getMutationsStatus() const
{
return queue.getMutationsStatus();

View File

@ -109,8 +109,6 @@ public:
void alterPartition(const ASTPtr & query, const PartitionCommands & commands, const Context & query_context) override;
void mutate(const MutationCommands & commands, const Context & context) override;
ReplicatedMergeTreeMutationEntry prepareMutationEntry(zkutil::ZooKeeperPtr zk, const MutationCommands & commands, Coordination::Requests & requests, int alter_version = -1) const;
void mutateImpl(zkutil::ZooKeeperPtr zookeeper, const Coordination::Requests & requests, ReplicatedMergeTreeMutationEntry & entry);
void waitMutation(const String & znode_name, size_t mutation_sync) const;
std::vector<MergeTreeMutationStatus> getMutationsStatus() const override;
CancellationCode killMutation(const String & mutation_id) override;