diff --git a/dbms/src/Core/Types.h b/dbms/src/Core/Types.h index 1209b1b1d72..61216a637f3 100644 --- a/dbms/src/Core/Types.h +++ b/dbms/src/Core/Types.h @@ -1,8 +1,8 @@ #pragma once +#include #include #include -#include namespace DB diff --git a/dbms/src/Storages/ColumnsDescription.h b/dbms/src/Storages/ColumnsDescription.h index 44a60d2dc7e..e7f2919c3bd 100644 --- a/dbms/src/Storages/ColumnsDescription.h +++ b/dbms/src/Storages/ColumnsDescription.h @@ -68,8 +68,8 @@ public: NamesAndTypesList getOrdinary() const; NamesAndTypesList getMaterialized() const; NamesAndTypesList getAliasesAndVirtuals() const; - /// ordinary + materialized + aliases + virtuals. - NamesAndTypesList getAll() const; + NamesAndTypesList getAllPhysical() const; /// ordinary + materialized. + NamesAndTypesList getAll() const; /// ordinary + materialized + aliases + virtuals. using ColumnTTLs = std::unordered_map; ColumnTTLs getColumnTTLs() const; @@ -88,8 +88,6 @@ public: throw Exception("Cannot modify ColumnDescription for column " + column_name + ": column name cannot be changed", ErrorCodes::LOGICAL_ERROR); } - /// ordinary + materialized. - NamesAndTypesList getAllPhysical() const; Names getNamesOfPhysical() const; bool hasPhysical(const String & column_name) const; NameAndTypePair getPhysical(const String & column_name) const; diff --git a/dbms/src/Storages/IStorage.h b/dbms/src/Storages/IStorage.h index 913b97a445b..8f1a7b06d9e 100644 --- a/dbms/src/Storages/IStorage.h +++ b/dbms/src/Storages/IStorage.h @@ -91,9 +91,9 @@ public: /// thread-unsafe part. lockStructure must be acquired virtual NameAndTypePair getColumn(const String & column_name) const; virtual bool hasColumn(const String & column_name) const; - Block getSampleBlock() const; - Block getSampleBlockNonMaterialized() const; - Block getSampleBlockForColumns(const Names & column_names) const; /// including virtual and alias columns. + Block getSampleBlock() const; /// ordinary + materialized. + Block getSampleBlockNonMaterialized() const; /// ordinary. + Block getSampleBlockForColumns(const Names & column_names) const; /// ordinary + materialized + aliases + virtuals. /// Verify that all the requested names are in the table and are set correctly: /// list of names is not empty and the names do not repeat. diff --git a/dbms/src/Storages/Kafka/KafkaBlockInputStream.cpp b/dbms/src/Storages/Kafka/KafkaBlockInputStream.cpp index 56b1db85a3f..abc4e702a6e 100644 --- a/dbms/src/Storages/Kafka/KafkaBlockInputStream.cpp +++ b/dbms/src/Storages/Kafka/KafkaBlockInputStream.cpp @@ -7,15 +7,15 @@ namespace DB { KafkaBlockInputStream::KafkaBlockInputStream( - StorageKafka & storage_, const Context & context_, const String & schema, size_t max_block_size_) - : storage(storage_), context(context_), max_block_size(max_block_size_) + StorageKafka & storage_, const Context & context_, const Names & columns, size_t max_block_size_) + : storage(storage_), context(context_), column_names(columns), max_block_size(max_block_size_) { context.setSetting("input_format_skip_unknown_fields", 1u); // Always skip unknown fields regardless of the context (JSON or TSKV) context.setSetting("input_format_allow_errors_ratio", 0.); - context.setSetting("input_format_allow_errors_num", storage.skip_broken); + context.setSetting("input_format_allow_errors_num", storage.skipBroken()); - if (!schema.empty()) - context.setSetting("format_schema", schema); + if (!storage.getSchemaName().empty()) + context.setSetting("format_schema", storage.getSchemaName()); } KafkaBlockInputStream::~KafkaBlockInputStream() @@ -29,6 +29,11 @@ KafkaBlockInputStream::~KafkaBlockInputStream() storage.pushBuffer(buffer); } +Block KafkaBlockInputStream::getHeader() const +{ + return storage.getSampleBlockForColumns(column_names); +} + void KafkaBlockInputStream::readPrefixImpl() { buffer = storage.tryClaimBuffer(context.getSettingsRef().queue_max_wait_ms.totalMilliseconds()); @@ -37,20 +42,30 @@ void KafkaBlockInputStream::readPrefixImpl() if (!buffer) buffer = storage.createBuffer(); - buffer->subBufferAs()->subscribe(storage.topics); + buffer->subBufferAs()->subscribe(storage.getTopics()); const auto & limits = getLimits(); const size_t poll_timeout = buffer->subBufferAs()->pollTimeout(); size_t rows_portion_size = poll_timeout ? std::min(max_block_size, limits.max_execution_time.totalMilliseconds() / poll_timeout) : max_block_size; rows_portion_size = std::max(rows_portion_size, 1ul); - auto child = FormatFactory::instance().getInput(storage.format_name, *buffer, storage.getSampleBlock(), context, max_block_size, rows_portion_size); + auto non_virtual_header = storage.getSampleBlockNonMaterialized(); /// FIXME: add materialized columns support + auto child = FormatFactory::instance().getInput( + storage.getFormatName(), *buffer, non_virtual_header, context, max_block_size, rows_portion_size); child->setLimits(limits); addChild(child); broken = true; } +Block KafkaBlockInputStream::readImpl() +{ + /// FIXME: materialize MATERIALIZED columns here. + Block block = children.back()->read(); + /// TODO: add virtual columns here + return block; +} + void KafkaBlockInputStream::readSuffixImpl() { buffer->subBufferAs()->commit(); diff --git a/dbms/src/Storages/Kafka/KafkaBlockInputStream.h b/dbms/src/Storages/Kafka/KafkaBlockInputStream.h index 1b6c8b8ae25..dcaec1f5066 100644 --- a/dbms/src/Storages/Kafka/KafkaBlockInputStream.h +++ b/dbms/src/Storages/Kafka/KafkaBlockInputStream.h @@ -11,19 +11,20 @@ namespace DB class KafkaBlockInputStream : public IBlockInputStream { public: - KafkaBlockInputStream(StorageKafka & storage_, const Context & context_, const String & schema, size_t max_block_size_); + KafkaBlockInputStream(StorageKafka & storage_, const Context & context_, const Names & columns, size_t max_block_size_); ~KafkaBlockInputStream() override; String getName() const override { return storage.getName(); } - Block readImpl() override { return children.back()->read(); } - Block getHeader() const override { return storage.getSampleBlock(); } + Block getHeader() const override; void readPrefixImpl() override; + Block readImpl() override; void readSuffixImpl() override; private: StorageKafka & storage; Context context; + Names column_names; UInt64 max_block_size; BufferPtr buffer; diff --git a/dbms/src/Storages/Kafka/ReadBufferFromKafkaConsumer.h b/dbms/src/Storages/Kafka/ReadBufferFromKafkaConsumer.h index 20a1c5830d7..9bb3fd473ab 100644 --- a/dbms/src/Storages/Kafka/ReadBufferFromKafkaConsumer.h +++ b/dbms/src/Storages/Kafka/ReadBufferFromKafkaConsumer.h @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include @@ -33,6 +34,11 @@ public: auto pollTimeout() { return poll_timeout; } + // Return values for the message that's being read. + String currentTopic() { return current[-1].get_topic(); } + String currentKey() { return current[-1].get_key(); } + auto currentOffset() { return current[-1].get_offset(); } + private: using Messages = std::vector; diff --git a/dbms/src/Storages/Kafka/StorageKafka.cpp b/dbms/src/Storages/Kafka/StorageKafka.cpp index b7bd6607836..79622b79856 100644 --- a/dbms/src/Storages/Kafka/StorageKafka.cpp +++ b/dbms/src/Storages/Kafka/StorageKafka.cpp @@ -108,7 +108,7 @@ StorageKafka::StorageKafka( BlockInputStreams StorageKafka::read( - const Names & /* column_names */, + const Names & column_names, const SelectQueryInfo & /* query_info */, const Context & context, QueryProcessingStage::Enum /* processed_stage */, @@ -127,8 +127,8 @@ BlockInputStreams StorageKafka::read( for (size_t i = 0; i < stream_count; ++i) { /// Use block size of 1, otherwise LIMIT won't work properly as it will buffer excess messages in the last block - /// TODO: that leads to awful performance. - streams.emplace_back(std::make_shared(*this, context, schema_name, 1)); + /// TODO: probably that leads to awful performance. + streams.emplace_back(std::make_shared(*this, context, column_names, 1)); } LOG_DEBUG(log, "Starting reading " << streams.size() << " streams"); @@ -182,46 +182,6 @@ void StorageKafka::updateDependencies() } -cppkafka::Configuration StorageKafka::createConsumerConfiguration() -{ - cppkafka::Configuration conf; - - LOG_TRACE(log, "Setting brokers: " << brokers); - conf.set("metadata.broker.list", brokers); - - LOG_TRACE(log, "Setting Group ID: " << group << " Client ID: clickhouse"); - conf.set("group.id", group); - - conf.set("client.id", VERSION_FULL); - - // If no offset stored for this group, read all messages from the start - conf.set("auto.offset.reset", "smallest"); - - // We manually commit offsets after a stream successfully finished - conf.set("enable.auto.commit", "false"); - - // Ignore EOF messages - conf.set("enable.partition.eof", "false"); - - // for debug logs inside rdkafka - // conf.set("debug", "consumer,cgrp,topic,fetch"); - - // Update consumer configuration from the configuration - const auto & config = global_context.getConfigRef(); - if (config.has(CONFIG_PREFIX)) - loadFromConfig(conf, config, CONFIG_PREFIX); - - // Update consumer topic-specific configuration - for (const auto & topic : topics) - { - const auto topic_config_key = CONFIG_PREFIX + "_" + topic; - if (config.has(topic_config_key)) - loadFromConfig(conf, config, topic_config_key); - } - - return conf; -} - BufferPtr StorageKafka::createBuffer() { // Create a consumer and subscribe to topics @@ -269,6 +229,47 @@ void StorageKafka::pushBuffer(BufferPtr buffer) semaphore.set(); } + +cppkafka::Configuration StorageKafka::createConsumerConfiguration() +{ + cppkafka::Configuration conf; + + LOG_TRACE(log, "Setting brokers: " << brokers); + conf.set("metadata.broker.list", brokers); + + LOG_TRACE(log, "Setting Group ID: " << group << " Client ID: clickhouse"); + conf.set("group.id", group); + + conf.set("client.id", VERSION_FULL); + + // If no offset stored for this group, read all messages from the start + conf.set("auto.offset.reset", "smallest"); + + // We manually commit offsets after a stream successfully finished + conf.set("enable.auto.commit", "false"); + + // Ignore EOF messages + conf.set("enable.partition.eof", "false"); + + // for debug logs inside rdkafka + // conf.set("debug", "consumer,cgrp,topic,fetch"); + + // Update consumer configuration from the configuration + const auto & config = global_context.getConfigRef(); + if (config.has(CONFIG_PREFIX)) + loadFromConfig(conf, config, CONFIG_PREFIX); + + // Update consumer topic-specific configuration + for (const auto & topic : topics) + { + const auto topic_config_key = CONFIG_PREFIX + "_" + topic; + if (config.has(topic_config_key)) + loadFromConfig(conf, config, topic_config_key); + } + + return conf; +} + bool StorageKafka::checkDependencies(const String & current_database_name, const String & current_table_name) { // Check if all dependencies are attached @@ -344,12 +345,16 @@ bool StorageKafka::streamToViews() if (block_size == 0) block_size = settings.max_block_size.value; + // Execute the query + InterpreterInsertQuery interpreter{insert, global_context}; + auto block_io = interpreter.execute(); + // Create a stream for each consumer and join them in a union stream BlockInputStreams streams; streams.reserve(num_created_consumers); for (size_t i = 0; i < num_created_consumers; ++i) { - auto stream = std::make_shared(*this, global_context, schema_name, block_size); + auto stream = std::make_shared(*this, global_context, block_io.out->getHeader().getNames(), block_size); streams.emplace_back(stream); // Limit read batch to maximum block size to allow DDL @@ -366,9 +371,6 @@ bool StorageKafka::streamToViews() else in = streams[0]; - // Execute the query - InterpreterInsertQuery interpreter{insert, global_context}; - auto block_io = interpreter.execute(); copyData(*in, *block_io.out, &stream_cancelled); // Check whether the limits were applied during query execution diff --git a/dbms/src/Storages/Kafka/StorageKafka.h b/dbms/src/Storages/Kafka/StorageKafka.h index ae9e9baa724..f9b6609def5 100644 --- a/dbms/src/Storages/Kafka/StorageKafka.h +++ b/dbms/src/Storages/Kafka/StorageKafka.h @@ -20,9 +20,6 @@ namespace DB */ class StorageKafka : public ext::shared_ptr_helper, public IStorage { - friend class KafkaBlockInputStream; - friend class KafkaBlockOutputStream; - public: std::string getName() const override { return "Kafka"; } std::string getTableName() const override { return table_name; } @@ -43,6 +40,27 @@ public: void updateDependencies() override; + BufferPtr createBuffer(); + BufferPtr claimBuffer(); + BufferPtr tryClaimBuffer(long wait_ms); + void pushBuffer(BufferPtr buf); + + const auto & getTopics() const { return topics; } + const auto & getFormatName() const { return format_name; } + const auto & getSchemaName() const { return schema_name; } + const auto & skipBroken() const { return skip_broken; } + +protected: + StorageKafka( + const std::string & table_name_, + const std::string & database_name_, + Context & context_, + const ColumnsDescription & columns_, + const String & brokers_, const String & group_, const Names & topics_, + const String & format_name_, char row_delimiter_, const String & schema_name_, + size_t num_consumers_, UInt64 max_block_size_, size_t skip_broken, + bool intermediate_commit_); + private: // Configuration and state String table_name; @@ -77,25 +95,10 @@ private: std::atomic stream_cancelled{false}; cppkafka::Configuration createConsumerConfiguration(); - BufferPtr createBuffer(); - BufferPtr claimBuffer(); - BufferPtr tryClaimBuffer(long wait_ms); - void pushBuffer(BufferPtr buf); void streamThread(); bool streamToViews(); bool checkDependencies(const String & database_name, const String & table_name); - -protected: - StorageKafka( - const std::string & table_name_, - const std::string & database_name_, - Context & context_, - const ColumnsDescription & columns_, - const String & brokers_, const String & group_, const Names & topics_, - const String & format_name_, char row_delimiter_, const String & schema_name_, - size_t num_consumers_, UInt64 max_block_size_, size_t skip_broken, - bool intermediate_commit_); }; }