From 39e2563f930a8a480c72bfb210f68f5742680cb2 Mon Sep 17 00:00:00 2001 From: Ivan Lezhankin Date: Wed, 30 Jan 2019 20:41:06 +0300 Subject: [PATCH] Create new consumers on-demand and give them some time for graceful assignment. --- .../Storages/Kafka/KafkaBlockInputStream.cpp | 79 +++++++------------ .../Storages/Kafka/KafkaBlockInputStream.h | 16 ++-- dbms/src/Storages/Kafka/StorageKafka.cpp | 3 + 3 files changed, 38 insertions(+), 60 deletions(-) diff --git a/dbms/src/Storages/Kafka/KafkaBlockInputStream.cpp b/dbms/src/Storages/Kafka/KafkaBlockInputStream.cpp index 5668e0d167c..c511a1053b3 100644 --- a/dbms/src/Storages/Kafka/KafkaBlockInputStream.cpp +++ b/dbms/src/Storages/Kafka/KafkaBlockInputStream.cpp @@ -1,92 +1,69 @@ #include #include -#include +#include namespace DB { -namespace ErrorCodes -{ - extern const int TIMEOUT_EXCEEDED; -} // namespace ErrorCodes KafkaBlockInputStream::KafkaBlockInputStream( StorageKafka & storage_, const Context & context_, const String & schema, size_t max_block_size_) : storage(storage_), context(context_), max_block_size(max_block_size_) { - // Always skip unknown fields regardless of the context (JSON or TSKV) - context.setSetting("input_format_skip_unknown_fields", 1u); - - // We don't use ratio since the number of Kafka messages may vary from stream to stream. - // Thus, ratio is meaningless. + context.setSetting("input_format_skip_unknown_fields", 1u); // Always skip unknown fields regardless of the context (JSON or TSKV) context.setSetting("input_format_allow_errors_ratio", 0.); context.setSetting("input_format_allow_errors_num", storage.skip_broken); - if (schema.size() > 0) + if (!schema.empty()) context.setSetting("format_schema", schema); } KafkaBlockInputStream::~KafkaBlockInputStream() { - if (!hasClaimed()) + if (!claimed) return; - // An error was thrown during the stream or it did not finish successfully - // The read offsets weren't comitted, so consumer must rejoin the group from the original starting point - if (!finalized) + if (broken) { - LOG_TRACE(storage.log, "KafkaBlockInputStream did not finish successfully, unsubscribing from assignments and rejoining"); + LOG_TRACE(storage.log, "Re-joining claimed consumer after failure"); consumer->unsubscribe(); - consumer->subscribe(storage.topics); } - // Return consumer for another reader storage.pushConsumer(consumer); } -String KafkaBlockInputStream::getName() const -{ - return storage.getName(); -} - -Block KafkaBlockInputStream::readImpl() -{ - if (!hasClaimed()) - return {}; - - return children.back()->read(); -} - -Block KafkaBlockInputStream::getHeader() const -{ - return storage.getSampleBlock(); -} - void KafkaBlockInputStream::readPrefixImpl() { - if (!hasClaimed()) - { - // Create a formatted reader on Kafka messages - LOG_TRACE(storage.log, "Creating formatted reader"); - consumer = storage.tryClaimConsumer(context.getSettingsRef().queue_max_wait_ms.totalMilliseconds()); - if (consumer == nullptr) - throw Exception("Failed to claim consumer: ", ErrorCodes::TIMEOUT_EXCEEDED); + consumer = storage.tryClaimConsumer(context.getSettingsRef().queue_max_wait_ms.totalMilliseconds()); + claimed = !!consumer; - buffer = std::make_unique(new ReadBufferFromKafkaConsumer(consumer, storage.log, max_block_size), storage.row_delimiter); - addChild(FormatFactory::instance().getInput(storage.format_name, *buffer, storage.getSampleBlock(), context, max_block_size)); + if (!consumer) + consumer = std::make_shared(storage.createConsumerConfiguration()); + + // While we wait for an assignment after subscribtion, we'll poll zero messages anyway. + // If we're doing a manual select then it's better to get something after a wait, then immediate nothing. + if (consumer->get_subscription().empty()) + { + using namespace std::chrono_literals; + + consumer->pause(); // don't accidentally read any messages + consumer->subscribe(storage.topics); + consumer->poll(5s); + consumer->resume(); } - // Start reading data - finalized = false; + buffer = std::make_unique( + new ReadBufferFromKafkaConsumer(consumer, storage.log, max_block_size), storage.row_delimiter); + addChild(FormatFactory::instance().getInput(storage.format_name, *buffer, storage.getSampleBlock(), context, max_block_size)); + + broken = true; } void KafkaBlockInputStream::readSuffixImpl() { - if (hasClaimed()) - buffer->subBufferAs()->commit(); + buffer->subBufferAs()->commit(); - // Mark as successfully finished - finalized = true; + broken = false; } } // namespace DB diff --git a/dbms/src/Storages/Kafka/KafkaBlockInputStream.h b/dbms/src/Storages/Kafka/KafkaBlockInputStream.h index 9414c7c5939..581fc391d32 100644 --- a/dbms/src/Storages/Kafka/KafkaBlockInputStream.h +++ b/dbms/src/Storages/Kafka/KafkaBlockInputStream.h @@ -3,11 +3,11 @@ #include #include #include -#include + +#include namespace DB { -class StorageKafka; class KafkaBlockInputStream : public IBlockInputStream { @@ -15,9 +15,10 @@ public: KafkaBlockInputStream(StorageKafka & storage_, const Context & context_, const String & schema, size_t max_block_size_); ~KafkaBlockInputStream() override; - String getName() const override; - Block readImpl() override; - Block getHeader() const override; + String getName() const override { return storage.getName(); } + Block readImpl() override { return children.back()->read(); } + Block getHeader() const override { return storage.getSampleBlock(); } + void readPrefixImpl() override; void readSuffixImpl() override; @@ -28,10 +29,7 @@ private: ConsumerPtr consumer; std::unique_ptr buffer; - bool finalized = false; - - // Return true if consumer has been claimed by the stream - bool hasClaimed() { return consumer != nullptr; } + bool broken = true, claimed = false; }; } // namespace DB diff --git a/dbms/src/Storages/Kafka/StorageKafka.cpp b/dbms/src/Storages/Kafka/StorageKafka.cpp index eb7bc04260e..8d7d8b01985 100644 --- a/dbms/src/Storages/Kafka/StorageKafka.cpp +++ b/dbms/src/Storages/Kafka/StorageKafka.cpp @@ -177,6 +177,9 @@ cppkafka::Configuration StorageKafka::createConsumerConfiguration() conf.set("client.id", VERSION_FULL); + // If no offset stored for this group, read all messages from the start + conf.set("auto.offset.reset", "smallest"); + // We manually commit offsets after a stream successfully finished conf.set("enable.auto.commit", "false");