From cad54d4e2cadaaf73f8816fa3073c8fc4597367a Mon Sep 17 00:00:00 2001 From: Peng Jian Date: Fri, 19 Mar 2021 11:49:30 +0800 Subject: [PATCH] reset the parser if exception catched. --- src/Storages/Kafka/KafkaBlockInputStream.cpp | 75 ++++++++++++-------- 1 file changed, 46 insertions(+), 29 deletions(-) diff --git a/src/Storages/Kafka/KafkaBlockInputStream.cpp b/src/Storages/Kafka/KafkaBlockInputStream.cpp index 70b6d3680f4..0041184c19d 100644 --- a/src/Storages/Kafka/KafkaBlockInputStream.cpp +++ b/src/Storages/Kafka/KafkaBlockInputStream.cpp @@ -90,12 +90,10 @@ Block KafkaBlockInputStream::readImpl() connect(input_format->getPort(), port); port.setNeeded(); - std::string exception_message; + std::optional exception_message; auto read_kafka_message = [&] { size_t new_rows = 0; - exception_message.clear(); - while (true) { auto status = input_format->prepare(); @@ -103,22 +101,7 @@ Block KafkaBlockInputStream::readImpl() switch (status) { case IProcessor::Status::Ready: - try - { - input_format->work(); - } - catch (const Exception & e) - { - if (put_error_to_stream) - { - exception_message = e.message(); - new_rows++; - } - else - { - throw e; - } - } + input_format->work(); break; case IProcessor::Status::Finished: @@ -154,13 +137,42 @@ Block KafkaBlockInputStream::readImpl() while (true) { - auto new_rows = buffer->poll() ? read_kafka_message() : 0; - - if (!exception_message.empty()) + size_t new_rows = 0; + exception_message.reset(); + if (buffer->poll()) { - for (size_t i = 0; i < result_columns.size(); ++i) - result_columns[i]->insertDefault(); + try + { + new_rows = read_kafka_message(); + } + catch (Exception & e) + { + if (put_error_to_stream) + { + input_format->resetParser(); + exception_message = e.message(); + for (size_t i = 0, s = result_columns.size(); i < s; ++i) + { + // read_kafka_message could already push some rows to result_columns + // before exception, we need to fix it. + auto cur_rows = result_columns[i]->size(); + if (cur_rows > total_rows) + { + result_columns[i]->popBack(cur_rows - total_rows); + } + // all data columns will get defaul value in case of error + result_columns[i]->insertDefault(); + } + new_rows = 1; + } + else + { + e.addMessage("while parsing Kafka message (topic: {}, partition: {}, offset: {})'", buffer->currentTopic(), buffer->currentPartition(), buffer->currentOffset()); + throw; + } + } } + if (new_rows) { // In read_kafka_message(), ReadBufferFromKafkaConsumer::nextImpl() @@ -214,12 +226,17 @@ Block KafkaBlockInputStream::readImpl() virtual_columns[7]->insert(headers_values); if (put_error_to_stream) { - auto payload = buffer->currentPayload(); - virtual_columns[8]->insert(payload); - if (exception_message.empty()) - virtual_columns[9]->insertDefault(); + if (exception_message) + { + auto payload = buffer->currentPayload(); + virtual_columns[8]->insert(payload); + virtual_columns[9]->insert(*exception_message); + } else - virtual_columns[9]->insert(exception_message); + { + virtual_columns[8]->insertDefault(); + virtual_columns[9]->insertDefault(); + } } }