mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 23:21:59 +00:00
Fix bug in parts selection
This commit is contained in:
parent
4519b4308c
commit
565555e55c
@ -232,8 +232,20 @@ bool MergeTreeDataMergerMutator::selectPartsToMerge(
|
||||
const String * prev_partition_id = nullptr;
|
||||
/// Previous part only in boundaries of partition frame
|
||||
const MergeTreeData::DataPartPtr * prev_part = nullptr;
|
||||
|
||||
for (const MergeTreeData::DataPartPtr & part : data_parts)
|
||||
{
|
||||
const String & partition_id = part->info.partition_id;
|
||||
|
||||
if (!prev_partition_id || partition_id != *prev_partition_id)
|
||||
{
|
||||
if (partitions.empty() || !partitions.back().empty())
|
||||
partitions.emplace_back();
|
||||
/// New partition frame.
|
||||
prev_partition_id = &partition_id;
|
||||
prev_part = nullptr;
|
||||
}
|
||||
|
||||
/// Check predicate only for first part in each partition.
|
||||
if (!prev_part)
|
||||
{
|
||||
@ -244,15 +256,19 @@ bool MergeTreeDataMergerMutator::selectPartsToMerge(
|
||||
if (!can_merge_callback(nullptr, part, nullptr))
|
||||
continue;
|
||||
}
|
||||
|
||||
const String & partition_id = part->info.partition_id;
|
||||
if (!prev_partition_id || partition_id != *prev_partition_id || (prev_part && !can_merge_callback(*prev_part, part, nullptr)))
|
||||
else
|
||||
{
|
||||
if (partitions.empty() || !partitions.back().empty())
|
||||
partitions.emplace_back();
|
||||
/// New partition frame.
|
||||
prev_partition_id = &partition_id;
|
||||
prev_part = nullptr;
|
||||
/// If we cannot merge with previous part we had to start new parts
|
||||
/// interval (in the same partition)
|
||||
if (!can_merge_callback(*prev_part, part, nullptr))
|
||||
{
|
||||
/// Starting new interval in the same partition
|
||||
if (!partitions.back().empty())
|
||||
partitions.emplace_back();
|
||||
|
||||
/// Now we haven't previous part, but it affects only logging
|
||||
prev_part = nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
IMergeSelector::Part part_info;
|
||||
|
@ -0,0 +1,9 @@
|
||||
<yandex>
|
||||
<background_processing_pool_thread_sleep_seconds>1</background_processing_pool_thread_sleep_seconds>
|
||||
<background_processing_pool_thread_sleep_seconds_random_part>0</background_processing_pool_thread_sleep_seconds_random_part>
|
||||
<background_processing_pool_thread_sleep_seconds_if_nothing_to_do>0.0</background_processing_pool_thread_sleep_seconds_if_nothing_to_do>
|
||||
<background_processing_pool_task_sleep_seconds_when_no_work_min>0</background_processing_pool_task_sleep_seconds_when_no_work_min>
|
||||
<background_processing_pool_task_sleep_seconds_when_no_work_max>1</background_processing_pool_task_sleep_seconds_when_no_work_max>
|
||||
<background_processing_pool_task_sleep_seconds_when_no_work_multiplier>1</background_processing_pool_task_sleep_seconds_when_no_work_multiplier>
|
||||
<background_processing_pool_task_sleep_seconds_when_no_work_random_part>0</background_processing_pool_task_sleep_seconds_when_no_work_random_part>
|
||||
</yandex>
|
83
tests/integration/test_concurrent_ttl_merges/test.py
Normal file
83
tests/integration/test_concurrent_ttl_merges/test.py
Normal file
@ -0,0 +1,83 @@
|
||||
import time
|
||||
import pytest
|
||||
|
||||
import helpers.client as client
|
||||
from helpers.cluster import ClickHouseCluster
|
||||
from helpers.test_tools import TSV
|
||||
from helpers.test_tools import assert_eq_with_retry
|
||||
|
||||
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
node1 = cluster.add_instance('node1', main_configs=['configs/fast_background_pool.xml'], with_zookeeper=True)
|
||||
node2 = cluster.add_instance('node2', main_configs=['configs/fast_background_pool.xml'], with_zookeeper=True)
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def started_cluster():
|
||||
try:
|
||||
cluster.start()
|
||||
|
||||
yield cluster
|
||||
|
||||
finally:
|
||||
cluster.shutdown()
|
||||
|
||||
|
||||
def count_ttl_merges_in_queue(node, table):
|
||||
result = node.query("SELECT count() FROM system.replication_queue WHERE merge_type = 'TTL_DELETE' and table = '{}'".format(table))
|
||||
if not result:
|
||||
return 0
|
||||
return int(result.strip())
|
||||
|
||||
|
||||
def count_regular_merges_in_queue(node, table):
|
||||
result = node.query("SELECT count() FROM system.replication_queue WHERE merge_type = 'REGULAR' and table = '{}'".format(table))
|
||||
if not result:
|
||||
return 0
|
||||
return int(result.strip())
|
||||
|
||||
|
||||
def count_ttl_merges_in_background_pool(node, table):
|
||||
result = node.query("SELECT count() FROM system.merges WHERE merge_type = 'TTL_DELETE' and table = '{}'".format(table))
|
||||
if not result:
|
||||
return 0
|
||||
return int(result.strip())
|
||||
|
||||
|
||||
def count_regular_merges_in_background_pool(node, table):
|
||||
result = node.query("SELECT count() FROM system.merges WHERE merge_type = 'REGULAR' and table = '{}'".format(table))
|
||||
if not result:
|
||||
return 0
|
||||
return int(result.strip())
|
||||
|
||||
|
||||
def count_running_mutations(node, table):
|
||||
result = node.query("SELECT count() FROM system.merges WHERE table = '{}' and is_mutation=1".format(table))
|
||||
if not result:
|
||||
return 0
|
||||
return int(result.strip())
|
||||
|
||||
|
||||
def test_no_ttl_merges_in_busy_pool(started_cluster):
|
||||
node1.query("CREATE TABLE test_ttl (d DateTime, key UInt64, data UInt64) ENGINE = MergeTree() ORDER BY tuple() PARTITION BY key TTL d + INTERVAL 1 + sleepEachRow(1) MONTH SETTINGS merge_with_ttl_timeout = 0, number_of_free_entries_in_pool_to_execute_mutation = 0")
|
||||
|
||||
node1.query("SYSTEM STOP TTL MERGES")
|
||||
|
||||
for i in range(1, 7):
|
||||
node1.query("INSERT INTO test_ttl SELECT now() - INTERVAL 1 MONTH + number - 1, {}, number FROM numbers(5)".format(i))
|
||||
|
||||
node1.query("ALTER TABLE test_ttl UPDATE data = data + 1 WHERE sleepEachRow(1) = 0")
|
||||
|
||||
while count_running_mutations(node1, "test_ttl") < 6:
|
||||
print "Mutations count", count_running_mutations(node1, "test_ttl")
|
||||
assert count_ttl_merges_in_background_pool(node1, "test_ttl") == 0
|
||||
time.sleep(0.5)
|
||||
|
||||
node1.query("SYSTEM START TTL MERGES")
|
||||
|
||||
while count_running_mutations(node1, "test_ttl") == 6:
|
||||
print "Mutations count after start TTL", count_running_mutations(node1, "test_ttl")
|
||||
assert node1.query("SELECT count() FROM test_ttl") == "30\n"
|
||||
time.sleep(0.5)
|
||||
|
||||
assert_eq_with_retry(node1, "SELECT COUNT() FROM test_ttl", "0")
|
Loading…
Reference in New Issue
Block a user