Style+build fixes; make long tests time limited

This commit is contained in:
tchepavel 2022-05-25 22:02:47 +03:00
parent 0eae83aebc
commit d4afc36f8e
4 changed files with 25 additions and 22 deletions

View File

@ -94,10 +94,11 @@ void ReadBufferFromNATSConsumer::onMsg(natsConnection *, natsSubscription *, nat
if (buffer->row_delimiter != '\0') if (buffer->row_delimiter != '\0')
message_received += buffer->row_delimiter; message_received += buffer->row_delimiter;
if (!buffer->received.push({ MessageData data = {
.message = std::move(message_received), .message = message_received,
.subject = std::move(subject), .subject = subject,
})) };
if (!buffer->received.push(std::move(data)))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Could not push to received queue"); throw Exception(ErrorCodes::LOGICAL_ERROR, "Could not push to received queue");
} }

View File

@ -48,7 +48,7 @@ public:
private: private:
bool nextImpl() override; bool nextImpl() override;
static void onMsg(natsConnection * nc, natsSubscription * sub, natsMsg * msg, void * closure); static void onMsg(natsConnection * nc, natsSubscription * sub, natsMsg * msg, void * consumer);
std::shared_ptr<NATSConnectionManager> connection; std::shared_ptr<NATSConnectionManager> connection;
std::vector<SubscriptionPtr> subscriptions; std::vector<SubscriptionPtr> subscriptions;

View File

@ -476,7 +476,8 @@ ProducerBufferPtr StorageNATS::createWriteBuffer(const std::string & subject)
row_delimiter ? std::optional<char>{row_delimiter} : std::nullopt, 1, 1024); row_delimiter ? std::optional<char>{row_delimiter} : std::nullopt, 1, 1024);
} }
bool StorageNATS::isSubjectInSubscriptions(const std::string & subject) { bool StorageNATS::isSubjectInSubscriptions(const std::string & subject)
{
auto subject_levels = parseList(subject, '.'); auto subject_levels = parseList(subject, '.');
for (const auto & nats_subject : subjects) for (const auto & nats_subject : subjects)

View File

@ -827,6 +827,11 @@ def test_nats_many_inserts(nats_cluster):
nats_subjects = 'many_inserts', nats_subjects = 'many_inserts',
nats_format = 'TSV', nats_format = 'TSV',
nats_row_delimiter = '\\n'; nats_row_delimiter = '\\n';
CREATE TABLE test.view_many (key UInt64, value UInt64)
ENGINE = MergeTree
ORDER BY key;
CREATE MATERIALIZED VIEW test.consumer_many TO test.view_many AS
SELECT * FROM test.nats_consume;
""" """
) )
while not check_table_is_ready(instance, "test.nats_consume"): while not check_table_is_ready(instance, "test.nats_consume"):
@ -858,23 +863,16 @@ def test_nats_many_inserts(nats_cluster):
time.sleep(random.uniform(0, 1)) time.sleep(random.uniform(0, 1))
thread.start() thread.start()
instance.query(
"""
CREATE TABLE test.view_many (key UInt64, value UInt64)
ENGINE = MergeTree
ORDER BY key;
CREATE MATERIALIZED VIEW test.consumer_many TO test.view_many AS
SELECT * FROM test.nats_consume;
"""
)
for thread in threads: for thread in threads:
thread.join() thread.join()
while True: time_limit_sec = 300
deadline = time.monotonic() + time_limit_sec
while time.monotonic() < deadline:
result = instance.query("SELECT count() FROM test.view_many") result = instance.query("SELECT count() FROM test.view_many")
print(result, messages_num * threads_num) print(result, messages_num * threads_num)
if int(result) == messages_num * threads_num: if int(result) >= messages_num * threads_num:
break break
time.sleep(1) time.sleep(1)
@ -889,7 +887,7 @@ def test_nats_many_inserts(nats_cluster):
assert ( assert (
int(result) == messages_num * threads_num int(result) == messages_num * threads_num
), "ClickHouse lost some messages: {}".format(result) ), "ClickHouse lost some messages or got duplicated ones. Total count: {}".format(result)
def test_nats_overloaded_insert(nats_cluster): def test_nats_overloaded_insert(nats_cluster):
@ -952,10 +950,13 @@ def test_nats_overloaded_insert(nats_cluster):
time.sleep(random.uniform(0, 1)) time.sleep(random.uniform(0, 1))
thread.start() thread.start()
while True: time_limit_sec = 300
deadline = time.monotonic() + time_limit_sec
while time.monotonic() < deadline:
result = instance.query("SELECT count() FROM test.view_overload") result = instance.query("SELECT count() FROM test.view_overload")
time.sleep(1) time.sleep(1)
if int(result) == messages_num * threads_num: if int(result) >= messages_num * threads_num:
break break
instance.query( instance.query(
@ -972,7 +973,7 @@ def test_nats_overloaded_insert(nats_cluster):
assert ( assert (
int(result) == messages_num * threads_num int(result) == messages_num * threads_num
), "ClickHouse lost some messages: {}".format(result) ), "ClickHouse lost some messages or got duplicated ones. Total count: {}".format(result)
def test_nats_virtual_column(nats_cluster): def test_nats_virtual_column(nats_cluster):