mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-18 21:51:57 +00:00
cancel merges on drop partition
This commit is contained in:
parent
4913b18532
commit
76156af5cc
@ -13,7 +13,7 @@ MergeListElement::MergeListElement(const StorageID & table_id_, const FutureMerg
|
|||||||
, partition_id{future_part.part_info.partition_id}
|
, partition_id{future_part.part_info.partition_id}
|
||||||
, result_part_name{future_part.name}
|
, result_part_name{future_part.name}
|
||||||
, result_part_path{future_part.path}
|
, result_part_path{future_part.path}
|
||||||
, result_data_version{future_part.part_info.getDataVersion()}
|
, result_part_info{future_part.part_info}
|
||||||
, num_parts{future_part.parts.size()}
|
, num_parts{future_part.parts.size()}
|
||||||
, thread_id{getThreadId()}
|
, thread_id{getThreadId()}
|
||||||
, merge_type{future_part.merge_type}
|
, merge_type{future_part.merge_type}
|
||||||
@ -32,7 +32,7 @@ MergeListElement::MergeListElement(const StorageID & table_id_, const FutureMerg
|
|||||||
if (!future_part.parts.empty())
|
if (!future_part.parts.empty())
|
||||||
{
|
{
|
||||||
source_data_version = future_part.parts[0]->info.getDataVersion();
|
source_data_version = future_part.parts[0]->info.getDataVersion();
|
||||||
is_mutation = (result_data_version != source_data_version);
|
is_mutation = (result_part_info.getDataVersion() != source_data_version);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Each merge is executed into separate background processing pool thread
|
/// Each merge is executed into separate background processing pool thread
|
||||||
|
@ -7,6 +7,7 @@
|
|||||||
#include <Common/MemoryTracker.h>
|
#include <Common/MemoryTracker.h>
|
||||||
#include <Storages/MergeTree/MergeType.h>
|
#include <Storages/MergeTree/MergeType.h>
|
||||||
#include <Storages/MergeTree/MergeAlgorithm.h>
|
#include <Storages/MergeTree/MergeAlgorithm.h>
|
||||||
|
#include <Storages/MergeTree/MergeTreePartInfo.h>
|
||||||
#include <Storages/MergeTree/BackgroundProcessList.h>
|
#include <Storages/MergeTree/BackgroundProcessList.h>
|
||||||
#include <Interpreters/StorageID.h>
|
#include <Interpreters/StorageID.h>
|
||||||
#include <boost/noncopyable.hpp>
|
#include <boost/noncopyable.hpp>
|
||||||
@ -60,7 +61,7 @@ struct MergeListElement : boost::noncopyable
|
|||||||
|
|
||||||
const std::string result_part_name;
|
const std::string result_part_name;
|
||||||
const std::string result_part_path;
|
const std::string result_part_path;
|
||||||
Int64 result_data_version{};
|
MergeTreePartInfo result_part_info;
|
||||||
bool is_mutation{};
|
bool is_mutation{};
|
||||||
|
|
||||||
UInt64 num_parts{};
|
UInt64 num_parts{};
|
||||||
@ -130,7 +131,18 @@ public:
|
|||||||
if ((partition_id.empty() || merge_element.partition_id == partition_id)
|
if ((partition_id.empty() || merge_element.partition_id == partition_id)
|
||||||
&& merge_element.table_id == table_id
|
&& merge_element.table_id == table_id
|
||||||
&& merge_element.source_data_version < mutation_version
|
&& merge_element.source_data_version < mutation_version
|
||||||
&& merge_element.result_data_version >= mutation_version)
|
&& merge_element.result_part_info.getDataVersion() >= mutation_version)
|
||||||
|
merge_element.is_cancelled = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void cancelInPartition(const StorageID & table_id, const String & partition_id, Int64 delimiting_block_number)
|
||||||
|
{
|
||||||
|
for (auto & merge_element : entries)
|
||||||
|
{
|
||||||
|
if (merge_element.table_id == table_id
|
||||||
|
&& merge_element.partition_id == partition_id
|
||||||
|
&& merge_element.result_part_info.min_block < delimiting_block_number)
|
||||||
merge_element.is_cancelled = true;
|
merge_element.is_cancelled = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -951,8 +951,12 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
|
|||||||
size_t rows_written = 0;
|
size_t rows_written = 0;
|
||||||
const size_t initial_reservation = space_reservation ? space_reservation->getSize() : 0;
|
const size_t initial_reservation = space_reservation ? space_reservation->getSize() : 0;
|
||||||
|
|
||||||
auto is_cancelled = [&]() { return merges_blocker.isCancelled()
|
auto is_cancelled = [&]()
|
||||||
|| (need_remove_expired_values && ttl_merges_blocker.isCancelled()); };
|
{
|
||||||
|
return merges_blocker.isCancelled()
|
||||||
|
|| (need_remove_expired_values && ttl_merges_blocker.isCancelled())
|
||||||
|
|| merge_entry->is_cancelled.load(std::memory_order_relaxed);
|
||||||
|
};
|
||||||
|
|
||||||
Block block;
|
Block block;
|
||||||
while (!is_cancelled() && (block = merged_stream->read()))
|
while (!is_cancelled() && (block = merged_stream->read()))
|
||||||
|
@ -2194,6 +2194,7 @@ bool StorageReplicatedMergeTree::executeFetchShared(
|
|||||||
void StorageReplicatedMergeTree::executeDropRange(const LogEntry & entry)
|
void StorageReplicatedMergeTree::executeDropRange(const LogEntry & entry)
|
||||||
{
|
{
|
||||||
auto drop_range_info = MergeTreePartInfo::fromPartName(entry.new_part_name, format_version);
|
auto drop_range_info = MergeTreePartInfo::fromPartName(entry.new_part_name, format_version);
|
||||||
|
getContext()->getMergeList().cancelInPartition(getStorageID(), drop_range_info.partition_id, drop_range_info.max_block);
|
||||||
queue.removePartProducingOpsInRange(getZooKeeper(), drop_range_info, entry);
|
queue.removePartProducingOpsInRange(getZooKeeper(), drop_range_info, entry);
|
||||||
|
|
||||||
if (entry.detach)
|
if (entry.detach)
|
||||||
@ -2253,9 +2254,14 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
|
|||||||
bool replace = !LogEntry::ReplaceRangeEntry::isMovePartitionOrAttachFrom(drop_range);
|
bool replace = !LogEntry::ReplaceRangeEntry::isMovePartitionOrAttachFrom(drop_range);
|
||||||
|
|
||||||
if (replace)
|
if (replace)
|
||||||
|
{
|
||||||
|
getContext()->getMergeList().cancelInPartition(getStorageID(), drop_range.partition_id, drop_range.max_block);
|
||||||
queue.removePartProducingOpsInRange(getZooKeeper(), drop_range, entry);
|
queue.removePartProducingOpsInRange(getZooKeeper(), drop_range, entry);
|
||||||
|
}
|
||||||
else
|
else
|
||||||
|
{
|
||||||
drop_range = {};
|
drop_range = {};
|
||||||
|
}
|
||||||
|
|
||||||
struct PartDescription
|
struct PartDescription
|
||||||
{
|
{
|
||||||
@ -7094,6 +7100,8 @@ bool StorageReplicatedMergeTree::dropAllPartsInPartition(
|
|||||||
String log_znode_path = dynamic_cast<const Coordination::CreateResponse &>(*responses.front()).path_created;
|
String log_znode_path = dynamic_cast<const Coordination::CreateResponse &>(*responses.front()).path_created;
|
||||||
entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1);
|
entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1);
|
||||||
|
|
||||||
|
getContext()->getMergeList().cancelInPartition(getStorageID(), partition_id, drop_range_info.max_block);
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user