More comments and slightly better code

This commit is contained in:
alesapin 2020-02-17 19:33:05 +03:00
parent 6a02b99faf
commit 382f6ab720
6 changed files with 42 additions and 13 deletions

View File

@ -175,6 +175,12 @@ struct ReplicatedMergeTreeLogEntryData
/// The quorum value (for GET_PART) is a non-zero value when the quorum write is enabled.
size_t quorum = 0;
/// If this MUTATE_PART entry caused by alter(modify/drop) query.
bool isAlterMutation() const
{
return type == MUTATE_PART && alter_version != -1;
}
};

View File

@ -45,6 +45,8 @@ struct ReplicatedMergeTreeMutationEntry
/// Version of metadata. Not equal to -1 only if this mutation
/// was created by ALTER MODIFY/DROP queries.
int alter_version = -1;
bool isAlterMutation() const { return alter_version != -1; }
};
using ReplicatedMergeTreeMutationEntryPtr = std::shared_ptr<const ReplicatedMergeTreeMutationEntry>;

View File

@ -698,7 +698,7 @@ void ReplicatedMergeTreeQueue::updateMutations(zkutil::ZooKeeperPtr zookeeper, C
}
/// otherwise it's already done
if (entry->alter_version != -1 && entry->znode_name > mutation_pointer)
if (entry->isAlterMutation() && entry->znode_name > mutation_pointer)
{
LOG_TRACE(log, "Adding mutation " << entry->znode_name << " with alter version " << entry->alter_version << " to the queue");
alter_chain.addMutationForAlter(entry->alter_version, state_lock);
@ -743,7 +743,7 @@ ReplicatedMergeTreeMutationEntryPtr ReplicatedMergeTreeQueue::removeMutation(
mutations_by_partition.erase(partition_and_block_num.first);
}
if (entry->alter_version != -1)
if (entry->isAlterMutation())
{
LOG_DEBUG(log, "Removed alter " << entry->alter_version << " because mutation " + entry->znode_name + " were killed.");
alter_chain.finishDataAlter(entry->alter_version, state_lock);
@ -970,6 +970,8 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
MergeTreeData & data,
std::lock_guard<std::mutex> & state_lock) const
{
/// If our entry produce part which is alredy covered by
/// some other entry which is currently executing, then we can postpone this entry.
if (entry.type == LogEntry::MERGE_PARTS
|| entry.type == LogEntry::GET_PART
|| entry.type == LogEntry::MUTATE_PART)
@ -1053,6 +1055,8 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
}
}
/// Alters must be executed one by one. First metadata change, and after that data alter (MUTATE_PART entries with).
/// corresponding alter_version.
if (entry.type == LogEntry::ALTER_METADATA)
{
if (!alter_chain.canExecuteMetaAlter(entry.alter_version, state_lock))
@ -1065,7 +1069,8 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
}
}
if (entry.type == LogEntry::MUTATE_PART && entry.alter_version != -1)
/// If this MUTATE_PART is part of alter modify/drop query, than we have to execute them one by one
if (entry.isAlterMutation())
{
if (!alter_chain.canExecuteDataAlter(entry.alter_version, state_lock))
{
@ -1077,7 +1082,6 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
out_postpone_reason = "Cannot execute alter data with version: " + std::to_string(entry.alter_version)
+ " because another alter " + std::to_string(head_alter) + " must be executed before";
return false;
}
}
@ -1402,7 +1406,7 @@ bool ReplicatedMergeTreeQueue::tryFinalizeMutations(zkutil::ZooKeeperPtr zookeep
{
LOG_TRACE(log, "Mutation " << entry->znode_name << " is done");
it->second.is_done = true;
if (entry->alter_version != -1)
if (entry->isAlterMutation())
{
LOG_TRACE(log, "Finishing data alter with version " << entry->alter_version << " for entry " << entry->znode_name);
alter_chain.finishDataAlter(entry->alter_version, lock);

View File

@ -277,7 +277,7 @@ public:
void updateMutations(zkutil::ZooKeeperPtr zookeeper, Coordination::WatchCallback watch_callback = {});
/// Remove a mutation from ZooKeeper and from the local set. Returns the removed entry or nullptr
/// if it could not be found.
/// if it could not be found. Called during KILL MUTATION query execution.
ReplicatedMergeTreeMutationEntryPtr removeMutation(zkutil::ZooKeeperPtr zookeeper, const String & mutation_id);
/** Remove the action from the queue with the parts covered by part_name (from ZK and from the RAM).
@ -395,8 +395,11 @@ public:
const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right,
String * out_reason = nullptr) const;
/// Return nonempty optional if the part can and should be mutated.
/// Returned mutation version number is always the biggest possible.
/// Return nonempty optional of desired mutation version and alter version.
/// If we have no alter (modify/drop) mutations in mutations queue, than we return biggest possible
/// mutation version (and -1 as alter version). In other case, we return biggest mutation version with
/// smallest alter version. This required, because we have to execute alter mutations sequentially and
/// don't glue them together. Alter is rare operation, so it shouldn't affect performance.
std::optional<std::pair<Int64, int>> getDesiredMutationVersion(const MergeTreeData::DataPartPtr & part) const;
bool isMutationFinished(const ReplicatedMergeTreeMutationEntry & mutation) const;

View File

@ -976,10 +976,13 @@ bool StorageReplicatedMergeTree::executeLogEntry(LogEntry & entry)
}
else if (entry.type == LogEntry::MERGE_PARTS)
{
/// Sometimes it's better to fetch merged part instead of merge
/// For example when we don't have all source parts for merge
do_fetch = !tryExecuteMerge(entry);
}
else if (entry.type == LogEntry::MUTATE_PART)
{
/// Sometimes it's better to fetch mutated part instead of merge
do_fetch = !tryExecutePartMutation(entry);
}
else if (entry.type == LogEntry::ALTER_METADATA)
@ -1284,6 +1287,7 @@ bool StorageReplicatedMergeTree::tryExecutePartMutation(const StorageReplicatedM
bool StorageReplicatedMergeTree::executeFetch(LogEntry & entry)
{
/// Looking for covering part. After that entry.actual_new_part_name may be filled.
String replica = findReplicaHavingCoveringPart(entry, true);
const auto storage_settings_ptr = getSettings();
@ -3219,6 +3223,7 @@ bool StorageReplicatedMergeTree::executeMetadataAlter(const StorageReplicatedMer
zookeeper->multi(requests);
{
/// TODO (relax this lock)
auto table_lock = lockExclusively(RWLockImpl::NO_QUERY);
LOG_INFO(log, "Metadata changed in ZooKeeper. Applying changes locally.");
@ -3229,7 +3234,7 @@ bool StorageReplicatedMergeTree::executeMetadataAlter(const StorageReplicatedMer
LOG_INFO(log, "Applied changes to the metadata of the table. Current metadata version: " << metadata_version);
}
/// This transaction may not happen, but it's ok, because on the next retry we will eventually create this node
/// This transaction may not happen, but it's OK, because on the next retry we will eventually create/update this node
zookeeper->createOrUpdate(replica_path + "/metadata_version", std::to_string(metadata_version), zkutil::CreateMode::Persistent);
recalculateColumnSizes();
@ -3340,6 +3345,7 @@ void StorageReplicatedMergeTree::alter(
std::optional<EphemeralLocksInAllPartitions> lock_holder;
/// No we will prepare mutations record
/// This code pretty same with mutate() function but process results slightly differently
if (alter_entry->have_mutation)
{
String mutations_path = zookeeper_path + "/mutations";
@ -3371,17 +3377,17 @@ void StorageReplicatedMergeTree::alter(
{
if (alter_entry->have_mutation)
{
/// Record in replication /log
/// ALTER_METADATA record in replication /log
String alter_path = dynamic_cast<const Coordination::CreateResponse &>(*results[2]).path_created;
alter_entry->znode_name = alter_path.substr(alter_path.find_last_of('/') + 1);
/// Record in /mutations
/// ReplicatedMergeTreeMutationEntry record in /mutations
String mutation_path = dynamic_cast<const Coordination::CreateResponse &>(*results.back()).path_created;
mutation_znode = mutation_path.substr(mutation_path.find_last_of('/') + 1);
}
else
{
/// Record in replication /log
/// ALTER_METADATA record in replication /log
String alter_path = dynamic_cast<const Coordination::CreateResponse &>(*results.back()).path_created;
alter_entry->znode_name = alter_path.substr(alter_path.find_last_of('/') + 1);
}
@ -3392,7 +3398,6 @@ void StorageReplicatedMergeTree::alter(
if (dynamic_cast<const Coordination::SetResponse &>(*results[0]).error)
throw Exception("Metadata on replica is not up to date with common metadata in Zookeeper. Cannot alter", ErrorCodes::CANNOT_ASSIGN_ALTER);
LOG_TRACE(log, "We have version conflict with inserts because of concurrent inserts. Will try to assign alter one more time.");
continue;
}
else

View File

@ -366,11 +366,20 @@ private:
/// Do the merge or recommend to make the fetch instead of the merge
bool tryExecuteMerge(const LogEntry & entry);
/// Execute alter of table metadata. Set replica/metdata and replica/columns
/// nodes in zookeeper and also changes in memory metadata.
/// New metadata and columns values stored in entry.
bool executeMetadataAlter(const LogEntry & entry);
/// Execute MUTATE_PART entry. Part name and mutation commands
/// stored in entry. This function relies on MergerMutator class.
bool tryExecutePartMutation(const LogEntry & entry);
/// Fetch part from other replica (inserted or merged/mutated)
/// NOTE: Attention! First of all tries to find covering part on other replica
/// and set it into entry.actual_new_part_name. After that tries to fetch this new covering part.
/// If fetch was not successful, clears entry.actual_new_part_name.
bool executeFetch(LogEntry & entry);
void executeClearColumnOrIndexInPartition(const LogEntry & entry);