Flush StorageBuffer into multiple threads if num_layers > 1

This commit is contained in:
alesapin 2024-02-18 13:15:24 +01:00
parent ea06825ddc
commit 8f29320a73
3 changed files with 16 additions and 1 deletions

View File

@ -262,6 +262,9 @@
M(ActiveTimersInQueryProfiler, "Number of Active thread local timers in QueryProfiler") \ M(ActiveTimersInQueryProfiler, "Number of Active thread local timers in QueryProfiler") \
M(RefreshableViews, "Number materialized views with periodic refreshing (REFRESH)") \ M(RefreshableViews, "Number materialized views with periodic refreshing (REFRESH)") \
M(RefreshingViews, "Number of materialized views currently executing a refresh") \ M(RefreshingViews, "Number of materialized views currently executing a refresh") \
M(StorageBufferFlushThreads, "Number of threads for background flushes in StorageBuffer") \
M(StorageBufferFlushThreadsActive, "Number of threads for background flushes in StorageBuffer running a task") \
M(StorageBufferFlushThreadsScheduled, "Number of queued or active threads for background flushes in StorageBuffer")
#ifdef APPLY_FOR_EXTERNAL_METRICS #ifdef APPLY_FOR_EXTERNAL_METRICS
#define APPLY_FOR_METRICS(M) APPLY_FOR_BUILTIN_METRICS(M) APPLY_FOR_EXTERNAL_METRICS(M) #define APPLY_FOR_METRICS(M) APPLY_FOR_BUILTIN_METRICS(M) APPLY_FOR_EXTERNAL_METRICS(M)

View File

@ -56,6 +56,9 @@ namespace CurrentMetrics
{ {
extern const Metric StorageBufferRows; extern const Metric StorageBufferRows;
extern const Metric StorageBufferBytes; extern const Metric StorageBufferBytes;
extern const Metric StorageBufferFlushThreads;
extern const Metric StorageBufferFlushThreadsActive;
extern const Metric StorageBufferFlushThreadsScheduled;
} }
@ -131,6 +134,7 @@ StorageBuffer::StorageBuffer(
: IStorage(table_id_) : IStorage(table_id_)
, WithContext(context_->getBufferContext()) , WithContext(context_->getBufferContext())
, num_shards(num_shards_) , num_shards(num_shards_)
, flush_pool(CurrentMetrics::StorageBufferFlushThreads, CurrentMetrics::StorageBufferFlushThreadsActive, CurrentMetrics::StorageBufferFlushThreadsScheduled, num_shards, 0, num_shards_)
, buffers(num_shards_) , buffers(num_shards_)
, min_thresholds(min_thresholds_) , min_thresholds(min_thresholds_)
, max_thresholds(max_thresholds_) , max_thresholds(max_thresholds_)
@ -802,7 +806,13 @@ bool StorageBuffer::checkThresholdsImpl(bool direct, size_t rows, size_t bytes,
void StorageBuffer::flushAllBuffers(bool check_thresholds) void StorageBuffer::flushAllBuffers(bool check_thresholds)
{ {
for (auto & buf : buffers) for (auto & buf : buffers)
flushBuffer(buf, check_thresholds, false); {
flush_pool.scheduleOrThrowOnError([&] ()
{
flushBuffer(buf, check_thresholds, false);
});
}
flush_pool.wait();
} }

View File

@ -3,6 +3,7 @@
#include <Core/BackgroundSchedulePool.h> #include <Core/BackgroundSchedulePool.h>
#include <Core/NamesAndTypes.h> #include <Core/NamesAndTypes.h>
#include <Storages/IStorage.h> #include <Storages/IStorage.h>
#include <Common/ThreadPool.h>
#include <Poco/Event.h> #include <Poco/Event.h>
@ -149,6 +150,7 @@ private:
/// There are `num_shards` of independent buffers. /// There are `num_shards` of independent buffers.
const size_t num_shards; const size_t num_shards;
ThreadPool flush_pool;
std::vector<Buffer> buffers; std::vector<Buffer> buffers;
const Thresholds min_thresholds; const Thresholds min_thresholds;