Merge pull request #26461 from azat/integration-tests-fixes-v21.9.1.7477

Fix flaky test_replicated_mutations (due to lack of threads in pool)
This commit is contained in:
alexey-milovidov 2021-07-17 19:22:30 +03:00 committed by GitHub
commit 7dda0da027
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 24 additions and 11 deletions

View File

@ -7,8 +7,14 @@
#include <Storages/MergeTree/ReplicatedMergeTreeQuorumEntry.h>
#include <Storages/MergeTree/ReplicatedMergeTreeMergeStrategyPicker.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/CurrentMetrics.h>
namespace CurrentMetrics
{
extern const Metric BackgroundPoolTask;
}
namespace DB
{
@ -1139,16 +1145,18 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
if (!ignore_max_size && sum_parts_size_in_bytes > max_source_parts_size)
{
const char * format_str = "Not executing log entry {} of type {} for part {}"
" because source parts size ({}) is greater than the current maximum ({}).";
size_t busy_threads_in_pool = CurrentMetrics::values[CurrentMetrics::BackgroundPoolTask].load(std::memory_order_relaxed);
size_t thread_pool_size = data.getContext()->getSettingsRef().background_pool_size;
size_t free_threads = thread_pool_size - busy_threads_in_pool;
size_t required_threads = data_settings->number_of_free_entries_in_pool_to_execute_mutation;
out_postpone_reason = fmt::format("Not executing log entry {} of type {} for part {}"
" because source parts size ({}) is greater than the current maximum ({})."
" {} free of {} threads, required {} free threads.",
entry.znode_name, entry.typeToString(), entry.new_part_name,
ReadableSize(sum_parts_size_in_bytes), ReadableSize(max_source_parts_size),
free_threads, thread_pool_size, required_threads);
LOG_DEBUG(log, format_str, entry.znode_name,
entry.typeToString(), entry.new_part_name,
ReadableSize(sum_parts_size_in_bytes), ReadableSize(max_source_parts_size));
out_postpone_reason = fmt::format(format_str, entry.znode_name,
entry.typeToString(), entry.new_part_name,
ReadableSize(sum_parts_size_in_bytes), ReadableSize(max_source_parts_size));
LOG_DEBUG(log, out_postpone_reason);
return false;
}

View File

@ -33,8 +33,13 @@ def started_cluster():
node.query("DROP TABLE IF EXISTS test_mutations")
for node in [node1, node2, node3, node4]:
node.query(
"CREATE TABLE test_mutations(d Date, x UInt32, i UInt32) ENGINE ReplicatedMergeTree('/clickhouse/{cluster}/tables/test/test_mutations', '{instance}') ORDER BY x PARTITION BY toYYYYMM(d)")
node.query("""
CREATE TABLE test_mutations(d Date, x UInt32, i UInt32)
ENGINE ReplicatedMergeTree('/clickhouse/{cluster}/tables/test/test_mutations', '{instance}')
ORDER BY x
PARTITION BY toYYYYMM(d)
SETTINGS number_of_free_entries_in_pool_to_execute_mutation=0
""")
node5.query(
"CREATE TABLE test_mutations(d Date, x UInt32, i UInt32) ENGINE MergeTree() ORDER BY x PARTITION BY toYYYYMM(d)")