Merge pull request #13939 from fastio/master

Multithreading implementation of Kafka engine
This commit is contained in:
alexey-milovidov 2020-09-07 20:12:06 +03:00 committed by GitHub
commit e1104135c8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 99 additions and 20 deletions

View File

@ -32,7 +32,8 @@ SETTINGS
[kafka_num_consumers = N,]
[kafka_max_block_size = 0,]
[kafka_skip_broken_messages = N,]
[kafka_commit_every_batch = 0]
[kafka_commit_every_batch = 0,]
[kafka_thread_per_consumer = 0]
```
Required parameters:
@ -50,6 +51,7 @@ Optional parameters:
- `kafka_max_block_size` - The maximum batch size (in messages) for poll (default: `max_block_size`).
- `kafka_skip_broken_messages` Kafka message parser tolerance to schema-incompatible messages per block. Default: `0`. If `kafka_skip_broken_messages = N` then the engine skips *N* Kafka messages that cannot be parsed (a message equals a row of data).
- `kafka_commit_every_batch` - Commit every consumed and handled batch instead of a single commit after writing a whole block (default: `0`).
- `kafka_thread_per_consumer` - Provide independent thread for each consumer (default: `0`). When enabled, every consumer flush the data independently, in parallel (otherwise - rows from several consumers squashed to form one block).
Examples:

View File

@ -14,6 +14,7 @@
M(BackgroundSchedulePoolTask, "Number of active tasks in BackgroundSchedulePool. This pool is used for periodic ReplicatedMergeTree tasks, like cleaning old data parts, altering data parts, replica re-initialization, etc.") \
M(BackgroundBufferFlushSchedulePoolTask, "Number of active tasks in BackgroundBufferFlushSchedulePool. This pool is used for periodic Buffer flushes") \
M(BackgroundDistributedSchedulePoolTask, "Number of active tasks in BackgroundDistributedSchedulePool. This pool is used for distributed sends that is done in background.") \
M(BackgroundMessageBrokerSchedulePoolTask, "Number of active tasks in BackgroundProcessingPool for message streaming") \
M(CacheDictionaryUpdateQueueBatches, "Number of 'batches' (a set of keys) in update queue in CacheDictionaries.") \
M(CacheDictionaryUpdateQueueKeys, "Exact number of keys in update queue in CacheDictionaries.") \
M(DiskSpaceReservedForMerge, "Disk space reserved for currently running background merges. It is slightly more than the total size of currently merging parts.") \
@ -38,6 +39,7 @@
M(MemoryTrackingInBackgroundSchedulePool, "Total amount of memory (bytes) allocated in background schedule pool (that is dedicated for bookkeeping tasks of Replicated tables).") \
M(MemoryTrackingInBackgroundBufferFlushSchedulePool, "Total amount of memory (bytes) allocated in background buffer flushes pool (that is dedicated for background buffer flushes).") \
M(MemoryTrackingInBackgroundDistributedSchedulePool, "Total amount of memory (bytes) allocated in background distributed schedule pool (that is dedicated for distributed sends).") \
M(MemoryTrackingInBackgroundMessageBrokerSchedulePool, "Total amount of memory (bytes) allocated in background message broker pool (that is dedicated for background message streaming).") \
M(MemoryTrackingForMerges, "Total amount of memory (bytes) allocated for background merges. Included in MemoryTrackingInBackgroundProcessingPool. Note that this value may include a drift when the memory was allocated in a context of background processing pool and freed in other context or vice-versa. This happens naturally due to caches for tables indexes and doesn't indicate memory leaks.") \
M(EphemeralNode, "Number of ephemeral nodes hold in ZooKeeper.") \
M(ZooKeeperSession, "Number of sessions (connections) to ZooKeeper. Should be no more than one, because using more than one connection to ZooKeeper may lead to bugs due to lack of linearizability (stale reads) that ZooKeeper consistency model allows.") \

View File

@ -71,7 +71,8 @@ class IColumn;
M(UInt64, background_buffer_flush_schedule_pool_size, 16, "Number of threads performing background flush for tables with Buffer engine. Only has meaning at server startup.", 0) \
M(UInt64, background_pool_size, 16, "Number of threads performing background work for tables (for example, merging in merge tree). Only has meaning at server startup.", 0) \
M(UInt64, background_move_pool_size, 8, "Number of threads performing background moves for tables. Only has meaning at server startup.", 0) \
M(UInt64, background_schedule_pool_size, 16, "Number of threads performing background tasks for replicated tables, kafka streaming, dns cache updates. Only has meaning at server startup.", 0) \
M(UInt64, background_schedule_pool_size, 16, "Number of threads performing background tasks for replicated tables, dns cache updates. Only has meaning at server startup.", 0) \
M(UInt64, background_message_broker_schedule_pool_size, 16, "Number of threads performing background tasks for message streaming. Only has meaning at server startup.", 0) \
M(UInt64, background_distributed_schedule_pool_size, 16, "Number of threads performing background tasks for distributed sends. Only has meaning at server startup.", 0) \
\
M(Milliseconds, distributed_directory_monitor_sleep_time_ms, 100, "Sleep time for StorageDistributed DirectoryMonitors, in case of any errors delay grows exponentially.", 0) \

View File

@ -82,6 +82,9 @@ namespace CurrentMetrics
extern const Metric BackgroundDistributedSchedulePoolTask;
extern const Metric MemoryTrackingInBackgroundDistributedSchedulePool;
extern const Metric BackgroundMessageBrokerSchedulePoolTask;
extern const Metric MemoryTrackingInBackgroundMessageBrokerSchedulePool;
}
@ -341,6 +344,7 @@ struct ContextShared
std::optional<BackgroundProcessingPool> background_move_pool; /// The thread pool for the background moves performed by the tables.
std::optional<BackgroundSchedulePool> schedule_pool; /// A thread pool that can run different jobs in background (used in replicated tables)
std::optional<BackgroundSchedulePool> distributed_schedule_pool; /// A thread pool that can run different jobs in background (used for distributed sends)
std::optional<BackgroundSchedulePool> message_broker_schedule_pool; /// A thread pool that can run different jobs in background (used in kafka streaming)
MultiVersion<Macros> macros; /// Substitutions extracted from config.
std::unique_ptr<DDLWorker> ddl_worker; /// Process ddl commands from zk.
/// Rules for selecting the compression settings, depending on the size of the part.
@ -441,6 +445,7 @@ struct ContextShared
schedule_pool.reset();
distributed_schedule_pool.reset();
ddl_worker.reset();
message_broker_schedule_pool.reset();
/// Stop trace collector if any
trace_collector.reset();
@ -1421,6 +1426,18 @@ BackgroundSchedulePool & Context::getDistributedSchedulePool()
return *shared->distributed_schedule_pool;
}
BackgroundSchedulePool & Context::getMessageBrokerSchedulePool()
{
auto lock = getLock();
if (!shared->message_broker_schedule_pool)
shared->message_broker_schedule_pool.emplace(
settings.background_message_broker_schedule_pool_size,
CurrentMetrics::BackgroundMessageBrokerSchedulePoolTask,
CurrentMetrics::MemoryTrackingInBackgroundMessageBrokerSchedulePool,
"BgMBSchPool");
return *shared->message_broker_schedule_pool;
}
void Context::setDDLWorker(std::unique_ptr<DDLWorker> ddl_worker)
{
auto lock = getLock();

View File

@ -503,6 +503,7 @@ public:
BackgroundProcessingPool & getBackgroundPool();
BackgroundProcessingPool & getBackgroundMovePool();
BackgroundSchedulePool & getSchedulePool();
BackgroundSchedulePool & getMessageBrokerSchedulePool();
BackgroundSchedulePool & getDistributedSchedulePool();
void setDDLWorker(std::unique_ptr<DDLWorker> ddl_worker);

View File

@ -28,7 +28,8 @@ class ASTStorage;
M(String, kafka_format, "", "The message format for Kafka engine.", 0) \
M(Char, kafka_row_delimiter, '\0', "The character to be considered as a delimiter in Kafka message.", 0) \
M(String, kafka_schema, "", "Schema identifier (used by schema-based formats) for Kafka engine", 0) \
M(UInt64, kafka_skip_broken_messages, 0, "Skip at least this number of broken messages from Kafka topic per block", 0)
M(UInt64, kafka_skip_broken_messages, 0, "Skip at least this number of broken messages from Kafka topic per block", 0) \
M(Bool, kafka_thread_per_consumer, false, "Provide independent thread for each consumer", 0)
/** TODO: */
/* https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md */

View File

@ -138,12 +138,18 @@ StorageKafka::StorageKafka(
, semaphore(0, num_consumers)
, intermediate_commit(kafka_settings->kafka_commit_every_batch.value)
, settings_adjustments(createSettingsAdjustments())
, thread_per_consumer(kafka_settings->kafka_thread_per_consumer.value)
{
StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(columns_);
setInMemoryMetadata(storage_metadata);
task = global_context.getSchedulePool().createTask(log->name(), [this]{ threadFunc(); });
task->deactivate();
auto task_count = thread_per_consumer ? num_consumers : 1;
for (size_t i = 0; i < task_count; ++i)
{
auto task = global_context.getSchedulePool().createTask(log->name(), [this, i]{ threadFunc(i); });
task->deactivate();
tasks.emplace_back(std::make_shared<TaskContext>(std::move(task)));
}
}
SettingsChanges StorageKafka::createSettingsAdjustments()
@ -257,17 +263,23 @@ void StorageKafka::startup()
}
// Start the reader thread
task->activateAndSchedule();
for (auto & task : tasks)
{
task->holder->activateAndSchedule();
}
}
void StorageKafka::shutdown()
{
// Interrupt streaming thread
stream_cancelled = true;
for (auto & task : tasks)
{
// Interrupt streaming thread
task->stream_cancelled = true;
LOG_TRACE(log, "Waiting for cleanup");
task->deactivate();
LOG_TRACE(log, "Waiting for cleanup");
task->holder->deactivate();
}
LOG_TRACE(log, "Closing consumers");
for (size_t i = 0; i < num_created_consumers; ++i)
@ -368,7 +380,12 @@ ConsumerBufferPtr StorageKafka::createReadBuffer(const size_t consumer_number)
consumer->set_destroy_flags(RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE);
/// NOTE: we pass |stream_cancelled| by reference here, so the buffers should not outlive the storage.
return std::make_shared<ReadBufferFromKafkaConsumer>(consumer, log, getPollMaxBatchSize(), getPollTimeoutMillisecond(), intermediate_commit, stream_cancelled, topics);
if (thread_per_consumer)
{
auto& stream_cancelled = tasks[consumer_number]->stream_cancelled;
return std::make_shared<ReadBufferFromKafkaConsumer>(consumer, log, getPollMaxBatchSize(), getPollTimeoutMillisecond(), intermediate_commit, stream_cancelled, topics);
}
return std::make_shared<ReadBufferFromKafkaConsumer>(consumer, log, getPollMaxBatchSize(), getPollTimeoutMillisecond(), intermediate_commit, tasks.back()->stream_cancelled, topics);
}
size_t StorageKafka::getMaxBlockSize() const
@ -464,8 +481,10 @@ bool StorageKafka::checkDependencies(const StorageID & table_id)
return true;
}
void StorageKafka::threadFunc()
void StorageKafka::threadFunc(size_t idx)
{
assert(idx < tasks.size());
auto task = tasks[idx];
try
{
auto table_id = getStorageID();
@ -476,7 +495,7 @@ void StorageKafka::threadFunc()
auto start_time = std::chrono::steady_clock::now();
// Keep streaming as long as there are attached views and streaming is not cancelled
while (!stream_cancelled && num_created_consumers > 0)
while (!task->stream_cancelled && num_created_consumers > 0)
{
if (!checkDependencies(table_id))
break;
@ -507,8 +526,8 @@ void StorageKafka::threadFunc()
}
// Wait for attached views
if (!stream_cancelled)
task->scheduleAfter(RESCHEDULE_MS);
if (!task->stream_cancelled)
task->holder->scheduleAfter(RESCHEDULE_MS);
}
@ -537,9 +556,10 @@ bool StorageKafka::streamToViews()
// Create a stream for each consumer and join them in a union stream
BlockInputStreams streams;
streams.reserve(num_created_consumers);
for (size_t i = 0; i < num_created_consumers; ++i)
auto stream_count = thread_per_consumer ? 1 : num_created_consumers;
streams.reserve(stream_count);
for (size_t i = 0; i < stream_count; ++i)
{
auto stream = std::make_shared<KafkaBlockInputStream>(*this, metadata_snapshot, kafka_context, block_io.out->getHeader().getNames(), log, block_size, false);
streams.emplace_back(stream);

View File

@ -94,8 +94,16 @@ private:
std::mutex mutex;
// Stream thread
BackgroundSchedulePool::TaskHolder task;
std::atomic<bool> stream_cancelled{false};
struct TaskContext
{
BackgroundSchedulePool::TaskHolder holder;
std::atomic<bool> stream_cancelled {false};
explicit TaskContext(BackgroundSchedulePool::TaskHolder&& task_) : holder(std::move(task_))
{
}
};
std::vector<std::shared_ptr<TaskContext>> tasks;
bool thread_per_consumer = false;
SettingsChanges createSettingsAdjustments();
ConsumerBufferPtr createReadBuffer(const size_t consumer_number);
@ -103,7 +111,7 @@ private:
// Update Kafka configuration with values from CH user configuration.
void updateConfiguration(cppkafka::Configuration & conf);
void threadFunc();
void threadFunc(size_t idx);
size_t getPollMaxBatchSize() const;
size_t getMaxBlockSize() const;

View File

@ -2192,6 +2192,33 @@ def test_kafka_issue14202(kafka_cluster):
DROP TABLE test.kafka_q;
''')
@pytest.mark.timeout(180)
def test_kafka_csv_with_thread_per_consumer(kafka_cluster):
instance.query('''
CREATE TABLE test.kafka (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'csv',
kafka_group_name = 'csv',
kafka_format = 'CSV',
kafka_row_delimiter = '\\n',
kafka_num_consumers = 4,
kafka_thread_per_consumer = 1;
''')
messages = []
for i in range(50):
messages.append('{i}, {i}'.format(i=i))
kafka_produce('csv', messages)
result = ''
while True:
result += instance.query('SELECT * FROM test.kafka', ignore_error=True)
if kafka_check_result(result):
break
kafka_check_result(result, True)
if __name__ == '__main__':
cluster.start()
raw_input("Cluster created, press any key to destroy...")