This commit is contained in:
alesapin 2020-02-13 17:48:38 +03:00
parent e1bc499ea8
commit 8c160c7905
5 changed files with 159 additions and 30 deletions

View File

@ -482,6 +482,7 @@ namespace ErrorCodes
extern const int UNKNOWN_ACCESS_TYPE = 508;
extern const int INVALID_GRANT = 509;
extern const int CACHE_DICTIONARY_UPDATE_FAIL = 510;
extern const int CANNOT_ASSIGN_ALTER = 512;
extern const int KEEPER_EXCEPTION = 999;
extern const int POCO_EXCEPTION = 1000;

View File

@ -255,7 +255,7 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(zkutil::ZooKeeperPtr & zoo
log_entry.toString(),
zkutil::CreateMode::PersistentSequential));
ops.emplace_back(zkutil::makeCheckRequest(storage.zookeeper_path + "/columns", storage.getMetadataVersion()));
ops.emplace_back(zkutil::makeCheckRequest(storage.zookeeper_path + "/metadata", storage.getMetadataVersion()));
ops.emplace_back(zkutil::makeSetRequest(storage.zookeeper_path + "/block_numbers/" + part->info.partition_id, "", -1));
/// Deletes the information that the block number is used for writing.
@ -311,7 +311,6 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(zkutil::ZooKeeperPtr & zoo
transaction.commit();
storage.merge_selecting_task->schedule();
LOG_DEBUG(log, "COMMITED INSERT WITH VERSION:" << storage.getMetadataVersion() << " of part " << part_name);
/// Lock nodes have been already deleted, do not delete them in destructor
block_number_lock->assumeUnlocked();
}

View File

@ -1045,7 +1045,10 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
{
if (!alter_sequence.canExecuteMetaAlter(entry.alter_version, state_lock))
{
out_postpone_reason = "Alter is not started, because more old alter is executing right now";
int head_alter = alter_sequence.getHeadAlterVersion(state_lock);
out_postpone_reason = "Cannot execute alter data with version: " + std::to_string(entry.alter_version)
+ " because another alter " + std::to_string(head_alter)
+ " must be executed before";
return false;
}
}
@ -1054,7 +1057,15 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
{
if (!alter_sequence.canExecuteDataAlter(entry.alter_version, state_lock))
{
out_postpone_reason = "Cannot execute alter data with version: " + std::to_string(entry.alter_version) + " because metadata alter is not finished yet";
int head_alter = alter_sequence.getHeadAlterVersion(state_lock);
if (head_alter == entry.alter_version)
out_postpone_reason = "Cannot execute alter data with version: "
+ std::to_string(entry.alter_version) + " because metadata still not altered";
else
out_postpone_reason = "Cannot execute alter data with version: " + std::to_string(entry.alter_version)
+ " because another alter " + std::to_string(head_alter) + " must be executed before";
return false;
}
}

View File

