From d4afc36f8e96f9447968c661a5213f51b7a1575e Mon Sep 17 00:00:00 2001 From: tchepavel Date: Wed, 25 May 2022 22:02:47 +0300 Subject: [PATCH] Style+build fixes; make long tests time limited --- .../NATS/ReadBufferFromNATSConsumer.cpp | 9 ++--- .../NATS/ReadBufferFromNATSConsumer.h | 2 +- src/Storages/NATS/StorageNATS.cpp | 3 +- tests/integration/test_storage_nats/test.py | 33 ++++++++++--------- 4 files changed, 25 insertions(+), 22 deletions(-) diff --git a/src/Storages/NATS/ReadBufferFromNATSConsumer.cpp b/src/Storages/NATS/ReadBufferFromNATSConsumer.cpp index e8abe9a5ed4..55c3c18677d 100644 --- a/src/Storages/NATS/ReadBufferFromNATSConsumer.cpp +++ b/src/Storages/NATS/ReadBufferFromNATSConsumer.cpp @@ -94,10 +94,11 @@ void ReadBufferFromNATSConsumer::onMsg(natsConnection *, natsSubscription *, nat if (buffer->row_delimiter != '\0') message_received += buffer->row_delimiter; - if (!buffer->received.push({ - .message = std::move(message_received), - .subject = std::move(subject), - })) + MessageData data = { + .message = message_received, + .subject = subject, + }; + if (!buffer->received.push(std::move(data))) throw Exception(ErrorCodes::LOGICAL_ERROR, "Could not push to received queue"); } diff --git a/src/Storages/NATS/ReadBufferFromNATSConsumer.h b/src/Storages/NATS/ReadBufferFromNATSConsumer.h index 6f2ee071c81..dd38c08224d 100644 --- a/src/Storages/NATS/ReadBufferFromNATSConsumer.h +++ b/src/Storages/NATS/ReadBufferFromNATSConsumer.h @@ -48,7 +48,7 @@ public: private: bool nextImpl() override; - static void onMsg(natsConnection * nc, natsSubscription * sub, natsMsg * msg, void * closure); + static void onMsg(natsConnection * nc, natsSubscription * sub, natsMsg * msg, void * consumer); std::shared_ptr connection; std::vector subscriptions; diff --git a/src/Storages/NATS/StorageNATS.cpp b/src/Storages/NATS/StorageNATS.cpp index 74791fddd47..0f14044df7f 100644 --- a/src/Storages/NATS/StorageNATS.cpp +++ b/src/Storages/NATS/StorageNATS.cpp @@ -476,7 +476,8 @@ ProducerBufferPtr StorageNATS::createWriteBuffer(const std::string & subject) row_delimiter ? std::optional{row_delimiter} : std::nullopt, 1, 1024); } -bool StorageNATS::isSubjectInSubscriptions(const std::string & subject) { +bool StorageNATS::isSubjectInSubscriptions(const std::string & subject) +{ auto subject_levels = parseList(subject, '.'); for (const auto & nats_subject : subjects) diff --git a/tests/integration/test_storage_nats/test.py b/tests/integration/test_storage_nats/test.py index a29a72ea909..23b6817410c 100644 --- a/tests/integration/test_storage_nats/test.py +++ b/tests/integration/test_storage_nats/test.py @@ -827,6 +827,11 @@ def test_nats_many_inserts(nats_cluster): nats_subjects = 'many_inserts', nats_format = 'TSV', nats_row_delimiter = '\\n'; + CREATE TABLE test.view_many (key UInt64, value UInt64) + ENGINE = MergeTree + ORDER BY key; + CREATE MATERIALIZED VIEW test.consumer_many TO test.view_many AS + SELECT * FROM test.nats_consume; """ ) while not check_table_is_ready(instance, "test.nats_consume"): @@ -858,23 +863,16 @@ def test_nats_many_inserts(nats_cluster): time.sleep(random.uniform(0, 1)) thread.start() - instance.query( - """ - CREATE TABLE test.view_many (key UInt64, value UInt64) - ENGINE = MergeTree - ORDER BY key; - CREATE MATERIALIZED VIEW test.consumer_many TO test.view_many AS - SELECT * FROM test.nats_consume; - """ - ) - for thread in threads: thread.join() - while True: + time_limit_sec = 300 + deadline = time.monotonic() + time_limit_sec + + while time.monotonic() < deadline: result = instance.query("SELECT count() FROM test.view_many") print(result, messages_num * threads_num) - if int(result) == messages_num * threads_num: + if int(result) >= messages_num * threads_num: break time.sleep(1) @@ -889,7 +887,7 @@ def test_nats_many_inserts(nats_cluster): assert ( int(result) == messages_num * threads_num - ), "ClickHouse lost some messages: {}".format(result) + ), "ClickHouse lost some messages or got duplicated ones. Total count: {}".format(result) def test_nats_overloaded_insert(nats_cluster): @@ -952,10 +950,13 @@ def test_nats_overloaded_insert(nats_cluster): time.sleep(random.uniform(0, 1)) thread.start() - while True: + time_limit_sec = 300 + deadline = time.monotonic() + time_limit_sec + + while time.monotonic() < deadline: result = instance.query("SELECT count() FROM test.view_overload") time.sleep(1) - if int(result) == messages_num * threads_num: + if int(result) >= messages_num * threads_num: break instance.query( @@ -972,7 +973,7 @@ def test_nats_overloaded_insert(nats_cluster): assert ( int(result) == messages_num * threads_num - ), "ClickHouse lost some messages: {}".format(result) + ), "ClickHouse lost some messages or got duplicated ones. Total count: {}".format(result) def test_nats_virtual_column(nats_cluster):