mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 23:21:59 +00:00
Merge pull request #6617 from yandex/fix_too_many_parts_while_mutation_6502
Fix too many parts while mutation
This commit is contained in:
commit
3a237b4375
@ -157,7 +157,14 @@ UInt64 MergeTreeDataMergerMutator::getMaxSourcePartsSizeForMerge(size_t pool_siz
|
||||
|
||||
UInt64 MergeTreeDataMergerMutator::getMaxSourcePartSizeForMutation()
|
||||
{
|
||||
return static_cast<UInt64>(DiskSpaceMonitor::getUnreservedFreeSpace(data.full_path) / DISK_USAGE_COEFFICIENT_TO_RESERVE);
|
||||
size_t total_threads_in_pool = pool.getNumberOfThreads();
|
||||
size_t busy_threads_in_pool = CurrentMetrics::values[CurrentMetrics::BackgroundPoolTask].load(std::memory_order_relaxed);
|
||||
|
||||
/// Allow mutations only if there are enough threads, leave free threads for merges else
|
||||
if (total_threads_in_pool - busy_threads_in_pool >= data.settings.number_of_free_entries_in_pool_to_execute_mutation)
|
||||
return static_cast<UInt64>(DiskSpaceMonitor::getUnreservedFreeSpace(data.full_path) / DISK_USAGE_COEFFICIENT_TO_RESERVE);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
|
@ -30,8 +30,10 @@ struct MergeTreeSettings : public SettingsCollection<MergeTreeSettings>
|
||||
/** Merge settings. */ \
|
||||
M(SettingUInt64, max_bytes_to_merge_at_max_space_in_pool, 150ULL * 1024 * 1024 * 1024, "Maximum in total size of parts to merge, when there are maximum free threads in background pool (or entries in replication queue).") \
|
||||
M(SettingUInt64, max_bytes_to_merge_at_min_space_in_pool, 1024 * 1024, "Maximum in total size of parts to merge, when there are minimum free threads in background pool (or entries in replication queue).") \
|
||||
M(SettingUInt64, max_replicated_merges_in_queue, 16, "How many tasks of merging parts are allowed simultaneously in ReplicatedMergeTree queue.") \
|
||||
M(SettingUInt64, max_replicated_merges_in_queue, 16, "How many tasks of merging and mutating parts are allowed simultaneously in ReplicatedMergeTree queue.") \
|
||||
M(SettingUInt64, max_replicated_mutations_in_queue, 8, "How many tasks of mutating parts are allowed simultaneously in ReplicatedMergeTree queue.") \
|
||||
M(SettingUInt64, number_of_free_entries_in_pool_to_lower_max_size_of_merge, 8, "When there is less than specified number of free entries in pool (or replicated queue), start to lower maximum size of merge to process (or to put in queue). This is to allow small merges to process - not filling the pool with long running merges.") \
|
||||
M(SettingUInt64, number_of_free_entries_in_pool_to_execute_mutation, 10, "When there is less than specified number of free entries in pool, do not execute part mutations. This is to leave free threads for regular merges and avoid \"Too many parts\"") \
|
||||
M(SettingSeconds, old_parts_lifetime, 8 * 60, "How many seconds to keep obsolete parts.") \
|
||||
M(SettingSeconds, temporary_directories_lifetime, 86400, "How many seconds to keep tmp_-directories.") \
|
||||
\
|
||||
|
@ -956,15 +956,19 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
|
||||
return false;
|
||||
}
|
||||
|
||||
/** Execute merge only if there are enough free threads in background pool to do merges of that size.
|
||||
* But if all threads are free (maximal size of merge is allowed) then execute any merge,
|
||||
* (because it may be ordered by OPTIMIZE or early with differrent settings).
|
||||
UInt64 max_source_parts_size = entry.type == LogEntry::MERGE_PARTS ? merger_mutator.getMaxSourcePartsSizeForMerge()
|
||||
: merger_mutator.getMaxSourcePartSizeForMutation();
|
||||
/** If there are enough free threads in background pool to do large merges (maximal size of merge is allowed),
|
||||
* then ignore value returned by getMaxSourcePartsSizeForMerge() and execute merge of any size,
|
||||
* because it may be ordered by OPTIMIZE or early with different settings.
|
||||
* Setting max_bytes_to_merge_at_max_space_in_pool still working for regular merges,
|
||||
* because the leader replica does not assign merges of greater size (except OPTIMIZE PARTITION and OPTIMIZE FINAL).
|
||||
*/
|
||||
UInt64 max_source_parts_size = merger_mutator.getMaxSourcePartsSizeForMerge();
|
||||
if (max_source_parts_size != data.settings.max_bytes_to_merge_at_max_space_in_pool
|
||||
&& sum_parts_size_in_bytes > max_source_parts_size)
|
||||
bool ignore_max_size = (entry.type == LogEntry::MERGE_PARTS) && (max_source_parts_size == data.settings.max_bytes_to_merge_at_max_space_in_pool);
|
||||
|
||||
if (!ignore_max_size && sum_parts_size_in_bytes > max_source_parts_size)
|
||||
{
|
||||
String reason = "Not executing log entry for part " + entry.new_part_name
|
||||
String reason = "Not executing log entry " + entry.typeToString() + " for part " + entry.new_part_name
|
||||
+ " because source parts size (" + formatReadableSizeWithBinarySuffix(sum_parts_size_in_bytes)
|
||||
+ ") is greater than the current maximum (" + formatReadableSizeWithBinarySuffix(max_source_parts_size) + ").";
|
||||
LOG_DEBUG(log, reason);
|
||||
@ -1154,17 +1158,21 @@ bool ReplicatedMergeTreeQueue::processEntry(
|
||||
}
|
||||
|
||||
|
||||
size_t ReplicatedMergeTreeQueue::countMergesAndPartMutations() const
|
||||
std::pair<size_t, size_t> ReplicatedMergeTreeQueue::countMergesAndPartMutations() const
|
||||
{
|
||||
std::lock_guard lock(state_mutex);
|
||||
|
||||
size_t count = 0;
|
||||
size_t count_merges = 0;
|
||||
size_t count_mutations = 0;
|
||||
for (const auto & entry : queue)
|
||||
if (entry->type == ReplicatedMergeTreeLogEntry::MERGE_PARTS
|
||||
|| entry->type == ReplicatedMergeTreeLogEntry::MUTATE_PART)
|
||||
++count;
|
||||
{
|
||||
if (entry->type == ReplicatedMergeTreeLogEntry::MERGE_PARTS)
|
||||
++count_merges;
|
||||
else if (entry->type == ReplicatedMergeTreeLogEntry::MUTATE_PART)
|
||||
++count_mutations;
|
||||
}
|
||||
|
||||
return count;
|
||||
return std::make_pair(count_merges, count_mutations);
|
||||
}
|
||||
|
||||
|
||||
|
@ -296,7 +296,7 @@ public:
|
||||
bool processEntry(std::function<zkutil::ZooKeeperPtr()> get_zookeeper, LogEntryPtr & entry, const std::function<bool(LogEntryPtr &)> func);
|
||||
|
||||
/// Count the number of merges and mutations of single parts in the queue.
|
||||
size_t countMergesAndPartMutations() const;
|
||||
std::pair<size_t, size_t> countMergesAndPartMutations() const;
|
||||
|
||||
/// Count the total number of active mutations.
|
||||
size_t countMutations() const;
|
||||
|
@ -630,8 +630,6 @@ bool StorageMergeTree::tryMutatePart()
|
||||
/// You must call destructor with unlocked `currently_merging_mutex`.
|
||||
std::optional<CurrentlyMergingPartsTagger> tagger;
|
||||
{
|
||||
auto disk_space = DiskSpaceMonitor::getUnreservedFreeSpace(full_path);
|
||||
|
||||
std::lock_guard lock(currently_merging_mutex);
|
||||
|
||||
if (current_mutations_by_version.empty())
|
||||
@ -647,8 +645,7 @@ bool StorageMergeTree::tryMutatePart()
|
||||
if (mutations_begin_it == mutations_end_it)
|
||||
continue;
|
||||
|
||||
auto estimated_needed_space = MergeTreeDataMergerMutator::estimateNeededDiskSpace({part});
|
||||
if (estimated_needed_space > disk_space)
|
||||
if (merger_mutator.getMaxSourcePartSizeForMutation() < part->bytes_on_disk)
|
||||
continue;
|
||||
|
||||
for (auto it = mutations_begin_it; it != mutations_end_it; ++it)
|
||||
@ -661,7 +658,7 @@ bool StorageMergeTree::tryMutatePart()
|
||||
future_part.part_info = new_part_info;
|
||||
future_part.name = part->getNewName(new_part_info);
|
||||
|
||||
tagger.emplace(future_part, estimated_needed_space, *this);
|
||||
tagger.emplace(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace({part}), *this);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -2219,17 +2219,18 @@ void StorageReplicatedMergeTree::mergeSelectingTask()
|
||||
/// If many merges is already queued, then will queue only small enough merges.
|
||||
/// Otherwise merge queue could be filled with only large merges,
|
||||
/// and in the same time, many small parts could be created and won't be merged.
|
||||
size_t merges_and_mutations_queued = queue.countMergesAndPartMutations();
|
||||
if (merges_and_mutations_queued >= settings.max_replicated_merges_in_queue)
|
||||
auto merges_and_mutations_queued = queue.countMergesAndPartMutations();
|
||||
size_t merges_and_mutations_sum = merges_and_mutations_queued.first + merges_and_mutations_queued.second;
|
||||
if (merges_and_mutations_sum >= settings.max_replicated_merges_in_queue)
|
||||
{
|
||||
LOG_TRACE(log, "Number of queued merges and part mutations (" << merges_and_mutations_queued
|
||||
<< ") is greater than max_replicated_merges_in_queue ("
|
||||
LOG_TRACE(log, "Number of queued merges (" << merges_and_mutations_queued.first << ") and part mutations ("
|
||||
<< merges_and_mutations_queued.second << ") is greater than max_replicated_merges_in_queue ("
|
||||
<< settings.max_replicated_merges_in_queue << "), so won't select new parts to merge or mutate.");
|
||||
}
|
||||
else
|
||||
{
|
||||
UInt64 max_source_parts_size_for_merge = merger_mutator.getMaxSourcePartsSizeForMerge(
|
||||
settings.max_replicated_merges_in_queue, merges_and_mutations_queued);
|
||||
settings.max_replicated_merges_in_queue, merges_and_mutations_sum);
|
||||
UInt64 max_source_part_size_for_mutation = merger_mutator.getMaxSourcePartSizeForMutation();
|
||||
|
||||
FutureMergedMutatedPart future_merged_part;
|
||||
@ -2239,7 +2240,9 @@ void StorageReplicatedMergeTree::mergeSelectingTask()
|
||||
success = createLogEntryToMergeParts(zookeeper, future_merged_part.parts,
|
||||
future_merged_part.name, deduplicate, force_ttl);
|
||||
}
|
||||
else if (max_source_part_size_for_mutation > 0 && queue.countMutations() > 0)
|
||||
/// If there are many mutations in queue it may happen, that we cannot enqueue enough merges to merge all new parts
|
||||
else if (max_source_part_size_for_mutation > 0 && queue.countMutations() > 0
|
||||
&& merges_and_mutations_queued.second < settings.max_replicated_mutations_in_queue)
|
||||
{
|
||||
/// Choose a part to mutate.
|
||||
DataPartsVector data_parts = getDataPartsVector();
|
||||
|
@ -0,0 +1,6 @@
|
||||
<yandex>
|
||||
<merge_tree>
|
||||
<parts_to_delay_insert>50</parts_to_delay_insert>
|
||||
<parts_to_throw_insert>50</parts_to_throw_insert>
|
||||
</merge_tree>
|
||||
</yandex>
|
@ -10,21 +10,29 @@ from helpers.cluster import ClickHouseCluster
|
||||
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
|
||||
node1 = cluster.add_instance('node1', with_zookeeper=True)
|
||||
node1 = cluster.add_instance('node1', macros={'cluster': 'test1'}, with_zookeeper=True)
|
||||
# Check, that limits on max part size for merges doesn`t affect mutations
|
||||
node2 = cluster.add_instance('node2', main_configs=["configs/merge_tree.xml"], with_zookeeper=True)
|
||||
nodes = [node1, node2]
|
||||
node2 = cluster.add_instance('node2', macros={'cluster': 'test1'}, main_configs=["configs/merge_tree.xml"], with_zookeeper=True)
|
||||
|
||||
node3 = cluster.add_instance('node3', macros={'cluster': 'test2'}, main_configs=["configs/merge_tree_max_parts.xml"], with_zookeeper=True)
|
||||
node4 = cluster.add_instance('node4', macros={'cluster': 'test2'}, main_configs=["configs/merge_tree_max_parts.xml"], with_zookeeper=True)
|
||||
|
||||
node5 = cluster.add_instance('node5', macros={'cluster': 'test3'}, main_configs=["configs/merge_tree_max_parts.xml"])
|
||||
|
||||
all_nodes = [node1, node2, node3, node4, node5]
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def started_cluster():
|
||||
try:
|
||||
cluster.start()
|
||||
|
||||
for node in nodes:
|
||||
for node in all_nodes:
|
||||
node.query("DROP TABLE IF EXISTS test_mutations")
|
||||
|
||||
for node in nodes:
|
||||
node.query("CREATE TABLE test_mutations(d Date, x UInt32, i UInt32) ENGINE ReplicatedMergeTree('/clickhouse/tables/test/test_mutations', '{instance}') ORDER BY x PARTITION BY toYYYYMM(d)")
|
||||
for node in [node1, node2, node3, node4]:
|
||||
node.query("CREATE TABLE test_mutations(d Date, x UInt32, i UInt32) ENGINE ReplicatedMergeTree('/clickhouse/{cluster}/tables/test/test_mutations', '{instance}') ORDER BY x PARTITION BY toYYYYMM(d)")
|
||||
|
||||
node5.query("CREATE TABLE test_mutations(d Date, x UInt32, i UInt32) ENGINE MergeTree() ORDER BY x PARTITION BY toYYYYMM(d)")
|
||||
|
||||
yield cluster
|
||||
|
||||
@ -33,7 +41,8 @@ def started_cluster():
|
||||
|
||||
|
||||
class Runner:
|
||||
def __init__(self):
|
||||
def __init__(self, nodes):
|
||||
self.nodes = nodes
|
||||
self.mtx = threading.Lock()
|
||||
self.total_inserted_xs = 0
|
||||
self.total_inserted_rows = 0
|
||||
@ -49,7 +58,9 @@ class Runner:
|
||||
|
||||
self.stop_ev = threading.Event()
|
||||
|
||||
def do_insert(self, thread_num):
|
||||
self.exceptions = []
|
||||
|
||||
def do_insert(self, thread_num, partitions_num):
|
||||
self.stop_ev.wait(random.random())
|
||||
|
||||
# Each thread inserts a small random number of rows with random year, month 01 and day determined
|
||||
@ -67,7 +78,7 @@ class Runner:
|
||||
for x in xs:
|
||||
self.currently_inserting_xs[x] += 1
|
||||
|
||||
year = 2000 + random.randint(0, 10)
|
||||
year = 2000 + random.randint(0, partitions_num)
|
||||
date_str = '{year}-{month}-{day}'.format(year=year, month=month, day=day)
|
||||
payload = ''
|
||||
for x in xs:
|
||||
@ -76,7 +87,7 @@ class Runner:
|
||||
|
||||
try:
|
||||
print 'thread {}: insert for {}: {}'.format(thread_num, date_str, ','.join(str(x) for x in xs))
|
||||
random.choice(nodes).query("INSERT INTO test_mutations FORMAT TSV", payload)
|
||||
random.choice(self.nodes).query("INSERT INTO test_mutations FORMAT TSV", payload)
|
||||
|
||||
with self.mtx:
|
||||
for x in xs:
|
||||
@ -86,6 +97,7 @@ class Runner:
|
||||
|
||||
except Exception, e:
|
||||
print 'Exception while inserting,', e
|
||||
self.exceptions.append(e)
|
||||
finally:
|
||||
with self.mtx:
|
||||
for x in xs:
|
||||
@ -113,7 +125,7 @@ class Runner:
|
||||
|
||||
try:
|
||||
print 'thread {}: delete {} * {}'.format(thread_num, to_delete_count, x)
|
||||
random.choice(nodes).query("ALTER TABLE test_mutations DELETE WHERE x = {}".format(x))
|
||||
random.choice(self.nodes).query("ALTER TABLE test_mutations DELETE WHERE x = {}".format(x))
|
||||
|
||||
with self.mtx:
|
||||
self.total_mutations += 1
|
||||
@ -130,14 +142,27 @@ class Runner:
|
||||
self.stop_ev.wait(1.0 + random.random() * 2)
|
||||
|
||||
|
||||
def wait_for_mutations(nodes, number_of_mutations):
|
||||
for i in range(100): # wait for replication 80 seconds max
|
||||
time.sleep(0.8)
|
||||
|
||||
def get_done_mutations(node):
|
||||
return int(node.query("SELECT sum(is_done) FROM system.mutations WHERE table = 'test_mutations'").rstrip())
|
||||
|
||||
if all([get_done_mutations(n) == number_of_mutations for n in nodes]):
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def test_mutations(started_cluster):
|
||||
DURATION_SECONDS = 30
|
||||
nodes = [node1, node2]
|
||||
|
||||
runner = Runner()
|
||||
runner = Runner(nodes)
|
||||
|
||||
threads = []
|
||||
for thread_num in range(5):
|
||||
threads.append(threading.Thread(target=runner.do_insert, args=(thread_num, )))
|
||||
threads.append(threading.Thread(target=runner.do_insert, args=(thread_num, 10)))
|
||||
|
||||
for thread_num in (11, 12, 13):
|
||||
threads.append(threading.Thread(target=runner.do_delete, args=(thread_num,)))
|
||||
@ -155,18 +180,11 @@ def test_mutations(started_cluster):
|
||||
assert runner.total_inserted_rows > 0
|
||||
assert runner.total_mutations > 0
|
||||
|
||||
all_done = False
|
||||
for i in range(100): # wait for replication 80 seconds max
|
||||
time.sleep(0.8)
|
||||
all_done = wait_for_mutations(nodes, runner.total_mutations)
|
||||
|
||||
def get_done_mutations(node):
|
||||
return int(node.query("SELECT sum(is_done) FROM system.mutations WHERE table = 'test_mutations'").rstrip())
|
||||
|
||||
if all([get_done_mutations(n) == runner.total_mutations for n in nodes]):
|
||||
all_done = True
|
||||
break
|
||||
|
||||
print node1.query("SELECT mutation_id, command, parts_to_do, is_done FROM system.mutations WHERE table = 'test_mutations' FORMAT TSVWithNames")
|
||||
print "Total mutations: ", runner.total_mutations
|
||||
for node in nodes:
|
||||
print node.query("SELECT mutation_id, command, parts_to_do, is_done FROM system.mutations WHERE table = 'test_mutations' FORMAT TSVWithNames")
|
||||
assert all_done
|
||||
|
||||
expected_sum = runner.total_inserted_xs - runner.total_deleted_xs
|
||||
@ -174,3 +192,44 @@ def test_mutations(started_cluster):
|
||||
for i, node in enumerate(nodes):
|
||||
actual_sums.append(int(node.query("SELECT sum(x) FROM test_mutations").rstrip()))
|
||||
assert actual_sums[i] == expected_sum
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
('nodes', ),
|
||||
[
|
||||
([node5, ], ), # MergeTree
|
||||
([node3, node4], ), # ReplicatedMergeTree
|
||||
]
|
||||
)
|
||||
def test_mutations_dont_prevent_merges(started_cluster, nodes):
|
||||
for year in range(2000, 2016):
|
||||
rows = ''
|
||||
date_str = '{}-01-{}'.format(year, random.randint(1, 10))
|
||||
for i in range(10):
|
||||
rows += '{} {} {}\n'.format(date_str, random.randint(1, 10), i)
|
||||
nodes[0].query("INSERT INTO test_mutations FORMAT TSV", rows)
|
||||
|
||||
# will run mutations of 16 parts in parallel, mutations will sleep for about 20 seconds
|
||||
nodes[0].query("ALTER TABLE test_mutations UPDATE i = sleepEachRow(2) WHERE 1")
|
||||
|
||||
runner = Runner(nodes)
|
||||
threads = []
|
||||
for thread_num in range(2):
|
||||
threads.append(threading.Thread(target=runner.do_insert, args=(thread_num, 0)))
|
||||
|
||||
# will insert approx 8-10 new parts per 1 second into one partition
|
||||
for t in threads:
|
||||
t.start()
|
||||
|
||||
all_done = wait_for_mutations(nodes, 1)
|
||||
|
||||
runner.stop_ev.set()
|
||||
for t in threads:
|
||||
t.join()
|
||||
|
||||
for node in nodes:
|
||||
print node.query("SELECT mutation_id, command, parts_to_do, is_done FROM system.mutations WHERE table = 'test_mutations' FORMAT TSVWithNames")
|
||||
print node.query("SELECT partition, count(name), sum(active), sum(active*rows) FROM system.parts WHERE table ='test_mutations' GROUP BY partition FORMAT TSVWithNames")
|
||||
|
||||
assert all_done
|
||||
assert all([str(e).find("Too many parts") < 0 for e in runner.exceptions])
|
||||
|
Loading…
Reference in New Issue
Block a user