diff --git a/.gitmodules b/.gitmodules index 23a93ea6652..6a240f298ad 100644 --- a/.gitmodules +++ b/.gitmodules @@ -10,7 +10,7 @@ url = https://github.com/lz4/lz4.git [submodule "contrib/librdkafka"] path = contrib/librdkafka - url = https://github.com/edenhill/librdkafka.git + url = https://github.com/ClickHouse-Extras/librdkafka.git [submodule "contrib/cctz"] path = contrib/cctz url = https://github.com/ClickHouse-Extras/cctz.git diff --git a/contrib/librdkafka b/contrib/librdkafka index b0d91bd74ab..2090cbf56b7 160000 --- a/contrib/librdkafka +++ b/contrib/librdkafka @@ -1 +1 @@ -Subproject commit b0d91bd74abb5f0e1ee972d326a317ad610f6300 +Subproject commit 2090cbf56b715247ec2be7f768707a7ab1bf7ede diff --git a/src/Storages/Kafka/ReadBufferFromKafkaConsumer.cpp b/src/Storages/Kafka/ReadBufferFromKafkaConsumer.cpp index bbd93bc5aad..2498b719648 100644 --- a/src/Storages/Kafka/ReadBufferFromKafkaConsumer.cpp +++ b/src/Storages/Kafka/ReadBufferFromKafkaConsumer.cpp @@ -384,7 +384,8 @@ bool ReadBufferFromKafkaConsumer::poll() { messages = std::move(new_messages); current = messages.begin(); - LOG_TRACE(log, "Polled batch of {} messages. Offset position: {}", messages.size(), consumer->get_offsets_position(consumer->get_assignment())); + LOG_TRACE(log, "Polled batch of {} messages. Offsets position: {}", + messages.size(), consumer->get_offsets_position(consumer->get_assignment())); break; } } @@ -416,7 +417,7 @@ size_t ReadBufferFromKafkaConsumer::filterMessageErrors() return false; }); - size_t skipped = std::distance(messages.end(), new_end); + size_t skipped = std::distance(new_end, messages.end()); if (skipped) { LOG_ERROR(log, "There were {} messages with an error", skipped); diff --git a/tests/integration/test_storage_kafka/test.py b/tests/integration/test_storage_kafka/test.py index 75270512fc4..5d23a9cfa40 100644 --- a/tests/integration/test_storage_kafka/test.py +++ b/tests/integration/test_storage_kafka/test.py @@ -664,7 +664,8 @@ def test_kafka_issue11308(kafka_cluster): FROM test.kafka; ''') - time.sleep(9) + while int(instance.query('SELECT count() FROM test.persistent_kafka')) < 3: + time.sleep(1) result = instance.query('SELECT * FROM test.persistent_kafka ORDER BY time;') @@ -1431,7 +1432,8 @@ def test_kafka_produce_key_timestamp(kafka_cluster): instance.query("INSERT INTO test.kafka_writer VALUES ({},{},'{}',toDateTime({})),({},{},'{}',toDateTime({}))".format(3,3,'k3',1577836803,4,4,'k4',1577836804)) instance.query("INSERT INTO test.kafka_writer VALUES ({},{},'{}',toDateTime({}))".format(5,5,'k5',1577836805)) - time.sleep(10) + while int(instance.query("SELECT count() FROM test.view")) < 5: + time.sleep(1) result = instance.query("SELECT * FROM test.view ORDER BY value", ignore_error=True) @@ -1535,7 +1537,9 @@ def test_kafka_flush_by_block_size(kafka_cluster): messages.append(json.dumps({'key': 0, 'value': 0})) kafka_produce('flush_by_block_size', messages) - time.sleep(1) + # Wait for Kafka engine to consume this data + while 1 != int(instance.query("SELECT count() FROM system.parts WHERE database = 'test' AND table = 'view' AND name = 'all_1_1_0'")): + time.sleep(1) # TODO: due to https://github.com/ClickHouse/ClickHouse/issues/11216 # second flush happens earlier than expected, so we have 2 parts here instead of one