Merge pull request #7935 from filimonov/kafka_perf

[wip] attempt to improve kafka parsing performance
This commit is contained in:
alexey-milovidov 2019-12-03 03:52:40 +03:00 committed by GitHub
commit 87adf355e8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 234 additions and 57 deletions

View File

@ -57,6 +57,13 @@ NativeBlockInputStream::NativeBlockInputStream(ReadBuffer & istr_, UInt64 server
}
}
void NativeBlockInputStream::resetParser()
{
istr_concrete = nullptr;
use_index = false;
header.clear();
avg_value_size_hints.clear();
}
void NativeBlockInputStream::readData(const IDataType & type, IColumn & column, ReadBuffer & istr, size_t rows, double avg_value_size_hint)
{

View File

@ -78,6 +78,9 @@ public:
Block getHeader() const override;
void resetParser();
protected:
Block readImpl() override;

View File

@ -0,0 +1,30 @@
#include <Processors/Formats/IInputFormat.h>
#include <IO/ReadBuffer.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
IInputFormat::IInputFormat(Block header, ReadBuffer & in_)
: ISource(std::move(header)), in(in_)
{
}
void IInputFormat::resetParser()
{
if (in.hasPendingData())
throw Exception("Unread data in IInputFormat::resetParser. Most likely it's a bug.", ErrorCodes::LOGICAL_ERROR);
// those are protected attributes from ISource (I didn't want to propagate resetParser up there)
finished = false;
got_exception = false;
getPort().getInputPort().reopen();
}
}

View File

@ -23,10 +23,15 @@ protected:
#pragma GCC diagnostic pop
public:
IInputFormat(Block header, ReadBuffer & in_)
: ISource(std::move(header)), in(in_)
{
}
IInputFormat(Block header, ReadBuffer & in_);
/** In some usecase (hello Kafka) we need to read a lot of tiny streams in exactly the same format.
* The recreating of parser for each small stream takes too long, so we introduce a method
* resetParser() which allow to reset the state of parser to continure reading of
* source stream w/o recreating that.
* That should be called after current buffer was fully read.
*/
virtual void resetParser();
virtual const BlockMissingValues & getMissingValues() const
{

View File

@ -159,4 +159,13 @@ void IRowInputFormat::syncAfterError()
throw Exception("Method syncAfterError is not implemented for input format", ErrorCodes::NOT_IMPLEMENTED);
}
void IRowInputFormat::resetParser()
{
IInputFormat::resetParser();
total_rows = 0;
num_errors = 0;
block_missing_values.clear();
}
}

View File

