Always read and insert Kafka messages as a whole (#6950)

* Always read and insert Kafka messages as a whole.
This commit is contained in:
Ivan 2019-09-20 15:12:32 +03:00 committed by GitHub
parent ef75a45fef
commit cffc254922
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 81 additions and 92 deletions

View File

@ -16,7 +16,7 @@ SquashingTransform::Result SquashingTransform::add(MutableColumns && columns)
if (columns.empty())
return Result(std::move(accumulated_columns));
/// Just read block is alredy enough.
/// Just read block is already enough.
if (isEnoughSize(columns))
{
/// If no accumulated data, return just read block.

View File

@ -83,7 +83,6 @@ BlockInputStreamPtr FormatFactory::getInput(
const Block & sample,
const Context & context,
UInt64 max_block_size,
UInt64 rows_portion_size,
ReadCallback callback) const
{
if (name == "Native")
@ -98,11 +97,10 @@ BlockInputStreamPtr FormatFactory::getInput(
const Settings & settings = context.getSettingsRef();
FormatSettings format_settings = getInputFormatSetting(settings);
return input_getter(
buf, sample, context, max_block_size, rows_portion_size, callback ? callback : ReadCallback(), format_settings);
return input_getter(buf, sample, context, max_block_size, callback ? callback : ReadCallback(), format_settings);
}
auto format = getInputFormat(name, buf, sample, context, max_block_size, rows_portion_size, std::move(callback));
auto format = getInputFormat(name, buf, sample, context, max_block_size, std::move(callback));
return std::make_shared<InputStreamFromInputFormat>(std::move(format));
}
@ -150,7 +148,6 @@ InputFormatPtr FormatFactory::getInputFormat(
const Block & sample,
const Context & context,
UInt64 max_block_size,
UInt64 rows_portion_size,
ReadCallback callback) const
{
const auto & input_getter = getCreators(name).input_processor_creator;
@ -164,7 +161,6 @@ InputFormatPtr FormatFactory::getInputFormat(
params.max_block_size = max_block_size;
params.allow_errors_num = format_settings.input_allow_errors_num;
params.allow_errors_ratio = format_settings.input_allow_errors_ratio;
params.rows_portion_size = rows_portion_size;
params.callback = std::move(callback);
params.max_execution_time = settings.max_execution_time;
params.timeout_overflow_mode = settings.timeout_overflow_mode;

View File

@ -51,7 +51,6 @@ private:
const Block & sample,
const Context & context,
UInt64 max_block_size,
UInt64 rows_portion_size,
ReadCallback callback,
const FormatSettings & settings)>;
@ -96,7 +95,6 @@ public:
const Block & sample,
const Context & context,
UInt64 max_block_size,
UInt64 rows_portion_size = 0,
ReadCallback callback = {}) const;
BlockOutputStreamPtr getOutput(const String & name, WriteBuffer & buf,
@ -108,7 +106,6 @@ public:
const Block & sample,
const Context & context,
UInt64 max_block_size,
UInt64 rows_portion_size = 0,
ReadCallback callback = {}) const;
OutputFormatPtr getOutputFormat(

View File

@ -13,7 +13,6 @@ void registerInputFormatNative(FormatFactory & factory)
const Block & sample,
const Context &,
UInt64 /* max_block_size */,
UInt64 /* min_read_rows */,
FormatFactory::ReadCallback /* callback */,
const FormatSettings &)
{

View File

@ -39,7 +39,7 @@ try
FormatSettings format_settings;
RowInputFormatParams params{DEFAULT_INSERT_BLOCK_SIZE, 0, 0, 0, []{}};
RowInputFormatParams params{DEFAULT_INSERT_BLOCK_SIZE, 0, 0, []{}};
InputFormatPtr input_format = std::make_shared<TabSeparatedRowInputFormat>(sample, in_buf, params, false, false, format_settings);
BlockInputStreamPtr block_input = std::make_shared<InputStreamFromInputFormat>(std::move(input_format));

View File

@ -20,8 +20,10 @@ namespace ErrorCodes
extern const int TIMEOUT_EXCEEDED;
}
namespace
{
static bool isParseError(int code)
bool isParseError(int code)
{
return code == ErrorCodes::CANNOT_PARSE_INPUT_ASSERTION_FAILED
|| code == ErrorCodes::CANNOT_PARSE_QUOTED_STRING
@ -33,34 +35,8 @@ static bool isParseError(int code)
|| code == ErrorCodes::TOO_LARGE_STRING_SIZE;
}
static bool handleOverflowMode(OverflowMode mode, const String & message, int code)
{
switch (mode)
{
case OverflowMode::THROW:
throw Exception(message, code);
case OverflowMode::BREAK:
return false;
default:
throw Exception("Logical error: unknown overflow mode", ErrorCodes::LOGICAL_ERROR);
}
}
static bool checkTimeLimit(const IRowInputFormat::Params & params, const Stopwatch & stopwatch)
{
if (params.max_execution_time != 0
&& stopwatch.elapsed() > static_cast<UInt64>(params.max_execution_time.totalMicroseconds()) * 1000)
return handleOverflowMode(params.timeout_overflow_mode,
"Timeout exceeded: elapsed " + toString(stopwatch.elapsedSeconds())
+ " seconds, maximum: " + toString(params.max_execution_time.totalMicroseconds() / 1000000.0),
ErrorCodes::TIMEOUT_EXCEEDED);
return true;
}
Chunk IRowInputFormat::generate()
{
if (total_rows == 0)
@ -76,15 +52,8 @@ Chunk IRowInputFormat::generate()
try
{
for (size_t rows = 0, batch = 0; rows < params.max_block_size; ++rows, ++batch)
for (size_t rows = 0; rows < params.max_block_size; ++rows)
{
if (params.rows_portion_size && batch == params.rows_portion_size)
{
batch = 0;
if (!checkTimeLimit(params, total_stopwatch) || isCancelled())
break;
}
try
{
++total_rows;

View File

@ -27,8 +27,6 @@ struct RowInputFormatParams
UInt64 allow_errors_num;
Float64 allow_errors_ratio;
UInt64 rows_portion_size;
using ReadCallback = std::function<void()>;
ReadCallback callback;
@ -85,4 +83,3 @@ private:
};
}

View File

@ -49,10 +49,13 @@ void KafkaBlockInputStream::readPrefixImpl()
buffer->subscribe(storage.getTopics());
const auto & limits_ = getLimits();
const size_t poll_timeout = buffer->pollTimeout();
size_t rows_portion_size = poll_timeout ? std::min<size_t>(max_block_size, limits_.max_execution_time.totalMilliseconds() / poll_timeout) : max_block_size;
rows_portion_size = std::max(rows_portion_size, 1ul);
broken = true;
}
Block KafkaBlockInputStream::readImpl()
{
if (!buffer)
return Block();
auto non_virtual_header = storage.getSampleBlockNonMaterialized(); /// FIXME: add materialized columns support
auto read_callback = [this]
@ -67,33 +70,72 @@ void KafkaBlockInputStream::readPrefixImpl()
virtual_columns[4]->insert(std::chrono::duration_cast<std::chrono::seconds>(timestamp->get_timestamp()).count()); // "timestamp"
};
auto child = FormatFactory::instance().getInput(
storage.getFormatName(), *buffer, non_virtual_header, context, max_block_size, rows_portion_size, read_callback);
child->setLimits(limits_);
addChild(child);
auto merge_blocks = [] (Block & block1, Block && block2)
{
if (!block1)
{
// Need to make sure that resulting block has the same structure
block1 = std::move(block2);
return;
}
broken = true;
}
if (!block2)
return;
Block KafkaBlockInputStream::readImpl()
{
if (!buffer)
auto columns1 = block1.mutateColumns();
auto columns2 = block2.mutateColumns();
for (size_t i = 0, s = columns1.size(); i < s; ++i)
columns1[i]->insertRangeFrom(*columns2[i], 0, columns2[i]->size());
block1.setColumns(std::move(columns1));
};
auto read_kafka_message = [&, this]
{
Block result;
auto child = FormatFactory::instance().getInput(
storage.getFormatName(), *buffer, non_virtual_header, context, max_block_size, read_callback);
const auto virtual_header = storage.getSampleBlockForColumns({"_topic", "_key", "_offset", "_partition", "_timestamp"});
while (auto block = child->read())
{
auto virtual_block = virtual_header.cloneWithColumns(std::move(virtual_columns));
virtual_columns = virtual_header.cloneEmptyColumns();
for (const auto & column : virtual_block.getColumnsWithTypeAndName())
block.insert(column);
/// FIXME: materialize MATERIALIZED columns here.
merge_blocks(result, std::move(block));
}
return result;
};
Block single_block;
UInt64 total_rows = 0;
while (total_rows < max_block_size)
{
auto new_block = read_kafka_message();
auto new_rows = new_block.rows();
total_rows += new_rows;
merge_blocks(single_block, std::move(new_block));
buffer->allowNext();
if (!new_rows || !checkTimeLimit())
break;
}
if (!single_block)
return Block();
Block block = children.back()->read();
if (!block)
return block;
Block virtual_block = storage.getSampleBlockForColumns({"_topic", "_key", "_offset", "_partition", "_timestamp"}).cloneWithColumns(std::move(virtual_columns));
virtual_columns = storage.getSampleBlockForColumns({"_topic", "_key", "_offset", "_partition", "_timestamp"}).cloneEmptyColumns();
for (const auto & column : virtual_block.getColumnsWithTypeAndName())
block.insert(column);
/// FIXME: materialize MATERIALIZED columns here.
return ConvertingBlockInputStream(
context, std::make_shared<OneBlockInputStream>(block), getHeader(), ConvertingBlockInputStream::MatchColumnsMode::Name)
context,
std::make_shared<OneBlockInputStream>(single_block),
getHeader(),
ConvertingBlockInputStream::MatchColumnsMode::Name)
.read();
}

View File

@ -13,7 +13,6 @@ ReadBufferFromKafkaConsumer::ReadBufferFromKafkaConsumer(
size_t max_batch_size,
size_t poll_timeout_,
bool intermediate_commit_,
char delimiter_,
const std::atomic<bool> & stopped_)
: ReadBuffer(nullptr, 0)
, consumer(consumer_)
@ -21,7 +20,6 @@ ReadBufferFromKafkaConsumer::ReadBufferFromKafkaConsumer(
, batch_size(max_batch_size)
, poll_timeout(poll_timeout_)
, intermediate_commit(intermediate_commit_)
, delimiter(delimiter_)
, stopped(stopped_)
, current(messages.begin())
{
@ -140,16 +138,9 @@ bool ReadBufferFromKafkaConsumer::nextImpl()
/// NOTE: ReadBuffer was implemented with an immutable underlying contents in mind.
/// If we failed to poll any message once - don't try again.
/// Otherwise, the |poll_timeout| expectations get flawn.
if (stalled || stopped)
if (stalled || stopped || !allowed)
return false;
if (put_delimiter)
{
BufferBase::set(&delimiter, 1, 0);
put_delimiter = false;
return true;
}
if (current == messages.end())
{
if (intermediate_commit)
@ -181,7 +172,7 @@ bool ReadBufferFromKafkaConsumer::nextImpl()
// XXX: very fishy place with const casting.
auto new_position = reinterpret_cast<char *>(const_cast<unsigned char *>(current->get_payload().get_data()));
BufferBase::set(new_position, current->get_payload().get_size(), 0);
put_delimiter = (delimiter != 0);
allowed = false;
/// Since we can poll more messages than we already processed - commit only processed messages.
consumer->store_offset(*current);

View File

@ -25,10 +25,10 @@ public:
size_t max_batch_size,
size_t poll_timeout_,
bool intermediate_commit_,
char delimiter_,
const std::atomic<bool> & stopped_);
~ReadBufferFromKafkaConsumer() override;
void allowNext() { allowed = true; } // Allow to read next message.
void commit(); // Commit all processed messages.
void subscribe(const Names & topics); // Subscribe internal consumer to topics.
void unsubscribe(); // Unsubscribe internal consumer in case of failure.
@ -51,9 +51,7 @@ private:
const size_t poll_timeout = 0;
bool stalled = false;
bool intermediate_commit = true;
char delimiter;
bool put_delimiter = false;
bool allowed = true;
const std::atomic<bool> & stopped;

View File

@ -278,7 +278,7 @@ ConsumerBufferPtr StorageKafka::createReadBuffer()
size_t poll_timeout = settings.stream_poll_timeout_ms.totalMilliseconds();
/// NOTE: we pass |stream_cancelled| by reference here, so the buffers should not outlive the storage.
return std::make_shared<ReadBufferFromKafkaConsumer>(consumer, log, batch_size, poll_timeout, intermediate_commit, row_delimiter, stream_cancelled);
return std::make_shared<ReadBufferFromKafkaConsumer>(consumer, log, batch_size, poll_timeout, intermediate_commit, stream_cancelled);
}