Better selection of Merges with TTL

This commit is contained in:
alesapin 2020-09-02 11:18:50 +03:00
parent dce57976c8
commit b08056fa8c
15 changed files with 145 additions and 27 deletions

View File

@ -21,6 +21,7 @@ MergeListElement::MergeListElement(const std::string & database_, const std::str
, result_data_version{future_part.part_info.getDataVersion()} , result_data_version{future_part.part_info.getDataVersion()}
, num_parts{future_part.parts.size()} , num_parts{future_part.parts.size()}
, thread_id{getThreadId()} , thread_id{getThreadId()}
, merge_type{toString(future_part.merge_type)}
{ {
for (const auto & source_part : future_part.parts) for (const auto & source_part : future_part.parts)
{ {
@ -70,6 +71,7 @@ MergeInfo MergeListElement::getInfo() const
res.columns_written = columns_written.load(std::memory_order_relaxed); res.columns_written = columns_written.load(std::memory_order_relaxed);
res.memory_usage = memory_tracker.get(); res.memory_usage = memory_tracker.get();
res.thread_id = thread_id; res.thread_id = thread_id;
res.merge_type = merge_type;
for (const auto & source_part_name : source_part_names) for (const auto & source_part_name : source_part_names)
res.source_part_names.emplace_back(source_part_name); res.source_part_names.emplace_back(source_part_name);

View File

@ -45,6 +45,7 @@ struct MergeInfo
UInt64 columns_written; UInt64 columns_written;
UInt64 memory_usage; UInt64 memory_usage;
UInt64 thread_id; UInt64 thread_id;
std::string merge_type;
}; };
struct FutureMergedMutatedPart; struct FutureMergedMutatedPart;
@ -88,6 +89,7 @@ struct MergeListElement : boost::noncopyable
UInt64 thread_id; UInt64 thread_id;
const std::string merge_type;
MergeListElement(const std::string & database, const std::string & table, const FutureMergedMutatedPart & future_part); MergeListElement(const std::string & database, const std::string & table, const FutureMergedMutatedPart & future_part);

View File

@ -158,15 +158,15 @@ MergeTreeDataMergerMutator::MergeTreeDataMergerMutator(MergeTreeData & data_, si
} }
UInt64 MergeTreeDataMergerMutator::getMaxSourcePartsSizeForMerge() UInt64 MergeTreeDataMergerMutator::getMaxSourcePartsSizeForMerge(MergeType merge_type)
{ {
size_t busy_threads_in_pool = CurrentMetrics::values[CurrentMetrics::BackgroundPoolTask].load(std::memory_order_relaxed); size_t busy_threads_in_pool = CurrentMetrics::values[CurrentMetrics::BackgroundPoolTask].load(std::memory_order_relaxed);
return getMaxSourcePartsSizeForMerge(background_pool_size, busy_threads_in_pool == 0 ? 0 : busy_threads_in_pool - 1); /// 1 is current thread return getMaxSourcePartsSizeForMerge(background_pool_size, busy_threads_in_pool == 0 ? 0 : busy_threads_in_pool - 1, merge_type); /// 1 is current thread
} }
UInt64 MergeTreeDataMergerMutator::getMaxSourcePartsSizeForMerge(size_t pool_size, size_t pool_used) UInt64 MergeTreeDataMergerMutator::getMaxSourcePartsSizeForMerge(size_t pool_size, size_t pool_used, MergeType merge_type)
{ {
if (pool_used > pool_size) if (pool_used > pool_size)
throw Exception("Logical error: invalid arguments passed to getMaxSourcePartsSize: pool_used > pool_size", ErrorCodes::LOGICAL_ERROR); throw Exception("Logical error: invalid arguments passed to getMaxSourcePartsSize: pool_used > pool_size", ErrorCodes::LOGICAL_ERROR);
@ -178,14 +178,21 @@ UInt64 MergeTreeDataMergerMutator::getMaxSourcePartsSizeForMerge(size_t pool_siz
/// One entry is probably the entry where this function is executed. /// One entry is probably the entry where this function is executed.
/// This will protect from bad settings. /// This will protect from bad settings.
size_t lowering_setting;
if (merge_type == MergeType::TTL_DELETE)
lowering_setting = data_settings->number_of_free_entries_in_pool_to_lower_max_size_of_merge_with_ttl;
else
lowering_setting = data_settings->number_of_free_entries_in_pool_to_lower_max_size_of_merge;
UInt64 max_size = 0; UInt64 max_size = 0;
if (pool_used <= 1 || free_entries >= data_settings->number_of_free_entries_in_pool_to_lower_max_size_of_merge) if (pool_used <= 1 || free_entries >= lowering_setting)
max_size = data_settings->max_bytes_to_merge_at_max_space_in_pool; max_size = data_settings->max_bytes_to_merge_at_max_space_in_pool;
else else
max_size = interpolateExponential( max_size = interpolateExponential(
data_settings->max_bytes_to_merge_at_min_space_in_pool, data_settings->max_bytes_to_merge_at_min_space_in_pool,
data_settings->max_bytes_to_merge_at_max_space_in_pool, data_settings->max_bytes_to_merge_at_max_space_in_pool,
static_cast<double>(free_entries) / data_settings->number_of_free_entries_in_pool_to_lower_max_size_of_merge); static_cast<double>(free_entries) / lowering_setting);
return std::min(max_size, static_cast<UInt64>(data.getStoragePolicy()->getMaxUnreservedFreeSpace() / DISK_USAGE_COEFFICIENT_TO_SELECT)); return std::min(max_size, static_cast<UInt64>(data.getStoragePolicy()->getMaxUnreservedFreeSpace() / DISK_USAGE_COEFFICIENT_TO_SELECT));
} }
@ -213,6 +220,7 @@ bool MergeTreeDataMergerMutator::selectPartsToMerge(
bool aggressive, bool aggressive,
size_t max_total_size_to_merge, size_t max_total_size_to_merge,
const AllowedMergingPredicate & can_merge_callback, const AllowedMergingPredicate & can_merge_callback,
size_t max_total_size_to_merge_with_ttl,
String * out_disable_reason) String * out_disable_reason)
{ {
MergeTreeData::DataPartsVector data_parts = data.getDataPartsVector(); MergeTreeData::DataPartsVector data_parts = data.getDataPartsVector();
@ -284,7 +292,9 @@ bool MergeTreeDataMergerMutator::selectPartsToMerge(
current_time, current_time,
data_settings->merge_with_ttl_timeout, data_settings->merge_with_ttl_timeout,
data_settings->ttl_only_drop_parts); data_settings->ttl_only_drop_parts);
parts_to_merge = merge_selector.select(partitions, max_total_size_to_merge);
parts_to_merge = merge_selector.select(partitions, max_total_size_to_merge_with_ttl);
future_part.merge_type = MergeType::TTL_DELETE;
} }
if (parts_to_merge.empty()) if (parts_to_merge.empty())
@ -306,6 +316,7 @@ bool MergeTreeDataMergerMutator::selectPartsToMerge(
*out_disable_reason = "There is no need to merge parts according to merge selector algorithm"; *out_disable_reason = "There is no need to merge parts according to merge selector algorithm";
return false; return false;
} }
future_part.merge_type = MergeType::NORMAL;
} }
MergeTreeData::DataPartsVector parts; MergeTreeData::DataPartsVector parts;
@ -385,6 +396,12 @@ bool MergeTreeDataMergerMutator::selectAllPartsToMergeWithinPartition(
LOG_DEBUG(log, "Selected {} parts from {} to {}", parts.size(), parts.front()->name, parts.back()->name); LOG_DEBUG(log, "Selected {} parts from {} to {}", parts.size(), parts.front()->name, parts.back()->name);
future_part.assign(std::move(parts)); future_part.assign(std::move(parts));
if (final)
future_part.merge_type = MergeType::FINAL;
else
future_part.merge_type = MergeType::NORMAL;
available_disk_space -= required_disk_space; available_disk_space -= required_disk_space;
return true; return true;
} }
@ -634,6 +651,9 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
new_data_part->partition.assign(future_part.getPartition()); new_data_part->partition.assign(future_part.getPartition());
new_data_part->is_temp = true; new_data_part->is_temp = true;
if (future_part.merge_type == MergeType::TTL_DELETE && ttl_merges_blocker.isCancelled())
throw Exception("Cancelled merging parts with expired TTL", ErrorCodes::ABORTED);
bool need_remove_expired_values = false; bool need_remove_expired_values = false;
for (const auto & part : parts) for (const auto & part : parts)
new_data_part->ttl_infos.update(part->ttl_infos); new_data_part->ttl_infos.update(part->ttl_infos);

