From 61ac52cf29c817e9fc1bdbb8733876417e846d07 Mon Sep 17 00:00:00 2001 From: Peng Jian Date: Fri, 21 Aug 2020 15:52:44 +0800 Subject: [PATCH 1/6] Multithreading implementation of Kafka engine --- src/Storages/Kafka/StorageKafka.cpp | 66 +++++++++++++---------------- src/Storages/Kafka/StorageKafka.h | 12 ++++-- 2 files changed, 38 insertions(+), 40 deletions(-) diff --git a/src/Storages/Kafka/StorageKafka.cpp b/src/Storages/Kafka/StorageKafka.cpp index d4c4be3e2ef..13812c2bcf0 100644 --- a/src/Storages/Kafka/StorageKafka.cpp +++ b/src/Storages/Kafka/StorageKafka.cpp @@ -142,8 +142,11 @@ StorageKafka::StorageKafka( StorageInMemoryMetadata storage_metadata; storage_metadata.setColumns(columns_); setInMemoryMetadata(storage_metadata); - task = global_context.getSchedulePool().createTask(log->name(), [this]{ threadFunc(); }); - task->deactivate(); + for (size_t i = 0; i < num_consumers; i++) { + auto task = global_context.getSchedulePool().createTask(log->name(), [this, i]{ threadFunc(i); }); + task->deactivate(); + tasks.emplace_back(std::make_shared(std::move(task))); + } } SettingsChanges StorageKafka::createSettingsAdjustments() @@ -254,20 +257,22 @@ void StorageKafka::startup() { tryLogCurrentException(log); } + // Start the reader thread + tasks[i]->holder->activateAndSchedule(); } - // Start the reader thread - task->activateAndSchedule(); } void StorageKafka::shutdown() { // Interrupt streaming thread - stream_cancelled = true; - LOG_TRACE(log, "Waiting for cleanup"); - task->deactivate(); + for (size_t i = 0; i < num_consumers; i++) + { + tasks[i]->stream_cancelled = false; + tasks[i]->holder->deactivate(); + } LOG_TRACE(log, "Closing consumers"); for (size_t i = 0; i < num_created_consumers; ++i) @@ -368,6 +373,7 @@ ConsumerBufferPtr StorageKafka::createReadBuffer(const size_t consumer_number) consumer->set_destroy_flags(RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE); /// NOTE: we pass |stream_cancelled| by reference here, so the buffers should not outlive the storage. + auto& stream_cancelled = tasks[consumer_number]->stream_cancelled; return std::make_shared(consumer, log, getPollMaxBatchSize(), getPollTimeoutMillisecond(), intermediate_commit, stream_cancelled, topics); } @@ -464,8 +470,9 @@ bool StorageKafka::checkDependencies(const StorageID & table_id) return true; } -void StorageKafka::threadFunc() +void StorageKafka::threadFunc(size_t idx) { + auto& stream_cancelled = tasks[idx]->stream_cancelled; try { auto table_id = getStorageID(); @@ -508,7 +515,9 @@ void StorageKafka::threadFunc() // Wait for attached views if (!stream_cancelled) - task->scheduleAfter(RESCHEDULE_MS); + { + tasks[idx]->holder->scheduleAfter(RESCHEDULE_MS); + } } @@ -535,44 +544,27 @@ bool StorageKafka::streamToViews() InterpreterInsertQuery interpreter(insert, *kafka_context, false, true, true); auto block_io = interpreter.execute(); - // Create a stream for each consumer and join them in a union stream - BlockInputStreams streams; - streams.reserve(num_created_consumers); + auto stream = std::make_shared(*this, metadata_snapshot, kafka_context, block_io.out->getHeader().getNames(), log, block_size, false); - for (size_t i = 0; i < num_created_consumers; ++i) - { - auto stream = std::make_shared(*this, metadata_snapshot, kafka_context, block_io.out->getHeader().getNames(), log, block_size, false); - streams.emplace_back(stream); + // Limit read batch to maximum block size to allow DDL + IBlockInputStream::LocalLimits limits; - // Limit read batch to maximum block size to allow DDL - IBlockInputStream::LocalLimits limits; + limits.speed_limits.max_execution_time = kafka_settings->kafka_flush_interval_ms.changed + ? kafka_settings->kafka_flush_interval_ms + : global_context.getSettingsRef().stream_flush_interval_ms; - limits.speed_limits.max_execution_time = kafka_settings->kafka_flush_interval_ms.changed - ? kafka_settings->kafka_flush_interval_ms - : global_context.getSettingsRef().stream_flush_interval_ms; + limits.timeout_overflow_mode = OverflowMode::BREAK; + stream->setLimits(limits); - limits.timeout_overflow_mode = OverflowMode::BREAK; - stream->setLimits(limits); - } - - // Join multiple streams if necessary - BlockInputStreamPtr in; - if (streams.size() > 1) - in = std::make_shared(streams, nullptr, streams.size()); - else - in = streams[0]; // We can't cancel during copyData, as it's not aware of commits and other kafka-related stuff. // It will be cancelled on underlying layer (kafka buffer) std::atomic stub = {false}; - copyData(*in, *block_io.out, &stub); + copyData(*stream, *block_io.out, &stub); bool some_stream_is_stalled = false; - for (auto & stream : streams) - { - some_stream_is_stalled = some_stream_is_stalled || stream->as()->isStalled(); - stream->as()->commit(); - } + some_stream_is_stalled = some_stream_is_stalled || stream->isStalled(); + stream->commit(); return some_stream_is_stalled; } diff --git a/src/Storages/Kafka/StorageKafka.h b/src/Storages/Kafka/StorageKafka.h index dc9569839fa..862781f9b60 100644 --- a/src/Storages/Kafka/StorageKafka.h +++ b/src/Storages/Kafka/StorageKafka.h @@ -94,8 +94,14 @@ private: std::mutex mutex; // Stream thread - BackgroundSchedulePool::TaskHolder task; - std::atomic stream_cancelled{false}; + struct TaskContext { + BackgroundSchedulePool::TaskHolder holder; + std::atomic stream_cancelled {false}; + explicit TaskContext(BackgroundSchedulePool::TaskHolder&& task_) : holder(std::move(task_)) + { + } + }; + std::vector> tasks; SettingsChanges createSettingsAdjustments(); ConsumerBufferPtr createReadBuffer(const size_t consumer_number); @@ -103,7 +109,7 @@ private: // Update Kafka configuration with values from CH user configuration. void updateConfiguration(cppkafka::Configuration & conf); - void threadFunc(); + void threadFunc(size_t idx); size_t getPollMaxBatchSize() const; size_t getMaxBlockSize() const; From de0a40aedc147cbda30933ea4be101ad6877a552 Mon Sep 17 00:00:00 2001 From: Peng Jian Date: Fri, 21 Aug 2020 17:14:05 +0800 Subject: [PATCH 2/6] fix code style --- src/Storages/Kafka/StorageKafka.cpp | 5 +++-- src/Storages/Kafka/StorageKafka.h | 3 ++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/src/Storages/Kafka/StorageKafka.cpp b/src/Storages/Kafka/StorageKafka.cpp index 13812c2bcf0..b82e9fdbf78 100644 --- a/src/Storages/Kafka/StorageKafka.cpp +++ b/src/Storages/Kafka/StorageKafka.cpp @@ -142,7 +142,8 @@ StorageKafka::StorageKafka( StorageInMemoryMetadata storage_metadata; storage_metadata.setColumns(columns_); setInMemoryMetadata(storage_metadata); - for (size_t i = 0; i < num_consumers; i++) { + for (size_t i = 0; i < num_consumers; i++) + { auto task = global_context.getSchedulePool().createTask(log->name(), [this, i]{ threadFunc(i); }); task->deactivate(); tasks.emplace_back(std::make_shared(std::move(task))); @@ -270,7 +271,7 @@ void StorageKafka::shutdown() LOG_TRACE(log, "Waiting for cleanup"); for (size_t i = 0; i < num_consumers; i++) { - tasks[i]->stream_cancelled = false; + tasks[i]->stream_cancelled = true; tasks[i]->holder->deactivate(); } diff --git a/src/Storages/Kafka/StorageKafka.h b/src/Storages/Kafka/StorageKafka.h index 862781f9b60..b6b064212e1 100644 --- a/src/Storages/Kafka/StorageKafka.h +++ b/src/Storages/Kafka/StorageKafka.h @@ -94,7 +94,8 @@ private: std::mutex mutex; // Stream thread - struct TaskContext { + struct TaskContext + { BackgroundSchedulePool::TaskHolder holder; std::atomic stream_cancelled {false}; explicit TaskContext(BackgroundSchedulePool::TaskHolder&& task_) : holder(std::move(task_)) From e6bfd9d5861dd0d18f5e31deb5059d987c151700 Mon Sep 17 00:00:00 2001 From: Peng Jian Date: Mon, 31 Aug 2020 19:06:35 +0800 Subject: [PATCH 3/6] 1. Add new setting for Kafka engine, named kafka_thread_per_consumer which default value is false. 2. Create separate thread pool for Kafka engine. --- src/Common/CurrentMetrics.cpp | 2 + src/Core/Settings.h | 1 + src/Interpreters/Context.cpp | 17 +++++++ src/Interpreters/Context.h | 1 + src/Storages/Kafka/KafkaSettings.h | 3 +- src/Storages/Kafka/StorageKafka.cpp | 77 +++++++++++++++++++---------- src/Storages/Kafka/StorageKafka.h | 1 + 7 files changed, 76 insertions(+), 26 deletions(-) diff --git a/src/Common/CurrentMetrics.cpp b/src/Common/CurrentMetrics.cpp index a82853f9961..9e8086aec54 100644 --- a/src/Common/CurrentMetrics.cpp +++ b/src/Common/CurrentMetrics.cpp @@ -14,6 +14,7 @@ M(BackgroundSchedulePoolTask, "Number of active tasks in BackgroundSchedulePool. This pool is used for periodic ReplicatedMergeTree tasks, like cleaning old data parts, altering data parts, replica re-initialization, etc.") \ M(BackgroundBufferFlushSchedulePoolTask, "Number of active tasks in BackgroundBufferFlushSchedulePool. This pool is used for periodic Buffer flushes") \ M(BackgroundDistributedSchedulePoolTask, "Number of active tasks in BackgroundDistributedSchedulePool. This pool is used for distributed sends that is done in background.") \ + M(BackgroundMessageBrokerSchedulePoolTask, "Number of active tasks in BackgroundProcessingPool for message streaming") \ M(CacheDictionaryUpdateQueueBatches, "Number of 'batches' (a set of keys) in update queue in CacheDictionaries.") \ M(CacheDictionaryUpdateQueueKeys, "Exact number of keys in update queue in CacheDictionaries.") \ M(DiskSpaceReservedForMerge, "Disk space reserved for currently running background merges. It is slightly more than the total size of currently merging parts.") \ @@ -38,6 +39,7 @@ M(MemoryTrackingInBackgroundSchedulePool, "Total amount of memory (bytes) allocated in background schedule pool (that is dedicated for bookkeeping tasks of Replicated tables).") \ M(MemoryTrackingInBackgroundBufferFlushSchedulePool, "Total amount of memory (bytes) allocated in background buffer flushes pool (that is dedicated for background buffer flushes).") \ M(MemoryTrackingInBackgroundDistributedSchedulePool, "Total amount of memory (bytes) allocated in background distributed schedule pool (that is dedicated for distributed sends).") \ + M(MemoryTrackingInBackgroundMessageBrokerSchedulePool, "Total amount of memory (bytes) allocated in background message broker pool (that is dedicated for background message streaming).") \ M(MemoryTrackingForMerges, "Total amount of memory (bytes) allocated for background merges. Included in MemoryTrackingInBackgroundProcessingPool. Note that this value may include a drift when the memory was allocated in a context of background processing pool and freed in other context or vice-versa. This happens naturally due to caches for tables indexes and doesn't indicate memory leaks.") \ M(EphemeralNode, "Number of ephemeral nodes hold in ZooKeeper.") \ M(ZooKeeperSession, "Number of sessions (connections) to ZooKeeper. Should be no more than one, because using more than one connection to ZooKeeper may lead to bugs due to lack of linearizability (stale reads) that ZooKeeper consistency model allows.") \ diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 00df95c2b69..f81ebb81d1f 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -72,6 +72,7 @@ class IColumn; M(UInt64, background_pool_size, 16, "Number of threads performing background work for tables (for example, merging in merge tree). Only has meaning at server startup.", 0) \ M(UInt64, background_move_pool_size, 8, "Number of threads performing background moves for tables. Only has meaning at server startup.", 0) \ M(UInt64, background_schedule_pool_size, 16, "Number of threads performing background tasks for replicated tables, kafka streaming, dns cache updates. Only has meaning at server startup.", 0) \ + M(UInt64, background_message_broker_schedule_pool_size, 16, "Number of threads performing background tasks for kafka streaming. Only has meaning at server startup.", 0) \ M(UInt64, background_distributed_schedule_pool_size, 16, "Number of threads performing background tasks for distributed sends. Only has meaning at server startup.", 0) \ \ M(Milliseconds, distributed_directory_monitor_sleep_time_ms, 100, "Sleep time for StorageDistributed DirectoryMonitors, in case of any errors delay grows exponentially.", 0) \ diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 1318f0331c4..a43f19db771 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -82,6 +82,9 @@ namespace CurrentMetrics extern const Metric BackgroundDistributedSchedulePoolTask; extern const Metric MemoryTrackingInBackgroundDistributedSchedulePool; + + extern const Metric BackgroundMessageBrokerSchedulePoolTask; + extern const Metric MemoryTrackingInBackgroundMessageBrokerSchedulePool; } @@ -341,6 +344,7 @@ struct ContextShared std::optional background_move_pool; /// The thread pool for the background moves performed by the tables. std::optional schedule_pool; /// A thread pool that can run different jobs in background (used in replicated tables) std::optional distributed_schedule_pool; /// A thread pool that can run different jobs in background (used for distributed sends) + std::optional message_broker_schedule_pool; /// A thread pool that can run different jobs in background (used in kafka streaming) MultiVersion macros; /// Substitutions extracted from config. std::unique_ptr ddl_worker; /// Process ddl commands from zk. /// Rules for selecting the compression settings, depending on the size of the part. @@ -441,6 +445,7 @@ struct ContextShared schedule_pool.reset(); distributed_schedule_pool.reset(); ddl_worker.reset(); + message_broker_schedule_pool.reset(); /// Stop trace collector if any trace_collector.reset(); @@ -1421,6 +1426,18 @@ BackgroundSchedulePool & Context::getDistributedSchedulePool() return *shared->distributed_schedule_pool; } +BackgroundSchedulePool & Context::getMessageBrokerSchedulePool() +{ + auto lock = getLock(); + if (!shared->message_broker_schedule_pool) + shared->message_broker_schedule_pool.emplace( + settings.background_message_broker_schedule_pool_size, + CurrentMetrics::BackgroundMessageBrokerSchedulePoolTask, + CurrentMetrics::MemoryTrackingInBackgroundMessageBrokerSchedulePool, + "BgMBSchPool"); + return *shared->message_broker_schedule_pool; +} + void Context::setDDLWorker(std::unique_ptr ddl_worker) { auto lock = getLock(); diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index ff06146254f..7c6d3e197f8 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -503,6 +503,7 @@ public: BackgroundProcessingPool & getBackgroundPool(); BackgroundProcessingPool & getBackgroundMovePool(); BackgroundSchedulePool & getSchedulePool(); + BackgroundSchedulePool & getMessageBrokerSchedulePool(); BackgroundSchedulePool & getDistributedSchedulePool(); void setDDLWorker(std::unique_ptr ddl_worker); diff --git a/src/Storages/Kafka/KafkaSettings.h b/src/Storages/Kafka/KafkaSettings.h index b9b606e4660..53fbe8adc6b 100644 --- a/src/Storages/Kafka/KafkaSettings.h +++ b/src/Storages/Kafka/KafkaSettings.h @@ -28,7 +28,8 @@ class ASTStorage; M(String, kafka_format, "", "The message format for Kafka engine.", 0) \ M(Char, kafka_row_delimiter, '\0', "The character to be considered as a delimiter in Kafka message.", 0) \ M(String, kafka_schema, "", "Schema identifier (used by schema-based formats) for Kafka engine", 0) \ - M(UInt64, kafka_skip_broken_messages, 0, "Skip at least this number of broken messages from Kafka topic per block", 0) + M(UInt64, kafka_skip_broken_messages, 0, "Skip at least this number of broken messages from Kafka topic per block", 0) \ + M(Bool, kafka_thread_per_consumer, false, "Provide independent thread for each consumer", 0) /** TODO: */ /* https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md */ diff --git a/src/Storages/Kafka/StorageKafka.cpp b/src/Storages/Kafka/StorageKafka.cpp index b82e9fdbf78..9bc0adaa95c 100644 --- a/src/Storages/Kafka/StorageKafka.cpp +++ b/src/Storages/Kafka/StorageKafka.cpp @@ -138,11 +138,13 @@ StorageKafka::StorageKafka( , semaphore(0, num_consumers) , intermediate_commit(kafka_settings->kafka_commit_every_batch.value) , settings_adjustments(createSettingsAdjustments()) + , thread_per_consumer(kafka_settings->kafka_thread_per_consumer.value) { StorageInMemoryMetadata storage_metadata; storage_metadata.setColumns(columns_); setInMemoryMetadata(storage_metadata); - for (size_t i = 0; i < num_consumers; i++) + auto task_count = thread_per_consumer ? num_consumers : 1; + for (size_t i = 0; i < task_count; ++i) { auto task = global_context.getSchedulePool().createTask(log->name(), [this, i]{ threadFunc(i); }); task->deactivate(); @@ -258,20 +260,24 @@ void StorageKafka::startup() { tryLogCurrentException(log); } - // Start the reader thread - tasks[i]->holder->activateAndSchedule(); } + // Start the reader thread + for (size_t i = 0; i < tasks.size(); ++i) + { + tasks[i]->holder->activateAndSchedule(); + } } void StorageKafka::shutdown() { - // Interrupt streaming thread - LOG_TRACE(log, "Waiting for cleanup"); - for (size_t i = 0; i < num_consumers; i++) + for (size_t i = 0; i < tasks.size(); ++i) { + // Interrupt streaming thread tasks[i]->stream_cancelled = true; + + LOG_TRACE(log, "Waiting for cleanup"); tasks[i]->holder->deactivate(); } @@ -374,8 +380,12 @@ ConsumerBufferPtr StorageKafka::createReadBuffer(const size_t consumer_number) consumer->set_destroy_flags(RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE); /// NOTE: we pass |stream_cancelled| by reference here, so the buffers should not outlive the storage. - auto& stream_cancelled = tasks[consumer_number]->stream_cancelled; - return std::make_shared(consumer, log, getPollMaxBatchSize(), getPollTimeoutMillisecond(), intermediate_commit, stream_cancelled, topics); + if (thread_per_consumer) + { + auto& stream_cancelled = tasks[consumer_number]->stream_cancelled; + return std::make_shared(consumer, log, getPollMaxBatchSize(), getPollTimeoutMillisecond(), intermediate_commit, stream_cancelled, topics); + } + return std::make_shared(consumer, log, getPollMaxBatchSize(), getPollTimeoutMillisecond(), intermediate_commit, tasks.back()->stream_cancelled, topics); } size_t StorageKafka::getMaxBlockSize() const @@ -473,7 +483,8 @@ bool StorageKafka::checkDependencies(const StorageID & table_id) void StorageKafka::threadFunc(size_t idx) { - auto& stream_cancelled = tasks[idx]->stream_cancelled; + assert(idx < tasks.size()); + auto task = tasks[idx]; try { auto table_id = getStorageID(); @@ -484,7 +495,7 @@ void StorageKafka::threadFunc(size_t idx) auto start_time = std::chrono::steady_clock::now(); // Keep streaming as long as there are attached views and streaming is not cancelled - while (!stream_cancelled && num_created_consumers > 0) + while (!task->stream_cancelled && num_created_consumers > 0) { if (!checkDependencies(table_id)) break; @@ -515,10 +526,8 @@ void StorageKafka::threadFunc(size_t idx) } // Wait for attached views - if (!stream_cancelled) - { - tasks[idx]->holder->scheduleAfter(RESCHEDULE_MS); - } + if (!task->stream_cancelled) + task->holder->scheduleAfter(RESCHEDULE_MS); } @@ -545,27 +554,45 @@ bool StorageKafka::streamToViews() InterpreterInsertQuery interpreter(insert, *kafka_context, false, true, true); auto block_io = interpreter.execute(); - auto stream = std::make_shared(*this, metadata_snapshot, kafka_context, block_io.out->getHeader().getNames(), log, block_size, false); + // Create a stream for each consumer and join them in a union stream + BlockInputStreams streams; + + auto stream_count = thread_per_consumer ? 1 : num_created_consumers; + streams.reserve(stream_count); + for (size_t i = 0; i < stream_count; ++i) + { + auto stream = std::make_shared(*this, metadata_snapshot, kafka_context, block_io.out->getHeader().getNames(), log, block_size, false); + streams.emplace_back(stream); - // Limit read batch to maximum block size to allow DDL - IBlockInputStream::LocalLimits limits; + // Limit read batch to maximum block size to allow DDL + IBlockInputStream::LocalLimits limits; - limits.speed_limits.max_execution_time = kafka_settings->kafka_flush_interval_ms.changed - ? kafka_settings->kafka_flush_interval_ms - : global_context.getSettingsRef().stream_flush_interval_ms; + limits.speed_limits.max_execution_time = kafka_settings->kafka_flush_interval_ms.changed + ? kafka_settings->kafka_flush_interval_ms + : global_context.getSettingsRef().stream_flush_interval_ms; - limits.timeout_overflow_mode = OverflowMode::BREAK; - stream->setLimits(limits); + limits.timeout_overflow_mode = OverflowMode::BREAK; + stream->setLimits(limits); + } + // Join multiple streams if necessary + BlockInputStreamPtr in; + if (streams.size() > 1) + in = std::make_shared(streams, nullptr, streams.size()); + else + in = streams[0]; // We can't cancel during copyData, as it's not aware of commits and other kafka-related stuff. // It will be cancelled on underlying layer (kafka buffer) std::atomic stub = {false}; - copyData(*stream, *block_io.out, &stub); + copyData(*in, *block_io.out, &stub); bool some_stream_is_stalled = false; - some_stream_is_stalled = some_stream_is_stalled || stream->isStalled(); - stream->commit(); + for (auto & stream : streams) + { + some_stream_is_stalled = some_stream_is_stalled || stream->as()->isStalled(); + stream->as()->commit(); + } return some_stream_is_stalled; } diff --git a/src/Storages/Kafka/StorageKafka.h b/src/Storages/Kafka/StorageKafka.h index b6b064212e1..ea6dfeec741 100644 --- a/src/Storages/Kafka/StorageKafka.h +++ b/src/Storages/Kafka/StorageKafka.h @@ -103,6 +103,7 @@ private: } }; std::vector> tasks; + bool thread_per_consumer = false; SettingsChanges createSettingsAdjustments(); ConsumerBufferPtr createReadBuffer(const size_t consumer_number); From fd82272cfed1b67a8f66f51eb4ee34e10eaa4bea Mon Sep 17 00:00:00 2001 From: Peng Jian Date: Tue, 1 Sep 2020 16:37:12 +0800 Subject: [PATCH 4/6] Fix code style, and update docs for Kafka engine --- docs/en/engines/table-engines/integrations/kafka.md | 4 +++- src/Interpreters/Context.cpp | 2 +- src/Storages/Kafka/StorageKafka.cpp | 12 ++++++------ src/Storages/Kafka/StorageKafka.h | 2 +- 4 files changed, 11 insertions(+), 9 deletions(-) diff --git a/docs/en/engines/table-engines/integrations/kafka.md b/docs/en/engines/table-engines/integrations/kafka.md index 3324386e1c5..6cb4b20a470 100644 --- a/docs/en/engines/table-engines/integrations/kafka.md +++ b/docs/en/engines/table-engines/integrations/kafka.md @@ -32,7 +32,8 @@ SETTINGS [kafka_num_consumers = N,] [kafka_max_block_size = 0,] [kafka_skip_broken_messages = N,] - [kafka_commit_every_batch = 0] + [kafka_commit_every_batch = 0,] + [kafka_thread_per_consumer = 0] ``` Required parameters: @@ -50,6 +51,7 @@ Optional parameters: - `kafka_max_block_size` - The maximum batch size (in messages) for poll (default: `max_block_size`). - `kafka_skip_broken_messages` – Kafka message parser tolerance to schema-incompatible messages per block. Default: `0`. If `kafka_skip_broken_messages = N` then the engine skips *N* Kafka messages that cannot be parsed (a message equals a row of data). - `kafka_commit_every_batch` - Commit every consumed and handled batch instead of a single commit after writing a whole block (default: `0`). +- `kafka_thread_per_consumer` - Provide independent thread for each consumer (default: `0`). Examples: diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index a43f19db771..b1de4d006c4 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -82,7 +82,7 @@ namespace CurrentMetrics extern const Metric BackgroundDistributedSchedulePoolTask; extern const Metric MemoryTrackingInBackgroundDistributedSchedulePool; - + extern const Metric BackgroundMessageBrokerSchedulePoolTask; extern const Metric MemoryTrackingInBackgroundMessageBrokerSchedulePool; } diff --git a/src/Storages/Kafka/StorageKafka.cpp b/src/Storages/Kafka/StorageKafka.cpp index 9bc0adaa95c..1ee1f5de909 100644 --- a/src/Storages/Kafka/StorageKafka.cpp +++ b/src/Storages/Kafka/StorageKafka.cpp @@ -263,22 +263,22 @@ void StorageKafka::startup() } // Start the reader thread - for (size_t i = 0; i < tasks.size(); ++i) + for (auto & task : tasks) { - tasks[i]->holder->activateAndSchedule(); + task->holder->activateAndSchedule(); } } void StorageKafka::shutdown() { - for (size_t i = 0; i < tasks.size(); ++i) + for (auto & task : tasks) { // Interrupt streaming thread - tasks[i]->stream_cancelled = true; + task->stream_cancelled = true; LOG_TRACE(log, "Waiting for cleanup"); - tasks[i]->holder->deactivate(); + task->holder->deactivate(); } LOG_TRACE(log, "Closing consumers"); @@ -556,7 +556,7 @@ bool StorageKafka::streamToViews() // Create a stream for each consumer and join them in a union stream BlockInputStreams streams; - + auto stream_count = thread_per_consumer ? 1 : num_created_consumers; streams.reserve(stream_count); for (size_t i = 0; i < stream_count; ++i) diff --git a/src/Storages/Kafka/StorageKafka.h b/src/Storages/Kafka/StorageKafka.h index ea6dfeec741..272e419bebe 100644 --- a/src/Storages/Kafka/StorageKafka.h +++ b/src/Storages/Kafka/StorageKafka.h @@ -100,7 +100,7 @@ private: std::atomic stream_cancelled {false}; explicit TaskContext(BackgroundSchedulePool::TaskHolder&& task_) : holder(std::move(task_)) { - } + } }; std::vector> tasks; bool thread_per_consumer = false; From ac25c41f2254090e6ff490fab9fd075396d06cfa Mon Sep 17 00:00:00 2001 From: Peng Jian Date: Tue, 1 Sep 2020 22:49:36 +0800 Subject: [PATCH 5/6] Update docs for Kafka engine --- docs/en/engines/table-engines/integrations/kafka.md | 2 +- src/Core/Settings.h | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/en/engines/table-engines/integrations/kafka.md b/docs/en/engines/table-engines/integrations/kafka.md index 6cb4b20a470..fe9aa2ca25e 100644 --- a/docs/en/engines/table-engines/integrations/kafka.md +++ b/docs/en/engines/table-engines/integrations/kafka.md @@ -51,7 +51,7 @@ Optional parameters: - `kafka_max_block_size` - The maximum batch size (in messages) for poll (default: `max_block_size`). - `kafka_skip_broken_messages` – Kafka message parser tolerance to schema-incompatible messages per block. Default: `0`. If `kafka_skip_broken_messages = N` then the engine skips *N* Kafka messages that cannot be parsed (a message equals a row of data). - `kafka_commit_every_batch` - Commit every consumed and handled batch instead of a single commit after writing a whole block (default: `0`). -- `kafka_thread_per_consumer` - Provide independent thread for each consumer (default: `0`). +- `kafka_thread_per_consumer` - Provide independent thread for each consumer (default: `0`). When enabled, every consumer flush the data independently, in parallel (otherwise - rows from several consumers squashed to form one block). Examples: diff --git a/src/Core/Settings.h b/src/Core/Settings.h index f81ebb81d1f..b565d03a91b 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -71,8 +71,8 @@ class IColumn; M(UInt64, background_buffer_flush_schedule_pool_size, 16, "Number of threads performing background flush for tables with Buffer engine. Only has meaning at server startup.", 0) \ M(UInt64, background_pool_size, 16, "Number of threads performing background work for tables (for example, merging in merge tree). Only has meaning at server startup.", 0) \ M(UInt64, background_move_pool_size, 8, "Number of threads performing background moves for tables. Only has meaning at server startup.", 0) \ - M(UInt64, background_schedule_pool_size, 16, "Number of threads performing background tasks for replicated tables, kafka streaming, dns cache updates. Only has meaning at server startup.", 0) \ - M(UInt64, background_message_broker_schedule_pool_size, 16, "Number of threads performing background tasks for kafka streaming. Only has meaning at server startup.", 0) \ + M(UInt64, background_schedule_pool_size, 16, "Number of threads performing background tasks for replicated tables, dns cache updates. Only has meaning at server startup.", 0) \ + M(UInt64, background_message_broker_schedule_pool_size, 16, "Number of threads performing background tasks for message streaming. Only has meaning at server startup.", 0) \ M(UInt64, background_distributed_schedule_pool_size, 16, "Number of threads performing background tasks for distributed sends. Only has meaning at server startup.", 0) \ \ M(Milliseconds, distributed_directory_monitor_sleep_time_ms, 100, "Sleep time for StorageDistributed DirectoryMonitors, in case of any errors delay grows exponentially.", 0) \ From 77420c82ead609b20566fdb631209b2daae2c41a Mon Sep 17 00:00:00 2001 From: Peng Jian Date: Wed, 2 Sep 2020 17:42:58 +0800 Subject: [PATCH 6/6] Add test case for Kafka engine with kafka_thread_per_consumer enabled. --- tests/integration/test_storage_kafka/test.py | 27 ++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/tests/integration/test_storage_kafka/test.py b/tests/integration/test_storage_kafka/test.py index dbd822476da..8f605e1bbd4 100644 --- a/tests/integration/test_storage_kafka/test.py +++ b/tests/integration/test_storage_kafka/test.py @@ -2192,6 +2192,33 @@ def test_kafka_issue14202(kafka_cluster): DROP TABLE test.kafka_q; ''') +@pytest.mark.timeout(180) +def test_kafka_csv_with_thread_per_consumer(kafka_cluster): + instance.query(''' + CREATE TABLE test.kafka (key UInt64, value UInt64) + ENGINE = Kafka + SETTINGS kafka_broker_list = 'kafka1:19092', + kafka_topic_list = 'csv', + kafka_group_name = 'csv', + kafka_format = 'CSV', + kafka_row_delimiter = '\\n', + kafka_num_consumers = 4, + kafka_thread_per_consumer = 1; + ''') + + messages = [] + for i in range(50): + messages.append('{i}, {i}'.format(i=i)) + kafka_produce('csv', messages) + + result = '' + while True: + result += instance.query('SELECT * FROM test.kafka', ignore_error=True) + if kafka_check_result(result): + break + + kafka_check_result(result, True) + if __name__ == '__main__': cluster.start() raw_input("Cluster created, press any key to destroy...")