Merge pull request #19722 from azat/background_message_broker_schedule_pool_size

Add separate pool for message brokers (RabbitMQ and Kafka)
This commit is contained in:
alexey-milovidov 2021-01-31 06:07:45 +03:00 committed by GitHub
commit d93dad0ec3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 33 additions and 4 deletions

View File

@ -1944,6 +1944,21 @@ Possible values:
Default value: 16.
## background_message_broker_schedule_pool_size {#background_message_broker_schedule_pool_size}
Sets the number of threads performing background tasks for message streaming. This setting is applied at the ClickHouse server start and cant be changed in a user session.
Possible values:
- Any positive integer.
Default value: 16.
**See Also**
- [Kafka](../../engines/table-engines/integrations/kafka.md#kafka) engine
- [RabbitMQ](../../engines/table-engines/integrations/rabbitmq.md#rabbitmq-engine) engine
## validate_polygons {#validate_polygons}
Enables or disables throwing an exception in the [pointInPolygon](../../sql-reference/functions/geo/index.md#pointinpolygon) function, if the polygon is self-intersecting or self-tangent.

View File

@ -341,6 +341,7 @@ struct ContextShared
mutable std::optional<BackgroundSchedulePool> buffer_flush_schedule_pool; /// A thread pool that can do background flush for Buffer tables.
mutable std::optional<BackgroundSchedulePool> schedule_pool; /// A thread pool that can run different jobs in background (used in replicated tables)
mutable std::optional<BackgroundSchedulePool> distributed_schedule_pool; /// A thread pool that can run different jobs in background (used for distributed sends)
mutable std::optional<BackgroundSchedulePool> message_broker_schedule_pool; /// A thread pool that can run different jobs in background (used for message brokers, like RabbitMQ and Kafka)
MultiVersion<Macros> macros; /// Substitutions extracted from config.
std::unique_ptr<DDLWorker> ddl_worker; /// Process ddl commands from zk.
/// Rules for selecting the compression settings, depending on the size of the part.
@ -439,6 +440,7 @@ struct ContextShared
buffer_flush_schedule_pool.reset();
schedule_pool.reset();
distributed_schedule_pool.reset();
message_broker_schedule_pool.reset();
ddl_worker.reset();
/// Stop trace collector if any
@ -1525,6 +1527,17 @@ BackgroundSchedulePool & Context::getDistributedSchedulePool() const
return *shared->distributed_schedule_pool;
}
BackgroundSchedulePool & Context::getMessageBrokerSchedulePool() const
{
auto lock = getLock();
if (!shared->message_broker_schedule_pool)
shared->message_broker_schedule_pool.emplace(
settings.background_message_broker_schedule_pool_size,
CurrentMetrics::BackgroundDistributedSchedulePoolTask,
"BgMsgBrkSchPool");
return *shared->message_broker_schedule_pool;
}
bool Context::hasDistributedDDL() const
{
return getConfigRef().has("distributed_ddl");

View File

@ -611,6 +611,7 @@ public:
BackgroundSchedulePool & getBufferFlushSchedulePool() const;
BackgroundSchedulePool & getSchedulePool() const;
BackgroundSchedulePool & getMessageBrokerSchedulePool() const;
BackgroundSchedulePool & getDistributedSchedulePool() const;
/// Has distributed_ddl configuration or not.

View File

@ -196,7 +196,7 @@ StorageKafka::StorageKafka(
auto task_count = thread_per_consumer ? num_consumers : 1;
for (size_t i = 0; i < task_count; ++i)
{
auto task = global_context.getSchedulePool().createTask(log->name(), [this, i]{ threadFunc(i); });
auto task = global_context.getMessageBrokerSchedulePool().createTask(log->name(), [this, i]{ threadFunc(i); });
task->deactivate();
tasks.emplace_back(std::make_shared<TaskContext>(std::move(task)));
}

View File

@ -112,13 +112,13 @@ StorageRabbitMQ::StorageRabbitMQ(
/// One looping task for all consumers as they share the same connection == the same handler == the same event loop
event_handler->updateLoopState(Loop::STOP);
looping_task = global_context.getSchedulePool().createTask("RabbitMQLoopingTask", [this]{ loopingFunc(); });
looping_task = global_context.getMessageBrokerSchedulePool().createTask("RabbitMQLoopingTask", [this]{ loopingFunc(); });
looping_task->deactivate();
streaming_task = global_context.getSchedulePool().createTask("RabbitMQStreamingTask", [this]{ streamingToViewsFunc(); });
streaming_task = global_context.getMessageBrokerSchedulePool().createTask("RabbitMQStreamingTask", [this]{ streamingToViewsFunc(); });
streaming_task->deactivate();
connection_task = global_context.getSchedulePool().createTask("RabbitMQConnectionTask", [this]{ connectionFunc(); });
connection_task = global_context.getMessageBrokerSchedulePool().createTask("RabbitMQConnectionTask", [this]{ connectionFunc(); });
connection_task->deactivate();
if (queue_base.empty())