View File

@ -6,6 +6,7 @@
#include <functional> #include <functional>
#include <Common/ActionBlocker.h> #include <Common/ActionBlocker.h>
#include <Storages/MergeTree/TTLMergeSelector.h> #include <Storages/MergeTree/TTLMergeSelector.h>
#include <Storages/MergeTree/MergeType.h>
namespace DB namespace DB
@ -22,6 +23,7 @@ struct FutureMergedMutatedPart
MergeTreeDataPartType type; MergeTreeDataPartType type;
MergeTreePartInfo part_info; MergeTreePartInfo part_info;
MergeTreeData::DataPartsVector parts; MergeTreeData::DataPartsVector parts;
MergeType merge_type = MergeType::NORMAL;
const MergeTreePartition & getPartition() const { return parts.front()->partition; } const MergeTreePartition & getPartition() const { return parts.front()->partition; }
@ -57,12 +59,12 @@ public:
/** Get maximum total size of parts to do merge, at current moment of time. /** Get maximum total size of parts to do merge, at current moment of time.
* It depends on number of free threads in background_pool and amount of free space in disk. * It depends on number of free threads in background_pool and amount of free space in disk.
*/ */
UInt64 getMaxSourcePartsSizeForMerge(); UInt64 getMaxSourcePartsSizeForMerge(MergeType merge_type);
/** For explicitly passed size of pool and number of used tasks. /** For explicitly passed size of pool and number of used tasks.
* This method could be used to calculate threshold depending on number of tasks in replication queue. * This method could be used to calculate threshold depending on number of tasks in replication queue.
*/ */
UInt64 getMaxSourcePartsSizeForMerge(size_t pool_size, size_t pool_used); UInt64 getMaxSourcePartsSizeForMerge(size_t pool_size, size_t pool_used, MergeType merge_type);
/** Get maximum total size of parts to do mutation, at current moment of time. /** Get maximum total size of parts to do mutation, at current moment of time.
* It depends only on amount of free space in disk. * It depends only on amount of free space in disk.
@ -81,6 +83,7 @@ public:
bool aggressive, bool aggressive,
size_t max_total_size_to_merge, size_t max_total_size_to_merge,
const AllowedMergingPredicate & can_merge, const AllowedMergingPredicate & can_merge,
size_t max_total_size_to_merge_with_ttl,
String * out_disable_reason = nullptr); String * out_disable_reason = nullptr);
/** Select all the parts in the specified partition for merge, if possible. /** Select all the parts in the specified partition for merge, if possible.

View File

@ -33,8 +33,10 @@ struct Settings;
M(UInt64, max_bytes_to_merge_at_min_space_in_pool, 1024 * 1024, "Maximum in total size of parts to merge, when there are minimum free threads in background pool (or entries in replication queue).", 0) \ M(UInt64, max_bytes_to_merge_at_min_space_in_pool, 1024 * 1024, "Maximum in total size of parts to merge, when there are minimum free threads in background pool (or entries in replication queue).", 0) \
M(UInt64, max_replicated_merges_in_queue, 16, "How many tasks of merging and mutating parts are allowed simultaneously in ReplicatedMergeTree queue.", 0) \ M(UInt64, max_replicated_merges_in_queue, 16, "How many tasks of merging and mutating parts are allowed simultaneously in ReplicatedMergeTree queue.", 0) \
M(UInt64, max_replicated_mutations_in_queue, 8, "How many tasks of mutating parts are allowed simultaneously in ReplicatedMergeTree queue.", 0) \ M(UInt64, max_replicated_mutations_in_queue, 8, "How many tasks of mutating parts are allowed simultaneously in ReplicatedMergeTree queue.", 0) \
M(UInt64, max_replicated_merges_with_ttl_in_queue, 1, "How many tasks of mutating parts are allowed simultaneously in ReplicatedMergeTree queue.", 0) \
M(UInt64, number_of_free_entries_in_pool_to_lower_max_size_of_merge, 8, "When there is less than specified number of free entries in pool (or replicated queue), start to lower maximum size of merge to process (or to put in queue). This is to allow small merges to process - not filling the pool with long running merges.", 0) \ M(UInt64, number_of_free_entries_in_pool_to_lower_max_size_of_merge, 8, "When there is less than specified number of free entries in pool (or replicated queue), start to lower maximum size of merge to process (or to put in queue). This is to allow small merges to process - not filling the pool with long running merges.", 0) \
M(UInt64, number_of_free_entries_in_pool_to_execute_mutation, 10, "When there is less than specified number of free entries in pool, do not execute part mutations. This is to leave free threads for regular merges and avoid \"Too many parts\"", 0) \ M(UInt64, number_of_free_entries_in_pool_to_execute_mutation, 10, "When there is less than specified number of free entries in pool, do not execute part mutations. This is to leave free threads for regular merges and avoid \"Too many parts\"", 0) \
M(UInt64, number_of_free_entries_in_pool_to_lower_max_size_of_merge_with_ttl, 14, "When there is less than specified number of free entries in pool (or replicated queue), start to lower maximum size of merge to process (or to put in queue). This is to allow small merges to process - not filling the pool with long running merges.", 0) \
M(Seconds, old_parts_lifetime, 8 * 60, "How many seconds to keep obsolete parts.", 0) \ M(Seconds, old_parts_lifetime, 8 * 60, "How many seconds to keep obsolete parts.", 0) \
M(Seconds, temporary_directories_lifetime, 86400, "How many seconds to keep tmp_-directories.", 0) \ M(Seconds, temporary_directories_lifetime, 86400, "How many seconds to keep tmp_-directories.", 0) \
M(Seconds, lock_acquire_timeout_for_background_operations, DBMS_DEFAULT_LOCK_ACQUIRE_TIMEOUT_SEC, "For background operations like merges, mutations etc. How many seconds before failing to acquire table locks.", 0) \ M(Seconds, lock_acquire_timeout_for_background_operations, DBMS_DEFAULT_LOCK_ACQUIRE_TIMEOUT_SEC, "For background operations like merges, mutations etc. How many seconds before failing to acquire table locks.", 0) \
@ -83,7 +85,7 @@ struct Settings;
M(UInt64, min_merge_bytes_to_use_direct_io, 10ULL * 1024 * 1024 * 1024, "Minimal amount of bytes to enable O_DIRECT in merge (0 - disabled).", 0) \ M(UInt64, min_merge_bytes_to_use_direct_io, 10ULL * 1024 * 1024 * 1024, "Minimal amount of bytes to enable O_DIRECT in merge (0 - disabled).", 0) \
M(UInt64, index_granularity_bytes, 10 * 1024 * 1024, "Approximate amount of bytes in single granule (0 - disabled).", 0) \ M(UInt64, index_granularity_bytes, 10 * 1024 * 1024, "Approximate amount of bytes in single granule (0 - disabled).", 0) \
M(UInt64, min_index_granularity_bytes, 1024, "Minimum amount of bytes in single granule.", 1024) \ M(UInt64, min_index_granularity_bytes, 1024, "Minimum amount of bytes in single granule.", 1024) \
M(Int64, merge_with_ttl_timeout, 3600 * 24, "Minimal time in seconds, when merge with TTL can be repeated.", 0) \ M(Int64, merge_with_ttl_timeout, 0, "Minimal time in seconds, when merge with TTL can be repeated.", 0) \
M(Bool, ttl_only_drop_parts, false, "Only drop altogether the expired parts and not partially prune them.", 0) \ M(Bool, ttl_only_drop_parts, false, "Only drop altogether the expired parts and not partially prune them.", 0) \
M(Bool, write_final_mark, 1, "Write final mark after end of column (0 - disabled, do nothing if index_granularity_bytes=0)", 0) \ M(Bool, write_final_mark, 1, "Write final mark after end of column (0 - disabled, do nothing if index_granularity_bytes=0)", 0) \
M(Bool, enable_mixed_granularity_parts, 1, "Enable parts with adaptive and non adaptive granularity", 0) \ M(Bool, enable_mixed_granularity_parts, 1, "Enable parts with adaptive and non adaptive granularity", 0) \

View File

@ -0,0 +1,27 @@
#include <Storages/MergeTree/MergeType.h>
#include <Common/Exception.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
}
String toString(MergeType merge_type)
{
switch (merge_type)
{
case MergeType::NORMAL:
return "NORMAL";
case MergeType::FINAL:
return "FINAL";
case MergeType::TTL_DELETE:
return "TTL_DELETE";
}
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Unknown MergeType {}", static_cast<UInt64>(merge_type));
}
}

View File

@ -0,0 +1,17 @@
#pragma once
#include <Core/Types.h>
namespace DB
{
enum class MergeType
{
NORMAL,
FINAL,
TTL_DELETE,
};
String toString(MergeType merge_type);
}

View File

@ -36,6 +36,8 @@ void ReplicatedMergeTreeLogEntryData::writeText(WriteBuffer & out) const
out << s << '\n'; out << s << '\n';
out << "into\n" << new_part_name; out << "into\n" << new_part_name;
out << "\ndeduplicate: " << deduplicate; out << "\ndeduplicate: " << deduplicate;
if (merge_type != MergeType::NORMAL)
out <<"\nmerge_type: " << static_cast<UInt64>(merge_type);
break; break;
case DROP_RANGE: case DROP_RANGE:
@ -149,7 +151,18 @@ void ReplicatedMergeTreeLogEntryData::readText(ReadBuffer & in)
} }
in >> new_part_name; in >> new_part_name;
if (format_version >= 4) if (format_version >= 4)
{
in >> "\ndeduplicate: " >> deduplicate; in >> "\ndeduplicate: " >> deduplicate;
in >> "\n";
if (in.eof())
trailing_newline_found = true;
else if (checkString("merge_type: ", in))
{
UInt64 value;
in >> value;
merge_type = static_cast<MergeType>(value);
}
}
} }
else if (type_str == "drop" || type_str == "detach") else if (type_str == "drop" || type_str == "detach")
{ {

View File

@ -5,6 +5,7 @@
#include <Core/Types.h> #include <Core/Types.h>
#include <IO/WriteHelpers.h> #include <IO/WriteHelpers.h>
#include <Storages/MergeTree/MergeTreeDataPartType.h> #include <Storages/MergeTree/MergeTreeDataPartType.h>
#include <Storages/MergeTree/MergeType.h>
#include <mutex> #include <mutex>
#include <condition_variable> #include <condition_variable>
@ -79,6 +80,7 @@ struct ReplicatedMergeTreeLogEntryData
Strings source_parts; Strings source_parts;
bool deduplicate = false; /// Do deduplicate on merge bool deduplicate = false; /// Do deduplicate on merge
MergeType merge_type = MergeType::NORMAL;
String column_name; String column_name;
String index_name; String index_name;

View File

@ -1061,7 +1061,7 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
return false; return false;
} }
UInt64 max_source_parts_size = entry.type == LogEntry::MERGE_PARTS ? merger_mutator.getMaxSourcePartsSizeForMerge() UInt64 max_source_parts_size = entry.type == LogEntry::MERGE_PARTS ? merger_mutator.getMaxSourcePartsSizeForMerge(entry.merge_type)
: merger_mutator.getMaxSourcePartSizeForMutation(); : merger_mutator.getMaxSourcePartSizeForMutation();
/** If there are enough free threads in background pool to do large merges (maximal size of merge is allowed), /** If there are enough free threads in background pool to do large merges (maximal size of merge is allowed),
* then ignore value returned by getMaxSourcePartsSizeForMerge() and execute merge of any size, * then ignore value returned by getMaxSourcePartsSizeForMerge() and execute merge of any size,
@ -1312,21 +1312,26 @@ bool ReplicatedMergeTreeQueue::processEntry(
} }
std::pair<size_t, size_t> ReplicatedMergeTreeQueue::countMergesAndPartMutations() const ReplicatedMergeTreeQueue::OperationsInQueue ReplicatedMergeTreeQueue::countMergesAndPartMutations() const
{ {
std::lock_guard lock(state_mutex); std::lock_guard lock(state_mutex);
size_t count_merges = 0; size_t count_merges = 0;
size_t count_mutations = 0; size_t count_mutations = 0;
size_t count_merges_with_ttl = 0;
for (const auto & entry : queue) for (const auto & entry : queue)
{ {
if (entry->type == ReplicatedMergeTreeLogEntry::MERGE_PARTS) if (entry->type == ReplicatedMergeTreeLogEntry::MERGE_PARTS)
{
++count_merges; ++count_merges;
if (entry->merge_type == MergeType::TTL_DELETE)
++count_merges_with_ttl;
}
else if (entry->type == ReplicatedMergeTreeLogEntry::MUTATE_PART) else if (entry->type == ReplicatedMergeTreeLogEntry::MUTATE_PART)
++count_mutations; ++count_mutations;
} }
return std::make_pair(count_merges, count_mutations); return OperationsInQueue{count_merges, count_mutations, count_merges_with_ttl};
} }

View File

@ -46,6 +46,13 @@ private:
} }
}; };
struct OperationsInQueue
{
size_t merges = 0;
size_t mutations = 0;
size_t merges_with_ttl = 0;
};
/// To calculate min_unprocessed_insert_time, max_processed_insert_time, for which the replica lag is calculated. /// To calculate min_unprocessed_insert_time, max_processed_insert_time, for which the replica lag is calculated.
using InsertsByTime = std::set<LogEntryPtr, ByTime>; using InsertsByTime = std::set<LogEntryPtr, ByTime>;
@ -325,7 +332,7 @@ public:
bool processEntry(std::function<zkutil::ZooKeeperPtr()> get_zookeeper, LogEntryPtr & entry, const std::function<bool(LogEntryPtr &)> func); bool processEntry(std::function<zkutil::ZooKeeperPtr()> get_zookeeper, LogEntryPtr & entry, const std::function<bool(LogEntryPtr &)> func);
/// Count the number of merges and mutations of single parts in the queue. /// Count the number of merges and mutations of single parts in the queue.
std::pair<size_t, size_t> countMergesAndPartMutations() const; OperationsInQueue countMergesAndPartMutations() const;
/// Count the total number of active mutations. /// Count the total number of active mutations.
size_t countMutations() const; size_t countMutations() const;

View File

@ -650,9 +650,14 @@ bool StorageMergeTree::merge(
if (partition_id.empty()) if (partition_id.empty())
{ {
UInt64 max_source_parts_size = merger_mutator.getMaxSourcePartsSizeForMerge(); UInt64 max_source_parts_size = merger_mutator.getMaxSourcePartsSizeForMerge(MergeType::NORMAL);
UInt64 max_source_parts_size_with_ttl = 0;
if (!aggressive)
max_source_parts_size_with_ttl = merger_mutator.getMaxSourcePartsSizeForMerge(MergeType::TTL_DELETE);
if (max_source_parts_size > 0) if (max_source_parts_size > 0)
selected = merger_mutator.selectPartsToMerge(future_part, aggressive, max_source_parts_size, can_merge, out_disable_reason); selected = merger_mutator.selectPartsToMerge(future_part, aggressive, max_source_parts_size, can_merge, max_source_parts_size_with_ttl, out_disable_reason);
else if (out_disable_reason) else if (out_disable_reason)
*out_disable_reason = "Current value of max_source_parts_size is zero"; *out_disable_reason = "Current value of max_source_parts_size is zero";
} }
@ -724,6 +729,7 @@ bool StorageMergeTree::merge(
try try
{ {
std::cerr << "FUTURE PART MERGE TYPE:" << toString(future_part.merge_type) << std::endl;
new_part = merger_mutator.mergePartsToTemporaryPart( new_part = merger_mutator.mergePartsToTemporaryPart(
future_part, metadata_snapshot, *merge_entry, table_lock_holder, time(nullptr), future_part, metadata_snapshot, *merge_entry, table_lock_holder, time(nullptr),
merging_tagger->reserved_space, deduplicate); merging_tagger->reserved_space, deduplicate);

View File

@ -2514,31 +2514,38 @@ void StorageReplicatedMergeTree::mergeSelectingTask()
/// and in the same time, many small parts could be created and won't be merged. /// and in the same time, many small parts could be created and won't be merged.
auto merges_and_mutations_queued = queue.countMergesAndPartMutations(); auto merges_and_mutations_queued = queue.countMergesAndPartMutations();
size_t merges_and_mutations_sum = merges_and_mutations_queued.first + merges_and_mutations_queued.second; size_t merges_and_mutations_sum = merges_and_mutations_queued.merges + merges_and_mutations_queued.mutations;
if (merges_and_mutations_sum >= storage_settings_ptr->max_replicated_merges_in_queue) if (merges_and_mutations_sum >= storage_settings_ptr->max_replicated_merges_in_queue)
{ {
LOG_TRACE(log, "Number of queued merges ({}) and part mutations ({})" LOG_TRACE(log, "Number of queued merges ({}) and part mutations ({})"
" is greater than max_replicated_merges_in_queue ({}), so won't select new parts to merge or mutate.", " is greater than max_replicated_merges_in_queue ({}), so won't select new parts to merge or mutate.",
merges_and_mutations_queued.first, merges_and_mutations_queued.merges,
merges_and_mutations_queued.second, merges_and_mutations_queued.mutations,
storage_settings_ptr->max_replicated_merges_in_queue); storage_settings_ptr->max_replicated_merges_in_queue);
} }
else else
{ {
UInt64 max_source_parts_size_for_merge = merger_mutator.getMaxSourcePartsSizeForMerge( UInt64 max_source_parts_size_for_merge = merger_mutator.getMaxSourcePartsSizeForMerge(
storage_settings_ptr->max_replicated_merges_in_queue, merges_and_mutations_sum); storage_settings_ptr->max_replicated_merges_in_queue, merges_and_mutations_sum, MergeType::NORMAL);
UInt64 max_source_parts_size_for_merge_with_ttl = 0;
if (merges_and_mutations_queued.merges_with_ttl < storage_settings_ptr->max_replicated_merges_with_ttl_in_queue)
max_source_parts_size_for_merge_with_ttl = merger_mutator.getMaxSourcePartsSizeForMerge(
storage_settings_ptr->max_replicated_merges_in_queue, merges_and_mutations_sum, MergeType::TTL_DELETE);
UInt64 max_source_part_size_for_mutation = merger_mutator.getMaxSourcePartSizeForMutation(); UInt64 max_source_part_size_for_mutation = merger_mutator.getMaxSourcePartSizeForMutation();
FutureMergedMutatedPart future_merged_part; FutureMergedMutatedPart future_merged_part;
if (max_source_parts_size_for_merge > 0 && if (max_source_parts_size_for_merge > 0 &&
merger_mutator.selectPartsToMerge(future_merged_part, false, max_source_parts_size_for_merge, merge_pred, nullptr)) merger_mutator.selectPartsToMerge(future_merged_part, false,
max_source_parts_size_for_merge, merge_pred, max_source_parts_size_for_merge_with_ttl, nullptr))
{ {
create_result = createLogEntryToMergeParts(zookeeper, future_merged_part.parts, create_result = createLogEntryToMergeParts(zookeeper, future_merged_part.parts,
future_merged_part.name, future_merged_part.type, deduplicate, nullptr, merge_pred.getVersion()); future_merged_part.name, future_merged_part.type, deduplicate, nullptr, merge_pred.getVersion(), future_merged_part.merge_type);
} }
/// If there are many mutations in queue, it may happen, that we cannot enqueue enough merges to merge all new parts /// If there are many mutations in queue, it may happen, that we cannot enqueue enough merges to merge all new parts
else if (max_source_part_size_for_mutation > 0 && queue.countMutations() > 0 else if (max_source_part_size_for_mutation > 0 && queue.countMutations() > 0
&& merges_and_mutations_queued.second < storage_settings_ptr->max_replicated_mutations_in_queue) && merges_and_mutations_queued.mutations < storage_settings_ptr->max_replicated_mutations_in_queue)
{ {
/// Choose a part to mutate. /// Choose a part to mutate.
DataPartsVector data_parts = getDataPartsVector(); DataPartsVector data_parts = getDataPartsVector();
@ -2617,7 +2624,8 @@ StorageReplicatedMergeTree::CreateMergeEntryResult StorageReplicatedMergeTree::c
const MergeTreeDataPartType & merged_part_type, const MergeTreeDataPartType & merged_part_type,
bool deduplicate, bool deduplicate,
ReplicatedMergeTreeLogEntryData * out_log_entry, ReplicatedMergeTreeLogEntryData * out_log_entry,
int32_t log_version) int32_t log_version,
MergeType merge_type)
{ {
std::vector<std::future<Coordination::ExistsResponse>> exists_futures; std::vector<std::future<Coordination::ExistsResponse>> exists_futures;
exists_futures.reserve(parts.size()); exists_futures.reserve(parts.size());
@ -2649,6 +2657,7 @@ StorageReplicatedMergeTree::CreateMergeEntryResult StorageReplicatedMergeTree::c
entry.source_replica = replica_name; entry.source_replica = replica_name;
entry.new_part_name = merged_name; entry.new_part_name = merged_name;
entry.new_part_type = merged_part_type; entry.new_part_type = merged_part_type;
entry.merge_type = merge_type;
entry.deduplicate = deduplicate; entry.deduplicate = deduplicate;
entry.create_time = time(nullptr); entry.create_time = time(nullptr);
@ -3584,7 +3593,7 @@ bool StorageReplicatedMergeTree::optimize(
CreateMergeEntryResult create_result = createLogEntryToMergeParts( CreateMergeEntryResult create_result = createLogEntryToMergeParts(
zookeeper, future_merged_part.parts, zookeeper, future_merged_part.parts,
future_merged_part.name, future_merged_part.type, deduplicate, future_merged_part.name, future_merged_part.type, deduplicate,
&merge_entry, can_merge.getVersion()); &merge_entry, can_merge.getVersion(), future_merged_part.merge_type);
if (create_result == CreateMergeEntryResult::MissingPart) if (create_result == CreateMergeEntryResult::MissingPart)
return handle_noop("Can't create merge queue node in ZooKeeper, because some parts are missing"); return handle_noop("Can't create merge queue node in ZooKeeper, because some parts are missing");
@ -3614,7 +3623,7 @@ bool StorageReplicatedMergeTree::optimize(
if (!partition) if (!partition)
{ {
selected = merger_mutator.selectPartsToMerge( selected = merger_mutator.selectPartsToMerge(
future_merged_part, true, storage_settings_ptr->max_bytes_to_merge_at_max_space_in_pool, can_merge, &disable_reason); future_merged_part, true, storage_settings_ptr->max_bytes_to_merge_at_max_space_in_pool, can_merge, 0, &disable_reason);
} }
else else
{ {
@ -3639,7 +3648,7 @@ bool StorageReplicatedMergeTree::optimize(
CreateMergeEntryResult create_result = createLogEntryToMergeParts( CreateMergeEntryResult create_result = createLogEntryToMergeParts(
zookeeper, future_merged_part.parts, zookeeper, future_merged_part.parts,
future_merged_part.name, future_merged_part.type, deduplicate, future_merged_part.name, future_merged_part.type, deduplicate,
&merge_entry, can_merge.getVersion()); &merge_entry, can_merge.getVersion(), future_merged_part.merge_type);
if (create_result == CreateMergeEntryResult::MissingPart) if (create_result == CreateMergeEntryResult::MissingPart)
return handle_noop("Can't create merge queue node in ZooKeeper, because some parts are missing"); return handle_noop("Can't create merge queue node in ZooKeeper, because some parts are missing");

View File

@ -450,7 +450,8 @@ private:
const MergeTreeDataPartType & merged_part_type, const MergeTreeDataPartType & merged_part_type,
bool deduplicate, bool deduplicate,
ReplicatedMergeTreeLogEntryData * out_log_entry, ReplicatedMergeTreeLogEntryData * out_log_entry,
int32_t log_version); int32_t log_version,
MergeType merge_type);
CreateMergeEntryResult createLogEntryToMutatePart( CreateMergeEntryResult createLogEntryToMutatePart(
const IMergeTreeDataPart & part, const IMergeTreeDataPart & part,

View File

@ -30,6 +30,7 @@ NamesAndTypesList StorageSystemMerges::getNamesAndTypes()
{"columns_written", std::make_shared<DataTypeUInt64>()}, {"columns_written", std::make_shared<DataTypeUInt64>()},
{"memory_usage", std::make_shared<DataTypeUInt64>()}, {"memory_usage", std::make_shared<DataTypeUInt64>()},
{"thread_id", std::make_shared<DataTypeUInt64>()}, {"thread_id", std::make_shared<DataTypeUInt64>()},
{"merge_type", std::make_shared<DataTypeString>()},
}; };
} }
@ -65,6 +66,7 @@ void StorageSystemMerges::fillData(MutableColumns & res_columns, const Context &
res_columns[i++]->insert(merge.columns_written); res_columns[i++]->insert(merge.columns_written);
res_columns[i++]->insert(merge.memory_usage); res_columns[i++]->insert(merge.memory_usage);
res_columns[i++]->insert(merge.thread_id); res_columns[i++]->insert(merge.thread_id);
res_columns[i++]->insert(merge.merge_type);
} }
} }