Merge pull request #12991 from ClickHouse/kafka-fix-msan

Fix MSan error in "rdkafka" library #12990
This commit is contained in:
alexey-milovidov 2020-08-02 00:22:44 +03:00 committed by GitHub
commit 566044f9c8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 12 additions and 7 deletions

2
.gitmodules vendored
View File

@ -10,7 +10,7 @@
url = https://github.com/lz4/lz4.git
[submodule "contrib/librdkafka"]
path = contrib/librdkafka
url = https://github.com/edenhill/librdkafka.git
url = https://github.com/ClickHouse-Extras/librdkafka.git
[submodule "contrib/cctz"]
path = contrib/cctz
url = https://github.com/ClickHouse-Extras/cctz.git

2
contrib/librdkafka vendored

@ -1 +1 @@
Subproject commit b0d91bd74abb5f0e1ee972d326a317ad610f6300
Subproject commit 2090cbf56b715247ec2be7f768707a7ab1bf7ede

View File

@ -384,7 +384,8 @@ bool ReadBufferFromKafkaConsumer::poll()
{
messages = std::move(new_messages);
current = messages.begin();
LOG_TRACE(log, "Polled batch of {} messages. Offset position: {}", messages.size(), consumer->get_offsets_position(consumer->get_assignment()));
LOG_TRACE(log, "Polled batch of {} messages. Offsets position: {}",
messages.size(), consumer->get_offsets_position(consumer->get_assignment()));
break;
}
}
@ -416,7 +417,7 @@ size_t ReadBufferFromKafkaConsumer::filterMessageErrors()
return false;
});
size_t skipped = std::distance(messages.end(), new_end);
size_t skipped = std::distance(new_end, messages.end());
if (skipped)
{
LOG_ERROR(log, "There were {} messages with an error", skipped);

View File

@ -664,7 +664,8 @@ def test_kafka_issue11308(kafka_cluster):
FROM test.kafka;
''')
time.sleep(9)
while int(instance.query('SELECT count() FROM test.persistent_kafka')) < 3:
time.sleep(1)
result = instance.query('SELECT * FROM test.persistent_kafka ORDER BY time;')
@ -1431,7 +1432,8 @@ def test_kafka_produce_key_timestamp(kafka_cluster):
instance.query("INSERT INTO test.kafka_writer VALUES ({},{},'{}',toDateTime({})),({},{},'{}',toDateTime({}))".format(3,3,'k3',1577836803,4,4,'k4',1577836804))
instance.query("INSERT INTO test.kafka_writer VALUES ({},{},'{}',toDateTime({}))".format(5,5,'k5',1577836805))
time.sleep(10)
while int(instance.query("SELECT count() FROM test.view")) < 5:
time.sleep(1)
result = instance.query("SELECT * FROM test.view ORDER BY value", ignore_error=True)
@ -1535,7 +1537,9 @@ def test_kafka_flush_by_block_size(kafka_cluster):
messages.append(json.dumps({'key': 0, 'value': 0}))
kafka_produce('flush_by_block_size', messages)
time.sleep(1)
# Wait for Kafka engine to consume this data
while 1 != int(instance.query("SELECT count() FROM system.parts WHERE database = 'test' AND table = 'view' AND name = 'all_1_1_0'")):
time.sleep(1)
# TODO: due to https://github.com/ClickHouse/ClickHouse/issues/11216
# second flush happens earlier than expected, so we have 2 parts here instead of one