From a74ffa70bc4ae9dd031f641bbe5295b3dcadfa36 Mon Sep 17 00:00:00 2001 From: avogar Date: Fri, 30 Dec 2022 16:40:04 +0000 Subject: [PATCH] Don't use async producing in Kafka --- src/Storages/Kafka/KafkaProducer.cpp | 74 +++++++++------------------- src/Storages/Kafka/KafkaProducer.h | 20 ++------ 2 files changed, 27 insertions(+), 67 deletions(-) diff --git a/src/Storages/Kafka/KafkaProducer.cpp b/src/Storages/Kafka/KafkaProducer.cpp index e4c78b53036..f50e15e2133 100644 --- a/src/Storages/Kafka/KafkaProducer.cpp +++ b/src/Storages/Kafka/KafkaProducer.cpp @@ -16,8 +16,6 @@ namespace ProfileEvents namespace DB { -static const auto BATCH = 1000; - namespace ErrorCodes { extern const int LOGICAL_ERROR; @@ -25,7 +23,7 @@ namespace ErrorCodes KafkaProducer::KafkaProducer( ProducerPtr producer_, const std::string & topic_, std::chrono::milliseconds poll_timeout, std::atomic & shutdown_called_, const Block & header) - : producer(producer_), topic(topic_), timeout(poll_timeout), shutdown_called(shutdown_called_), payloads(BATCH) + : producer(producer_), topic(topic_), timeout(poll_timeout), shutdown_called(shutdown_called_) { if (header.has("_key")) { @@ -48,72 +46,46 @@ KafkaProducer::KafkaProducer( void KafkaProducer::produce(const String & message, size_t rows_in_message, const Columns & columns, size_t last_row) { ProfileEvents::increment(ProfileEvents::KafkaRowsWritten, rows_in_message); - Payload payload; - payload.message = message; + cppkafka::MessageBuilder builder(topic); + builder.payload(message); // Note: if it will be few rows per message - it will take the value from last row of block if (key_column_index) { const auto & key_column = assert_cast(*columns[key_column_index.value()]); - payload.key = key_column.getDataAt(last_row).toString(); + const auto key_data = key_column.getDataAt(last_row).toString(); + builder.key(cppkafka::Buffer(key_data.data(), key_data.size())); } if (timestamp_column_index) { const auto & timestamp_column = assert_cast(*columns[timestamp_column_index.value()]); - payload.timestamp = std::chrono::seconds{timestamp_column.getElement(last_row)}; + const auto timestamp = std::chrono::seconds{timestamp_column.getElement(last_row)}; + builder.timestamp(timestamp); } - if (!payloads.push(std::move(payload))) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Could not push to payloads queue"); -} - - -void KafkaProducer::startProducingTaskLoop() -{ - cppkafka::MessageBuilder builder(topic); - while ((!payloads.isFinishedAndEmpty()) && !shutdown_called.load()) + while (!shutdown_called) { - Payload payload; - if (!payloads.pop(payload)) - break; - - builder.payload(payload.message); - - if (payload.key) - builder.key(cppkafka::Buffer(payload.key->data(), payload.key->size())); - - if (payload.timestamp) - builder.timestamp(*payload.timestamp); - - while (true) + try { - try - { - producer->produce(builder); - } - catch (cppkafka::HandleException & e) - { - if (e.get_error() == RD_KAFKA_RESP_ERR__QUEUE_FULL) - { - producer->poll(timeout); - continue; - } - ProfileEvents::increment(ProfileEvents::KafkaProducerErrors); - throw; - } - ProfileEvents::increment(ProfileEvents::KafkaMessagesProduced); - break; + producer->produce(builder); } + catch (cppkafka::HandleException & e) + { + if (e.get_error() == RD_KAFKA_RESP_ERR__QUEUE_FULL) + { + producer->poll(timeout); + continue; + } + ProfileEvents::increment(ProfileEvents::KafkaProducerErrors); + throw; + } + ProfileEvents::increment(ProfileEvents::KafkaMessagesProduced); + break; } } -void KafkaProducer::stopProducingTask() -{ - payloads.finish(); -} - -void KafkaProducer::finishImpl() +void KafkaProducer::finish() { // For unknown reason we may hit some internal timeout when inserting for the first time. while (true) diff --git a/src/Storages/Kafka/KafkaProducer.h b/src/Storages/Kafka/KafkaProducer.h index 221d20fd40c..2e4bbaf7658 100644 --- a/src/Storages/Kafka/KafkaProducer.h +++ b/src/Storages/Kafka/KafkaProducer.h @@ -18,7 +18,7 @@ namespace DB class Block; using ProducerPtr = std::shared_ptr; -class KafkaProducer : public AsynchronousMessageProducer +class KafkaProducer : public IMessageProducer { public: KafkaProducer( @@ -30,21 +30,10 @@ public: void produce(const String & message, size_t rows_in_message, const Columns & columns, size_t last_row) override; + void start(const ContextPtr &) override {} + void finish() override; + private: - void stopProducingTask() override; - void finishImpl() override; - - String getProducingTaskName() const override { return "KafkaProducingTask"; } - - void startProducingTaskLoop() override; - - struct Payload - { - String message; - std::optional key; - std::optional timestamp; - }; - CurrentMetrics::Increment metric_increment{CurrentMetrics::KafkaProducers}; ProducerPtr producer; @@ -55,7 +44,6 @@ private: std::optional key_column_index; std::optional timestamp_column_index; - ConcurrentBoundedQueue payloads; }; }