ClickHouse/dbms/src/Storages/Kafka/KafkaBlockInputStream.cpp

201 lines
6.3 KiB
C++
Raw Normal View History

#include <Storages/Kafka/KafkaBlockInputStream.h>
#include <DataStreams/ConvertingBlockInputStream.h>
#include <DataStreams/OneBlockInputStream.h>
#include <Formats/FormatFactory.h>
#include <Storages/Kafka/ReadBufferFromKafkaConsumer.h>
#include <Processors/Formats/InputStreamFromInputFormat.h>
namespace DB
{
KafkaBlockInputStream::KafkaBlockInputStream(
StorageKafka & storage_, const Context & context_, const Names & columns, size_t max_block_size_, bool commit_in_suffix_)
: storage(storage_)
, context(context_)
, column_names(columns)
, max_block_size(max_block_size_)
, commit_in_suffix(commit_in_suffix_)
, non_virtual_header(storage.getSampleBlockNonMaterialized()) /// FIXME: add materialized columns support
, virtual_header(storage.getSampleBlockForColumns({"_topic", "_key", "_offset", "_partition", "_timestamp"}))
2019-11-29 06:21:31 +00:00
{
context.setSetting("input_format_skip_unknown_fields", 1u); // Always skip unknown fields regardless of the context (JSON or TSKV)
2019-01-24 12:44:58 +00:00
context.setSetting("input_format_allow_errors_ratio", 0.);
2019-05-22 19:38:43 +00:00
context.setSetting("input_format_allow_errors_num", storage.skipBroken());
2019-05-22 19:38:43 +00:00
if (!storage.getSchemaName().empty())
context.setSetting("format_schema", storage.getSchemaName());
}
KafkaBlockInputStream::~KafkaBlockInputStream()
{
if (!claimed)
return;
if (broken)
buffer->unsubscribe();
storage.pushReadBuffer(buffer);
}
2019-05-22 19:38:43 +00:00
Block KafkaBlockInputStream::getHeader() const
{
return storage.getSampleBlockForColumns(column_names);
}
void KafkaBlockInputStream::readPrefixImpl()
{
2019-08-28 15:24:23 +00:00
auto timeout = std::chrono::milliseconds(context.getSettingsRef().kafka_max_wait_ms.totalMilliseconds());
buffer = storage.popReadBuffer(timeout);
claimed = !!buffer;
if (!buffer)
return;
buffer->subscribe(storage.getTopics());
broken = true;
}
Block KafkaBlockInputStream::readImpl()
{
if (!buffer)
return Block();
MutableColumns result_columns = non_virtual_header.cloneEmptyColumns();
2019-11-29 06:21:31 +00:00
MutableColumns virtual_columns = virtual_header.cloneEmptyColumns();
auto input_format = FormatFactory::instance().getInputFormat(
2019-12-03 21:12:47 +00:00
storage.getFormatName(), *buffer, non_virtual_header, context, max_block_size);
InputPort port(input_format->getPort().getHeader(), input_format.get());
connect(input_format->getPort(), port);
port.setNeeded();
auto read_kafka_message = [&]
{
size_t new_rows = 0;
while (true)
{
auto status = input_format->prepare();
switch (status)
{
case IProcessor::Status::Ready:
input_format->work();
break;
case IProcessor::Status::Finished:
input_format->resetParser();
return new_rows;
case IProcessor::Status::PortFull:
{
auto chunk = port.pull();
2019-12-03 21:12:47 +00:00
// that was returning bad value before https://github.com/ClickHouse/ClickHouse/pull/8005
// if will be backported should go together with #8005
auto chunk_rows = chunk.getNumRows();
new_rows += chunk_rows;
auto columns = chunk.detachColumns();
for (size_t i = 0, s = columns.size(); i < s; ++i)
2019-12-03 21:12:47 +00:00
{
result_columns[i]->insertRangeFrom(*columns[i], 0, columns[i]->size());
2019-12-03 21:12:47 +00:00
}
break;
}
case IProcessor::Status::NeedData:
case IProcessor::Status::Async:
case IProcessor::Status::Wait:
case IProcessor::Status::ExpandPipeline:
throw Exception("Source processor returned status " + IProcessor::statusToName(status), ErrorCodes::LOGICAL_ERROR);
}
}
};
size_t total_rows = 0;
2019-12-03 21:12:47 +00:00
while (true)
{
2019-12-03 21:12:47 +00:00
buffer->allowNext();
// some formats (like RowBinaryWithNamesAndTypes / CSVWithNames)
// throw an exception from readPrefix when buffer in empty
if (buffer->eof())
break;
auto new_rows = read_kafka_message();
2019-12-03 21:12:47 +00:00
auto _topic = buffer->currentTopic();
auto _key = buffer->currentKey();
auto _offset = buffer->currentOffset();
auto _partition = buffer->currentPartition();
auto _timestamp_raw = buffer->currentTimestamp();
auto _timestamp = _timestamp_raw ? std::chrono::duration_cast<std::chrono::seconds>(_timestamp_raw->get_timestamp()).count()
: 0;
2019-12-05 21:21:15 +00:00
for (size_t i = 0; i < new_rows; ++i)
{
2019-12-03 21:12:47 +00:00
virtual_columns[0]->insert(_topic);
virtual_columns[1]->insert(_key);
virtual_columns[2]->insert(_offset);
virtual_columns[3]->insert(_partition);
2019-12-05 21:21:15 +00:00
if (_timestamp_raw)
{
virtual_columns[4]->insert(_timestamp);
}
else
{
virtual_columns[4]->insertDefault();
}
2019-12-03 21:12:47 +00:00
}
total_rows = total_rows + new_rows;
if (!new_rows || total_rows >= max_block_size || !checkTimeLimit())
break;
}
if (total_rows == 0)
return Block();
2019-12-03 21:12:47 +00:00
/// MATERIALIZED columns can be added here, but I think
// they are not needed here:
// and it's misleading to use them here,
// as columns 'materialized' that way stays 'ephemeral'
// i.e. will not be stored anythere
// IF needed any extra columns can be added using DEFAULT they can be added at MV level if needed,
auto result_block = non_virtual_header.cloneWithColumns(std::move(result_columns));
auto virtual_block = virtual_header.cloneWithColumns(std::move(virtual_columns));
for (const auto & column : virtual_block.getColumnsWithTypeAndName())
2019-11-29 06:21:31 +00:00
result_block.insert(column);
return ConvertingBlockInputStream(
context,
2019-11-29 06:21:31 +00:00
std::make_shared<OneBlockInputStream>(result_block),
getHeader(),
ConvertingBlockInputStream::MatchColumnsMode::Name)
.read();
2019-05-22 19:38:43 +00:00
}
void KafkaBlockInputStream::readSuffixImpl()
{
broken = false;
if (commit_in_suffix)
commit();
}
void KafkaBlockInputStream::commit()
{
if (!buffer)
return;
buffer->commit();
}
}