mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-25 00:52:02 +00:00
Some comments
This commit is contained in:
parent
61ecaebcb1
commit
82c56349a5
@ -3666,4 +3666,9 @@ NamesAndTypesList MergeTreeData::getVirtuals() const
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
size_t MergeTreeData::getTotalMergesWithTTLInMergeList() const
|
||||||
|
{
|
||||||
|
return global_context.getMergeList().getExecutingMergesWithTTLCount();
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -646,6 +646,11 @@ public:
|
|||||||
/// TTL rule.
|
/// TTL rule.
|
||||||
bool isPartInTTLDestination(const TTLDescription & ttl, const IMergeTreeDataPart & part) const;
|
bool isPartInTTLDestination(const TTLDescription & ttl, const IMergeTreeDataPart & part) const;
|
||||||
|
|
||||||
|
/// Get count of total merges with TTL in MergeList (system.merges) for all
|
||||||
|
/// tables (not only current table).
|
||||||
|
/// Method is cheap and doesn't require any locks.
|
||||||
|
size_t getTotalMergesWithTTLInMergeList() const;
|
||||||
|
|
||||||
using WriteAheadLogPtr = std::shared_ptr<MergeTreeWriteAheadLog>;
|
using WriteAheadLogPtr = std::shared_ptr<MergeTreeWriteAheadLog>;
|
||||||
WriteAheadLogPtr getWriteAheadLog();
|
WriteAheadLogPtr getWriteAheadLog();
|
||||||
|
|
||||||
|
@ -5,16 +5,24 @@
|
|||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
|
|
||||||
|
/// Type of Merge. Used to control amount of different merges during merges
|
||||||
|
/// assignment. Also allows to apply special logic during merge process
|
||||||
|
/// (mergePartsToTemporaryPart). Stored in FutureMergedMutatedPart and
|
||||||
|
/// ReplicatedMergeTreeLogEntry.
|
||||||
|
///
|
||||||
|
/// Order is important, don't try to change it.
|
||||||
enum class MergeType
|
enum class MergeType
|
||||||
{
|
{
|
||||||
REGULAR,
|
REGULAR = 1,
|
||||||
TTL_DELETE,
|
TTL_DELETE = 2,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/// Check parsed merge_type from raw int and get enum value.
|
||||||
MergeType checkAndGetMergeType(UInt64 merge_type);
|
MergeType checkAndGetMergeType(UInt64 merge_type);
|
||||||
|
|
||||||
String toString(MergeType merge_type);
|
String toString(MergeType merge_type);
|
||||||
|
|
||||||
|
/// Check this merge assigned with TTL
|
||||||
bool isTTLMergeType(MergeType merge_type);
|
bool isTTLMergeType(MergeType merge_type);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -1070,7 +1070,34 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
|
|||||||
* because the leader replica does not assign merges of greater size (except OPTIMIZE PARTITION and OPTIMIZE FINAL).
|
* because the leader replica does not assign merges of greater size (except OPTIMIZE PARTITION and OPTIMIZE FINAL).
|
||||||
*/
|
*/
|
||||||
const auto data_settings = data.getSettings();
|
const auto data_settings = data.getSettings();
|
||||||
bool ignore_max_size = (entry.type == LogEntry::MERGE_PARTS) && (max_source_parts_size == data_settings->max_bytes_to_merge_at_max_space_in_pool);
|
bool ignore_max_size = false;
|
||||||
|
if (entry.type == LogEntry::MERGE_PARTS)
|
||||||
|
{
|
||||||
|
ignore_max_size = max_source_parts_size == data_settings->max_bytes_to_merge_at_max_space_in_pool;
|
||||||
|
|
||||||
|
if (isTTLMergeType(entry.merge_type))
|
||||||
|
{
|
||||||
|
if (merger_mutator.ttl_merges_blocker.isCancelled())
|
||||||
|
{
|
||||||
|
String reason = "Not executing log entry for part " + entry.new_part_name + " because merges with TTL is cancelled now.";
|
||||||
|
LOG_DEBUG(log, reason);
|
||||||
|
out_postpone_reason = reason;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
size_t total_merges_with_ttl = data.getTotalMergesWithTTLInMergeList();
|
||||||
|
if (total_merges_with_ttl >= data_settings->max_number_of_merges_with_ttl_in_pool)
|
||||||
|
{
|
||||||
|
const char * format_str = "Not executing log entry for part {}"
|
||||||
|
" because {} merges with TTL already executing, maximum {}.";
|
||||||
|
LOG_DEBUG(log, format_str, entry.new_part_name, total_merges_with_ttl,
|
||||||
|
data_settings->max_number_of_merges_with_ttl_in_pool);
|
||||||
|
|
||||||
|
out_postpone_reason = fmt::format(format_str, entry.new_part_name, total_merges_with_ttl,
|
||||||
|
data_settings->max_number_of_merges_with_ttl_in_pool);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (!ignore_max_size && sum_parts_size_in_bytes > max_source_parts_size)
|
if (!ignore_max_size && sum_parts_size_in_bytes > max_source_parts_size)
|
||||||
{
|
{
|
||||||
|
@ -633,7 +633,7 @@ bool StorageMergeTree::merge(
|
|||||||
|
|
||||||
/// You must call destructor with unlocked `currently_processing_in_background_mutex`.
|
/// You must call destructor with unlocked `currently_processing_in_background_mutex`.
|
||||||
std::optional<CurrentlyMergingPartsTagger> merging_tagger;
|
std::optional<CurrentlyMergingPartsTagger> merging_tagger;
|
||||||
auto & merge_list = global_context.getMergeList();
|
MergeList::EntryPtr merge_entry;
|
||||||
|
|
||||||
{
|
{
|
||||||
std::unique_lock lock(currently_processing_in_background_mutex);
|
std::unique_lock lock(currently_processing_in_background_mutex);
|
||||||
@ -653,7 +653,7 @@ bool StorageMergeTree::merge(
|
|||||||
if (partition_id.empty())
|
if (partition_id.empty())
|
||||||
{
|
{
|
||||||
UInt64 max_source_parts_size = merger_mutator.getMaxSourcePartsSizeForMerge();
|
UInt64 max_source_parts_size = merger_mutator.getMaxSourcePartsSizeForMerge();
|
||||||
bool merge_with_ttl_allowed = merge_list.getExecutingMergesWithTTLCount() < data_settings->max_number_of_merges_with_ttl_in_pool;
|
bool merge_with_ttl_allowed = getTotalMergesWithTTLInMergeList() < data_settings->max_number_of_merges_with_ttl_in_pool;
|
||||||
|
|
||||||
/// TTL requirements is much more strict than for regular merge, so
|
/// TTL requirements is much more strict than for regular merge, so
|
||||||
/// if regular not possible, than merge with ttl is not also not
|
/// if regular not possible, than merge with ttl is not also not
|
||||||
@ -716,11 +716,10 @@ bool StorageMergeTree::merge(
|
|||||||
}
|
}
|
||||||
|
|
||||||
merging_tagger.emplace(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace(future_part.parts), *this, false);
|
merging_tagger.emplace(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace(future_part.parts), *this, false);
|
||||||
|
auto table_id = getStorageID();
|
||||||
|
merge_entry = global_context.getMergeList().insert(table_id.database_name, table_id.table_name, future_part);
|
||||||
}
|
}
|
||||||
|
|
||||||
auto table_id = getStorageID();
|
|
||||||
MergeList::EntryPtr merge_entry = global_context.getMergeList().insert(table_id.database_name, table_id.table_name, future_part);
|
|
||||||
|
|
||||||
/// Logging
|
/// Logging
|
||||||
Stopwatch stopwatch;
|
Stopwatch stopwatch;
|
||||||
MutableDataPartPtr new_part;
|
MutableDataPartPtr new_part;
|
||||||
|
@ -1377,6 +1377,7 @@ bool StorageReplicatedMergeTree::tryExecuteMerge(const LogEntry & entry)
|
|||||||
+ backQuote(entry.new_part_name), ErrorCodes::BAD_DATA_PART_NAME);
|
+ backQuote(entry.new_part_name), ErrorCodes::BAD_DATA_PART_NAME);
|
||||||
}
|
}
|
||||||
future_merged_part.updatePath(*this, reserved_space);
|
future_merged_part.updatePath(*this, reserved_space);
|
||||||
|
future_merged_part.merge_type = entry.merge_type;
|
||||||
|
|
||||||
auto table_id = getStorageID();
|
auto table_id = getStorageID();
|
||||||
MergeList::EntryPtr merge_entry = global_context.getMergeList().insert(table_id.database_name, table_id.table_name, future_merged_part);
|
MergeList::EntryPtr merge_entry = global_context.getMergeList().insert(table_id.database_name, table_id.table_name, future_merged_part);
|
||||||
@ -2525,13 +2526,12 @@ void StorageReplicatedMergeTree::mergeSelectingTask()
|
|||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
const auto & merge_list = global_context.getMergeList();
|
|
||||||
UInt64 max_source_parts_size_for_merge = merger_mutator.getMaxSourcePartsSizeForMerge(
|
UInt64 max_source_parts_size_for_merge = merger_mutator.getMaxSourcePartsSizeForMerge(
|
||||||
storage_settings_ptr->max_replicated_merges_in_queue, merges_and_mutations_sum);
|
storage_settings_ptr->max_replicated_merges_in_queue, merges_and_mutations_sum);
|
||||||
UInt64 max_source_part_size_for_mutation = merger_mutator.getMaxSourcePartSizeForMutation();
|
UInt64 max_source_part_size_for_mutation = merger_mutator.getMaxSourcePartSizeForMutation();
|
||||||
|
|
||||||
bool merge_with_ttl_allowed = merges_and_mutations_queued.merges_with_ttl < storage_settings_ptr->max_replicated_merges_with_ttl_in_queue &&
|
bool merge_with_ttl_allowed = merges_and_mutations_queued.merges_with_ttl < storage_settings_ptr->max_replicated_merges_with_ttl_in_queue &&
|
||||||
merge_list.getExecutingMergesWithTTLCount() < storage_settings_ptr->max_number_of_merges_with_ttl_in_pool;
|
getTotalMergesWithTTLInMergeList() < storage_settings_ptr->max_number_of_merges_with_ttl_in_pool;
|
||||||
|
|
||||||
FutureMergedMutatedPart future_merged_part;
|
FutureMergedMutatedPart future_merged_part;
|
||||||
if (max_source_parts_size_for_merge > 0 &&
|
if (max_source_parts_size_for_merge > 0 &&
|
||||||
|
@ -30,13 +30,6 @@ def count_ttl_merges_in_queue(node, table):
|
|||||||
return int(result.strip())
|
return int(result.strip())
|
||||||
|
|
||||||
|
|
||||||
def count_regular_merges_in_queue(node, table):
|
|
||||||
result = node.query("SELECT count() FROM system.replication_queue WHERE merge_type = 'REGULAR' and table = '{}'".format(table))
|
|
||||||
if not result:
|
|
||||||
return 0
|
|
||||||
return int(result.strip())
|
|
||||||
|
|
||||||
|
|
||||||
def count_ttl_merges_in_background_pool(node, table):
|
def count_ttl_merges_in_background_pool(node, table):
|
||||||
result = node.query("SELECT count() FROM system.merges WHERE merge_type = 'TTL_DELETE' and table = '{}'".format(table))
|
result = node.query("SELECT count() FROM system.merges WHERE merge_type = 'TTL_DELETE' and table = '{}'".format(table))
|
||||||
if not result:
|
if not result:
|
||||||
@ -84,3 +77,84 @@ def test_no_ttl_merges_in_busy_pool(started_cluster):
|
|||||||
time.sleep(0.5)
|
time.sleep(0.5)
|
||||||
|
|
||||||
assert_eq_with_retry(node1, "SELECT COUNT() FROM test_ttl", "0")
|
assert_eq_with_retry(node1, "SELECT COUNT() FROM test_ttl", "0")
|
||||||
|
|
||||||
|
|
||||||
|
def test_limited_ttl_merges_in_empty_pool(started_cluster):
|
||||||
|
node1.query("CREATE TABLE test_ttl_v2 (d DateTime, key UInt64, data UInt64) ENGINE = MergeTree() ORDER BY tuple() PARTITION BY key TTL d + INTERVAL 1 MONTH SETTINGS merge_with_ttl_timeout = 0")
|
||||||
|
|
||||||
|
node1.query("SYSTEM STOP TTL MERGES")
|
||||||
|
|
||||||
|
for i in range(100):
|
||||||
|
node1.query("INSERT INTO test_ttl_v2 SELECT now() - INTERVAL 1 MONTH, {}, number FROM numbers(1)".format(i))
|
||||||
|
|
||||||
|
assert node1.query("SELECT COUNT() FROM test_ttl_v2") == "100\n"
|
||||||
|
|
||||||
|
node1.query("SYSTEM START TTL MERGES")
|
||||||
|
|
||||||
|
merges_with_ttl_count = set({})
|
||||||
|
while True:
|
||||||
|
merges_with_ttl_count.add(count_ttl_merges_in_background_pool(node1, "test_ttl_v2"))
|
||||||
|
time.sleep(0.01)
|
||||||
|
if node1.query("SELECT COUNT() FROM test_ttl_v2") == "0\n":
|
||||||
|
break
|
||||||
|
|
||||||
|
assert max(merges_with_ttl_count) <= 2
|
||||||
|
|
||||||
|
|
||||||
|
def test_limited_ttl_merges_in_empty_pool_replicated(started_cluster):
|
||||||
|
node1.query("CREATE TABLE replicated_ttl (d DateTime, key UInt64, data UInt64) ENGINE = ReplicatedMergeTree('/test/t', '1') ORDER BY tuple() PARTITION BY key TTL d + INTERVAL 1 MONTH SETTINGS merge_with_ttl_timeout = 0")
|
||||||
|
|
||||||
|
node1.query("SYSTEM STOP TTL MERGES")
|
||||||
|
|
||||||
|
for i in range(100):
|
||||||
|
node1.query("INSERT INTO replicated_ttl SELECT now() - INTERVAL 1 MONTH, {}, number FROM numbers(1)".format(i))
|
||||||
|
|
||||||
|
assert node1.query("SELECT COUNT() FROM replicated_ttl") == "100\n"
|
||||||
|
|
||||||
|
node1.query("SYSTEM START TTL MERGES")
|
||||||
|
|
||||||
|
merges_with_ttl_count = set({})
|
||||||
|
entries_with_ttl_count = set({})
|
||||||
|
while True:
|
||||||
|
merges_with_ttl_count.add(count_ttl_merges_in_background_pool(node1, "replicated_ttl"))
|
||||||
|
entries_with_ttl_count.add(count_ttl_merges_in_queue(node1, "replicated_ttl"))
|
||||||
|
time.sleep(0.01)
|
||||||
|
if node1.query("SELECT COUNT() FROM replicated_ttl") == "0\n":
|
||||||
|
break
|
||||||
|
|
||||||
|
assert max(merges_with_ttl_count) <= 2
|
||||||
|
assert max(entries_with_ttl_count) <= 1
|
||||||
|
|
||||||
|
|
||||||
|
def test_limited_ttl_merges_two_replicas(started_cluster):
|
||||||
|
# Actually this test quite fast and often we cannot catch any merges.
|
||||||
|
# To check for sure just add some sleeps in mergePartsToTemporaryPart
|
||||||
|
node1.query("CREATE TABLE replicated_ttl_2 (d DateTime, key UInt64, data UInt64) ENGINE = ReplicatedMergeTree('/test/t2', '1') ORDER BY tuple() PARTITION BY key TTL d + INTERVAL 1 MONTH SETTINGS merge_with_ttl_timeout = 0")
|
||||||
|
node2.query("CREATE TABLE replicated_ttl_2 (d DateTime, key UInt64, data UInt64) ENGINE = ReplicatedMergeTree('/test/t2', '2') ORDER BY tuple() PARTITION BY key TTL d + INTERVAL 1 MONTH SETTINGS merge_with_ttl_timeout = 0")
|
||||||
|
|
||||||
|
node1.query("SYSTEM STOP TTL MERGES")
|
||||||
|
node2.query("SYSTEM STOP TTL MERGES")
|
||||||
|
|
||||||
|
for i in range(100):
|
||||||
|
node1.query("INSERT INTO replicated_ttl_2 SELECT now() - INTERVAL 1 MONTH, {}, number FROM numbers(10000)".format(i))
|
||||||
|
|
||||||
|
node2.query("SYSTEM SYNC REPLICA replicated_ttl_2", timeout=10)
|
||||||
|
assert node1.query("SELECT COUNT() FROM replicated_ttl_2") == "1000000\n"
|
||||||
|
assert node2.query("SELECT COUNT() FROM replicated_ttl_2") == "1000000\n"
|
||||||
|
|
||||||
|
node1.query("SYSTEM START TTL MERGES")
|
||||||
|
node2.query("SYSTEM START TTL MERGES")
|
||||||
|
|
||||||
|
merges_with_ttl_count_node1 = set({})
|
||||||
|
merges_with_ttl_count_node2 = set({})
|
||||||
|
while True:
|
||||||
|
merges_with_ttl_count_node1.add(count_ttl_merges_in_background_pool(node1, "replicated_ttl_2"))
|
||||||
|
merges_with_ttl_count_node2.add(count_ttl_merges_in_background_pool(node2, "replicated_ttl_2"))
|
||||||
|
if node1.query("SELECT COUNT() FROM replicated_ttl_2") == "0\n" and node2.query("SELECT COUNT() FROM replicated_ttl_2") == "0\n":
|
||||||
|
break
|
||||||
|
|
||||||
|
# Both replicas can assign merges with TTL. If one will perform better than
|
||||||
|
# the other slow replica may have several merges in queue, so we don't
|
||||||
|
# check them
|
||||||
|
assert max(merges_with_ttl_count_node1) <= 2
|
||||||
|
assert max(merges_with_ttl_count_node2) <= 2
|
||||||
|
Loading…
Reference in New Issue
Block a user