diff --git a/docs/en/engines/table-engines/integrations/kafka.md b/docs/en/engines/table-engines/integrations/kafka.md index 3324386e1c5..fe9aa2ca25e 100644 --- a/docs/en/engines/table-engines/integrations/kafka.md +++ b/docs/en/engines/table-engines/integrations/kafka.md @@ -32,7 +32,8 @@ SETTINGS [kafka_num_consumers = N,] [kafka_max_block_size = 0,] [kafka_skip_broken_messages = N,] - [kafka_commit_every_batch = 0] + [kafka_commit_every_batch = 0,] + [kafka_thread_per_consumer = 0] ``` Required parameters: @@ -50,6 +51,7 @@ Optional parameters: - `kafka_max_block_size` - The maximum batch size (in messages) for poll (default: `max_block_size`). - `kafka_skip_broken_messages` – Kafka message parser tolerance to schema-incompatible messages per block. Default: `0`. If `kafka_skip_broken_messages = N` then the engine skips *N* Kafka messages that cannot be parsed (a message equals a row of data). - `kafka_commit_every_batch` - Commit every consumed and handled batch instead of a single commit after writing a whole block (default: `0`). +- `kafka_thread_per_consumer` - Provide independent thread for each consumer (default: `0`). When enabled, every consumer flush the data independently, in parallel (otherwise - rows from several consumers squashed to form one block). Examples: diff --git a/src/Common/CurrentMetrics.cpp b/src/Common/CurrentMetrics.cpp index a82853f9961..9e8086aec54 100644 --- a/src/Common/CurrentMetrics.cpp +++ b/src/Common/CurrentMetrics.cpp @@ -14,6 +14,7 @@ M(BackgroundSchedulePoolTask, "Number of active tasks in BackgroundSchedulePool. This pool is used for periodic ReplicatedMergeTree tasks, like cleaning old data parts, altering data parts, replica re-initialization, etc.") \ M(BackgroundBufferFlushSchedulePoolTask, "Number of active tasks in BackgroundBufferFlushSchedulePool. This pool is used for periodic Buffer flushes") \ M(BackgroundDistributedSchedulePoolTask, "Number of active tasks in BackgroundDistributedSchedulePool. This pool is used for distributed sends that is done in background.") \ + M(BackgroundMessageBrokerSchedulePoolTask, "Number of active tasks in BackgroundProcessingPool for message streaming") \ M(CacheDictionaryUpdateQueueBatches, "Number of 'batches' (a set of keys) in update queue in CacheDictionaries.") \ M(CacheDictionaryUpdateQueueKeys, "Exact number of keys in update queue in CacheDictionaries.") \ M(DiskSpaceReservedForMerge, "Disk space reserved for currently running background merges. It is slightly more than the total size of currently merging parts.") \ @@ -38,6 +39,7 @@ M(MemoryTrackingInBackgroundSchedulePool, "Total amount of memory (bytes) allocated in background schedule pool (that is dedicated for bookkeeping tasks of Replicated tables).") \ M(MemoryTrackingInBackgroundBufferFlushSchedulePool, "Total amount of memory (bytes) allocated in background buffer flushes pool (that is dedicated for background buffer flushes).") \ M(MemoryTrackingInBackgroundDistributedSchedulePool, "Total amount of memory (bytes) allocated in background distributed schedule pool (that is dedicated for distributed sends).") \ + M(MemoryTrackingInBackgroundMessageBrokerSchedulePool, "Total amount of memory (bytes) allocated in background message broker pool (that is dedicated for background message streaming).") \ M(MemoryTrackingForMerges, "Total amount of memory (bytes) allocated for background merges. Included in MemoryTrackingInBackgroundProcessingPool. Note that this value may include a drift when the memory was allocated in a context of background processing pool and freed in other context or vice-versa. This happens naturally due to caches for tables indexes and doesn't indicate memory leaks.") \ M(EphemeralNode, "Number of ephemeral nodes hold in ZooKeeper.") \ M(ZooKeeperSession, "Number of sessions (connections) to ZooKeeper. Should be no more than one, because using more than one connection to ZooKeeper may lead to bugs due to lack of linearizability (stale reads) that ZooKeeper consistency model allows.") \ diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 7dfe59f8b63..d367297f900 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -71,7 +71,8 @@ class IColumn; M(UInt64, background_buffer_flush_schedule_pool_size, 16, "Number of threads performing background flush for tables with Buffer engine. Only has meaning at server startup.", 0) \ M(UInt64, background_pool_size, 16, "Number of threads performing background work for tables (for example, merging in merge tree). Only has meaning at server startup.", 0) \ M(UInt64, background_move_pool_size, 8, "Number of threads performing background moves for tables. Only has meaning at server startup.", 0) \ - M(UInt64, background_schedule_pool_size, 16, "Number of threads performing background tasks for replicated tables, kafka streaming, dns cache updates. Only has meaning at server startup.", 0) \ + M(UInt64, background_schedule_pool_size, 16, "Number of threads performing background tasks for replicated tables, dns cache updates. Only has meaning at server startup.", 0) \ + M(UInt64, background_message_broker_schedule_pool_size, 16, "Number of threads performing background tasks for message streaming. Only has meaning at server startup.", 0) \ M(UInt64, background_distributed_schedule_pool_size, 16, "Number of threads performing background tasks for distributed sends. Only has meaning at server startup.", 0) \ \ M(Milliseconds, distributed_directory_monitor_sleep_time_ms, 100, "Sleep time for StorageDistributed DirectoryMonitors, in case of any errors delay grows exponentially.", 0) \ diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 1318f0331c4..b1de4d006c4 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -82,6 +82,9 @@ namespace CurrentMetrics extern const Metric BackgroundDistributedSchedulePoolTask; extern const Metric MemoryTrackingInBackgroundDistributedSchedulePool; + + extern const Metric BackgroundMessageBrokerSchedulePoolTask; + extern const Metric MemoryTrackingInBackgroundMessageBrokerSchedulePool; } @@ -341,6 +344,7 @@ struct ContextShared std::optional background_move_pool; /// The thread pool for the background moves performed by the tables. std::optional schedule_pool; /// A thread pool that can run different jobs in background (used in replicated tables) std::optional distributed_schedule_pool; /// A thread pool that can run different jobs in background (used for distributed sends) + std::optional message_broker_schedule_pool; /// A thread pool that can run different jobs in background (used in kafka streaming) MultiVersion macros; /// Substitutions extracted from config. std::unique_ptr ddl_worker; /// Process ddl commands from zk. /// Rules for selecting the compression settings, depending on the size of the part. @@ -441,6 +445,7 @@ struct ContextShared schedule_pool.reset(); distributed_schedule_pool.reset(); ddl_worker.reset(); + message_broker_schedule_pool.reset(); /// Stop trace collector if any trace_collector.reset(); @@ -1421,6 +1426,18 @@ BackgroundSchedulePool & Context::getDistributedSchedulePool() return *shared->distributed_schedule_pool; } +BackgroundSchedulePool & Context::getMessageBrokerSchedulePool() +{ + auto lock = getLock(); + if (!shared->message_broker_schedule_pool) + shared->message_broker_schedule_pool.emplace( + settings.background_message_broker_schedule_pool_size, + CurrentMetrics::BackgroundMessageBrokerSchedulePoolTask, + CurrentMetrics::MemoryTrackingInBackgroundMessageBrokerSchedulePool, + "BgMBSchPool"); + return *shared->message_broker_schedule_pool; +} + void Context::setDDLWorker(std::unique_ptr ddl_worker) { auto lock = getLock(); diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index ff06146254f..7c6d3e197f8 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -503,6 +503,7 @@ public: BackgroundProcessingPool & getBackgroundPool(); BackgroundProcessingPool & getBackgroundMovePool(); BackgroundSchedulePool & getSchedulePool(); + BackgroundSchedulePool & getMessageBrokerSchedulePool(); BackgroundSchedulePool & getDistributedSchedulePool(); void setDDLWorker(std::unique_ptr ddl_worker); diff --git a/src/Storages/Kafka/KafkaSettings.h b/src/Storages/Kafka/KafkaSettings.h index b9b606e4660..53fbe8adc6b 100644 --- a/src/Storages/Kafka/KafkaSettings.h +++ b/src/Storages/Kafka/KafkaSettings.h @@ -28,7 +28,8 @@ class ASTStorage; M(String, kafka_format, "", "The message format for Kafka engine.", 0) \ M(Char, kafka_row_delimiter, '\0', "The character to be considered as a delimiter in Kafka message.", 0) \ M(String, kafka_schema, "", "Schema identifier (used by schema-based formats) for Kafka engine", 0) \ - M(UInt64, kafka_skip_broken_messages, 0, "Skip at least this number of broken messages from Kafka topic per block", 0) + M(UInt64, kafka_skip_broken_messages, 0, "Skip at least this number of broken messages from Kafka topic per block", 0) \ + M(Bool, kafka_thread_per_consumer, false, "Provide independent thread for each consumer", 0) /** TODO: */ /* https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md */ diff --git a/src/Storages/Kafka/StorageKafka.cpp b/src/Storages/Kafka/StorageKafka.cpp index d4c4be3e2ef..1ee1f5de909 100644 --- a/src/Storages/Kafka/StorageKafka.cpp +++ b/src/Storages/Kafka/StorageKafka.cpp @@ -138,12 +138,18 @@ StorageKafka::StorageKafka( , semaphore(0, num_consumers) , intermediate_commit(kafka_settings->kafka_commit_every_batch.value) , settings_adjustments(createSettingsAdjustments()) + , thread_per_consumer(kafka_settings->kafka_thread_per_consumer.value) { StorageInMemoryMetadata storage_metadata; storage_metadata.setColumns(columns_); setInMemoryMetadata(storage_metadata); - task = global_context.getSchedulePool().createTask(log->name(), [this]{ threadFunc(); }); - task->deactivate(); + auto task_count = thread_per_consumer ? num_consumers : 1; + for (size_t i = 0; i < task_count; ++i) + { + auto task = global_context.getSchedulePool().createTask(log->name(), [this, i]{ threadFunc(i); }); + task->deactivate(); + tasks.emplace_back(std::make_shared(std::move(task))); + } } SettingsChanges StorageKafka::createSettingsAdjustments() @@ -257,17 +263,23 @@ void StorageKafka::startup() } // Start the reader thread - task->activateAndSchedule(); + for (auto & task : tasks) + { + task->holder->activateAndSchedule(); + } } void StorageKafka::shutdown() { - // Interrupt streaming thread - stream_cancelled = true; + for (auto & task : tasks) + { + // Interrupt streaming thread + task->stream_cancelled = true; - LOG_TRACE(log, "Waiting for cleanup"); - task->deactivate(); + LOG_TRACE(log, "Waiting for cleanup"); + task->holder->deactivate(); + } LOG_TRACE(log, "Closing consumers"); for (size_t i = 0; i < num_created_consumers; ++i) @@ -368,7 +380,12 @@ ConsumerBufferPtr StorageKafka::createReadBuffer(const size_t consumer_number) consumer->set_destroy_flags(RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE); /// NOTE: we pass |stream_cancelled| by reference here, so the buffers should not outlive the storage. - return std::make_shared(consumer, log, getPollMaxBatchSize(), getPollTimeoutMillisecond(), intermediate_commit, stream_cancelled, topics); + if (thread_per_consumer) + { + auto& stream_cancelled = tasks[consumer_number]->stream_cancelled; + return std::make_shared(consumer, log, getPollMaxBatchSize(), getPollTimeoutMillisecond(), intermediate_commit, stream_cancelled, topics); + } + return std::make_shared(consumer, log, getPollMaxBatchSize(), getPollTimeoutMillisecond(), intermediate_commit, tasks.back()->stream_cancelled, topics); } size_t StorageKafka::getMaxBlockSize() const @@ -464,8 +481,10 @@ bool StorageKafka::checkDependencies(const StorageID & table_id) return true; } -void StorageKafka::threadFunc() +void StorageKafka::threadFunc(size_t idx) { + assert(idx < tasks.size()); + auto task = tasks[idx]; try { auto table_id = getStorageID(); @@ -476,7 +495,7 @@ void StorageKafka::threadFunc() auto start_time = std::chrono::steady_clock::now(); // Keep streaming as long as there are attached views and streaming is not cancelled - while (!stream_cancelled && num_created_consumers > 0) + while (!task->stream_cancelled && num_created_consumers > 0) { if (!checkDependencies(table_id)) break; @@ -507,8 +526,8 @@ void StorageKafka::threadFunc() } // Wait for attached views - if (!stream_cancelled) - task->scheduleAfter(RESCHEDULE_MS); + if (!task->stream_cancelled) + task->holder->scheduleAfter(RESCHEDULE_MS); } @@ -537,9 +556,10 @@ bool StorageKafka::streamToViews() // Create a stream for each consumer and join them in a union stream BlockInputStreams streams; - streams.reserve(num_created_consumers); - for (size_t i = 0; i < num_created_consumers; ++i) + auto stream_count = thread_per_consumer ? 1 : num_created_consumers; + streams.reserve(stream_count); + for (size_t i = 0; i < stream_count; ++i) { auto stream = std::make_shared(*this, metadata_snapshot, kafka_context, block_io.out->getHeader().getNames(), log, block_size, false); streams.emplace_back(stream); diff --git a/src/Storages/Kafka/StorageKafka.h b/src/Storages/Kafka/StorageKafka.h index dc9569839fa..272e419bebe 100644 --- a/src/Storages/Kafka/StorageKafka.h +++ b/src/Storages/Kafka/StorageKafka.h @@ -94,8 +94,16 @@ private: std::mutex mutex; // Stream thread - BackgroundSchedulePool::TaskHolder task; - std::atomic stream_cancelled{false}; + struct TaskContext + { + BackgroundSchedulePool::TaskHolder holder; + std::atomic stream_cancelled {false}; + explicit TaskContext(BackgroundSchedulePool::TaskHolder&& task_) : holder(std::move(task_)) + { + } + }; + std::vector> tasks; + bool thread_per_consumer = false; SettingsChanges createSettingsAdjustments(); ConsumerBufferPtr createReadBuffer(const size_t consumer_number); @@ -103,7 +111,7 @@ private: // Update Kafka configuration with values from CH user configuration. void updateConfiguration(cppkafka::Configuration & conf); - void threadFunc(); + void threadFunc(size_t idx); size_t getPollMaxBatchSize() const; size_t getMaxBlockSize() const; diff --git a/tests/integration/test_storage_kafka/test.py b/tests/integration/test_storage_kafka/test.py index dbd822476da..8f605e1bbd4 100644 --- a/tests/integration/test_storage_kafka/test.py +++ b/tests/integration/test_storage_kafka/test.py @@ -2192,6 +2192,33 @@ def test_kafka_issue14202(kafka_cluster): DROP TABLE test.kafka_q; ''') +@pytest.mark.timeout(180) +def test_kafka_csv_with_thread_per_consumer(kafka_cluster): + instance.query(''' + CREATE TABLE test.kafka (key UInt64, value UInt64) + ENGINE = Kafka + SETTINGS kafka_broker_list = 'kafka1:19092', + kafka_topic_list = 'csv', + kafka_group_name = 'csv', + kafka_format = 'CSV', + kafka_row_delimiter = '\\n', + kafka_num_consumers = 4, + kafka_thread_per_consumer = 1; + ''') + + messages = [] + for i in range(50): + messages.append('{i}, {i}'.format(i=i)) + kafka_produce('csv', messages) + + result = '' + while True: + result += instance.query('SELECT * FROM test.kafka', ignore_error=True) + if kafka_check_result(result): + break + + kafka_check_result(result, True) + if __name__ == '__main__': cluster.start() raw_input("Cluster created, press any key to destroy...")