@ -17,31 +17,34 @@ namespace ErrorCodes
class AlterSequence
{
private:
struct AlterInQueue
struct AlterState
{
bool metadata_finished = false;
bool data_finished = false;
AlterInQueue() = default;
AlterState() = default;
AlterInQueue(bool metadata_finished_, bool data_finished_)
AlterState(bool metadata_finished_, bool data_finished_)
: metadata_finished(metadata_finished_)
, data_finished(data_finished_)
{
}
};
std::map<int, AlterInQueue> queue_state;
std::map<int, AlterState> queue_state;
public:
bool empty() const {
return queue_state.empty();
int getHeadAlterVersion(std::lock_guard<std::mutex> & /*state_lock*/) const
{
if (!queue_state.empty())
return queue_state.begin()->first;
return -1;
}
void addMutationForAlter(int alter_version, std::lock_guard<std::mutex> & /*state_lock*/)
{
if (!queue_state.count(alter_version))
queue_state.emplace(alter_version, AlterInQueue(true, false));
queue_state.emplace(alter_version, AlterState{true, false});
else
queue_state[alter_version].data_finished = false;
}
@ -49,7 +52,7 @@ public:
void addMetadataAlter(int alter_version, std::lock_guard<std::mutex> & /*state_lock*/)
{
if (!queue_state.count(alter_version))
queue_state.emplace(alter_version, AlterInQueue(false, true));
queue_state.emplace(alter_version, AlterState{false, true});
else
queue_state[alter_version].metadata_finished = false;
}
@ -80,14 +83,19 @@ public:
bool canExecuteDataAlter(int alter_version, std::lock_guard<std::mutex> & /*state_lock*/) const
{
if (!queue_state.count(alter_version))
return true;
return queue_state.at(alter_version).metadata_finished;
}
bool canExecuteMetaAlter(int alter_version, std::lock_guard<std::mutex> & /*state_lock*/) const
{
return queue_state.empty() || queue_state.begin()->first == alter_version;
if (queue_state.empty())
return true;
return queue_state.begin()->first == alter_version;
}
};

View File

@ -113,6 +113,7 @@ namespace ErrorCodes
extern const int KEEPER_EXCEPTION;
extern const int ALL_REPLICAS_LOST;
extern const int REPLICA_STATUS_CHANGED;
extern const int CANNOT_ASSIGN_ALTER;
}
namespace ActionLocks
@ -999,7 +1000,7 @@ bool StorageReplicatedMergeTree::executeLogEntry(LogEntry & entry)
}
else if (entry.type == LogEntry::ALTER_METADATA)
{
executeMetadataAlter(entry);
return executeMetadataAlter(entry);
}
else
{
@ -3246,7 +3247,7 @@ bool StorageReplicatedMergeTree::executeMetadataAlter(const StorageReplicatedMer
setTableStructure(std::move(columns_from_entry), metadata_diff);
metadata_version = entry.alter_version;
LOG_INFO(log, "Applied changes to the metadata of the table. Setting metadata version:" << metadata_version);
LOG_INFO(log, "Applied changes to the metadata of the table. Current metadata version: " << metadata_version);
}
recalculateColumnSizes();
@ -3277,7 +3278,6 @@ void StorageReplicatedMergeTree::alter(
return;
}
auto ast_to_str = [](ASTPtr query) -> String {
if (!query)
return "";
@ -3289,22 +3289,17 @@ void StorageReplicatedMergeTree::alter(
ReplicatedMergeTreeLogEntryData entry;
std::optional<String> mutation_znode;
{
Coordination::Requests ops;
while (true) {
/// We can safely read structure, because we guarded with alter_intention_lock
if (is_readonly)
throw Exception("Can't ALTER readonly table", ErrorCodes::TABLE_IS_READ_ONLY);
Coordination::Stat metadata_stat;
String metadata_in_zk = zookeeper->get(zookeeper_path + "/metadata", &metadata_stat);
StorageInMemoryMetadata metadata = getInMemoryMetadata();
params.apply(metadata);
String new_columns_str = metadata.columns.toString();
ops.emplace_back(zkutil::makeSetRequest(zookeeper_path + "/columns", new_columns_str, metadata_version));
ReplicatedMergeTreeTableMetadata new_metadata(*this);
if (ast_to_str(metadata.order_by_ast) != ast_to_str(order_by_ast))
@ -3321,8 +3316,14 @@ void StorageReplicatedMergeTree::alter(
if (new_constraints_str != getConstraints().toString())
new_metadata.constraints = new_constraints_str;
Coordination::Requests ops;
String new_metadata_str = new_metadata.toString();
ops.emplace_back(zkutil::makeSetRequest(zookeeper_path + "/metadata", new_metadata_str, metadata_stat.version));
ops.emplace_back(zkutil::makeSetRequest(zookeeper_path + "/metadata", new_metadata_str, metadata_version));
String new_columns_str = metadata.columns.toString();
ops.emplace_back(zkutil::makeSetRequest(zookeeper_path + "/columns", new_columns_str, -1));
{
lockStructureExclusively(table_lock_holder, query_context.getCurrentQueryId());
@ -3356,7 +3357,7 @@ void StorageReplicatedMergeTree::alter(
ReplicatedMergeTreeMutationEntry mutation_entry;
mutation_entry.source_replica = replica_name;
mutation_entry.commands = maybe_mutation_commands;
mutation_entry.alter_version = metadata_version + 1;
mutation_entry.alter_version = metadata_version + 1;
Coordination::Stat mutations_stat;
zookeeper->get(mutations_path, &mutations_stat);
@ -3400,21 +3401,33 @@ void StorageReplicatedMergeTree::alter(
queue.pullLogsToQueue(zookeeper);
if (entry.have_mutation)
{
String alter_path = dynamic_cast<const Coordination::CreateResponse &>(*results[results.size() - 3 - partitions_count]).path_created;
/// Record in replication /log
String alter_path = dynamic_cast<const Coordination::CreateResponse &>(*results[2]).path_created;
entry.znode_name = alter_path.substr(alter_path.find_last_of('/') + 1);
/// Record in /mutations
String mutation_path = dynamic_cast<const Coordination::CreateResponse &>(*results.back()).path_created;
mutation_znode = mutation_path.substr(mutation_path.find_last_of('/') + 1);
}
else
{
/// Record in replication /log
String alter_path = dynamic_cast<const Coordination::CreateResponse &>(*results.back()).path_created;
entry.znode_name = alter_path.substr(alter_path.find_last_of('/') + 1);
}
break;
}
else if (rc == Coordination::ZBADVERSION)
{
if (dynamic_cast<const Coordination::SetResponse &>(*results[0]).error)
throw Exception("Metadata on replica is not up to date with common metadata in Zookeeper. Cannot alter", ErrorCodes::CANNOT_ASSIGN_ALTER);
LOG_TRACE(log, "We have version conflict with inserts because of concurrent inserts. Will try to assign alter one more time.");
continue;
}
else
{
throw Coordination::Exception("Cannot alter", rc);
throw Coordination::Exception("Alter cannot be assigned because of Zookeeper error", rc);
}
}
@ -4400,11 +4413,108 @@ void StorageReplicatedMergeTree::fetchPartition(const ASTPtr & partition, const
void StorageReplicatedMergeTree::mutate(const MutationCommands & commands, const Context & query_context)
{
auto zookeeper = getZooKeeper();
Coordination::Requests requests;
/// Overview of the mutation algorithm.
///
/// When the client executes a mutation, this method is called. It acquires block numbers in all
/// partitions, saves them in the mutation entry and writes the mutation entry to a new ZK node in
/// the /mutations folder. This block numbers are needed to determine which parts should be mutated and
/// which shouldn't (parts inserted after the mutation will have the block number higher than the
/// block number acquired by the mutation in that partition and so will not be mutatied).
/// This block number is called "mutation version" in that partition.
///
/// Mutation versions are acquired atomically in all partitions, so the case when an insert in some
/// partition has the block number higher than the mutation version but the following insert into another
/// partition acquires the block number lower than the mutation version in that partition is impossible.
/// Another important invariant: mutation entries appear in /mutations in the order of their mutation
/// versions (in any partition). This means that mutations form a sequence and we can execute them in
/// the order of their mutation versions and not worry that some mutation with the smaller version
/// will suddenly appear.
///
/// During mutations individual parts are immutable - when we want to change the contents of a part
/// we prepare the new part and add it to MergeTreeData (the original part gets replaced). The fact that
/// we have mutated the part is recorded in the part->info.mutation field of MergeTreePartInfo.
/// The relation with the original part is preserved because the new part covers the same block range
/// as the original one.
///
/// We then can for each part determine its "mutation version": the version of the last mutation in
/// the mutation sequence that we regard as already applied to that part. All mutations with the greater
/// version number will still need to be applied to that part.
///
/// Execution of mutations is done asynchronously. All replicas watch the /mutations directory and
/// load new mutation entries as they appear (see mutationsUpdatingTask()). Next we need to determine
/// how to mutate individual parts consistently with part merges. This is done by the leader replica
/// (see mergeSelectingTask() and class ReplicatedMergeTreeMergePredicate for details). Important
/// invariants here are that a) all source parts for a single merge must have the same mutation version
/// and b) any part can be mutated only once or merged only once (e.g. once we have decided to mutate
/// a part then we need to execute that mutation and can assign merges only to the new part and not to the
/// original part). Multiple consecutive mutations can be executed at once (without writing the
/// intermediate result to a part).
///
/// Leader replica records its decisions to the replication log (/log directory in ZK) in the form of
/// MUTATE_PART entries and all replicas then execute them in the background pool
/// (see tryExecutePartMutation() function). When a replica encounters a MUTATE_PART command, it is
/// guaranteed that the corresponding mutation entry is already loaded (when we pull entries from
/// replication log into the replica queue, we also load mutation entries). Note that just as with merges
/// the replica can decide not to do the mutation locally and fetch the mutated part from another replica
/// instead.
///
/// Mutations of individual parts are in fact pretty similar to merges, e.g. their assignment and execution
/// is governed by the same storage_settings. TODO: support a single "merge-mutation" operation when the data
/// read from the the source parts is first mutated on the fly to some uniform mutation version and then
/// merged to a resulting part.
///
/// After all needed parts are mutated (i.e. all active parts have the mutation version greater than
/// the version of this mutation), the mutation is considered done and can be deleted.
ReplicatedMergeTreeMutationEntry entry;
entry.source_replica = replica_name;
entry.commands = commands;
String mutations_path = zookeeper_path + "/mutations";
/// Update the mutations_path node when creating the mutation and check its version to ensure that
/// nodes for mutations are created in the same order as the corresponding block numbers.
/// Should work well if the number of concurrent mutation requests is small.
while (true)
{
auto zookeeper = getZooKeeper();
Coordination::Stat mutations_stat;
zookeeper->get(mutations_path, &mutations_stat);
EphemeralLocksInAllPartitions block_number_locks(
zookeeper_path + "/block_numbers", "block-", zookeeper_path + "/temp", *zookeeper);
for (const auto & lock : block_number_locks.getLocks())
entry.block_numbers[lock.partition_id] = lock.number;
entry.create_time = time(nullptr);
Coordination::Requests requests;
requests.emplace_back(zkutil::makeSetRequest(mutations_path, String(), mutations_stat.version));
requests.emplace_back(zkutil::makeCreateRequest(
mutations_path + "/", entry.toString(), zkutil::CreateMode::PersistentSequential));
Coordination::Responses responses;
int32_t rc = zookeeper->tryMulti(requests, responses);
if (rc == Coordination::ZOK)
{
const String & path_created =
dynamic_cast<const Coordination::CreateResponse *>(responses[1].get())->path_created;
entry.znode_name = path_created.substr(path_created.find_last_of('/') + 1);
LOG_TRACE(log, "Created mutation with ID " << entry.znode_name);
break;
}
else if (rc == Coordination::ZBADVERSION)
{
LOG_TRACE(log, "Version conflict when trying to create a mutation node, retrying...");
continue;
}
else
throw Coordination::Exception("Unable to create a mutation znode", rc);
}
ReplicatedMergeTreeMutationEntry entry = prepareMutationEntry(zookeeper, commands, requests);
mutateImpl(zookeeper, requests, entry);
waitMutation(entry.znode_name, query_context.getSettingsRef().mutations_sync);
}