DROP PARTITION now works with mutated parts [#CLICKHOUSE-3747]

This commit is contained in:
Alexey Zatelepin 2018-05-14 17:51:33 +03:00
parent fdb33d8f3c
commit 2b80fbf972
3 changed files with 28 additions and 15 deletions

View File

@ -763,14 +763,14 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
} }
Int64 ReplicatedMergeTreeQueue::getCurrentMutationVersion( Int64 ReplicatedMergeTreeQueue::getCurrentMutationVersionImpl(
const MergeTreePartInfo & part_info, std::lock_guard<std::mutex> & /* target_state_lock */) const const String & partition_id, Int64 data_version, std::lock_guard<std::mutex> & /* target_state_lock */) const
{ {
auto in_partition = mutations_by_partition.find(part_info.partition_id); auto in_partition = mutations_by_partition.find(partition_id);
if (in_partition == mutations_by_partition.end()) if (in_partition == mutations_by_partition.end())
return 0; return 0;
auto it = in_partition->second.upper_bound(part_info.getDataVersion()); auto it = in_partition->second.upper_bound(data_version);
if (it == in_partition->second.begin()) if (it == in_partition->second.begin())
return 0; return 0;
@ -779,6 +779,13 @@ Int64 ReplicatedMergeTreeQueue::getCurrentMutationVersion(
} }
Int64 ReplicatedMergeTreeQueue::getCurrentMutationVersion(const String & partition_id, Int64 data_version) const
{
std::lock_guard lock(target_state_mutex);
return getCurrentMutationVersionImpl(partition_id, data_version, lock);
}
ReplicatedMergeTreeQueue::CurrentlyExecuting::CurrentlyExecuting(ReplicatedMergeTreeQueue::LogEntryPtr & entry, ReplicatedMergeTreeQueue & queue) ReplicatedMergeTreeQueue::CurrentlyExecuting::CurrentlyExecuting(ReplicatedMergeTreeQueue::LogEntryPtr & entry, ReplicatedMergeTreeQueue & queue)
: entry(entry), queue(queue) : entry(entry), queue(queue)
{ {
@ -1206,8 +1213,10 @@ bool ReplicatedMergeTreeMergePredicate::operator()(
} }
} }
Int64 left_mutation_ver = queue.getCurrentMutationVersion(left->info, target_state_lock); Int64 left_mutation_ver = queue.getCurrentMutationVersionImpl(
Int64 right_mutation_ver = queue.getCurrentMutationVersion(right->info, target_state_lock); left->info.partition_id, left->info.getDataVersion(), target_state_lock);
Int64 right_mutation_ver = queue.getCurrentMutationVersionImpl(
left->info.partition_id, right->info.getDataVersion(), target_state_lock);
if (left_mutation_ver != right_mutation_ver) if (left_mutation_ver != right_mutation_ver)
{ {
if (out_reason) if (out_reason)
@ -1263,7 +1272,7 @@ std::optional<Int64> ReplicatedMergeTreeMergePredicate::getDesiredMutationVersio
if (in_partition == queue.mutations_by_partition.end()) if (in_partition == queue.mutations_by_partition.end())
return {}; return {};
Int64 current_version = queue.getCurrentMutationVersion(part->info, lock); Int64 current_version = queue.getCurrentMutationVersionImpl(part->info.partition_id, part->info.getDataVersion(), lock);
Int64 max_version = in_partition->second.rbegin()->first; Int64 max_version = in_partition->second.rbegin()->first;
if (current_version >= max_version) if (current_version >= max_version)
return {}; return {};

View File

@ -117,11 +117,7 @@ private:
MergeTreeDataMergerMutator & merger_mutator, MergeTreeData & data, MergeTreeDataMergerMutator & merger_mutator, MergeTreeData & data,
std::lock_guard<std::mutex> & queue_lock) const; std::lock_guard<std::mutex> & queue_lock) const;
/// Return the version (block number) of the last mutation that we don't need to apply to the part Int64 getCurrentMutationVersionImpl(const String & partition_id, Int64 data_version, std::lock_guard<std::mutex> & /* target_state_lock */) const;
/// (either this mutation was already applied or the part was created after the mutation).
/// If there is no such mutation or it has already been executed and deleted, return 0.
/// Call under the target_state_mutex.
Int64 getCurrentMutationVersion(const MergeTreePartInfo & part_info, std::lock_guard<std::mutex> & /* target_state_lock */) const;
/** Check that part isn't in currently generating parts and isn't covered by them. /** Check that part isn't in currently generating parts and isn't covered by them.
* Should be called under queue_mutex. * Should be called under queue_mutex.
@ -225,6 +221,12 @@ public:
ReplicatedMergeTreeMergePredicate getMergePredicate(zkutil::ZooKeeperPtr & zookeeper); ReplicatedMergeTreeMergePredicate getMergePredicate(zkutil::ZooKeeperPtr & zookeeper);
/// Return the version (block number) of the last mutation that we don't need to apply to the part
/// with getDataVersion() == data_version. (Either this mutation was already applied or the part
/// was created after the mutation).
/// If there is no such mutation or it has already been executed and deleted, return 0.
Int64 getCurrentMutationVersion(const String & partition_id, Int64 data_version) const;
MutationCommands getMutationCommands(const MergeTreeData::DataPartPtr & part, Int64 desired_mutation_version) const; MutationCommands getMutationCommands(const MergeTreeData::DataPartPtr & part, Int64 desired_mutation_version) const;
/// Prohibit merges in the specified range. /// Prohibit merges in the specified range.

View File

@ -2743,10 +2743,10 @@ void StorageReplicatedMergeTree::alter(const AlterCommands & params,
/// The name of an imaginary part covering all possible parts in the specified partition with numbers in the range from zero to specified right bound. /// The name of an imaginary part covering all possible parts in the specified partition with numbers in the range from zero to specified right bound.
static String getFakePartNameCoveringPartRange( static String getFakePartNameCoveringPartRange(
MergeTreeDataFormatVersion format_version, const String & partition_id, UInt64 left, UInt64 right) MergeTreeDataFormatVersion format_version, const String & partition_id, UInt64 left, UInt64 right, Int64 mutation_version)
{ {
/// Artificial high level is choosen, to make this part "covering" all parts inside. /// Artificial high level is choosen, to make this part "covering" all parts inside.
MergeTreePartInfo part_info(partition_id, left, right, 999999999); MergeTreePartInfo part_info(partition_id, left, right, 999999999, mutation_version);
if (format_version < MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING) if (format_version < MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
{ {
/// The date range is all month long. /// The date range is all month long.
@ -2775,11 +2775,13 @@ String StorageReplicatedMergeTree::getFakePartNameCoveringAllPartsInPartition(
* to guarantee this invariant. * to guarantee this invariant.
*/ */
Int64 right; Int64 right;
Int64 mutation_version;
{ {
auto zookeeper = getZooKeeper(); auto zookeeper = getZooKeeper();
AbandonableLockInZooKeeper block_number_lock = allocateBlockNumber(partition_id, zookeeper); AbandonableLockInZooKeeper block_number_lock = allocateBlockNumber(partition_id, zookeeper);
right = block_number_lock.getNumber(); right = block_number_lock.getNumber();
mutation_version = queue.getCurrentMutationVersion(partition_id, right);
block_number_lock.unlock(); block_number_lock.unlock();
} }
@ -2793,7 +2795,7 @@ String StorageReplicatedMergeTree::getFakePartNameCoveringAllPartsInPartition(
*out_min_block = left; *out_min_block = left;
if (out_max_block) if (out_max_block)
*out_max_block = right; *out_max_block = right;
return getFakePartNameCoveringPartRange(data.format_version, partition_id, left, right); return getFakePartNameCoveringPartRange(data.format_version, partition_id, left, right, mutation_version);
} }