Merge remote-tracking branch 'blessed/master' into cosineDistance

This commit is contained in:
Raúl Marín 2024-02-19 18:21:51 +01:00
commit 8c984c55e4
3 changed files with 59 additions and 29 deletions

View File

@ -262,6 +262,9 @@
M(ActiveTimersInQueryProfiler, "Number of Active thread local timers in QueryProfiler") \
M(RefreshableViews, "Number materialized views with periodic refreshing (REFRESH)") \
M(RefreshingViews, "Number of materialized views currently executing a refresh") \
M(StorageBufferFlushThreads, "Number of threads for background flushes in StorageBuffer") \
M(StorageBufferFlushThreadsActive, "Number of threads for background flushes in StorageBuffer running a task") \
M(StorageBufferFlushThreadsScheduled, "Number of queued or active threads for background flushes in StorageBuffer")
#ifdef APPLY_FOR_EXTERNAL_METRICS
#define APPLY_FOR_METRICS(M) APPLY_FOR_BUILTIN_METRICS(M) APPLY_FOR_EXTERNAL_METRICS(M)

View File

@ -1,40 +1,41 @@
#include <boost/range/algorithm_ext/erase.hpp>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Interpreters/InterpreterInsertQuery.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Interpreters/addMissingDefaults.h>
#include <Interpreters/castColumn.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Interpreters/addMissingDefaults.h>
#include <Interpreters/getColumnFromBlock.h>
#include <Storages/StorageBuffer.h>
#include <Storages/StorageFactory.h>
#include <Storages/AlterCommands.h>
#include <Storages/checkAndGetLiteralArgument.h>
#include <Parsers/ASTInsertQuery.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTExpressionList.h>
#include <Common/CurrentMetrics.h>
#include <Common/MemoryTrackerBlockerInThread.h>
#include <Common/FieldVisitorConvertToNumber.h>
#include <Common/quoteString.h>
#include <Common/typeid_cast.h>
#include <Common/ProfileEvents.h>
#include <Common/logger_useful.h>
#include <base/getThreadId.h>
#include <base/range.h>
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/Transforms/FilterTransform.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <Processors/Transforms/ReverseTransform.h>
#include <Processors/Transforms/PartialSortingTransform.h>
#include <Processors/Sinks/SinkToStorage.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTInsertQuery.h>
#include <Parsers/ASTLiteral.h>
#include <Processors/Executors/PushingPipelineExecutor.h>
#include <Processors/ISource.h>
#include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/ReadFromPreparedSource.h>
#include <Processors/QueryPlan/UnionStep.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
#include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
#include <Processors/Executors/PushingPipelineExecutor.h>
#include <Processors/Sinks/SinkToStorage.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <Processors/Transforms/FilterTransform.h>
#include <Processors/Transforms/PartialSortingTransform.h>
#include <Processors/Transforms/ReverseTransform.h>
#include <Storages/AlterCommands.h>
#include <Storages/StorageBuffer.h>
#include <Storages/StorageFactory.h>
#include <Storages/checkAndGetLiteralArgument.h>
#include <base/getThreadId.h>
#include <base/range.h>
#include <boost/range/algorithm_ext/erase.hpp>
#include <Common/CurrentMetrics.h>
#include <Common/FieldVisitorConvertToNumber.h>
#include <Common/MemoryTrackerBlockerInThread.h>
#include <Common/ProfileEvents.h>
#include <Common/logger_useful.h>
#include <Common/quoteString.h>
#include <Common/threadPoolCallbackRunner.h>
#include <Common/typeid_cast.h>
namespace ProfileEvents
@ -56,6 +57,9 @@ namespace CurrentMetrics
{
extern const Metric StorageBufferRows;
extern const Metric StorageBufferBytes;
extern const Metric StorageBufferFlushThreads;
extern const Metric StorageBufferFlushThreadsActive;
extern const Metric StorageBufferFlushThreadsScheduled;
}
@ -153,6 +157,12 @@ StorageBuffer::StorageBuffer(
storage_metadata.setComment(comment);
setInMemoryMetadata(storage_metadata);
if (num_shards > 1)
{
flush_pool = std::make_unique<ThreadPool>(
CurrentMetrics::StorageBufferFlushThreads, CurrentMetrics::StorageBufferFlushThreadsActive, CurrentMetrics::StorageBufferFlushThreadsScheduled,
num_shards, 0, num_shards);
}
flush_handle = bg_pool.createTask(log->name() + "/Bg", [this]{ backgroundFlush(); });
}
@ -802,7 +812,22 @@ bool StorageBuffer::checkThresholdsImpl(bool direct, size_t rows, size_t bytes,
void StorageBuffer::flushAllBuffers(bool check_thresholds)
{
for (auto & buf : buffers)
flushBuffer(buf, check_thresholds, false);
{
if (flush_pool)
{
scheduleFromThreadPool<void>([&] ()
{
flushBuffer(buf, check_thresholds, false);
}, *flush_pool, "BufferFlush");
}
else
{
flushBuffer(buf, check_thresholds, false);
}
}
if (flush_pool)
flush_pool->wait();
}

View File

@ -3,6 +3,7 @@
#include <Core/BackgroundSchedulePool.h>
#include <Core/NamesAndTypes.h>
#include <Storages/IStorage.h>
#include <Common/ThreadPool.h>
#include <Poco/Event.h>
@ -149,6 +150,7 @@ private:
/// There are `num_shards` of independent buffers.
const size_t num_shards;
std::unique_ptr<ThreadPool> flush_pool;
std::vector<Buffer> buffers;
const Thresholds min_thresholds;