ClickHouse/src/Storages/Kafka/ReadBufferFromKafkaConsumer.cpp

467 lines
15 KiB
C++
Raw Normal View History

#include <Storages/Kafka/ReadBufferFromKafkaConsumer.h>
#include <common/logger_useful.h>
#include <cppkafka/cppkafka.h>
#include <boost/algorithm/string/join.hpp>
kafka: fix SIGSEGV if there is an message with error in the middle of the batch ReadBufferFromKafkaConsumer does not handle the case when there is message with an error on non first position in the current batch, since it goes through messages in the batch after poll and stop on first valid message. But later it can try to use message as valid: - while storing offset - get topic name - ... And besides the message itself is also invalid (you can find this in the gdb traces below). So just filter out messages win an error error after poll. SIGSEGV was with the following stacktrace: (gdb) bt 3 0x0000000010f05b4d in rd_kafka_offset_store (app_rkt=0x0, partition=0, offset=0) at ../contrib/librdkafka/src/rdkafka_offset.c:656 4 0x0000000010e69657 in cppkafka::Consumer::store_offset (this=0x7f2015210820, msg=...) at ../contrib/cppkafka/include/cppkafka/message.h:225 5 0x000000000e68f208 in DB::ReadBufferFromKafkaConsumer::storeLastReadMessageOffset (this=0x7f206a136618) at ../contrib/libcxx/include/iterator:1508 6 0x000000000e68b207 in DB::KafkaBlockInputStream::readImpl (this=0x7f202c689020) at ../src/Storages/Kafka/KafkaBlockInputStream.cpp:150 7 0x000000000dd1178d in DB::IBlockInputStream::read (this=this@entry=0x7f202c689020) at ../src/DataStreams/IBlockInputStream.cpp:60 8 0x000000000dd34c0a in DB::copyDataImpl<> () at ../src/DataStreams/copyData.cpp:21 9 DB::copyData () at ../src/DataStreams/copyData.cpp:62 10 0x000000000e67c8f2 in DB::StorageKafka::streamToViews () at ../contrib/libcxx/include/memory:3823 11 0x000000000e67d218 in DB::StorageKafka::threadFunc () at ../src/Storages/Kafka/StorageKafka.cpp:488 And some information from it: (gdb) p this.current.__i $14 = (std::__1::__wrap_iter<cppkafka::Message const*>::iterator_type) 0x7f1ca8f58660 # current-1 (gdb) p $14-1 $15 = (const cppkafka::Message *) 0x7f1ca8f58600 (gdb) p $16.handle_ $17 = {__ptr_ = {<std::__1::__compressed_pair_elem<rd_kafka_message_s*, 0, false>> = { __value_ = 0x7f203577f938}, ...} (gdb) p *(rd_kafka_message_s*)0x7f203577f938 $24 = {err = RD_KAFKA_RESP_ERR__TRANSPORT, rkt = 0x0, partition = 0, payload = 0x7f202f0339c0, len = 63, key = 0x0, key_len = 0, offset = 0, _private = 0x7f203577f8c0} # current (gdb) p $14-0 $28 = (const cppkafka::Message *) 0x7f1ca8f58660 (gdb) p $28.handle_.__ptr_ $29 = {<std::__1::__compressed_pair_elem<rd_kafka_message_s*, 0, false>> = { __value_ = 0x7f184f129bf0}, ...} (gdb) p *(rd_kafka_message_s*)0x7f184f129bf0 $30 = {err = RD_KAFKA_RESP_ERR_NO_ERROR, rkt = 0x7f1ed44fe000, partition = 1, payload = 0x7f1fc9bc6036, len = 242, key = 0x0, key_len = 0, offset = 2394853582209, # current+1 (gdb) p (*($14+1)).handle_.__ptr_ $44 = {<std::__1::__compressed_pair_elem<rd_kafka_message_s*, 0, false>> = { __value_ = 0x7f184f129d30}, ...} (gdb) p *(rd_kafka_message_s*)0x7f184f129d30 $45 = {err = RD_KAFKA_RESP_ERR_NO_ERROR, rkt = 0x7f1ed44fe000, partition = 1, payload = 0x7f1fc9bc612f, len = 31, key = 0x0, key_len = 0, offset = 2394853582210, _private = 0x7f184f129cc0} # distance from the beginning (gdb) p messages.__end_-messages.__begin_ $34 = 65536 (gdb) p ($14-0)-messages.__begin_ $37 = 8965 (gdb) p ($14-1)-messages.__begin_ $38 = 8964 # parsing info (gdb) p allowed $39 = false (gdb) p new_rows $40 = 1 (gdb) p total_rows $41 = 8964 # current buffer is invalid (gdb) p *buffer.__ptr_ $50 = {<DB::ReadBuffer> = {<DB::BufferBase> = {pos = 0x7f202f0339c0 "FindCoordinator response error: Local: Broker transport failure", bytes = 47904863385, working_buffer = { begin_pos = 0x7f202f0339c0 "FindCoordinator response error: Local: Broker transport failure", end_pos = 0x7f202f0339c0 "FindCoordinator response error: Local: Broker transport failure"}, internal_buffer = { v0: check message errors in ReadBufferFromKafkaConsumer::nextImpl() (but this may lead to using of that messages after and SIGSEGV again, doh). v2: skip messages with an error after poll.
2020-07-08 20:26:26 +00:00
#include <algorithm>
namespace DB
{
2020-03-26 11:52:16 +00:00
namespace ErrorCodes
{
extern const int CANNOT_COMMIT_OFFSET;
}
using namespace std::chrono_literals;
const auto MAX_TIME_TO_WAIT_FOR_ASSIGNMENT_MS = 15000;
const std::size_t POLL_TIMEOUT_WO_ASSIGNMENT_MS = 50;
const auto DRAIN_TIMEOUT_MS = 5000ms;
ReadBufferFromKafkaConsumer::ReadBufferFromKafkaConsumer(
ConsumerPtr consumer_,
Poco::Logger * log_,
size_t max_batch_size,
size_t poll_timeout_,
bool intermediate_commit_,
const std::atomic<bool> & stopped_,
const Names & _topics)
: ReadBuffer(nullptr, 0)
, consumer(consumer_)
, log(log_)
, batch_size(max_batch_size)
, poll_timeout(poll_timeout_)
, intermediate_commit(intermediate_commit_)
, stopped(stopped_)
, current(messages.begin())
, topics(_topics)
{
2020-08-08 00:47:03 +00:00
// called (synchronously, during poll) when we enter the consumer group
2020-03-26 18:10:21 +00:00
consumer->set_assignment_callback([this](const cppkafka::TopicPartitionList & topic_partitions)
{
2020-05-23 22:24:01 +00:00
LOG_TRACE(log, "Topics/partitions assigned: {}", topic_partitions);
assignment = topic_partitions;
});
2020-08-08 00:47:03 +00:00
// called (synchronously, during poll) when we leave the consumer group
2020-03-26 18:10:21 +00:00
consumer->set_revocation_callback([this](const cppkafka::TopicPartitionList & topic_partitions)
{
// Rebalance is happening now, and now we have a chance to finish the work
// with topics/partitions we were working with before rebalance
2020-05-23 22:24:01 +00:00
LOG_TRACE(log, "Rebalance initiated. Revoking partitions: {}", topic_partitions);
// we can not flush data to target from that point (it is pulled, not pushed)
// so the best we can now it to
// 1) repeat last commit in sync mode (async could be still in queue, we need to be sure is is properly committed before rebalance)
// 2) stop / brake the current reading:
// * clean buffered non-commited messages
// * set flag / flush
cleanUnprocessed();
stalled_status = REBALANCE_HAPPENED;
assignment.clear();
waited_for_assignment = 0;
// for now we use slower (but reliable) sync commit in main loop, so no need to repeat
// try
// {
// consumer->commit();
// }
// catch (cppkafka::HandleException & e)
// {
2020-05-23 22:24:01 +00:00
// LOG_WARNING(log, "Commit error: {}", e.what());
// }
});
consumer->set_rebalance_error_callback([this](cppkafka::Error err)
{
2020-05-23 22:24:01 +00:00
LOG_ERROR(log, "Rebalance error: {}", err);
});
}
ReadBufferFromKafkaConsumer::~ReadBufferFromKafkaConsumer()
{
try
{
if (!consumer->get_subscription().empty())
{
try
{
consumer->unsubscribe();
}
catch (const cppkafka::HandleException & e)
{
2020-05-25 21:53:29 +00:00
LOG_ERROR(log, "Error during unsubscribe: {}", e.what());
}
drain();
}
}
catch (const cppkafka::HandleException & e)
{
2020-05-25 21:53:29 +00:00
LOG_ERROR(log, "Error while destructing consumer: {}", e.what());
}
}
// Needed to drain rest of the messages / queued callback calls from the consumer
// after unsubscribe, otherwise consumer will hang on destruction
// see https://github.com/edenhill/librdkafka/issues/2077
// https://github.com/confluentinc/confluent-kafka-go/issues/189 etc.
void ReadBufferFromKafkaConsumer::drain()
{
auto start_time = std::chrono::steady_clock::now();
cppkafka::Error last_error(RD_KAFKA_RESP_ERR_NO_ERROR);
while (true)
{
auto msg = consumer->poll(100ms);
if (!msg)
break;
auto error = msg.get_error();
if (error)
{
if (msg.is_eof() || error == last_error)
{
break;
}
else
{
2020-05-25 21:53:29 +00:00
LOG_ERROR(log, "Error during draining: {}", error);
}
}
// i don't stop draining on first error,
// only if it repeats once again sequentially
last_error = error;
auto ts = std::chrono::steady_clock::now();
if (std::chrono::duration_cast<std::chrono::milliseconds>(ts-start_time) > DRAIN_TIMEOUT_MS)
{
LOG_ERROR(log, "Timeout during draining.");
break;
}
}
}
void ReadBufferFromKafkaConsumer::commit()
{
auto print_offsets = [this] (const char * prefix, const cppkafka::TopicPartitionList & offsets)
{
for (const auto & topic_part : offsets)
{
auto print_special_offset = [&topic_part]
{
switch (topic_part.get_offset())
{
case cppkafka::TopicPartition::OFFSET_BEGINNING: return "BEGINNING";
case cppkafka::TopicPartition::OFFSET_END: return "END";
case cppkafka::TopicPartition::OFFSET_STORED: return "STORED";
case cppkafka::TopicPartition::OFFSET_INVALID: return "INVALID";
default: return "";
}
};
if (topic_part.get_offset() < 0)
{
2020-05-23 22:24:01 +00:00
LOG_TRACE(log, "{} {} (topic: {}, partition: {})", prefix, print_special_offset(), topic_part.get_topic(), topic_part.get_partition());
}
else
{
2020-05-23 22:24:01 +00:00
LOG_TRACE(log, "{} {} (topic: {}, partition: {})", prefix, topic_part.get_offset(), topic_part.get_topic(), topic_part.get_partition());
}
}
};
print_offsets("Polled offset", consumer->get_offsets_position(consumer->get_assignment()));
if (hasMorePolledMessages())
{
2020-05-23 22:24:01 +00:00
LOG_WARNING(log, "Logical error. Non all polled messages were processed.");
}
if (offsets_stored > 0)
{
// if we will do async commit here (which is faster)
// we may need to repeat commit in sync mode in revocation callback,
// but it seems like existing API doesn't allow us to to that
// in a controlled manner (i.e. we don't know the offsets to commit then)
2020-03-26 11:52:16 +00:00
size_t max_retries = 5;
2020-08-08 00:47:03 +00:00
bool committed = false;
2020-03-26 11:52:16 +00:00
2020-08-08 00:47:03 +00:00
while (!committed && max_retries > 0)
2020-03-26 11:52:16 +00:00
{
try
{
// See https://github.com/edenhill/librdkafka/issues/1470
// broker may reject commit if during offsets.commit.timeout.ms (5000 by default),
// there were not enough replicas available for the __consumer_offsets topic.
// also some other temporary issues like client-server connectivity problems are possible
consumer->commit();
2020-08-08 00:47:03 +00:00
committed = true;
2020-03-26 11:52:16 +00:00
print_offsets("Committed offset", consumer->get_offsets_committed(consumer->get_assignment()));
}
catch (const cppkafka::HandleException & e)
{
2020-05-23 22:24:01 +00:00
LOG_ERROR(log, "Exception during commit attempt: {}", e.what());
2020-03-26 11:52:16 +00:00
}
2020-03-26 18:10:21 +00:00
--max_retries;
2020-03-26 11:52:16 +00:00
}
2020-08-08 00:47:03 +00:00
if (!committed)
2020-03-26 11:52:16 +00:00
{
2020-08-08 00:47:03 +00:00
// TODO: insert atomicity / transactions is needed here (possibility to rollback, on 2 phase commits)
throw Exception("All commit attempts failed. Last block was already written to target table(s), but was not committed to Kafka.", ErrorCodes::CANNOT_COMMIT_OFFSET);
2020-03-26 11:52:16 +00:00
}
}
else
{
2020-05-23 22:24:01 +00:00
LOG_TRACE(log, "Nothing to commit.");
}
offsets_stored = 0;
}
void ReadBufferFromKafkaConsumer::subscribe()
{
2020-05-23 22:24:01 +00:00
LOG_TRACE(log, "Already subscribed to topics: [{}]", boost::algorithm::join(consumer->get_subscription(), ", "));
LOG_TRACE(log, "Already assigned to: {}", assignment);
size_t max_retries = 5;
while (consumer->get_subscription().empty())
{
--max_retries;
try
{
consumer->subscribe(topics);
// FIXME: if we failed to receive "subscribe" response while polling and destroy consumer now, then we may hang up.
// see https://github.com/edenhill/librdkafka/issues/2077
}
catch (cppkafka::HandleException & e)
{
if (max_retries > 0 && e.get_error() == RD_KAFKA_RESP_ERR__TIMED_OUT)
continue;
throw;
}
}
cleanUnprocessed();
allowed = false;
// we can reset any flags (except of CONSUMER_STOPPED) before attempt of reading new block of data
if (stalled_status != CONSUMER_STOPPED)
stalled_status = NO_MESSAGES_RETURNED;
}
void ReadBufferFromKafkaConsumer::cleanUnprocessed()
{
messages.clear();
current = messages.begin();
BufferBase::set(nullptr, 0, 0);
offsets_stored = 0;
}
void ReadBufferFromKafkaConsumer::unsubscribe()
{
LOG_TRACE(log, "Re-joining claimed consumer after failure");
cleanUnprocessed();
// it should not raise exception as used in destructor
2020-03-04 18:07:36 +00:00
try
{
// From docs: Any previous subscription will be unassigned and unsubscribed first.
consumer->subscribe(topics);
// I wanted to avoid explicit unsubscribe as it requires draining the messages
// to close the consumer safely after unsubscribe
// see https://github.com/edenhill/librdkafka/issues/2077
// https://github.com/confluentinc/confluent-kafka-go/issues/189 etc.
2020-03-04 18:07:36 +00:00
}
catch (const cppkafka::HandleException & e)
{
2020-05-23 22:24:01 +00:00
LOG_ERROR(log, "Exception from ReadBufferFromKafkaConsumer::unsubscribe: {}", e.what());
}
}
void ReadBufferFromKafkaConsumer::resetToLastCommitted(const char * msg)
{
if (assignment.empty())
{
2020-05-23 22:24:01 +00:00
LOG_TRACE(log, "Not assignned. Can't reset to last committed position.");
return;
}
auto committed_offset = consumer->get_offsets_committed(consumer->get_assignment());
consumer->assign(committed_offset);
2020-05-23 22:24:01 +00:00
LOG_TRACE(log, "{} Returned to committed position: {}", msg, committed_offset);
}
// it do the poll when needed
bool ReadBufferFromKafkaConsumer::poll()
{
resetIfStopped();
if (polledDataUnusable())
return false;
if (hasMorePolledMessages())
{
allowed = true;
return true;
}
if (intermediate_commit)
commit();
while (true)
{
stalled_status = NO_MESSAGES_RETURNED;
// we already wait enough for assignment in the past,
// let's make polls shorter and not block other consumer
// which can work successfully in parallel
// POLL_TIMEOUT_WO_ASSIGNMENT_MS (50ms) is 100% enough just to check if we got assignment
// (see https://github.com/ClickHouse/ClickHouse/issues/11218)
auto actual_poll_timeout_ms = (waited_for_assignment >= MAX_TIME_TO_WAIT_FOR_ASSIGNMENT_MS)
? std::min(POLL_TIMEOUT_WO_ASSIGNMENT_MS,poll_timeout)
: poll_timeout;
/// Don't drop old messages immediately, since we may need them for virtual columns.
auto new_messages = consumer->poll_batch(batch_size,
std::chrono::milliseconds(actual_poll_timeout_ms));
resetIfStopped();
if (stalled_status == CONSUMER_STOPPED)
{
return false;
}
else if (stalled_status == REBALANCE_HAPPENED)
{
if (!new_messages.empty())
{
// we have polled something just after rebalance.
2020-08-08 00:47:03 +00:00
// we will not use current batch, so we need to return to last committed position
// otherwise we will continue polling from that position
resetToLastCommitted("Rewind last poll after rebalance.");
}
return false;
}
if (new_messages.empty())
{
// While we wait for an assignment after subscription, we'll poll zero messages anyway.
// If we're doing a manual select then it's better to get something after a wait, then immediate nothing.
if (assignment.empty())
{
waited_for_assignment += poll_timeout; // slightly innaccurate, but rough calculation is ok.
if (waited_for_assignment < MAX_TIME_TO_WAIT_FOR_ASSIGNMENT_MS)
{
continue;
}
else
{
LOG_WARNING(log, "Can't get assignment. It can be caused by some issue with consumer group (not enough partitions?). Will keep trying.");
stalled_status = NO_ASSIGNMENT;
return false;
}
}
else
{
LOG_TRACE(log, "Stalled");
return false;
}
}
else
{
messages = std::move(new_messages);
current = messages.begin();
2020-07-30 19:59:49 +00:00
LOG_TRACE(log, "Polled batch of {} messages. Offsets position: {}",
messages.size(), consumer->get_offsets_position(consumer->get_assignment()));
break;
}
}
kafka: fix SIGSEGV if there is an message with error in the middle of the batch ReadBufferFromKafkaConsumer does not handle the case when there is message with an error on non first position in the current batch, since it goes through messages in the batch after poll and stop on first valid message. But later it can try to use message as valid: - while storing offset - get topic name - ... And besides the message itself is also invalid (you can find this in the gdb traces below). So just filter out messages win an error error after poll. SIGSEGV was with the following stacktrace: (gdb) bt 3 0x0000000010f05b4d in rd_kafka_offset_store (app_rkt=0x0, partition=0, offset=0) at ../contrib/librdkafka/src/rdkafka_offset.c:656 4 0x0000000010e69657 in cppkafka::Consumer::store_offset (this=0x7f2015210820, msg=...) at ../contrib/cppkafka/include/cppkafka/message.h:225 5 0x000000000e68f208 in DB::ReadBufferFromKafkaConsumer::storeLastReadMessageOffset (this=0x7f206a136618) at ../contrib/libcxx/include/iterator:1508 6 0x000000000e68b207 in DB::KafkaBlockInputStream::readImpl (this=0x7f202c689020) at ../src/Storages/Kafka/KafkaBlockInputStream.cpp:150 7 0x000000000dd1178d in DB::IBlockInputStream::read (this=this@entry=0x7f202c689020) at ../src/DataStreams/IBlockInputStream.cpp:60 8 0x000000000dd34c0a in DB::copyDataImpl<> () at ../src/DataStreams/copyData.cpp:21 9 DB::copyData () at ../src/DataStreams/copyData.cpp:62 10 0x000000000e67c8f2 in DB::StorageKafka::streamToViews () at ../contrib/libcxx/include/memory:3823 11 0x000000000e67d218 in DB::StorageKafka::threadFunc () at ../src/Storages/Kafka/StorageKafka.cpp:488 And some information from it: (gdb) p this.current.__i $14 = (std::__1::__wrap_iter<cppkafka::Message const*>::iterator_type) 0x7f1ca8f58660 # current-1 (gdb) p $14-1 $15 = (const cppkafka::Message *) 0x7f1ca8f58600 (gdb) p $16.handle_ $17 = {__ptr_ = {<std::__1::__compressed_pair_elem<rd_kafka_message_s*, 0, false>> = { __value_ = 0x7f203577f938}, ...} (gdb) p *(rd_kafka_message_s*)0x7f203577f938 $24 = {err = RD_KAFKA_RESP_ERR__TRANSPORT, rkt = 0x0, partition = 0, payload = 0x7f202f0339c0, len = 63, key = 0x0, key_len = 0, offset = 0, _private = 0x7f203577f8c0} # current (gdb) p $14-0 $28 = (const cppkafka::Message *) 0x7f1ca8f58660 (gdb) p $28.handle_.__ptr_ $29 = {<std::__1::__compressed_pair_elem<rd_kafka_message_s*, 0, false>> = { __value_ = 0x7f184f129bf0}, ...} (gdb) p *(rd_kafka_message_s*)0x7f184f129bf0 $30 = {err = RD_KAFKA_RESP_ERR_NO_ERROR, rkt = 0x7f1ed44fe000, partition = 1, payload = 0x7f1fc9bc6036, len = 242, key = 0x0, key_len = 0, offset = 2394853582209, # current+1 (gdb) p (*($14+1)).handle_.__ptr_ $44 = {<std::__1::__compressed_pair_elem<rd_kafka_message_s*, 0, false>> = { __value_ = 0x7f184f129d30}, ...} (gdb) p *(rd_kafka_message_s*)0x7f184f129d30 $45 = {err = RD_KAFKA_RESP_ERR_NO_ERROR, rkt = 0x7f1ed44fe000, partition = 1, payload = 0x7f1fc9bc612f, len = 31, key = 0x0, key_len = 0, offset = 2394853582210, _private = 0x7f184f129cc0} # distance from the beginning (gdb) p messages.__end_-messages.__begin_ $34 = 65536 (gdb) p ($14-0)-messages.__begin_ $37 = 8965 (gdb) p ($14-1)-messages.__begin_ $38 = 8964 # parsing info (gdb) p allowed $39 = false (gdb) p new_rows $40 = 1 (gdb) p total_rows $41 = 8964 # current buffer is invalid (gdb) p *buffer.__ptr_ $50 = {<DB::ReadBuffer> = {<DB::BufferBase> = {pos = 0x7f202f0339c0 "FindCoordinator response error: Local: Broker transport failure", bytes = 47904863385, working_buffer = { begin_pos = 0x7f202f0339c0 "FindCoordinator response error: Local: Broker transport failure", end_pos = 0x7f202f0339c0 "FindCoordinator response error: Local: Broker transport failure"}, internal_buffer = { v0: check message errors in ReadBufferFromKafkaConsumer::nextImpl() (but this may lead to using of that messages after and SIGSEGV again, doh). v2: skip messages with an error after poll.
2020-07-08 20:26:26 +00:00
filterMessageErrors();
if (current == messages.end())
{
kafka: fix SIGSEGV if there is an message with error in the middle of the batch ReadBufferFromKafkaConsumer does not handle the case when there is message with an error on non first position in the current batch, since it goes through messages in the batch after poll and stop on first valid message. But later it can try to use message as valid: - while storing offset - get topic name - ... And besides the message itself is also invalid (you can find this in the gdb traces below). So just filter out messages win an error error after poll. SIGSEGV was with the following stacktrace: (gdb) bt 3 0x0000000010f05b4d in rd_kafka_offset_store (app_rkt=0x0, partition=0, offset=0) at ../contrib/librdkafka/src/rdkafka_offset.c:656 4 0x0000000010e69657 in cppkafka::Consumer::store_offset (this=0x7f2015210820, msg=...) at ../contrib/cppkafka/include/cppkafka/message.h:225 5 0x000000000e68f208 in DB::ReadBufferFromKafkaConsumer::storeLastReadMessageOffset (this=0x7f206a136618) at ../contrib/libcxx/include/iterator:1508 6 0x000000000e68b207 in DB::KafkaBlockInputStream::readImpl (this=0x7f202c689020) at ../src/Storages/Kafka/KafkaBlockInputStream.cpp:150 7 0x000000000dd1178d in DB::IBlockInputStream::read (this=this@entry=0x7f202c689020) at ../src/DataStreams/IBlockInputStream.cpp:60 8 0x000000000dd34c0a in DB::copyDataImpl<> () at ../src/DataStreams/copyData.cpp:21 9 DB::copyData () at ../src/DataStreams/copyData.cpp:62 10 0x000000000e67c8f2 in DB::StorageKafka::streamToViews () at ../contrib/libcxx/include/memory:3823 11 0x000000000e67d218 in DB::StorageKafka::threadFunc () at ../src/Storages/Kafka/StorageKafka.cpp:488 And some information from it: (gdb) p this.current.__i $14 = (std::__1::__wrap_iter<cppkafka::Message const*>::iterator_type) 0x7f1ca8f58660 # current-1 (gdb) p $14-1 $15 = (const cppkafka::Message *) 0x7f1ca8f58600 (gdb) p $16.handle_ $17 = {__ptr_ = {<std::__1::__compressed_pair_elem<rd_kafka_message_s*, 0, false>> = { __value_ = 0x7f203577f938}, ...} (gdb) p *(rd_kafka_message_s*)0x7f203577f938 $24 = {err = RD_KAFKA_RESP_ERR__TRANSPORT, rkt = 0x0, partition = 0, payload = 0x7f202f0339c0, len = 63, key = 0x0, key_len = 0, offset = 0, _private = 0x7f203577f8c0} # current (gdb) p $14-0 $28 = (const cppkafka::Message *) 0x7f1ca8f58660 (gdb) p $28.handle_.__ptr_ $29 = {<std::__1::__compressed_pair_elem<rd_kafka_message_s*, 0, false>> = { __value_ = 0x7f184f129bf0}, ...} (gdb) p *(rd_kafka_message_s*)0x7f184f129bf0 $30 = {err = RD_KAFKA_RESP_ERR_NO_ERROR, rkt = 0x7f1ed44fe000, partition = 1, payload = 0x7f1fc9bc6036, len = 242, key = 0x0, key_len = 0, offset = 2394853582209, # current+1 (gdb) p (*($14+1)).handle_.__ptr_ $44 = {<std::__1::__compressed_pair_elem<rd_kafka_message_s*, 0, false>> = { __value_ = 0x7f184f129d30}, ...} (gdb) p *(rd_kafka_message_s*)0x7f184f129d30 $45 = {err = RD_KAFKA_RESP_ERR_NO_ERROR, rkt = 0x7f1ed44fe000, partition = 1, payload = 0x7f1fc9bc612f, len = 31, key = 0x0, key_len = 0, offset = 2394853582210, _private = 0x7f184f129cc0} # distance from the beginning (gdb) p messages.__end_-messages.__begin_ $34 = 65536 (gdb) p ($14-0)-messages.__begin_ $37 = 8965 (gdb) p ($14-1)-messages.__begin_ $38 = 8964 # parsing info (gdb) p allowed $39 = false (gdb) p new_rows $40 = 1 (gdb) p total_rows $41 = 8964 # current buffer is invalid (gdb) p *buffer.__ptr_ $50 = {<DB::ReadBuffer> = {<DB::BufferBase> = {pos = 0x7f202f0339c0 "FindCoordinator response error: Local: Broker transport failure", bytes = 47904863385, working_buffer = { begin_pos = 0x7f202f0339c0 "FindCoordinator response error: Local: Broker transport failure", end_pos = 0x7f202f0339c0 "FindCoordinator response error: Local: Broker transport failure"}, internal_buffer = { v0: check message errors in ReadBufferFromKafkaConsumer::nextImpl() (but this may lead to using of that messages after and SIGSEGV again, doh). v2: skip messages with an error after poll.
2020-07-08 20:26:26 +00:00
LOG_ERROR(log, "Only errors left");
stalled_status = ERRORS_RETURNED;
return false;
}
kafka: fix SIGSEGV if there is an message with error in the middle of the batch ReadBufferFromKafkaConsumer does not handle the case when there is message with an error on non first position in the current batch, since it goes through messages in the batch after poll and stop on first valid message. But later it can try to use message as valid: - while storing offset - get topic name - ... And besides the message itself is also invalid (you can find this in the gdb traces below). So just filter out messages win an error error after poll. SIGSEGV was with the following stacktrace: (gdb) bt 3 0x0000000010f05b4d in rd_kafka_offset_store (app_rkt=0x0, partition=0, offset=0) at ../contrib/librdkafka/src/rdkafka_offset.c:656 4 0x0000000010e69657 in cppkafka::Consumer::store_offset (this=0x7f2015210820, msg=...) at ../contrib/cppkafka/include/cppkafka/message.h:225 5 0x000000000e68f208 in DB::ReadBufferFromKafkaConsumer::storeLastReadMessageOffset (this=0x7f206a136618) at ../contrib/libcxx/include/iterator:1508 6 0x000000000e68b207 in DB::KafkaBlockInputStream::readImpl (this=0x7f202c689020) at ../src/Storages/Kafka/KafkaBlockInputStream.cpp:150 7 0x000000000dd1178d in DB::IBlockInputStream::read (this=this@entry=0x7f202c689020) at ../src/DataStreams/IBlockInputStream.cpp:60 8 0x000000000dd34c0a in DB::copyDataImpl<> () at ../src/DataStreams/copyData.cpp:21 9 DB::copyData () at ../src/DataStreams/copyData.cpp:62 10 0x000000000e67c8f2 in DB::StorageKafka::streamToViews () at ../contrib/libcxx/include/memory:3823 11 0x000000000e67d218 in DB::StorageKafka::threadFunc () at ../src/Storages/Kafka/StorageKafka.cpp:488 And some information from it: (gdb) p this.current.__i $14 = (std::__1::__wrap_iter<cppkafka::Message const*>::iterator_type) 0x7f1ca8f58660 # current-1 (gdb) p $14-1 $15 = (const cppkafka::Message *) 0x7f1ca8f58600 (gdb) p $16.handle_ $17 = {__ptr_ = {<std::__1::__compressed_pair_elem<rd_kafka_message_s*, 0, false>> = { __value_ = 0x7f203577f938}, ...} (gdb) p *(rd_kafka_message_s*)0x7f203577f938 $24 = {err = RD_KAFKA_RESP_ERR__TRANSPORT, rkt = 0x0, partition = 0, payload = 0x7f202f0339c0, len = 63, key = 0x0, key_len = 0, offset = 0, _private = 0x7f203577f8c0} # current (gdb) p $14-0 $28 = (const cppkafka::Message *) 0x7f1ca8f58660 (gdb) p $28.handle_.__ptr_ $29 = {<std::__1::__compressed_pair_elem<rd_kafka_message_s*, 0, false>> = { __value_ = 0x7f184f129bf0}, ...} (gdb) p *(rd_kafka_message_s*)0x7f184f129bf0 $30 = {err = RD_KAFKA_RESP_ERR_NO_ERROR, rkt = 0x7f1ed44fe000, partition = 1, payload = 0x7f1fc9bc6036, len = 242, key = 0x0, key_len = 0, offset = 2394853582209, # current+1 (gdb) p (*($14+1)).handle_.__ptr_ $44 = {<std::__1::__compressed_pair_elem<rd_kafka_message_s*, 0, false>> = { __value_ = 0x7f184f129d30}, ...} (gdb) p *(rd_kafka_message_s*)0x7f184f129d30 $45 = {err = RD_KAFKA_RESP_ERR_NO_ERROR, rkt = 0x7f1ed44fe000, partition = 1, payload = 0x7f1fc9bc612f, len = 31, key = 0x0, key_len = 0, offset = 2394853582210, _private = 0x7f184f129cc0} # distance from the beginning (gdb) p messages.__end_-messages.__begin_ $34 = 65536 (gdb) p ($14-0)-messages.__begin_ $37 = 8965 (gdb) p ($14-1)-messages.__begin_ $38 = 8964 # parsing info (gdb) p allowed $39 = false (gdb) p new_rows $40 = 1 (gdb) p total_rows $41 = 8964 # current buffer is invalid (gdb) p *buffer.__ptr_ $50 = {<DB::ReadBuffer> = {<DB::BufferBase> = {pos = 0x7f202f0339c0 "FindCoordinator response error: Local: Broker transport failure", bytes = 47904863385, working_buffer = { begin_pos = 0x7f202f0339c0 "FindCoordinator response error: Local: Broker transport failure", end_pos = 0x7f202f0339c0 "FindCoordinator response error: Local: Broker transport failure"}, internal_buffer = { v0: check message errors in ReadBufferFromKafkaConsumer::nextImpl() (but this may lead to using of that messages after and SIGSEGV again, doh). v2: skip messages with an error after poll.
2020-07-08 20:26:26 +00:00
stalled_status = NOT_STALLED;
allowed = true;
return true;
}
kafka: fix SIGSEGV if there is an message with error in the middle of the batch ReadBufferFromKafkaConsumer does not handle the case when there is message with an error on non first position in the current batch, since it goes through messages in the batch after poll and stop on first valid message. But later it can try to use message as valid: - while storing offset - get topic name - ... And besides the message itself is also invalid (you can find this in the gdb traces below). So just filter out messages win an error error after poll. SIGSEGV was with the following stacktrace: (gdb) bt 3 0x0000000010f05b4d in rd_kafka_offset_store (app_rkt=0x0, partition=0, offset=0) at ../contrib/librdkafka/src/rdkafka_offset.c:656 4 0x0000000010e69657 in cppkafka::Consumer::store_offset (this=0x7f2015210820, msg=...) at ../contrib/cppkafka/include/cppkafka/message.h:225 5 0x000000000e68f208 in DB::ReadBufferFromKafkaConsumer::storeLastReadMessageOffset (this=0x7f206a136618) at ../contrib/libcxx/include/iterator:1508 6 0x000000000e68b207 in DB::KafkaBlockInputStream::readImpl (this=0x7f202c689020) at ../src/Storages/Kafka/KafkaBlockInputStream.cpp:150 7 0x000000000dd1178d in DB::IBlockInputStream::read (this=this@entry=0x7f202c689020) at ../src/DataStreams/IBlockInputStream.cpp:60 8 0x000000000dd34c0a in DB::copyDataImpl<> () at ../src/DataStreams/copyData.cpp:21 9 DB::copyData () at ../src/DataStreams/copyData.cpp:62 10 0x000000000e67c8f2 in DB::StorageKafka::streamToViews () at ../contrib/libcxx/include/memory:3823 11 0x000000000e67d218 in DB::StorageKafka::threadFunc () at ../src/Storages/Kafka/StorageKafka.cpp:488 And some information from it: (gdb) p this.current.__i $14 = (std::__1::__wrap_iter<cppkafka::Message const*>::iterator_type) 0x7f1ca8f58660 # current-1 (gdb) p $14-1 $15 = (const cppkafka::Message *) 0x7f1ca8f58600 (gdb) p $16.handle_ $17 = {__ptr_ = {<std::__1::__compressed_pair_elem<rd_kafka_message_s*, 0, false>> = { __value_ = 0x7f203577f938}, ...} (gdb) p *(rd_kafka_message_s*)0x7f203577f938 $24 = {err = RD_KAFKA_RESP_ERR__TRANSPORT, rkt = 0x0, partition = 0, payload = 0x7f202f0339c0, len = 63, key = 0x0, key_len = 0, offset = 0, _private = 0x7f203577f8c0} # current (gdb) p $14-0 $28 = (const cppkafka::Message *) 0x7f1ca8f58660 (gdb) p $28.handle_.__ptr_ $29 = {<std::__1::__compressed_pair_elem<rd_kafka_message_s*, 0, false>> = { __value_ = 0x7f184f129bf0}, ...} (gdb) p *(rd_kafka_message_s*)0x7f184f129bf0 $30 = {err = RD_KAFKA_RESP_ERR_NO_ERROR, rkt = 0x7f1ed44fe000, partition = 1, payload = 0x7f1fc9bc6036, len = 242, key = 0x0, key_len = 0, offset = 2394853582209, # current+1 (gdb) p (*($14+1)).handle_.__ptr_ $44 = {<std::__1::__compressed_pair_elem<rd_kafka_message_s*, 0, false>> = { __value_ = 0x7f184f129d30}, ...} (gdb) p *(rd_kafka_message_s*)0x7f184f129d30 $45 = {err = RD_KAFKA_RESP_ERR_NO_ERROR, rkt = 0x7f1ed44fe000, partition = 1, payload = 0x7f1fc9bc612f, len = 31, key = 0x0, key_len = 0, offset = 2394853582210, _private = 0x7f184f129cc0} # distance from the beginning (gdb) p messages.__end_-messages.__begin_ $34 = 65536 (gdb) p ($14-0)-messages.__begin_ $37 = 8965 (gdb) p ($14-1)-messages.__begin_ $38 = 8964 # parsing info (gdb) p allowed $39 = false (gdb) p new_rows $40 = 1 (gdb) p total_rows $41 = 8964 # current buffer is invalid (gdb) p *buffer.__ptr_ $50 = {<DB::ReadBuffer> = {<DB::BufferBase> = {pos = 0x7f202f0339c0 "FindCoordinator response error: Local: Broker transport failure", bytes = 47904863385, working_buffer = { begin_pos = 0x7f202f0339c0 "FindCoordinator response error: Local: Broker transport failure", end_pos = 0x7f202f0339c0 "FindCoordinator response error: Local: Broker transport failure"}, internal_buffer = { v0: check message errors in ReadBufferFromKafkaConsumer::nextImpl() (but this may lead to using of that messages after and SIGSEGV again, doh). v2: skip messages with an error after poll.
2020-07-08 20:26:26 +00:00
size_t ReadBufferFromKafkaConsumer::filterMessageErrors()
{
assert(current == messages.begin());
auto new_end = std::remove_if(messages.begin(), messages.end(), [this](auto & message)
{
if (auto error = message.get_error())
{
LOG_ERROR(log, "Consumer error: {}", error);
return true;
}
return false;
});
2020-07-30 19:33:20 +00:00
size_t skipped = std::distance(new_end, messages.end());
kafka: fix SIGSEGV if there is an message with error in the middle of the batch ReadBufferFromKafkaConsumer does not handle the case when there is message with an error on non first position in the current batch, since it goes through messages in the batch after poll and stop on first valid message. But later it can try to use message as valid: - while storing offset - get topic name - ... And besides the message itself is also invalid (you can find this in the gdb traces below). So just filter out messages win an error error after poll. SIGSEGV was with the following stacktrace: (gdb) bt 3 0x0000000010f05b4d in rd_kafka_offset_store (app_rkt=0x0, partition=0, offset=0) at ../contrib/librdkafka/src/rdkafka_offset.c:656 4 0x0000000010e69657 in cppkafka::Consumer::store_offset (this=0x7f2015210820, msg=...) at ../contrib/cppkafka/include/cppkafka/message.h:225 5 0x000000000e68f208 in DB::ReadBufferFromKafkaConsumer::storeLastReadMessageOffset (this=0x7f206a136618) at ../contrib/libcxx/include/iterator:1508 6 0x000000000e68b207 in DB::KafkaBlockInputStream::readImpl (this=0x7f202c689020) at ../src/Storages/Kafka/KafkaBlockInputStream.cpp:150 7 0x000000000dd1178d in DB::IBlockInputStream::read (this=this@entry=0x7f202c689020) at ../src/DataStreams/IBlockInputStream.cpp:60 8 0x000000000dd34c0a in DB::copyDataImpl<> () at ../src/DataStreams/copyData.cpp:21 9 DB::copyData () at ../src/DataStreams/copyData.cpp:62 10 0x000000000e67c8f2 in DB::StorageKafka::streamToViews () at ../contrib/libcxx/include/memory:3823 11 0x000000000e67d218 in DB::StorageKafka::threadFunc () at ../src/Storages/Kafka/StorageKafka.cpp:488 And some information from it: (gdb) p this.current.__i $14 = (std::__1::__wrap_iter<cppkafka::Message const*>::iterator_type) 0x7f1ca8f58660 # current-1 (gdb) p $14-1 $15 = (const cppkafka::Message *) 0x7f1ca8f58600 (gdb) p $16.handle_ $17 = {__ptr_ = {<std::__1::__compressed_pair_elem<rd_kafka_message_s*, 0, false>> = { __value_ = 0x7f203577f938}, ...} (gdb) p *(rd_kafka_message_s*)0x7f203577f938 $24 = {err = RD_KAFKA_RESP_ERR__TRANSPORT, rkt = 0x0, partition = 0, payload = 0x7f202f0339c0, len = 63, key = 0x0, key_len = 0, offset = 0, _private = 0x7f203577f8c0} # current (gdb) p $14-0 $28 = (const cppkafka::Message *) 0x7f1ca8f58660 (gdb) p $28.handle_.__ptr_ $29 = {<std::__1::__compressed_pair_elem<rd_kafka_message_s*, 0, false>> = { __value_ = 0x7f184f129bf0}, ...} (gdb) p *(rd_kafka_message_s*)0x7f184f129bf0 $30 = {err = RD_KAFKA_RESP_ERR_NO_ERROR, rkt = 0x7f1ed44fe000, partition = 1, payload = 0x7f1fc9bc6036, len = 242, key = 0x0, key_len = 0, offset = 2394853582209, # current+1 (gdb) p (*($14+1)).handle_.__ptr_ $44 = {<std::__1::__compressed_pair_elem<rd_kafka_message_s*, 0, false>> = { __value_ = 0x7f184f129d30}, ...} (gdb) p *(rd_kafka_message_s*)0x7f184f129d30 $45 = {err = RD_KAFKA_RESP_ERR_NO_ERROR, rkt = 0x7f1ed44fe000, partition = 1, payload = 0x7f1fc9bc612f, len = 31, key = 0x0, key_len = 0, offset = 2394853582210, _private = 0x7f184f129cc0} # distance from the beginning (gdb) p messages.__end_-messages.__begin_ $34 = 65536 (gdb) p ($14-0)-messages.__begin_ $37 = 8965 (gdb) p ($14-1)-messages.__begin_ $38 = 8964 # parsing info (gdb) p allowed $39 = false (gdb) p new_rows $40 = 1 (gdb) p total_rows $41 = 8964 # current buffer is invalid (gdb) p *buffer.__ptr_ $50 = {<DB::ReadBuffer> = {<DB::BufferBase> = {pos = 0x7f202f0339c0 "FindCoordinator response error: Local: Broker transport failure", bytes = 47904863385, working_buffer = { begin_pos = 0x7f202f0339c0 "FindCoordinator response error: Local: Broker transport failure", end_pos = 0x7f202f0339c0 "FindCoordinator response error: Local: Broker transport failure"}, internal_buffer = { v0: check message errors in ReadBufferFromKafkaConsumer::nextImpl() (but this may lead to using of that messages after and SIGSEGV again, doh). v2: skip messages with an error after poll.
2020-07-08 20:26:26 +00:00
if (skipped)
{
LOG_ERROR(log, "There were {} messages with an error", skipped);
messages.erase(new_end, messages.end());
}
return skipped;
}
void ReadBufferFromKafkaConsumer::resetIfStopped()
{
// we can react on stop only during fetching data
2020-08-08 00:47:03 +00:00
// after block is formed (i.e. during copying data to MV / committing) we ignore stop attempts
if (stopped)
{
stalled_status = CONSUMER_STOPPED;
cleanUnprocessed();
}
}
/// Do commit messages implicitly after we processed the previous batch.
bool ReadBufferFromKafkaConsumer::nextImpl()
{
if (!allowed || !hasMorePolledMessages())
return false;
// XXX: very fishy place with const casting.
2020-04-22 05:39:31 +00:00
auto * new_position = reinterpret_cast<char *>(const_cast<unsigned char *>(current->get_payload().get_data()));
BufferBase::set(new_position, current->get_payload().get_size(), 0);
allowed = false;
++current;
return true;
}
void ReadBufferFromKafkaConsumer::storeLastReadMessageOffset()
{
if (!isStalled())
{
consumer->store_offset(*(current - 1));
++offsets_stored;
}
}
}