diff --git a/dbms/src/Storages/Kafka/KafkaBlockInputStream.cpp b/dbms/src/Storages/Kafka/KafkaBlockInputStream.cpp new file mode 100644 index 00000000000..f4f5288a216 --- /dev/null +++ b/dbms/src/Storages/Kafka/KafkaBlockInputStream.cpp @@ -0,0 +1,101 @@ +#include + +#include +#include + +namespace DB +{ +namespace ErrorCodes +{ + extern const int TIMEOUT_EXCEEDED; +} // namespace ErrorCodes + +KafkaBlockInputStream::KafkaBlockInputStream( + StorageKafka & storage_, const Context & context_, const String & schema, size_t max_block_size_) + : storage(storage_), context(context_), max_block_size(max_block_size_) +{ + // Always skip unknown fields regardless of the context (JSON or TSKV) + context.setSetting("input_format_skip_unknown_fields", 1u); + + // We don't use ratio since the number of Kafka messages may vary from stream to stream. + // Thus, ratio is meaningless. + context.setSetting("input_format_allow_errors_ratio", 1.); + context.setSetting("input_format_allow_errors_num", storage.skip_broken); + + if (schema.size() > 0) + context.setSetting("format_schema", schema); +} + +KafkaBlockInputStream::~KafkaBlockInputStream() +{ + if (!hasClaimed()) + return; + + // An error was thrown during the stream or it did not finish successfully + // The read offsets weren't comitted, so consumer must rejoin the group from the original starting point + if (!finalized) + { + LOG_TRACE(storage.log, "KafkaBlockInputStream did not finish successfully, unsubscribing from assignments and rejoining"); + consumer->unsubscribe(); + consumer->subscribe(storage.topics); + } + + // Return consumer for another reader + storage.pushConsumer(consumer); + consumer = nullptr; +} + +String KafkaBlockInputStream::getName() const +{ + return storage.getName(); +} + +Block KafkaBlockInputStream::readImpl() +{ + if (isCancelledOrThrowIfKilled() || !hasClaimed()) + return {}; + + if (!reader) + throw Exception("Logical error: reader is not initialized", ErrorCodes::LOGICAL_ERROR); + + return reader->read(); +} + +Block KafkaBlockInputStream::getHeader() const +{ + return storage.getSampleBlock(); +} + +void KafkaBlockInputStream::readPrefixImpl() +{ + if (!hasClaimed()) + { + // Create a formatted reader on Kafka messages + LOG_TRACE(storage.log, "Creating formatted reader"); + consumer = storage.tryClaimConsumer(context.getSettingsRef().queue_max_wait_ms.totalMilliseconds()); + if (consumer == nullptr) + throw Exception("Failed to claim consumer: ", ErrorCodes::TIMEOUT_EXCEEDED); + + read_buf = std::make_unique(consumer, storage.log, storage.row_delimiter); + reader = FormatFactory::instance().getInput(storage.format_name, *read_buf, storage.getSampleBlock(), context, max_block_size); + } + + // Start reading data + finalized = false; + reader->readPrefix(); +} + +void KafkaBlockInputStream::readSuffixImpl() +{ + if (hasClaimed()) + { + reader->readSuffix(); + // Store offsets read in this stream + read_buf->commit(); + } + + // Mark as successfully finished + finalized = true; +} + +} // namespace DB diff --git a/dbms/src/Storages/Kafka/KafkaBlockInputStream.h b/dbms/src/Storages/Kafka/KafkaBlockInputStream.h new file mode 100644 index 00000000000..30c7bb9ea2c --- /dev/null +++ b/dbms/src/Storages/Kafka/KafkaBlockInputStream.h @@ -0,0 +1,37 @@ +#pragma once + +#include +#include +#include + +namespace DB +{ +class StorageKafka; + +class KafkaBlockInputStream : public IProfilingBlockInputStream +{ +public: + KafkaBlockInputStream(StorageKafka & storage_, const Context & context_, const String & schema, size_t max_block_size_); + ~KafkaBlockInputStream() override; + + String getName() const override; + Block readImpl() override; + Block getHeader() const override; + void readPrefixImpl() override; + void readSuffixImpl() override; + +private: + StorageKafka & storage; + ConsumerPtr consumer; + Context context; + size_t max_block_size; + Block sample_block; + std::unique_ptr read_buf; + BlockInputStreamPtr reader; + bool finalized = false; + + // Return true if consumer has been claimed by the stream + bool hasClaimed() { return consumer != nullptr; } +}; + +} // namespace DB diff --git a/dbms/src/Storages/Kafka/ReadBufferFromKafkaConsumer.cpp b/dbms/src/Storages/Kafka/ReadBufferFromKafkaConsumer.cpp new file mode 100644 index 00000000000..b417996e2ba --- /dev/null +++ b/dbms/src/Storages/Kafka/ReadBufferFromKafkaConsumer.cpp @@ -0,0 +1,62 @@ +#include + +namespace DB +{ + +namespace +{ + /// How long to wait for a single message (applies to each individual message) + const auto READ_POLL_MS = 500; +} // namespace + +bool ReadBufferFromKafkaConsumer::nextImpl() +{ + if (current_pending) + { + // XXX: very fishy place with const casting. + BufferBase::set( + reinterpret_cast(const_cast(current.get_payload().get_data())), current.get_payload().get_size(), 0); + current_pending = false; + return true; + } + + // Process next buffered message + auto message = consumer->poll(std::chrono::milliseconds(READ_POLL_MS)); + if (!message) + return false; + + if (message.is_eof()) + { + // Reached EOF while reading current batch, skip it. + LOG_TRACE(log, "EOF reached for partition " << message.get_partition() << " offset " << message.get_offset()); + return nextImpl(); + } + else if (auto err = message.get_error()) + { + LOG_ERROR(log, "Consumer error: " << err); + return false; + } + + ++read_messages; + + // Now we've received a new message. Check if we need to produce a delimiter + if (row_delimiter != '\0' && current) + { + BufferBase::set(&row_delimiter, 1, 0); + current = std::move(message); + current_pending = true; + return true; + } + + // Consume message and mark the topic/partition offset + // The offsets will be committed in the readSuffix() method after the block is completed + // If an exception is thrown before that would occur, the client will rejoin without committing offsets + current = std::move(message); + + // XXX: very fishy place with const casting. + BufferBase::set( + reinterpret_cast(const_cast(current.get_payload().get_data())), current.get_payload().get_size(), 0); + return true; +} + +} // namespace DB diff --git a/dbms/src/Storages/Kafka/ReadBufferFromKafkaConsumer.h b/dbms/src/Storages/Kafka/ReadBufferFromKafkaConsumer.h new file mode 100644 index 00000000000..ed1a734ebb5 --- /dev/null +++ b/dbms/src/Storages/Kafka/ReadBufferFromKafkaConsumer.h @@ -0,0 +1,44 @@ +#pragma once + +#include +#include + +#include + +namespace DB +{ +using ConsumerPtr = std::shared_ptr; + +class ReadBufferFromKafkaConsumer : public ReadBuffer +{ +public: + ReadBufferFromKafkaConsumer(ConsumerPtr consumer_, Poco::Logger * log_, char row_delimiter_) + : ReadBuffer(nullptr, 0), consumer(consumer_), log(log_), row_delimiter(row_delimiter_) + { + if (row_delimiter != '\0') + LOG_TRACE(log, "Row delimiter is: " << row_delimiter); + } + + /// Commit messages read with this consumer + void commit() + { + LOG_TRACE(log, "Committing " << read_messages << " messages"); + if (read_messages == 0) + return; + + consumer->async_commit(); + read_messages = 0; + } + +private: + ConsumerPtr consumer; + cppkafka::Message current; + bool current_pending = false; /// We've fetched "current" message and need to process it on the next iteration. + Poco::Logger * log; + size_t read_messages = 0; + char row_delimiter; + + bool nextImpl() override; +}; + +} // namespace DB diff --git a/dbms/src/Storages/Kafka/StorageKafka.cpp b/dbms/src/Storages/Kafka/StorageKafka.cpp index 2d8b85ad639..bc2bce29dd3 100644 --- a/dbms/src/Storages/Kafka/StorageKafka.cpp +++ b/dbms/src/Storages/Kafka/StorageKafka.cpp @@ -6,9 +6,6 @@ #include #include #include -#include -#include -#include #include #include #include @@ -16,6 +13,7 @@ #include #include #include +#include #include #include #include @@ -42,210 +40,31 @@ namespace ErrorCodes extern const int LOGICAL_ERROR; extern const int BAD_ARGUMENTS; extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; - extern const int TIMEOUT_EXCEEDED; } -using namespace Poco::Util; - -/// How long to wait for a single message (applies to each individual message) -static const auto READ_POLL_MS = 500; -static const auto CLEANUP_TIMEOUT_MS = 3000; - -/// Configuration prefix -static const String CONFIG_PREFIX = "kafka"; - -class ReadBufferFromKafkaConsumer : public ReadBuffer +namespace { - ConsumerPtr consumer; - cppkafka::Message current; - bool current_pending = false; /// We've fetched "current" message and need to process it on the next iteration. - Poco::Logger * log; - size_t read_messages = 0; - char row_delimiter; + const auto RESCHEDULE_MS = 500; + const auto CLEANUP_TIMEOUT_MS = 3000; - bool nextImpl() override + /// Configuration prefix + const String CONFIG_PREFIX = "kafka"; + + void loadFromConfig(cppkafka::Configuration & conf, const Poco::Util::AbstractConfiguration & config, const std::string & path) { - if (current_pending) + Poco::Util::AbstractConfiguration::Keys keys; + std::vector errstr(512); + + config.keys(path, keys); + + for (const auto & key : keys) { - // XXX: very fishy place with const casting. - BufferBase::set(reinterpret_cast(const_cast(current.get_payload().get_data())), current.get_payload().get_size(), 0); - current_pending = false; - return true; + const String key_path = path + "." + key; + const String key_name = boost::replace_all_copy(key, "_", "."); + conf.set(key_name, config.getString(key_path)); } - - // Process next buffered message - auto message = consumer->poll(std::chrono::milliseconds(READ_POLL_MS)); - if (!message) - return false; - - if (message.is_eof()) - { - // Reached EOF while reading current batch, skip it. - LOG_TRACE(log, "EOF reached for partition " << message.get_partition() << " offset " << message.get_offset()); - return nextImpl(); - } - else if (auto err = message.get_error()) - { - LOG_ERROR(log, "Consumer error: " << err); - return false; - } - - ++read_messages; - - // Now we've received a new message. Check if we need to produce a delimiter - if (row_delimiter != '\0' && current) - { - BufferBase::set(&row_delimiter, 1, 0); - current = std::move(message); - current_pending = true; - return true; - } - - // Consume message and mark the topic/partition offset - // The offsets will be committed in the readSuffix() method after the block is completed - // If an exception is thrown before that would occur, the client will rejoin without committing offsets - current = std::move(message); - - // XXX: very fishy place with const casting. - BufferBase::set(reinterpret_cast(const_cast(current.get_payload().get_data())), current.get_payload().get_size(), 0); - return true; } - -public: - ReadBufferFromKafkaConsumer(ConsumerPtr consumer_, Poco::Logger * log_, char row_delimiter_) - : ReadBuffer(nullptr, 0), consumer(consumer_), log(log_), row_delimiter(row_delimiter_) - { - if (row_delimiter != '\0') - LOG_TRACE(log, "Row delimiter is: " << row_delimiter); - } - - /// Commit messages read with this consumer - void commit() - { - LOG_TRACE(log, "Committing " << read_messages << " messages"); - if (read_messages == 0) - return; - - consumer->async_commit(); - read_messages = 0; - } -}; - -class KafkaBlockInputStream : public IBlockInputStream -{ -public: - KafkaBlockInputStream(StorageKafka & storage_, const Context & context_, const String & schema, size_t max_block_size_) - : storage(storage_), context(context_), max_block_size(max_block_size_) - { - // Always skip unknown fields regardless of the context (JSON or TSKV) - context.setSetting("input_format_skip_unknown_fields", 1u); - - // We don't use ratio since the number of Kafka messages may vary from stream to stream. - // Thus, ratio is meaningless. - context.setSetting("input_format_allow_errors_ratio", 1.); - context.setSetting("input_format_allow_errors_num", storage.skip_broken); - - if (schema.size() > 0) - context.setSetting("format_schema", schema); - } - - ~KafkaBlockInputStream() override - { - if (!hasClaimed()) - return; - - // An error was thrown during the stream or it did not finish successfully - // The read offsets weren't committed, so consumer must rejoin the group from the original starting point - if (!finalized) - { - LOG_TRACE(storage.log, "KafkaBlockInputStream did not finish successfully, unsubscribing from assignments and rejoining"); - consumer->unsubscribe(); - consumer->subscribe(storage.topics); - } - - // Return consumer for another reader - storage.pushConsumer(consumer); - consumer = nullptr; - } - - String getName() const override - { - return storage.getName(); - } - - Block readImpl() override - { - if (isCancelledOrThrowIfKilled() || !hasClaimed()) - return {}; - - if (!reader) - throw Exception("Logical error: reader is not initialized", ErrorCodes::LOGICAL_ERROR); - - return reader->read(); - } - - Block getHeader() const override { return storage.getSampleBlock(); } - - void readPrefixImpl() override - { - if (!hasClaimed()) - { - // Create a formatted reader on Kafka messages - LOG_TRACE(storage.log, "Creating formatted reader"); - consumer = storage.tryClaimConsumer(context.getSettingsRef().queue_max_wait_ms.totalMilliseconds()); - if (consumer == nullptr) - throw Exception("Failed to claim consumer: ", ErrorCodes::TIMEOUT_EXCEEDED); - - read_buf = std::make_unique(consumer, storage.log, storage.row_delimiter); - reader = FormatFactory::instance().getInput(storage.format_name, *read_buf, storage.getSampleBlock(), context, max_block_size); - } - - // Start reading data - finalized = false; - reader->readPrefix(); - } - - void readSuffixImpl() override - { - if (hasClaimed()) - { - reader->readSuffix(); - // Store offsets read in this stream - read_buf->commit(); - } - - // Mark as successfully finished - finalized = true; - } - -private: - StorageKafka & storage; - ConsumerPtr consumer; - Context context; - size_t max_block_size; - Block sample_block; - std::unique_ptr read_buf; - BlockInputStreamPtr reader; - bool finalized = false; - - // Return true if consumer has been claimed by the stream - bool hasClaimed() { return consumer != nullptr; } -}; - -static void loadFromConfig(cppkafka::Configuration & conf, const AbstractConfiguration & config, const std::string & path) -{ - AbstractConfiguration::Keys keys; - std::vector errstr(512); - - config.keys(path, keys); - - for (const auto & key : keys) - { - const String key_path = path + "." + key; - const String key_name = boost::replace_all_copy(key, "_", "."); - conf.set(key_name, config.getString(key_path)); - } -} +} // namespace StorageKafka::StorageKafka( const std::string & table_name_, @@ -361,6 +180,9 @@ cppkafka::Configuration StorageKafka::createConsumerConfiguration() // We manually commit offsets after a stream successfully finished conf.set("enable.auto.commit", "false"); + // for debug logs inside rdkafka + conf.set("debug", "consumer,cgrp,topic,fetch"); + // Update consumer configuration from the configuration const auto & config = global_context.getConfigRef(); if (config.has(CONFIG_PREFIX)) @@ -461,7 +283,7 @@ void StorageKafka::streamThread() // Wait for attached views if (!stream_cancelled) - task->scheduleAfter(READ_POLL_MS); + task->scheduleAfter(RESCHEDULE_MS); }