From 8f29320a73c394357b20495433a1ac919f8be9c6 Mon Sep 17 00:00:00 2001 From: alesapin Date: Sun, 18 Feb 2024 13:15:24 +0100 Subject: [PATCH] Flush StorageBuffer into multiple threads if num_layers > 1 --- src/Common/CurrentMetrics.cpp | 3 +++ src/Storages/StorageBuffer.cpp | 12 +++++++++++- src/Storages/StorageBuffer.h | 2 ++ 3 files changed, 16 insertions(+), 1 deletion(-) diff --git a/src/Common/CurrentMetrics.cpp b/src/Common/CurrentMetrics.cpp index c6fbafa8dc3..6931001202d 100644 --- a/src/Common/CurrentMetrics.cpp +++ b/src/Common/CurrentMetrics.cpp @@ -262,6 +262,9 @@ M(ActiveTimersInQueryProfiler, "Number of Active thread local timers in QueryProfiler") \ M(RefreshableViews, "Number materialized views with periodic refreshing (REFRESH)") \ M(RefreshingViews, "Number of materialized views currently executing a refresh") \ + M(StorageBufferFlushThreads, "Number of threads for background flushes in StorageBuffer") \ + M(StorageBufferFlushThreadsActive, "Number of threads for background flushes in StorageBuffer running a task") \ + M(StorageBufferFlushThreadsScheduled, "Number of queued or active threads for background flushes in StorageBuffer") #ifdef APPLY_FOR_EXTERNAL_METRICS #define APPLY_FOR_METRICS(M) APPLY_FOR_BUILTIN_METRICS(M) APPLY_FOR_EXTERNAL_METRICS(M) diff --git a/src/Storages/StorageBuffer.cpp b/src/Storages/StorageBuffer.cpp index d5c135bb81d..dbf6c7c7657 100644 --- a/src/Storages/StorageBuffer.cpp +++ b/src/Storages/StorageBuffer.cpp @@ -56,6 +56,9 @@ namespace CurrentMetrics { extern const Metric StorageBufferRows; extern const Metric StorageBufferBytes; + extern const Metric StorageBufferFlushThreads; + extern const Metric StorageBufferFlushThreadsActive; + extern const Metric StorageBufferFlushThreadsScheduled; } @@ -131,6 +134,7 @@ StorageBuffer::StorageBuffer( : IStorage(table_id_) , WithContext(context_->getBufferContext()) , num_shards(num_shards_) + , flush_pool(CurrentMetrics::StorageBufferFlushThreads, CurrentMetrics::StorageBufferFlushThreadsActive, CurrentMetrics::StorageBufferFlushThreadsScheduled, num_shards, 0, num_shards_) , buffers(num_shards_) , min_thresholds(min_thresholds_) , max_thresholds(max_thresholds_) @@ -802,7 +806,13 @@ bool StorageBuffer::checkThresholdsImpl(bool direct, size_t rows, size_t bytes, void StorageBuffer::flushAllBuffers(bool check_thresholds) { for (auto & buf : buffers) - flushBuffer(buf, check_thresholds, false); + { + flush_pool.scheduleOrThrowOnError([&] () + { + flushBuffer(buf, check_thresholds, false); + }); + } + flush_pool.wait(); } diff --git a/src/Storages/StorageBuffer.h b/src/Storages/StorageBuffer.h index 47f6239b173..50f12be5aef 100644 --- a/src/Storages/StorageBuffer.h +++ b/src/Storages/StorageBuffer.h @@ -3,6 +3,7 @@ #include #include #include +#include #include @@ -149,6 +150,7 @@ private: /// There are `num_shards` of independent buffers. const size_t num_shards; + ThreadPool flush_pool; std::vector buffers; const Thresholds min_thresholds;