diff --git a/dbms/src/DataStreams/SquashingTransform.cpp b/dbms/src/DataStreams/SquashingTransform.cpp index 00e3a51582c..22f35b1fe8f 100644 --- a/dbms/src/DataStreams/SquashingTransform.cpp +++ b/dbms/src/DataStreams/SquashingTransform.cpp @@ -16,7 +16,7 @@ SquashingTransform::Result SquashingTransform::add(MutableColumns && columns) if (columns.empty()) return Result(std::move(accumulated_columns)); - /// Just read block is alredy enough. + /// Just read block is already enough. if (isEnoughSize(columns)) { /// If no accumulated data, return just read block. diff --git a/dbms/src/Formats/FormatFactory.cpp b/dbms/src/Formats/FormatFactory.cpp index c1798b2d84f..b3253d3cbf1 100644 --- a/dbms/src/Formats/FormatFactory.cpp +++ b/dbms/src/Formats/FormatFactory.cpp @@ -83,7 +83,6 @@ BlockInputStreamPtr FormatFactory::getInput( const Block & sample, const Context & context, UInt64 max_block_size, - UInt64 rows_portion_size, ReadCallback callback) const { if (name == "Native") @@ -98,11 +97,10 @@ BlockInputStreamPtr FormatFactory::getInput( const Settings & settings = context.getSettingsRef(); FormatSettings format_settings = getInputFormatSetting(settings); - return input_getter( - buf, sample, context, max_block_size, rows_portion_size, callback ? callback : ReadCallback(), format_settings); + return input_getter(buf, sample, context, max_block_size, callback ? callback : ReadCallback(), format_settings); } - auto format = getInputFormat(name, buf, sample, context, max_block_size, rows_portion_size, std::move(callback)); + auto format = getInputFormat(name, buf, sample, context, max_block_size, std::move(callback)); return std::make_shared(std::move(format)); } @@ -150,7 +148,6 @@ InputFormatPtr FormatFactory::getInputFormat( const Block & sample, const Context & context, UInt64 max_block_size, - UInt64 rows_portion_size, ReadCallback callback) const { const auto & input_getter = getCreators(name).input_processor_creator; @@ -164,7 +161,6 @@ InputFormatPtr FormatFactory::getInputFormat( params.max_block_size = max_block_size; params.allow_errors_num = format_settings.input_allow_errors_num; params.allow_errors_ratio = format_settings.input_allow_errors_ratio; - params.rows_portion_size = rows_portion_size; params.callback = std::move(callback); params.max_execution_time = settings.max_execution_time; params.timeout_overflow_mode = settings.timeout_overflow_mode; diff --git a/dbms/src/Formats/FormatFactory.h b/dbms/src/Formats/FormatFactory.h index 04e9ce22e09..1c6fbc1b97e 100644 --- a/dbms/src/Formats/FormatFactory.h +++ b/dbms/src/Formats/FormatFactory.h @@ -51,7 +51,6 @@ private: const Block & sample, const Context & context, UInt64 max_block_size, - UInt64 rows_portion_size, ReadCallback callback, const FormatSettings & settings)>; @@ -96,7 +95,6 @@ public: const Block & sample, const Context & context, UInt64 max_block_size, - UInt64 rows_portion_size = 0, ReadCallback callback = {}) const; BlockOutputStreamPtr getOutput(const String & name, WriteBuffer & buf, @@ -108,7 +106,6 @@ public: const Block & sample, const Context & context, UInt64 max_block_size, - UInt64 rows_portion_size = 0, ReadCallback callback = {}) const; OutputFormatPtr getOutputFormat( diff --git a/dbms/src/Formats/NativeFormat.cpp b/dbms/src/Formats/NativeFormat.cpp index f9063e0eced..11835c01123 100644 --- a/dbms/src/Formats/NativeFormat.cpp +++ b/dbms/src/Formats/NativeFormat.cpp @@ -13,7 +13,6 @@ void registerInputFormatNative(FormatFactory & factory) const Block & sample, const Context &, UInt64 /* max_block_size */, - UInt64 /* min_read_rows */, FormatFactory::ReadCallback /* callback */, const FormatSettings &) { diff --git a/dbms/src/Formats/tests/tab_separated_streams.cpp b/dbms/src/Formats/tests/tab_separated_streams.cpp index f05a83bc751..671043b9aac 100644 --- a/dbms/src/Formats/tests/tab_separated_streams.cpp +++ b/dbms/src/Formats/tests/tab_separated_streams.cpp @@ -39,7 +39,7 @@ try FormatSettings format_settings; - RowInputFormatParams params{DEFAULT_INSERT_BLOCK_SIZE, 0, 0, 0, []{}}; + RowInputFormatParams params{DEFAULT_INSERT_BLOCK_SIZE, 0, 0, []{}}; InputFormatPtr input_format = std::make_shared(sample, in_buf, params, false, false, format_settings); BlockInputStreamPtr block_input = std::make_shared(std::move(input_format)); diff --git a/dbms/src/Processors/Formats/IRowInputFormat.cpp b/dbms/src/Processors/Formats/IRowInputFormat.cpp index b45c714ea07..a63fd80a087 100644 --- a/dbms/src/Processors/Formats/IRowInputFormat.cpp +++ b/dbms/src/Processors/Formats/IRowInputFormat.cpp @@ -20,8 +20,10 @@ namespace ErrorCodes extern const int TIMEOUT_EXCEEDED; } +namespace +{ -static bool isParseError(int code) +bool isParseError(int code) { return code == ErrorCodes::CANNOT_PARSE_INPUT_ASSERTION_FAILED || code == ErrorCodes::CANNOT_PARSE_QUOTED_STRING @@ -33,34 +35,8 @@ static bool isParseError(int code) || code == ErrorCodes::TOO_LARGE_STRING_SIZE; } - -static bool handleOverflowMode(OverflowMode mode, const String & message, int code) -{ - switch (mode) - { - case OverflowMode::THROW: - throw Exception(message, code); - case OverflowMode::BREAK: - return false; - default: - throw Exception("Logical error: unknown overflow mode", ErrorCodes::LOGICAL_ERROR); - } } - -static bool checkTimeLimit(const IRowInputFormat::Params & params, const Stopwatch & stopwatch) -{ - if (params.max_execution_time != 0 - && stopwatch.elapsed() > static_cast(params.max_execution_time.totalMicroseconds()) * 1000) - return handleOverflowMode(params.timeout_overflow_mode, - "Timeout exceeded: elapsed " + toString(stopwatch.elapsedSeconds()) - + " seconds, maximum: " + toString(params.max_execution_time.totalMicroseconds() / 1000000.0), - ErrorCodes::TIMEOUT_EXCEEDED); - - return true; -} - - Chunk IRowInputFormat::generate() { if (total_rows == 0) @@ -76,15 +52,8 @@ Chunk IRowInputFormat::generate() try { - for (size_t rows = 0, batch = 0; rows < params.max_block_size; ++rows, ++batch) + for (size_t rows = 0; rows < params.max_block_size; ++rows) { - if (params.rows_portion_size && batch == params.rows_portion_size) - { - batch = 0; - if (!checkTimeLimit(params, total_stopwatch) || isCancelled()) - break; - } - try { ++total_rows; diff --git a/dbms/src/Processors/Formats/IRowInputFormat.h b/dbms/src/Processors/Formats/IRowInputFormat.h index 72a6c813701..ff6c60b3999 100644 --- a/dbms/src/Processors/Formats/IRowInputFormat.h +++ b/dbms/src/Processors/Formats/IRowInputFormat.h @@ -27,8 +27,6 @@ struct RowInputFormatParams UInt64 allow_errors_num; Float64 allow_errors_ratio; - UInt64 rows_portion_size; - using ReadCallback = std::function; ReadCallback callback; @@ -85,4 +83,3 @@ private: }; } - diff --git a/dbms/src/Storages/Kafka/KafkaBlockInputStream.cpp b/dbms/src/Storages/Kafka/KafkaBlockInputStream.cpp index 29adb061e29..adfa5300435 100644 --- a/dbms/src/Storages/Kafka/KafkaBlockInputStream.cpp +++ b/dbms/src/Storages/Kafka/KafkaBlockInputStream.cpp @@ -49,10 +49,13 @@ void KafkaBlockInputStream::readPrefixImpl() buffer->subscribe(storage.getTopics()); - const auto & limits_ = getLimits(); - const size_t poll_timeout = buffer->pollTimeout(); - size_t rows_portion_size = poll_timeout ? std::min(max_block_size, limits_.max_execution_time.totalMilliseconds() / poll_timeout) : max_block_size; - rows_portion_size = std::max(rows_portion_size, 1ul); + broken = true; +} + +Block KafkaBlockInputStream::readImpl() +{ + if (!buffer) + return Block(); auto non_virtual_header = storage.getSampleBlockNonMaterialized(); /// FIXME: add materialized columns support auto read_callback = [this] @@ -67,33 +70,72 @@ void KafkaBlockInputStream::readPrefixImpl() virtual_columns[4]->insert(std::chrono::duration_cast(timestamp->get_timestamp()).count()); // "timestamp" }; - auto child = FormatFactory::instance().getInput( - storage.getFormatName(), *buffer, non_virtual_header, context, max_block_size, rows_portion_size, read_callback); - child->setLimits(limits_); - addChild(child); + auto merge_blocks = [] (Block & block1, Block && block2) + { + if (!block1) + { + // Need to make sure that resulting block has the same structure + block1 = std::move(block2); + return; + } - broken = true; -} + if (!block2) + return; -Block KafkaBlockInputStream::readImpl() -{ - if (!buffer) + auto columns1 = block1.mutateColumns(); + auto columns2 = block2.mutateColumns(); + for (size_t i = 0, s = columns1.size(); i < s; ++i) + columns1[i]->insertRangeFrom(*columns2[i], 0, columns2[i]->size()); + block1.setColumns(std::move(columns1)); + }; + + auto read_kafka_message = [&, this] + { + Block result; + auto child = FormatFactory::instance().getInput( + storage.getFormatName(), *buffer, non_virtual_header, context, max_block_size, read_callback); + const auto virtual_header = storage.getSampleBlockForColumns({"_topic", "_key", "_offset", "_partition", "_timestamp"}); + + while (auto block = child->read()) + { + auto virtual_block = virtual_header.cloneWithColumns(std::move(virtual_columns)); + virtual_columns = virtual_header.cloneEmptyColumns(); + + for (const auto & column : virtual_block.getColumnsWithTypeAndName()) + block.insert(column); + + /// FIXME: materialize MATERIALIZED columns here. + + merge_blocks(result, std::move(block)); + } + + return result; + }; + + Block single_block; + + UInt64 total_rows = 0; + while (total_rows < max_block_size) + { + auto new_block = read_kafka_message(); + auto new_rows = new_block.rows(); + total_rows += new_rows; + merge_blocks(single_block, std::move(new_block)); + + buffer->allowNext(); + + if (!new_rows || !checkTimeLimit()) + break; + } + + if (!single_block) return Block(); - Block block = children.back()->read(); - if (!block) - return block; - - Block virtual_block = storage.getSampleBlockForColumns({"_topic", "_key", "_offset", "_partition", "_timestamp"}).cloneWithColumns(std::move(virtual_columns)); - virtual_columns = storage.getSampleBlockForColumns({"_topic", "_key", "_offset", "_partition", "_timestamp"}).cloneEmptyColumns(); - - for (const auto & column : virtual_block.getColumnsWithTypeAndName()) - block.insert(column); - - /// FIXME: materialize MATERIALIZED columns here. - return ConvertingBlockInputStream( - context, std::make_shared(block), getHeader(), ConvertingBlockInputStream::MatchColumnsMode::Name) + context, + std::make_shared(single_block), + getHeader(), + ConvertingBlockInputStream::MatchColumnsMode::Name) .read(); } diff --git a/dbms/src/Storages/Kafka/ReadBufferFromKafkaConsumer.cpp b/dbms/src/Storages/Kafka/ReadBufferFromKafkaConsumer.cpp index 083b471d4f1..9a3bd73a6b2 100644 --- a/dbms/src/Storages/Kafka/ReadBufferFromKafkaConsumer.cpp +++ b/dbms/src/Storages/Kafka/ReadBufferFromKafkaConsumer.cpp @@ -13,7 +13,6 @@ ReadBufferFromKafkaConsumer::ReadBufferFromKafkaConsumer( size_t max_batch_size, size_t poll_timeout_, bool intermediate_commit_, - char delimiter_, const std::atomic & stopped_) : ReadBuffer(nullptr, 0) , consumer(consumer_) @@ -21,7 +20,6 @@ ReadBufferFromKafkaConsumer::ReadBufferFromKafkaConsumer( , batch_size(max_batch_size) , poll_timeout(poll_timeout_) , intermediate_commit(intermediate_commit_) - , delimiter(delimiter_) , stopped(stopped_) , current(messages.begin()) { @@ -140,16 +138,9 @@ bool ReadBufferFromKafkaConsumer::nextImpl() /// NOTE: ReadBuffer was implemented with an immutable underlying contents in mind. /// If we failed to poll any message once - don't try again. /// Otherwise, the |poll_timeout| expectations get flawn. - if (stalled || stopped) + if (stalled || stopped || !allowed) return false; - if (put_delimiter) - { - BufferBase::set(&delimiter, 1, 0); - put_delimiter = false; - return true; - } - if (current == messages.end()) { if (intermediate_commit) @@ -181,7 +172,7 @@ bool ReadBufferFromKafkaConsumer::nextImpl() // XXX: very fishy place with const casting. auto new_position = reinterpret_cast(const_cast(current->get_payload().get_data())); BufferBase::set(new_position, current->get_payload().get_size(), 0); - put_delimiter = (delimiter != 0); + allowed = false; /// Since we can poll more messages than we already processed - commit only processed messages. consumer->store_offset(*current); diff --git a/dbms/src/Storages/Kafka/ReadBufferFromKafkaConsumer.h b/dbms/src/Storages/Kafka/ReadBufferFromKafkaConsumer.h index d5d847dd153..8c2fcd3c7bb 100644 --- a/dbms/src/Storages/Kafka/ReadBufferFromKafkaConsumer.h +++ b/dbms/src/Storages/Kafka/ReadBufferFromKafkaConsumer.h @@ -25,10 +25,10 @@ public: size_t max_batch_size, size_t poll_timeout_, bool intermediate_commit_, - char delimiter_, const std::atomic & stopped_); ~ReadBufferFromKafkaConsumer() override; + void allowNext() { allowed = true; } // Allow to read next message. void commit(); // Commit all processed messages. void subscribe(const Names & topics); // Subscribe internal consumer to topics. void unsubscribe(); // Unsubscribe internal consumer in case of failure. @@ -51,9 +51,7 @@ private: const size_t poll_timeout = 0; bool stalled = false; bool intermediate_commit = true; - - char delimiter; - bool put_delimiter = false; + bool allowed = true; const std::atomic & stopped; diff --git a/dbms/src/Storages/Kafka/StorageKafka.cpp b/dbms/src/Storages/Kafka/StorageKafka.cpp index ed067993a18..c27fdaf4fe7 100644 --- a/dbms/src/Storages/Kafka/StorageKafka.cpp +++ b/dbms/src/Storages/Kafka/StorageKafka.cpp @@ -278,7 +278,7 @@ ConsumerBufferPtr StorageKafka::createReadBuffer() size_t poll_timeout = settings.stream_poll_timeout_ms.totalMilliseconds(); /// NOTE: we pass |stream_cancelled| by reference here, so the buffers should not outlive the storage. - return std::make_shared(consumer, log, batch_size, poll_timeout, intermediate_commit, row_delimiter, stream_cancelled); + return std::make_shared(consumer, log, batch_size, poll_timeout, intermediate_commit, stream_cancelled); }