reset the parser if exception catched.

This commit is contained in:
Peng Jian 2021-03-19 11:49:30 +08:00 committed by root
parent 85cfb927e6
commit cad54d4e2c

View File

@ -90,12 +90,10 @@ Block KafkaBlockInputStream::readImpl()
connect(input_format->getPort(), port); connect(input_format->getPort(), port);
port.setNeeded(); port.setNeeded();
std::string exception_message; std::optional<std::string> exception_message;
auto read_kafka_message = [&] auto read_kafka_message = [&]
{ {
size_t new_rows = 0; size_t new_rows = 0;
exception_message.clear();
while (true) while (true)
{ {
auto status = input_format->prepare(); auto status = input_format->prepare();
@ -103,22 +101,7 @@ Block KafkaBlockInputStream::readImpl()
switch (status) switch (status)
{ {
case IProcessor::Status::Ready: case IProcessor::Status::Ready:
try input_format->work();
{
input_format->work();
}
catch (const Exception & e)
{
if (put_error_to_stream)
{
exception_message = e.message();
new_rows++;
}
else
{
throw e;
}
}
break; break;
case IProcessor::Status::Finished: case IProcessor::Status::Finished:
@ -154,13 +137,42 @@ Block KafkaBlockInputStream::readImpl()
while (true) while (true)
{ {
auto new_rows = buffer->poll() ? read_kafka_message() : 0; size_t new_rows = 0;
exception_message.reset();
if (!exception_message.empty()) if (buffer->poll())
{ {
for (size_t i = 0; i < result_columns.size(); ++i) try
result_columns[i]->insertDefault(); {
new_rows = read_kafka_message();
}
catch (Exception & e)
{
if (put_error_to_stream)
{
input_format->resetParser();
exception_message = e.message();
for (size_t i = 0, s = result_columns.size(); i < s; ++i)
{
// read_kafka_message could already push some rows to result_columns
// before exception, we need to fix it.
auto cur_rows = result_columns[i]->size();
if (cur_rows > total_rows)
{
result_columns[i]->popBack(cur_rows - total_rows);
}
// all data columns will get defaul value in case of error
result_columns[i]->insertDefault();
}
new_rows = 1;
}
else
{
e.addMessage("while parsing Kafka message (topic: {}, partition: {}, offset: {})'", buffer->currentTopic(), buffer->currentPartition(), buffer->currentOffset());
throw;
}
}
} }
if (new_rows) if (new_rows)
{ {
// In read_kafka_message(), ReadBufferFromKafkaConsumer::nextImpl() // In read_kafka_message(), ReadBufferFromKafkaConsumer::nextImpl()
@ -214,12 +226,17 @@ Block KafkaBlockInputStream::readImpl()
virtual_columns[7]->insert(headers_values); virtual_columns[7]->insert(headers_values);
if (put_error_to_stream) if (put_error_to_stream)
{ {
auto payload = buffer->currentPayload(); if (exception_message)
virtual_columns[8]->insert(payload); {
if (exception_message.empty()) auto payload = buffer->currentPayload();
virtual_columns[9]->insertDefault(); virtual_columns[8]->insert(payload);
virtual_columns[9]->insert(*exception_message);
}
else else
virtual_columns[9]->insert(exception_message); {
virtual_columns[8]->insertDefault();
virtual_columns[9]->insertDefault();
}
} }
} }