Fix for the hang during deletion of engine=Kafka (one more time) (#11145)

* Added flag for safer rdkafka destruction, but more testing detected another hang case, which has no straigt solutions and can be workarounded currenly only by draining the consumer queue, so destructor is back

* After review fixes

* After review fixes2

* After review fixes3
This commit is contained in:
filimonov 2020-05-25 23:13:10 +02:00 committed by GitHub
parent 30d0e7759f
commit 73a0ef6c0e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 129 additions and 5 deletions

2
contrib/cppkafka vendored

@ -1 +1 @@
Subproject commit 9b184d881c15cc50784b28688c7c99d3d764db24 Subproject commit f555ee36aaa74d17ca0dab3ce472070a610b2966

View File

@ -15,6 +15,7 @@ namespace ErrorCodes
using namespace std::chrono_literals; using namespace std::chrono_literals;
const auto MAX_TIME_TO_WAIT_FOR_ASSIGNMENT_MS = 15000; const auto MAX_TIME_TO_WAIT_FOR_ASSIGNMENT_MS = 15000;
const auto DRAIN_TIMEOUT_MS = 5000ms;
ReadBufferFromKafkaConsumer::ReadBufferFromKafkaConsumer( ReadBufferFromKafkaConsumer::ReadBufferFromKafkaConsumer(
@ -80,9 +81,72 @@ ReadBufferFromKafkaConsumer::ReadBufferFromKafkaConsumer(
}); });
} }
// NOTE on removed desctuctor: There is no need to unsubscribe prior to calling rd_kafka_consumer_close(). ReadBufferFromKafkaConsumer::~ReadBufferFromKafkaConsumer()
// check: https://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md#termination {
// manual destruction was source of weird errors (hangs during droping kafka table, etc.) try
{
if (!consumer->get_subscription().empty())
{
try
{
consumer->unsubscribe();
}
catch (const cppkafka::HandleException & e)
{
LOG_ERROR(log, "Error during unsubscribe: " << e.what());
}
drain();
}
}
catch (const cppkafka::HandleException & e)
{
LOG_ERROR(log, "Error while destructing consumer: " << e.what());
}
}
// Needed to drain rest of the messages / queued callback calls from the consumer
// after unsubscribe, otherwise consumer will hang on destruction
// see https://github.com/edenhill/librdkafka/issues/2077
// https://github.com/confluentinc/confluent-kafka-go/issues/189 etc.
void ReadBufferFromKafkaConsumer::drain()
{
auto start_time = std::chrono::steady_clock::now();
cppkafka::Error last_error(RD_KAFKA_RESP_ERR_NO_ERROR);
while (true)
{
auto msg = consumer->poll(100ms);
if (!msg)
break;
auto error = msg.get_error();
if (error)
{
if (msg.is_eof() || error == last_error)
{
break;
}
else
{
LOG_ERROR(log, "Error during draining: " << error);
}
}
// i don't stop draining on first error,
// only if it repeats once again sequentially
last_error = error;
auto ts = std::chrono::steady_clock::now();
if (std::chrono::duration_cast<std::chrono::milliseconds>(ts-start_time) > DRAIN_TIMEOUT_MS)
{
LOG_ERROR(log, "Timeout during draining.");
break;
}
}
}
void ReadBufferFromKafkaConsumer::commit() void ReadBufferFromKafkaConsumer::commit()
{ {

View File

@ -28,7 +28,7 @@ public:
const std::atomic<bool> & stopped_, const std::atomic<bool> & stopped_,
const Names & _topics const Names & _topics
); );
~ReadBufferFromKafkaConsumer() override;
void allowNext() { allowed = true; } // Allow to read next message. void allowNext() { allowed = true; } // Allow to read next message.
void commit(); // Commit all processed messages. void commit(); // Commit all processed messages.
void subscribe(); // Subscribe internal consumer to topics. void subscribe(); // Subscribe internal consumer to topics.
@ -75,6 +75,8 @@ private:
cppkafka::TopicPartitionList assignment; cppkafka::TopicPartitionList assignment;
const Names topics; const Names topics;
void drain();
bool nextImpl() override; bool nextImpl() override;
}; };

View File

@ -293,6 +293,7 @@ ConsumerBufferPtr StorageKafka::createReadBuffer()
// Create a consumer and subscribe to topics // Create a consumer and subscribe to topics
auto consumer = std::make_shared<cppkafka::Consumer>(conf); auto consumer = std::make_shared<cppkafka::Consumer>(conf);
consumer->set_destroy_flags(RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE);
// Limit the number of batched messages to allow early cancellations // Limit the number of batched messages to allow early cancellations
const Settings & settings = global_context.getSettingsRef(); const Settings & settings = global_context.getSettingsRef();