@ -53,6 +53,8 @@ public:
Chunk generate() override;
void resetParser() override;
protected:
/** Read next row and append it to the columns.
* If no more rows - return false.

View File

@ -405,6 +405,14 @@ bool CSVRowInputFormat::readField(IColumn & column, const DataTypePtr & type, bo
}
}
void CSVRowInputFormat::resetParser()
{
RowInputFormatWithDiagnosticInfo::resetParser();
column_indexes_for_input_fields.clear();
read_columns.clear();
have_always_default_columns = false;
}
void registerInputFormatProcessorCSV(FormatFactory & factory)
{

View File

@ -28,6 +28,7 @@ public:
void readPrefix() override;
bool allowSyncAfterError() const override { return true; }
void syncAfterError() override;
void resetParser() override;
private:
bool with_names;

View File

@ -256,6 +256,15 @@ void JSONEachRowRowInputFormat::syncAfterError()
skipToUnescapedNextLineOrEOF(in);
}
void JSONEachRowRowInputFormat::resetParser()
{
IRowInputFormat::resetParser();
nested_prefix_length = 0;
read_columns.clear();
seen_columns.clear();
prev_positions.clear();
}
void registerInputFormatProcessorJSONEachRow(FormatFactory & factory)
{

View File

@ -27,6 +27,7 @@ public:
bool readRow(MutableColumns & columns, RowReadExtension & ext) override;
bool allowSyncAfterError() const override { return true; }
void syncAfterError() override;
void resetParser() override;
private:
const String & columnName(size_t i) const;

View File

@ -20,6 +20,15 @@ public:
String getName() const override { return "NativeInputFormatFromNativeBlockInputStream"; }
protected:
void resetParser() override
{
IInputFormat::resetParser();
stream->resetParser();
read_prefix = false;
read_suffix = false;
}
Chunk generate() override
{
/// TODO: do something with totals and extremes.

View File

@ -62,6 +62,16 @@ namespace DB
return res;
}
void ORCBlockInputFormat::resetParser()
{
IInputFormat::resetParser();
file_reader.reset();
file_data.clear();
row_group_total = 0;
row_group_current = 0;
}
void registerInputFormatProcessorORC(FormatFactory &factory)
{
factory.registerInputFormatProcessor(

View File

@ -21,6 +21,8 @@ public:
String getName() const override { return "ORCBlockInputFormat"; }
void resetParser() override;
protected:
Chunk generate() override;

View File

@ -63,6 +63,17 @@ namespace DB
return res;
}
void ParquetBlockInputFormat::resetParser()
{
IInputFormat::resetParser();
file_reader.reset();
file_data.clear();
buffer.reset();
row_group_total = 0;
row_group_current = 0;
}
void registerInputFormatProcessorParquet(FormatFactory &factory)
{
factory.registerInputFormatProcessor(

View File

@ -18,6 +18,9 @@ class ParquetBlockInputFormat: public IInputFormat
public:
ParquetBlockInputFormat(ReadBuffer & in_, Block header_, const Context & context_);
void resetParser() override;
String getName() const override { return "ParquetBlockInputFormat"; }
protected:

View File

@ -65,7 +65,6 @@ void ProtobufRowInputFormat::syncAfterError()
reader.endMessage(true);
}
void registerInputFormatProcessorProtobuf(FormatFactory & factory)
{
factory.registerInputFormatProcessor("Protobuf", [](

View File

@ -197,6 +197,14 @@ void TSKVRowInputFormat::syncAfterError()
}
void TSKVRowInputFormat::resetParser()
{
IRowInputFormat::resetParser();
read_columns.clear();
seen_columns.clear();
name_buf.clear();
}
void registerInputFormatProcessorTSKV(FormatFactory & factory)
{
factory.registerInputFormatProcessor("TSKV", [](

View File

@ -30,6 +30,8 @@ public:
bool readRow(MutableColumns & columns, RowReadExtension &) override;
bool allowSyncAfterError() const override { return true; }
void syncAfterError() override;
void resetParser() override;
private:
const FormatSettings format_settings;

View File

@ -341,6 +341,13 @@ void TabSeparatedRowInputFormat::syncAfterError()
skipToUnescapedNextLineOrEOF(in);
}
void TabSeparatedRowInputFormat::resetParser()
{
RowInputFormatWithDiagnosticInfo::resetParser();
column_indexes_for_input_fields.clear();
read_columns.clear();
columns_to_fill_with_default_values.clear();
}
void registerInputFormatProcessorTabSeparated(FormatFactory & factory)
{

View File

@ -26,6 +26,8 @@ public:
bool allowSyncAfterError() const override { return true; }
void syncAfterError() override;
void resetParser() override;
private:
bool with_names;
bool with_types;

View File

@ -496,6 +496,11 @@ void TemplateRowInputFormat::throwUnexpectedEof()
ErrorCodes::CANNOT_READ_ALL_DATA);
}
void TemplateRowInputFormat::resetParser()
{
RowInputFormatWithDiagnosticInfo::resetParser();
end_of_stream = false;
}
void registerInputFormatProcessorTemplate(FormatFactory & factory)
{

View File

@ -28,6 +28,8 @@ public:
bool allowSyncAfterError() const override;
void syncAfterError() override;
void resetParser() override;
private:
bool deserializeField(const DataTypePtr & type, IColumn & column, size_t file_column);
void skipField(ColumnFormat col_format);

View File

@ -411,6 +411,13 @@ void ValuesBlockInputFormat::readSuffix()
throw Exception("Unread data in PeekableReadBuffer will be lost. Most likely it's a bug.", ErrorCodes::LOGICAL_ERROR);
}
void ValuesBlockInputFormat::resetParser()
{
IInputFormat::resetParser();
// I'm not resetting parser modes here.
// There is a good chance that all messages have the same format.
total_rows = 0;
}
void registerInputFormatProcessorValues(FormatFactory & factory)
{

View File

@ -33,6 +33,8 @@ public:
String getName() const override { return "ValuesBlockInputFormat"; }
void resetParser() override;
const BlockMissingValues & getMissingValues() const override { return block_missing_values; }
private:

View File

@ -164,4 +164,17 @@ String RowInputFormatWithDiagnosticInfo::alignedName(const String & name, size_t
return name + ", " + std::string(spaces_count, ' ');
}
void RowInputFormatWithDiagnosticInfo::resetParser()
{
IRowInputFormat::resetParser();
row_num = 0;
bytes_read_at_start_of_buffer_on_current_row = 0;
bytes_read_at_start_of_buffer_on_prev_row = 0;
offset_of_current_row = std::numeric_limits<size_t>::max();
offset_of_prev_row = std::numeric_limits<size_t>::max();
max_length_of_column_name = 0;
max_length_of_data_type_name = 0;
}
}

View File

@ -16,6 +16,8 @@ public:
String getDiagnosticInfo() override;
void resetParser() override;
protected:
void updateDiagnosticInfo();
bool deserializeFieldAndPrintDiagnosticInfo(const String & col_name, const DataTypePtr & type, IColumn & column,

View File

@ -316,6 +316,17 @@ public:
is_finished = true;
}
void ALWAYS_INLINE reopen()
{
assumeConnected();
if (!isFinished())
return;
state->setFlags(0, State::IS_FINISHED);
is_finished = false;
}
OutputPort & getOutputPort()
{
assumeConnected();

View File

@ -4,6 +4,8 @@
#include <DataStreams/OneBlockInputStream.h>
#include <Formats/FormatFactory.h>
#include <Storages/Kafka/ReadBufferFromKafkaConsumer.h>
#include <Processors/Formats/InputStreamFromInputFormat.h>
namespace DB
{
@ -16,6 +18,7 @@ KafkaBlockInputStream::KafkaBlockInputStream(
, commit_in_suffix(commit_in_suffix_)
, non_virtual_header(storage.getSampleBlockNonMaterialized()) /// FIXME: add materialized columns support
, virtual_header(storage.getSampleBlockForColumns({"_topic", "_key", "_offset", "_partition", "_timestamp"}))
{
context.setSetting("input_format_skip_unknown_fields", 1u); // Always skip unknown fields regardless of the context (JSON or TSKV)
context.setSetting("input_format_allow_errors_ratio", 0.);
@ -23,8 +26,6 @@ KafkaBlockInputStream::KafkaBlockInputStream(
if (!storage.getSchemaName().empty())
context.setSetting("format_schema", storage.getSchemaName());
virtual_columns = virtual_header.cloneEmptyColumns();
}
KafkaBlockInputStream::~KafkaBlockInputStream()
@ -62,7 +63,10 @@ Block KafkaBlockInputStream::readImpl()
if (!buffer)
return Block();
auto read_callback = [this]
MutableColumns result_columns = non_virtual_header.cloneEmptyColumns();
MutableColumns virtual_columns = virtual_header.cloneEmptyColumns();
auto read_callback = [&]
{
virtual_columns[0]->insert(buffer->currentTopic()); // "topic"
virtual_columns[1]->insert(buffer->currentKey()); // "key"
@ -74,69 +78,74 @@ Block KafkaBlockInputStream::readImpl()
virtual_columns[4]->insert(std::chrono::duration_cast<std::chrono::seconds>(timestamp->get_timestamp()).count()); // "timestamp"
};
auto merge_blocks = [] (Block & block1, Block && block2)
auto input_format = FormatFactory::instance().getInputFormat(
storage.getFormatName(), *buffer, non_virtual_header, context, max_block_size, read_callback);
InputPort port(input_format->getPort().getHeader(), input_format.get());
connect(input_format->getPort(), port);
port.setNeeded();
auto read_kafka_message = [&]
{
if (!block1)
size_t new_rows = 0;
while (true)
{
// Need to make sure that resulting block has the same structure
block1 = std::move(block2);
return;
auto status = input_format->prepare();
switch (status)
{
case IProcessor::Status::Ready:
input_format->work();
break;
case IProcessor::Status::Finished:
input_format->resetParser();
return new_rows;
case IProcessor::Status::PortFull:
{
auto chunk = port.pull();
new_rows = new_rows + chunk.getNumRows();
/// FIXME: materialize MATERIALIZED columns here.
auto columns = chunk.detachColumns();
for (size_t i = 0, s = columns.size(); i < s; ++i)
result_columns[i]->insertRangeFrom(*columns[i], 0, columns[i]->size());
break;
}
case IProcessor::Status::NeedData:
case IProcessor::Status::Async:
case IProcessor::Status::Wait:
case IProcessor::Status::ExpandPipeline:
throw Exception("Source processor returned status " + IProcessor::statusToName(status), ErrorCodes::LOGICAL_ERROR);
}
}
if (!block2)
return;
auto columns1 = block1.mutateColumns();
auto columns2 = block2.mutateColumns();
for (size_t i = 0, s = columns1.size(); i < s; ++i)
columns1[i]->insertRangeFrom(*columns2[i], 0, columns2[i]->size());
block1.setColumns(std::move(columns1));
};
auto read_kafka_message = [&, this]
{
Block result;
auto child = FormatFactory::instance().getInput(
storage.getFormatName(), *buffer, non_virtual_header, context, max_block_size, read_callback);
while (auto block = child->read())
{
auto virtual_block = virtual_header.cloneWithColumns(std::move(virtual_columns));
virtual_columns = virtual_header.cloneEmptyColumns();
for (const auto & column : virtual_block.getColumnsWithTypeAndName())
block.insert(column);
/// FIXME: materialize MATERIALIZED columns here.
merge_blocks(result, std::move(block));
}
return result;
};
Block single_block;
UInt64 total_rows = 0;
size_t total_rows = 0;
while (total_rows < max_block_size)
{
auto new_block = read_kafka_message();
auto new_rows = new_block.rows();
total_rows += new_rows;
merge_blocks(single_block, std::move(new_block));
auto new_rows = read_kafka_message();
total_rows = total_rows + new_rows;
buffer->allowNext();
if (!new_rows || !checkTimeLimit())
break;
}
if (!single_block)
if (total_rows == 0)
return Block();
auto result_block = non_virtual_header.cloneWithColumns(std::move(result_columns));
auto virtual_block = virtual_header.cloneWithColumns(std::move(virtual_columns));
for (const auto & column : virtual_block.getColumnsWithTypeAndName())
result_block.insert(column);
return ConvertingBlockInputStream(
context,
std::make_shared<OneBlockInputStream>(single_block),
std::make_shared<OneBlockInputStream>(result_block),
getHeader(),
ConvertingBlockInputStream::MatchColumnsMode::Name)
.read();

View File

@ -33,9 +33,7 @@ private:
UInt64 max_block_size;
ConsumerBufferPtr buffer;
MutableColumns virtual_columns;
bool broken = true, claimed = false, commit_in_suffix;
const Block non_virtual_header, virtual_header;
};