From 8f29320a73c394357b20495433a1ac919f8be9c6 Mon Sep 17 00:00:00 2001 From: alesapin Date: Sun, 18 Feb 2024 13:15:24 +0100 Subject: [PATCH 1/4] Flush StorageBuffer into multiple threads if num_layers > 1 --- src/Common/CurrentMetrics.cpp | 3 +++ src/Storages/StorageBuffer.cpp | 12 +++++++++++- src/Storages/StorageBuffer.h | 2 ++ 3 files changed, 16 insertions(+), 1 deletion(-) diff --git a/src/Common/CurrentMetrics.cpp b/src/Common/CurrentMetrics.cpp index c6fbafa8dc3..6931001202d 100644 --- a/src/Common/CurrentMetrics.cpp +++ b/src/Common/CurrentMetrics.cpp @@ -262,6 +262,9 @@ M(ActiveTimersInQueryProfiler, "Number of Active thread local timers in QueryProfiler") \ M(RefreshableViews, "Number materialized views with periodic refreshing (REFRESH)") \ M(RefreshingViews, "Number of materialized views currently executing a refresh") \ + M(StorageBufferFlushThreads, "Number of threads for background flushes in StorageBuffer") \ + M(StorageBufferFlushThreadsActive, "Number of threads for background flushes in StorageBuffer running a task") \ + M(StorageBufferFlushThreadsScheduled, "Number of queued or active threads for background flushes in StorageBuffer") #ifdef APPLY_FOR_EXTERNAL_METRICS #define APPLY_FOR_METRICS(M) APPLY_FOR_BUILTIN_METRICS(M) APPLY_FOR_EXTERNAL_METRICS(M) diff --git a/src/Storages/StorageBuffer.cpp b/src/Storages/StorageBuffer.cpp index d5c135bb81d..dbf6c7c7657 100644 --- a/src/Storages/StorageBuffer.cpp +++ b/src/Storages/StorageBuffer.cpp @@ -56,6 +56,9 @@ namespace CurrentMetrics { extern const Metric StorageBufferRows; extern const Metric StorageBufferBytes; + extern const Metric StorageBufferFlushThreads; + extern const Metric StorageBufferFlushThreadsActive; + extern const Metric StorageBufferFlushThreadsScheduled; } @@ -131,6 +134,7 @@ StorageBuffer::StorageBuffer( : IStorage(table_id_) , WithContext(context_->getBufferContext()) , num_shards(num_shards_) + , flush_pool(CurrentMetrics::StorageBufferFlushThreads, CurrentMetrics::StorageBufferFlushThreadsActive, CurrentMetrics::StorageBufferFlushThreadsScheduled, num_shards, 0, num_shards_) , buffers(num_shards_) , min_thresholds(min_thresholds_) , max_thresholds(max_thresholds_) @@ -802,7 +806,13 @@ bool StorageBuffer::checkThresholdsImpl(bool direct, size_t rows, size_t bytes, void StorageBuffer::flushAllBuffers(bool check_thresholds) { for (auto & buf : buffers) - flushBuffer(buf, check_thresholds, false); + { + flush_pool.scheduleOrThrowOnError([&] () + { + flushBuffer(buf, check_thresholds, false); + }); + } + flush_pool.wait(); } diff --git a/src/Storages/StorageBuffer.h b/src/Storages/StorageBuffer.h index 47f6239b173..50f12be5aef 100644 --- a/src/Storages/StorageBuffer.h +++ b/src/Storages/StorageBuffer.h @@ -3,6 +3,7 @@ #include #include #include +#include #include @@ -149,6 +150,7 @@ private: /// There are `num_shards` of independent buffers. const size_t num_shards; + ThreadPool flush_pool; std::vector buffers; const Thresholds min_thresholds; From 29e3e7cb965b5ada347028282e87005c570d3400 Mon Sep 17 00:00:00 2001 From: alesapin Date: Sun, 18 Feb 2024 13:19:35 +0100 Subject: [PATCH 2/4] Better if only 1 layer --- src/Storages/StorageBuffer.cpp | 22 ++++++++++++++++++---- src/Storages/StorageBuffer.h | 2 +- 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/src/Storages/StorageBuffer.cpp b/src/Storages/StorageBuffer.cpp index dbf6c7c7657..c2b63101d11 100644 --- a/src/Storages/StorageBuffer.cpp +++ b/src/Storages/StorageBuffer.cpp @@ -134,7 +134,6 @@ StorageBuffer::StorageBuffer( : IStorage(table_id_) , WithContext(context_->getBufferContext()) , num_shards(num_shards_) - , flush_pool(CurrentMetrics::StorageBufferFlushThreads, CurrentMetrics::StorageBufferFlushThreadsActive, CurrentMetrics::StorageBufferFlushThreadsScheduled, num_shards, 0, num_shards_) , buffers(num_shards_) , min_thresholds(min_thresholds_) , max_thresholds(max_thresholds_) @@ -157,6 +156,12 @@ StorageBuffer::StorageBuffer( storage_metadata.setComment(comment); setInMemoryMetadata(storage_metadata); + if (num_shards > 1) + { + flush_pool = std::make_unique( + CurrentMetrics::StorageBufferFlushThreads, CurrentMetrics::StorageBufferFlushThreadsActive, CurrentMetrics::StorageBufferFlushThreadsScheduled, + num_shards, 0, num_shards); + } flush_handle = bg_pool.createTask(log->name() + "/Bg", [this]{ backgroundFlush(); }); } @@ -807,12 +812,21 @@ void StorageBuffer::flushAllBuffers(bool check_thresholds) { for (auto & buf : buffers) { - flush_pool.scheduleOrThrowOnError([&] () + if (flush_pool) + { + flush_pool->scheduleOrThrowOnError([&] () + { + flushBuffer(buf, check_thresholds, false); + }); + } + else { flushBuffer(buf, check_thresholds, false); - }); + } } - flush_pool.wait(); + + if (flush_pool) + flush_pool->wait(); } diff --git a/src/Storages/StorageBuffer.h b/src/Storages/StorageBuffer.h index 50f12be5aef..6c15c7e0238 100644 --- a/src/Storages/StorageBuffer.h +++ b/src/Storages/StorageBuffer.h @@ -150,7 +150,7 @@ private: /// There are `num_shards` of independent buffers. const size_t num_shards; - ThreadPool flush_pool; + std::unique_ptr flush_pool; std::vector buffers; const Thresholds min_thresholds; From 6565423b1a3ca7a6127b848fc112e8c2eadb66ae Mon Sep 17 00:00:00 2001 From: alesapin Date: Mon, 19 Feb 2024 10:32:36 +0100 Subject: [PATCH 3/4] Review fix --- src/Storages/StorageBuffer.cpp | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/Storages/StorageBuffer.cpp b/src/Storages/StorageBuffer.cpp index c2b63101d11..5d717f84a1d 100644 --- a/src/Storages/StorageBuffer.cpp +++ b/src/Storages/StorageBuffer.cpp @@ -5,6 +5,7 @@ #include #include #include +#include #include #include #include @@ -814,10 +815,10 @@ void StorageBuffer::flushAllBuffers(bool check_thresholds) { if (flush_pool) { - flush_pool->scheduleOrThrowOnError([&] () + scheduleFromThreadPool([&] () { flushBuffer(buf, check_thresholds, false); - }); + }, *flush_pool, "BufferFlush"); } else { From 9361946d151a082ca190b9d7489804b9c30ef3b2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Mar=C3=ADn?= Date: Mon, 19 Feb 2024 17:48:53 +0100 Subject: [PATCH 4/4] Fix build in master --- src/Storages/StorageBuffer.cpp | 58 +++++++++++++++++----------------- 1 file changed, 29 insertions(+), 29 deletions(-) diff --git a/src/Storages/StorageBuffer.cpp b/src/Storages/StorageBuffer.cpp index 5d717f84a1d..2925038ec8e 100644 --- a/src/Storages/StorageBuffer.cpp +++ b/src/Storages/StorageBuffer.cpp @@ -1,41 +1,41 @@ -#include -#include #include +#include +#include #include #include -#include #include -#include -#include -#include -#include -#include -#include -#include -#include #include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include +#include +#include +#include +#include #include +#include +#include +#include #include #include #include -#include -#include -#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include namespace ProfileEvents