View File

@ -246,6 +246,50 @@ def test_kafka_consumer_hang(kafka_cluster):
# 'dr'||'op' to avoid self matching # 'dr'||'op' to avoid self matching
assert int(instance.query("select count() from system.processes where position(lower(query),'dr'||'op')>0")) == 0 assert int(instance.query("select count() from system.processes where position(lower(query),'dr'||'op')>0")) == 0
@pytest.mark.timeout(180)
def test_kafka_consumer_hang2(kafka_cluster):
instance.query('''
DROP TABLE IF EXISTS test.kafka;
CREATE TABLE test.kafka (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'consumer_hang2',
kafka_group_name = 'consumer_hang2',
kafka_format = 'JSONEachRow';
CREATE TABLE test.kafka2 (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'consumer_hang2',
kafka_group_name = 'consumer_hang2',
kafka_format = 'JSONEachRow';
''')
# first consumer subscribe the topic, try to poll some data, and go to rest
instance.query('SELECT * FROM test.kafka')
# second consumer do the same leading to rebalance in the first
# consumer, try to poll some data
instance.query('SELECT * FROM test.kafka2')
#echo 'SELECT * FROM test.kafka; SELECT * FROM test.kafka2; DROP TABLE test.kafka;' | clickhouse client -mn &
# kafka_cluster.open_bash_shell('instance')
# first consumer has pending rebalance callback unprocessed (no poll after select)
# one of those queries was failing because of
# https://github.com/edenhill/librdkafka/issues/2077
# https://github.com/edenhill/librdkafka/issues/2898
instance.query('DROP TABLE test.kafka')
instance.query('DROP TABLE test.kafka2')
# from a user perspective: we expect no hanging 'drop' queries
# 'dr'||'op' to avoid self matching
assert int(instance.query("select count() from system.processes where position(lower(query),'dr'||'op')>0")) == 0
@pytest.mark.timeout(180) @pytest.mark.timeout(180)
def test_kafka_csv_with_delimiter(kafka_cluster): def test_kafka_csv_with_delimiter(kafka_cluster):
instance.query(''' instance.query('''
@ -1130,6 +1174,7 @@ def test_kafka_rebalance(kafka_cluster):
print(instance.query('SELECT count(), uniqExact(key), max(key) + 1 FROM test.destination')) print(instance.query('SELECT count(), uniqExact(key), max(key) + 1 FROM test.destination'))
# Some queries to debug...
# SELECT * FROM test.destination where key in (SELECT key FROM test.destination group by key having count() <> 1) # SELECT * FROM test.destination where key in (SELECT key FROM test.destination group by key having count() <> 1)
# select number + 1 as key from numbers(4141) left join test.destination using (key) where test.destination.key = 0; # select number + 1 as key from numbers(4141) left join test.destination using (key) where test.destination.key = 0;
# SELECT * FROM test.destination WHERE key between 2360 and 2370 order by key; # SELECT * FROM test.destination WHERE key between 2360 and 2370 order by key;
@ -1137,6 +1182,18 @@ def test_kafka_rebalance(kafka_cluster):
# select toUInt64(0) as _partition, number + 1 as _offset from numbers(400) left join test.destination using (_partition,_offset) where test.destination.key = 0 order by _offset; # select toUInt64(0) as _partition, number + 1 as _offset from numbers(400) left join test.destination using (_partition,_offset) where test.destination.key = 0 order by _offset;
# SELECT * FROM test.destination WHERE _partition = 0 and _offset between 220 and 240 order by _offset; # SELECT * FROM test.destination WHERE _partition = 0 and _offset between 220 and 240 order by _offset;
# CREATE TABLE test.reference (key UInt64, value UInt64) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka1:19092',
# kafka_topic_list = 'topic_with_multiple_partitions',
# kafka_group_name = 'rebalance_test_group_reference',
# kafka_format = 'JSONEachRow',
# kafka_max_block_size = 100000;
#
# CREATE MATERIALIZED VIEW test.reference_mv Engine=Log AS
# SELECT key, value, _topic,_key,_offset, _partition, _timestamp, 'reference' as _consumed_by
# FROM test.reference;
#
# select * from test.reference_mv left join test.destination using (key,_topic,_offset,_partition) where test.destination._consumed_by = '';
result = int(instance.query('SELECT count() == uniqExact(key) FROM test.destination')) result = int(instance.query('SELECT count() == uniqExact(key) FROM test.destination'))
for consumer_index in range(NUMBER_OF_CONSURRENT_CONSUMERS): for consumer_index in range(NUMBER_OF_CONSURRENT_CONSUMERS):