2019-01-21 14:02:03 +00:00
|
|
|
#include <Storages/Kafka/KafkaBlockInputStream.h>
|
|
|
|
|
2019-05-23 13:20:25 +00:00
|
|
|
#include <DataStreams/ConvertingBlockInputStream.h>
|
|
|
|
#include <DataStreams/OneBlockInputStream.h>
|
2019-01-21 14:02:03 +00:00
|
|
|
#include <Formats/FormatFactory.h>
|
2019-01-30 17:41:06 +00:00
|
|
|
#include <Storages/Kafka/ReadBufferFromKafkaConsumer.h>
|
2019-01-21 14:02:03 +00:00
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
|
|
|
KafkaBlockInputStream::KafkaBlockInputStream(
|
2019-05-22 19:38:43 +00:00
|
|
|
StorageKafka & storage_, const Context & context_, const Names & columns, size_t max_block_size_)
|
|
|
|
: storage(storage_), context(context_), column_names(columns), max_block_size(max_block_size_)
|
2019-01-21 14:02:03 +00:00
|
|
|
{
|
2019-01-30 17:41:06 +00:00
|
|
|
context.setSetting("input_format_skip_unknown_fields", 1u); // Always skip unknown fields regardless of the context (JSON or TSKV)
|
2019-01-24 12:44:58 +00:00
|
|
|
context.setSetting("input_format_allow_errors_ratio", 0.);
|
2019-05-22 19:38:43 +00:00
|
|
|
context.setSetting("input_format_allow_errors_num", storage.skipBroken());
|
2019-01-21 14:02:03 +00:00
|
|
|
|
2019-05-22 19:38:43 +00:00
|
|
|
if (!storage.getSchemaName().empty())
|
|
|
|
context.setSetting("format_schema", storage.getSchemaName());
|
2019-05-23 11:15:18 +00:00
|
|
|
|
2019-08-06 14:18:37 +00:00
|
|
|
virtual_columns = storage.getSampleBlockForColumns({"_topic", "_key", "_offset", "_partition", "_timestamp"}).cloneEmptyColumns();
|
2019-01-21 14:02:03 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
KafkaBlockInputStream::~KafkaBlockInputStream()
|
|
|
|
{
|
2019-01-30 17:41:06 +00:00
|
|
|
if (!claimed)
|
2019-01-21 14:02:03 +00:00
|
|
|
return;
|
|
|
|
|
2019-01-30 17:41:06 +00:00
|
|
|
if (broken)
|
2019-07-16 18:34:47 +00:00
|
|
|
{
|
2019-04-22 13:23:05 +00:00
|
|
|
buffer->subBufferAs<ReadBufferFromKafkaConsumer>()->unsubscribe();
|
2019-07-16 15:27:42 +00:00
|
|
|
buffer->reset();
|
|
|
|
}
|
2019-01-21 14:02:03 +00:00
|
|
|
|
2019-08-20 11:17:57 +00:00
|
|
|
storage.pushReadBuffer(buffer);
|
2019-01-21 14:02:03 +00:00
|
|
|
}
|
|
|
|
|
2019-05-22 19:38:43 +00:00
|
|
|
Block KafkaBlockInputStream::getHeader() const
|
|
|
|
{
|
|
|
|
return storage.getSampleBlockForColumns(column_names);
|
|
|
|
}
|
|
|
|
|
2019-01-30 17:41:06 +00:00
|
|
|
void KafkaBlockInputStream::readPrefixImpl()
|
2019-01-21 14:02:03 +00:00
|
|
|
{
|
2019-08-28 15:24:23 +00:00
|
|
|
auto timeout = std::chrono::milliseconds(context.getSettingsRef().kafka_max_wait_ms.totalMilliseconds());
|
2019-08-20 11:17:57 +00:00
|
|
|
buffer = storage.popReadBuffer(timeout);
|
2019-04-22 13:23:05 +00:00
|
|
|
claimed = !!buffer;
|
2019-01-21 14:02:03 +00:00
|
|
|
|
2019-04-22 13:23:05 +00:00
|
|
|
if (!buffer)
|
2019-08-20 11:17:57 +00:00
|
|
|
return;
|
2019-01-21 14:02:03 +00:00
|
|
|
|
2019-05-22 19:38:43 +00:00
|
|
|
buffer->subBufferAs<ReadBufferFromKafkaConsumer>()->subscribe(storage.getTopics());
|
2019-01-30 17:41:06 +00:00
|
|
|
|
2019-08-03 11:02:40 +00:00
|
|
|
const auto & limits_ = getLimits();
|
2019-05-14 15:52:03 +00:00
|
|
|
const size_t poll_timeout = buffer->subBufferAs<ReadBufferFromKafkaConsumer>()->pollTimeout();
|
2019-08-03 11:02:40 +00:00
|
|
|
size_t rows_portion_size = poll_timeout ? std::min<size_t>(max_block_size, limits_.max_execution_time.totalMilliseconds() / poll_timeout) : max_block_size;
|
2019-05-14 15:52:03 +00:00
|
|
|
rows_portion_size = std::max(rows_portion_size, 1ul);
|
|
|
|
|
2019-05-22 19:38:43 +00:00
|
|
|
auto non_virtual_header = storage.getSampleBlockNonMaterialized(); /// FIXME: add materialized columns support
|
2019-05-23 13:20:25 +00:00
|
|
|
auto read_callback = [this]
|
2019-05-23 11:15:18 +00:00
|
|
|
{
|
|
|
|
const auto * sub_buffer = buffer->subBufferAs<ReadBufferFromKafkaConsumer>();
|
2019-07-22 11:32:11 +00:00
|
|
|
virtual_columns[0]->insert(sub_buffer->currentTopic()); // "topic"
|
|
|
|
virtual_columns[1]->insert(sub_buffer->currentKey()); // "key"
|
|
|
|
virtual_columns[2]->insert(sub_buffer->currentOffset()); // "offset"
|
|
|
|
virtual_columns[3]->insert(sub_buffer->currentPartition()); // "partition"
|
2019-08-06 14:18:37 +00:00
|
|
|
|
|
|
|
auto timestamp = sub_buffer->currentTimestamp();
|
|
|
|
if (timestamp)
|
|
|
|
virtual_columns[4]->insert(std::chrono::duration_cast<std::chrono::seconds>(timestamp->get_timestamp()).count()); // "timestamp"
|
2019-05-23 11:15:18 +00:00
|
|
|
};
|
|
|
|
|
2019-05-22 19:38:43 +00:00
|
|
|
auto child = FormatFactory::instance().getInput(
|
2019-05-23 13:20:25 +00:00
|
|
|
storage.getFormatName(), *buffer, non_virtual_header, context, max_block_size, rows_portion_size, read_callback);
|
2019-08-03 11:02:40 +00:00
|
|
|
child->setLimits(limits_);
|
2019-05-14 15:52:03 +00:00
|
|
|
addChild(child);
|
2019-01-30 17:41:06 +00:00
|
|
|
|
|
|
|
broken = true;
|
2019-01-21 14:02:03 +00:00
|
|
|
}
|
|
|
|
|
2019-05-22 19:38:43 +00:00
|
|
|
Block KafkaBlockInputStream::readImpl()
|
|
|
|
{
|
2019-08-20 11:17:57 +00:00
|
|
|
if (!buffer)
|
|
|
|
return Block();
|
|
|
|
|
2019-05-22 19:38:43 +00:00
|
|
|
Block block = children.back()->read();
|
2019-05-27 21:01:24 +00:00
|
|
|
if (!block)
|
|
|
|
return block;
|
|
|
|
|
2019-08-06 14:18:37 +00:00
|
|
|
Block virtual_block = storage.getSampleBlockForColumns({"_topic", "_key", "_offset", "_partition", "_timestamp"}).cloneWithColumns(std::move(virtual_columns));
|
|
|
|
virtual_columns = storage.getSampleBlockForColumns({"_topic", "_key", "_offset", "_partition", "_timestamp"}).cloneEmptyColumns();
|
2019-05-23 13:20:25 +00:00
|
|
|
|
|
|
|
for (const auto & column : virtual_block.getColumnsWithTypeAndName())
|
|
|
|
block.insert(column);
|
|
|
|
|
|
|
|
/// FIXME: materialize MATERIALIZED columns here.
|
|
|
|
|
|
|
|
return ConvertingBlockInputStream(
|
|
|
|
context, std::make_shared<OneBlockInputStream>(block), getHeader(), ConvertingBlockInputStream::MatchColumnsMode::Name)
|
|
|
|
.read();
|
2019-05-22 19:38:43 +00:00
|
|
|
}
|
|
|
|
|
2019-01-21 14:02:03 +00:00
|
|
|
void KafkaBlockInputStream::readSuffixImpl()
|
|
|
|
{
|
2019-08-20 11:17:57 +00:00
|
|
|
if (!buffer)
|
|
|
|
return;
|
|
|
|
|
2019-01-30 17:41:06 +00:00
|
|
|
buffer->subBufferAs<ReadBufferFromKafkaConsumer>()->commit();
|
2019-01-21 14:02:03 +00:00
|
|
|
|
2019-01-30 17:41:06 +00:00
|
|
|
broken = false;
|
2019-01-21 14:02:03 +00:00
|
|
|
}
|
|
|
|
|
2019-04-22 13:23:05 +00:00
|
|
|
}
|