2019-01-21 14:02:03 +00:00
|
|
|
#include <Storages/Kafka/KafkaBlockInputStream.h>
|
|
|
|
|
2019-05-23 13:20:25 +00:00
|
|
|
#include <DataStreams/ConvertingBlockInputStream.h>
|
|
|
|
#include <DataStreams/OneBlockInputStream.h>
|
2019-01-21 14:02:03 +00:00
|
|
|
#include <Formats/FormatFactory.h>
|
2019-01-30 17:41:06 +00:00
|
|
|
#include <Storages/Kafka/ReadBufferFromKafkaConsumer.h>
|
2019-11-26 23:46:19 +00:00
|
|
|
#include <Processors/Formats/InputStreamFromInputFormat.h>
|
2019-01-21 14:02:03 +00:00
|
|
|
|
2019-11-28 23:06:03 +00:00
|
|
|
|
2019-01-21 14:02:03 +00:00
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
KafkaBlockInputStream::KafkaBlockInputStream(
|
2019-10-02 21:17:19 +00:00
|
|
|
StorageKafka & storage_, const Context & context_, const Names & columns, size_t max_block_size_, bool commit_in_suffix_)
|
2019-11-01 11:34:29 +00:00
|
|
|
: storage(storage_)
|
|
|
|
, context(context_)
|
|
|
|
, column_names(columns)
|
|
|
|
, max_block_size(max_block_size_)
|
|
|
|
, commit_in_suffix(commit_in_suffix_)
|
|
|
|
, non_virtual_header(storage.getSampleBlockNonMaterialized()) /// FIXME: add materialized columns support
|
|
|
|
, virtual_header(storage.getSampleBlockForColumns({"_topic", "_key", "_offset", "_partition", "_timestamp"}))
|
2019-11-29 06:21:31 +00:00
|
|
|
|
2019-01-21 14:02:03 +00:00
|
|
|
{
|
2019-01-30 17:41:06 +00:00
|
|
|
context.setSetting("input_format_skip_unknown_fields", 1u); // Always skip unknown fields regardless of the context (JSON or TSKV)
|
2019-01-24 12:44:58 +00:00
|
|
|
context.setSetting("input_format_allow_errors_ratio", 0.);
|
2019-05-22 19:38:43 +00:00
|
|
|
context.setSetting("input_format_allow_errors_num", storage.skipBroken());
|
2019-01-21 14:02:03 +00:00
|
|
|
|
2019-05-22 19:38:43 +00:00
|
|
|
if (!storage.getSchemaName().empty())
|
|
|
|
context.setSetting("format_schema", storage.getSchemaName());
|
2019-01-21 14:02:03 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
KafkaBlockInputStream::~KafkaBlockInputStream()
|
|
|
|
{
|
2019-01-30 17:41:06 +00:00
|
|
|
if (!claimed)
|
2019-01-21 14:02:03 +00:00
|
|
|
return;
|
|
|
|
|
2019-01-30 17:41:06 +00:00
|
|
|
if (broken)
|
2019-09-04 21:25:33 +00:00
|
|
|
buffer->unsubscribe();
|
2019-01-21 14:02:03 +00:00
|
|
|
|
2019-08-20 11:17:57 +00:00
|
|
|
storage.pushReadBuffer(buffer);
|
2019-01-21 14:02:03 +00:00
|
|
|
}
|
|
|
|
|
2019-05-22 19:38:43 +00:00
|
|
|
Block KafkaBlockInputStream::getHeader() const
|
|
|
|
{
|
|
|
|
return storage.getSampleBlockForColumns(column_names);
|
|
|
|
}
|
|
|
|
|
2019-01-30 17:41:06 +00:00
|
|
|
void KafkaBlockInputStream::readPrefixImpl()
|
2019-01-21 14:02:03 +00:00
|
|
|
{
|
2019-08-28 15:24:23 +00:00
|
|
|
auto timeout = std::chrono::milliseconds(context.getSettingsRef().kafka_max_wait_ms.totalMilliseconds());
|
2019-08-20 11:17:57 +00:00
|
|
|
buffer = storage.popReadBuffer(timeout);
|
2019-04-22 13:23:05 +00:00
|
|
|
claimed = !!buffer;
|
2019-01-21 14:02:03 +00:00
|
|
|
|
2019-04-22 13:23:05 +00:00
|
|
|
if (!buffer)
|
2019-08-20 11:17:57 +00:00
|
|
|
return;
|
2019-01-21 14:02:03 +00:00
|
|
|
|
2019-09-04 21:25:33 +00:00
|
|
|
buffer->subscribe(storage.getTopics());
|
2019-01-30 17:41:06 +00:00
|
|
|
|
2019-09-20 12:12:32 +00:00
|
|
|
broken = true;
|
|
|
|
}
|
|
|
|
|
|
|
|
Block KafkaBlockInputStream::readImpl()
|
|
|
|
{
|
|
|
|
if (!buffer)
|
|
|
|
return Block();
|
2019-05-14 15:52:03 +00:00
|
|
|
|
2019-11-29 10:46:25 +00:00
|
|
|
MutableColumns result_columns = non_virtual_header.cloneEmptyColumns();
|
2019-11-29 06:21:31 +00:00
|
|
|
MutableColumns virtual_columns = virtual_header.cloneEmptyColumns();
|
|
|
|
|
|
|
|
auto read_callback = [&]
|
2019-05-23 11:15:18 +00:00
|
|
|
{
|
2019-09-04 21:25:33 +00:00
|
|
|
virtual_columns[0]->insert(buffer->currentTopic()); // "topic"
|
|
|
|
virtual_columns[1]->insert(buffer->currentKey()); // "key"
|
|
|
|
virtual_columns[2]->insert(buffer->currentOffset()); // "offset"
|
|
|
|
virtual_columns[3]->insert(buffer->currentPartition()); // "partition"
|
2019-08-06 14:18:37 +00:00
|
|
|
|
2019-09-04 21:25:33 +00:00
|
|
|
auto timestamp = buffer->currentTimestamp();
|
2019-08-06 14:18:37 +00:00
|
|
|
if (timestamp)
|
|
|
|
virtual_columns[4]->insert(std::chrono::duration_cast<std::chrono::seconds>(timestamp->get_timestamp()).count()); // "timestamp"
|
2019-05-23 11:15:18 +00:00
|
|
|
};
|
|
|
|
|
2019-11-28 16:43:06 +00:00
|
|
|
auto input_format = FormatFactory::instance().getInputFormat(
|
2019-11-26 23:46:19 +00:00
|
|
|
storage.getFormatName(), *buffer, non_virtual_header, context, max_block_size, read_callback);
|
|
|
|
|
2019-11-28 16:43:06 +00:00
|
|
|
InputPort port(input_format->getPort().getHeader(), input_format.get());
|
|
|
|
connect(input_format->getPort(), port);
|
|
|
|
port.setNeeded();
|
2019-11-26 23:46:19 +00:00
|
|
|
|
2019-11-28 23:06:03 +00:00
|
|
|
auto read_kafka_message = [&]
|
2019-09-20 12:12:32 +00:00
|
|
|
{
|
2019-11-28 23:06:03 +00:00
|
|
|
size_t new_rows = 0;
|
2019-01-21 14:02:03 +00:00
|
|
|
|
2019-11-28 16:43:06 +00:00
|
|
|
while (true)
|
2019-09-20 12:12:32 +00:00
|
|
|
{
|
2019-11-28 16:43:06 +00:00
|
|
|
auto status = input_format->prepare();
|
|
|
|
|
|
|
|
switch (status)
|
|
|
|
{
|
|
|
|
case IProcessor::Status::Ready:
|
|
|
|
input_format->work();
|
|
|
|
break;
|
|
|
|
|
|
|
|
case IProcessor::Status::Finished:
|
|
|
|
input_format->resetParser();
|
2019-11-28 23:06:03 +00:00
|
|
|
return new_rows;
|
2019-11-28 16:43:06 +00:00
|
|
|
|
|
|
|
case IProcessor::Status::PortFull:
|
|
|
|
{
|
2019-11-29 10:46:25 +00:00
|
|
|
auto chunk = port.pull();
|
|
|
|
new_rows = new_rows + chunk.getNumRows();
|
2019-11-28 16:43:06 +00:00
|
|
|
|
|
|
|
/// FIXME: materialize MATERIALIZED columns here.
|
2019-11-29 10:46:25 +00:00
|
|
|
|
|
|
|
auto columns = chunk.detachColumns();
|
|
|
|
for (size_t i = 0, s = columns.size(); i < s; ++i)
|
|
|
|
result_columns[i]->insertRangeFrom(*columns[i], 0, columns[i]->size());
|
2019-11-28 16:43:06 +00:00
|
|
|
break;
|
|
|
|
}
|
|
|
|
case IProcessor::Status::NeedData:
|
|
|
|
case IProcessor::Status::Async:
|
|
|
|
case IProcessor::Status::Wait:
|
|
|
|
case IProcessor::Status::ExpandPipeline:
|
|
|
|
throw Exception("Source processor returned status " + IProcessor::statusToName(status), ErrorCodes::LOGICAL_ERROR);
|
|
|
|
}
|
2019-09-20 12:12:32 +00:00
|
|
|
}
|
|
|
|
};
|
|
|
|
|
2019-11-28 23:06:03 +00:00
|
|
|
size_t total_rows = 0;
|
2019-12-03 10:44:27 +00:00
|
|
|
|
|
|
|
buffer->allowNext();
|
|
|
|
// some formats (like RowBinaryWithNamesAndTypes / CSVWithNames)
|
|
|
|
// throw an exception when buffer in empty, so just
|
|
|
|
while (!buffer->eof())
|
2019-09-20 12:12:32 +00:00
|
|
|
{
|
2019-11-28 23:06:03 +00:00
|
|
|
auto new_rows = read_kafka_message();
|
|
|
|
total_rows = total_rows + new_rows;
|
2019-09-20 12:12:32 +00:00
|
|
|
buffer->allowNext();
|
2019-12-03 10:44:27 +00:00
|
|
|
if (!new_rows || total_rows >= max_block_size || !checkTimeLimit())
|
2019-09-20 12:12:32 +00:00
|
|
|
break;
|
|
|
|
}
|
2019-05-23 13:20:25 +00:00
|
|
|
|
2019-11-28 23:06:03 +00:00
|
|
|
if (total_rows == 0)
|
2019-09-20 12:12:32 +00:00
|
|
|
return Block();
|
2019-05-23 13:20:25 +00:00
|
|
|
|
2019-11-29 10:46:25 +00:00
|
|
|
auto result_block = non_virtual_header.cloneWithColumns(std::move(result_columns));
|
2019-11-28 23:06:03 +00:00
|
|
|
auto virtual_block = virtual_header.cloneWithColumns(std::move(virtual_columns));
|
|
|
|
|
|
|
|
for (const auto & column : virtual_block.getColumnsWithTypeAndName())
|
2019-11-29 06:21:31 +00:00
|
|
|
result_block.insert(column);
|
2019-11-28 23:06:03 +00:00
|
|
|
|
2019-05-23 13:20:25 +00:00
|
|
|
return ConvertingBlockInputStream(
|
2019-09-20 12:12:32 +00:00
|
|
|
context,
|
2019-11-29 06:21:31 +00:00
|
|
|
std::make_shared<OneBlockInputStream>(result_block),
|
2019-09-20 12:12:32 +00:00
|
|
|
getHeader(),
|
|
|
|
ConvertingBlockInputStream::MatchColumnsMode::Name)
|
2019-05-23 13:20:25 +00:00
|
|
|
.read();
|
2019-05-22 19:38:43 +00:00
|
|
|
}
|
|
|
|
|
2019-01-21 14:02:03 +00:00
|
|
|
void KafkaBlockInputStream::readSuffixImpl()
|
2019-10-02 21:17:19 +00:00
|
|
|
{
|
|
|
|
broken = false;
|
|
|
|
|
|
|
|
if (commit_in_suffix)
|
|
|
|
commit();
|
|
|
|
}
|
|
|
|
|
|
|
|
void KafkaBlockInputStream::commit()
|
2019-01-21 14:02:03 +00:00
|
|
|
{
|
2019-08-20 11:17:57 +00:00
|
|
|
if (!buffer)
|
|
|
|
return;
|
|
|
|
|
2019-09-04 21:25:33 +00:00
|
|
|
buffer->commit();
|
2019-01-21 14:02:03 +00:00
|
|
|
}
|
|
|
|
|
2019-04-22 13:23:05 +00:00
|
|
|
}
|