diff --git a/docs/en/operations/settings/settings.md b/docs/en/operations/settings/settings.md index 99d6a0aad5f..edfd391c71e 100644 --- a/docs/en/operations/settings/settings.md +++ b/docs/en/operations/settings/settings.md @@ -1944,6 +1944,21 @@ Possible values: Default value: 16. +## background_message_broker_schedule_pool_size {#background_message_broker_schedule_pool_size} + +Sets the number of threads performing background tasks for message streaming. This setting is applied at the ClickHouse server start and can’t be changed in a user session. + +Possible values: + +- Any positive integer. + +Default value: 16. + +**See Also** + +- [Kafka](../../engines/table-engines/integrations/kafka.md#kafka) engine +- [RabbitMQ](../../engines/table-engines/integrations/rabbitmq.md#rabbitmq-engine) engine + ## validate_polygons {#validate_polygons} Enables or disables throwing an exception in the [pointInPolygon](../../sql-reference/functions/geo/index.md#pointinpolygon) function, if the polygon is self-intersecting or self-tangent. diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index d7fa4da44cd..8ff317764a7 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -341,6 +341,7 @@ struct ContextShared mutable std::optional buffer_flush_schedule_pool; /// A thread pool that can do background flush for Buffer tables. mutable std::optional schedule_pool; /// A thread pool that can run different jobs in background (used in replicated tables) mutable std::optional distributed_schedule_pool; /// A thread pool that can run different jobs in background (used for distributed sends) + mutable std::optional message_broker_schedule_pool; /// A thread pool that can run different jobs in background (used for message brokers, like RabbitMQ and Kafka) MultiVersion macros; /// Substitutions extracted from config. std::unique_ptr ddl_worker; /// Process ddl commands from zk. /// Rules for selecting the compression settings, depending on the size of the part. @@ -439,6 +440,7 @@ struct ContextShared buffer_flush_schedule_pool.reset(); schedule_pool.reset(); distributed_schedule_pool.reset(); + message_broker_schedule_pool.reset(); ddl_worker.reset(); /// Stop trace collector if any @@ -1525,6 +1527,17 @@ BackgroundSchedulePool & Context::getDistributedSchedulePool() const return *shared->distributed_schedule_pool; } +BackgroundSchedulePool & Context::getMessageBrokerSchedulePool() const +{ + auto lock = getLock(); + if (!shared->message_broker_schedule_pool) + shared->message_broker_schedule_pool.emplace( + settings.background_message_broker_schedule_pool_size, + CurrentMetrics::BackgroundDistributedSchedulePoolTask, + "BgMsgBrkSchPool"); + return *shared->message_broker_schedule_pool; +} + bool Context::hasDistributedDDL() const { return getConfigRef().has("distributed_ddl"); diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index 8e15d0a4fed..83a46ef5a2b 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -611,6 +611,7 @@ public: BackgroundSchedulePool & getBufferFlushSchedulePool() const; BackgroundSchedulePool & getSchedulePool() const; + BackgroundSchedulePool & getMessageBrokerSchedulePool() const; BackgroundSchedulePool & getDistributedSchedulePool() const; /// Has distributed_ddl configuration or not. diff --git a/src/Storages/Kafka/StorageKafka.cpp b/src/Storages/Kafka/StorageKafka.cpp index 388c21c6ad6..45e4ec538a1 100644 --- a/src/Storages/Kafka/StorageKafka.cpp +++ b/src/Storages/Kafka/StorageKafka.cpp @@ -196,7 +196,7 @@ StorageKafka::StorageKafka( auto task_count = thread_per_consumer ? num_consumers : 1; for (size_t i = 0; i < task_count; ++i) { - auto task = global_context.getSchedulePool().createTask(log->name(), [this, i]{ threadFunc(i); }); + auto task = global_context.getMessageBrokerSchedulePool().createTask(log->name(), [this, i]{ threadFunc(i); }); task->deactivate(); tasks.emplace_back(std::make_shared(std::move(task))); } diff --git a/src/Storages/RabbitMQ/StorageRabbitMQ.cpp b/src/Storages/RabbitMQ/StorageRabbitMQ.cpp index f41c4805d24..3ee9dda2bf3 100644 --- a/src/Storages/RabbitMQ/StorageRabbitMQ.cpp +++ b/src/Storages/RabbitMQ/StorageRabbitMQ.cpp @@ -112,13 +112,13 @@ StorageRabbitMQ::StorageRabbitMQ( /// One looping task for all consumers as they share the same connection == the same handler == the same event loop event_handler->updateLoopState(Loop::STOP); - looping_task = global_context.getSchedulePool().createTask("RabbitMQLoopingTask", [this]{ loopingFunc(); }); + looping_task = global_context.getMessageBrokerSchedulePool().createTask("RabbitMQLoopingTask", [this]{ loopingFunc(); }); looping_task->deactivate(); - streaming_task = global_context.getSchedulePool().createTask("RabbitMQStreamingTask", [this]{ streamingToViewsFunc(); }); + streaming_task = global_context.getMessageBrokerSchedulePool().createTask("RabbitMQStreamingTask", [this]{ streamingToViewsFunc(); }); streaming_task->deactivate(); - connection_task = global_context.getSchedulePool().createTask("RabbitMQConnectionTask", [this]{ connectionFunc(); }); + connection_task = global_context.getMessageBrokerSchedulePool().createTask("RabbitMQConnectionTask", [this]{ connectionFunc(); }); connection_task->deactivate(); if (queue_base.empty())