Fix kafka failover issue (#21118).

Distringuish lack of assignment vs lack of partitions. Bit better / clearer logging.
This commit is contained in:
Mikhail Filimonov 2021-02-27 00:26:01 +01:00
parent 82b8d45cd7
commit 3627ab7cbe
No known key found for this signature in database
GPG Key ID: 6E49C2E9AF1220BE
4 changed files with 166 additions and 9 deletions

2
contrib/cppkafka vendored

@ -1 +1 @@
Subproject commit 57a599d99c540e647bcd0eb9ea77c523cca011b3
Subproject commit 5a119f689f8a4d90d10a9635e7ee2bee5c127de1

View File

@ -42,7 +42,15 @@ ReadBufferFromKafkaConsumer::ReadBufferFromKafkaConsumer(
// called (synchronously, during poll) when we enter the consumer group
consumer->set_assignment_callback([this](const cppkafka::TopicPartitionList & topic_partitions)
{
LOG_TRACE(log, "Topics/partitions assigned: {}", topic_partitions);
if (topic_partitions.empty())
{
LOG_INFO(log, "Got empty assignment: Not enough partitions in the topic for all consumers?");
}
else
{
LOG_TRACE(log, "Topics/partitions assigned: {}", topic_partitions);
}
assignment = topic_partitions;
});
@ -63,7 +71,7 @@ ReadBufferFromKafkaConsumer::ReadBufferFromKafkaConsumer(
cleanUnprocessed();
stalled_status = REBALANCE_HAPPENED;
assignment.clear();
assignment.reset();
waited_for_assignment = 0;
// for now we use slower (but reliable) sync commit in main loop, so no need to repeat
@ -232,7 +240,16 @@ void ReadBufferFromKafkaConsumer::commit()
void ReadBufferFromKafkaConsumer::subscribe()
{
LOG_TRACE(log, "Already subscribed to topics: [{}]", boost::algorithm::join(consumer->get_subscription(), ", "));
LOG_TRACE(log, "Already assigned to: {}", assignment);
if (assignment.has_value())
{
LOG_TRACE(log, "Already assigned to: {}", assignment.value());
}
else
{
LOG_TRACE(log, "No assignment");
}
size_t max_retries = 5;
@ -295,7 +312,7 @@ void ReadBufferFromKafkaConsumer::unsubscribe()
void ReadBufferFromKafkaConsumer::resetToLastCommitted(const char * msg)
{
if (assignment.empty())
if (!assignment.has_value() || assignment->empty())
{
LOG_TRACE(log, "Not assignned. Can't reset to last committed position.");
return;
@ -360,7 +377,7 @@ bool ReadBufferFromKafkaConsumer::poll()
{
// While we wait for an assignment after subscription, we'll poll zero messages anyway.
// If we're doing a manual select then it's better to get something after a wait, then immediate nothing.
if (assignment.empty())
if (!assignment.has_value())
{
waited_for_assignment += poll_timeout; // slightly innaccurate, but rough calculation is ok.
if (waited_for_assignment < MAX_TIME_TO_WAIT_FOR_ASSIGNMENT_MS)
@ -369,11 +386,15 @@ bool ReadBufferFromKafkaConsumer::poll()
}
else
{
LOG_WARNING(log, "Can't get assignment. It can be caused by some issue with consumer group (not enough partitions?). Will keep trying.");
LOG_WARNING(log, "Can't get assignment. Will keep trying.");
stalled_status = NO_ASSIGNMENT;
return false;
}
}
else if (assignment->empty())
{
LOG_TRACE(log, "Empty assignment.");
return false;
}
else
{

View File

@ -97,7 +97,7 @@ private:
Messages::const_iterator current;
// order is important, need to be destructed before consumer
cppkafka::TopicPartitionList assignment;
std::optional<cppkafka::TopicPartitionList> assignment;
const Names topics;
void drain();

View File

@ -2976,6 +2976,142 @@ def test_kafka_formats_with_broken_message(kafka_cluster):
# print(errors_expected.strip())
assert errors_result.strip() == errors_expected.strip(), 'Proper errors for format: {}'.format(format_name)
def wait_for_new_data(table_name, prev_count = 0, max_retries = 120):
retries = 0
while True:
new_count = int(instance.query("SELECT count() FROM {}".format(table_name)))
print(new_count)
if new_count > prev_count:
return new_count
else:
retries += 1
time.sleep(0.5)
if retries > max_retries:
raise Exception("No new data :(")
@pytest.mark.timeout(120)
def test_kafka_consumer_failover(kafka_cluster):
# for backporting:
# admin_client = KafkaAdminClient(bootstrap_servers="localhost:9092")
admin_client = KafkaAdminClient(bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port))
topic_list = []
topic_list.append(NewTopic(name="kafka_consumer_failover", num_partitions=2, replication_factor=1))
admin_client.create_topics(new_topics=topic_list, validate_only=False)
instance.query('''
DROP TABLE IF EXISTS test.kafka;
DROP TABLE IF EXISTS test.kafka2;
CREATE TABLE test.kafka (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'kafka_consumer_failover',
kafka_group_name = 'kafka_consumer_failover_group',
kafka_format = 'JSONEachRow',
kafka_max_block_size = 1,
kafka_poll_timeout_ms = 200;
CREATE TABLE test.kafka2 (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'kafka_consumer_failover',
kafka_group_name = 'kafka_consumer_failover_group',
kafka_format = 'JSONEachRow',
kafka_max_block_size = 1,
kafka_poll_timeout_ms = 200;
CREATE TABLE test.kafka3 (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'kafka_consumer_failover',
kafka_group_name = 'kafka_consumer_failover_group',
kafka_format = 'JSONEachRow',
kafka_max_block_size = 1,
kafka_poll_timeout_ms = 200;
CREATE TABLE test.destination (
key UInt64,
value UInt64,
_consumed_by LowCardinality(String)
)
ENGINE = MergeTree()
ORDER BY key;
CREATE MATERIALIZED VIEW test.kafka_mv TO test.destination AS
SELECT key, value, 'kafka' as _consumed_by
FROM test.kafka;
CREATE MATERIALIZED VIEW test.kafka2_mv TO test.destination AS
SELECT key, value, 'kafka2' as _consumed_by
FROM test.kafka2;
CREATE MATERIALIZED VIEW test.kafka3_mv TO test.destination AS
SELECT key, value, 'kafka3' as _consumed_by
FROM test.kafka3;
''')
producer = KafkaProducer(bootstrap_servers="localhost:{}".format(cluster.kafka_port), value_serializer=producer_serializer, key_serializer=producer_serializer)
## all 3 attached, 2 working
producer.send(topic='kafka_consumer_failover', value=json.dumps({'key':1,'value': 1}), partition=0)
producer.send(topic='kafka_consumer_failover', value=json.dumps({'key':1,'value': 1}), partition=1)
producer.flush()
prev_count = wait_for_new_data('test.destination')
## 2 attached, 2 working
instance.query('DETACH TABLE test.kafka')
producer.send(topic='kafka_consumer_failover', value=json.dumps({'key':2,'value': 2}), partition=0)
producer.send(topic='kafka_consumer_failover', value=json.dumps({'key':2,'value': 2}), partition=1)
producer.flush()
prev_count = wait_for_new_data('test.destination', prev_count)
## 1 attached, 1 working
instance.query('DETACH TABLE test.kafka2')
producer.send(topic='kafka_consumer_failover', value=json.dumps({'key':3,'value': 3}), partition=0)
producer.send(topic='kafka_consumer_failover', value=json.dumps({'key':3,'value': 3}), partition=1)
producer.flush()
prev_count = wait_for_new_data('test.destination', prev_count)
## 2 attached, 2 working
instance.query('ATTACH TABLE test.kafka')
producer.send(topic='kafka_consumer_failover', value=json.dumps({'key':4,'value': 4}), partition=0)
producer.send(topic='kafka_consumer_failover', value=json.dumps({'key':4,'value': 4}), partition=1)
producer.flush()
prev_count = wait_for_new_data('test.destination', prev_count)
## 1 attached, 1 working
instance.query('DETACH TABLE test.kafka3')
producer.send(topic='kafka_consumer_failover', value=json.dumps({'key':5,'value': 5}), partition=0)
producer.send(topic='kafka_consumer_failover', value=json.dumps({'key':5,'value': 5}), partition=1)
producer.flush()
prev_count = wait_for_new_data('test.destination', prev_count)
## 2 attached, 2 working
instance.query('ATTACH TABLE test.kafka2')
producer.send(topic='kafka_consumer_failover', value=json.dumps({'key':6,'value': 6}), partition=0)
producer.send(topic='kafka_consumer_failover', value=json.dumps({'key':6,'value': 6}), partition=1)
producer.flush()
prev_count = wait_for_new_data('test.destination', prev_count)
## 3 attached, 2 working
instance.query('ATTACH TABLE test.kafka3')
producer.send(topic='kafka_consumer_failover', value=json.dumps({'key':7,'value': 7}), partition=0)
producer.send(topic='kafka_consumer_failover', value=json.dumps({'key':7,'value': 7}), partition=1)
producer.flush()
prev_count = wait_for_new_data('test.destination', prev_count)
## 2 attached, same 2 working
instance.query('DETACH TABLE test.kafka3')
producer.send(topic='kafka_consumer_failover', value=json.dumps({'key':8,'value': 8}), partition=0)
producer.send(topic='kafka_consumer_failover', value=json.dumps({'key':8,'value': 8}), partition=1)
producer.flush()
prev_count = wait_for_new_data('test.destination', prev_count)
if __name__ == '__main__':
cluster.start()
input("Cluster created, press any key to destroy...")