diff --git a/docs/en/development/build-osx.md b/docs/en/development/build-osx.md index 03abcb3f820..cee2693e345 100644 --- a/docs/en/development/build-osx.md +++ b/docs/en/development/build-osx.md @@ -42,15 +42,14 @@ brew install ccache cmake ninja libtool gettext llvm gcc binutils grep findutils ## Checkout ClickHouse Sources {#checkout-clickhouse-sources} -:::note -The ClickHouse build assumes a case-sensitive file system. Case-insensitive file systems may cause errors during the build process. If necessary, please follow [these instructions](https://brianboyko.medium.com/a-case-sensitive-src-folder-for-mac-programmers-176cc82a3830) to create a new disk image and checkout the code into it. -::: - ``` bash git clone --recursive git@github.com:ClickHouse/ClickHouse.git # ...alternatively, you can use https://github.com/ClickHouse/ClickHouse.git as the repo URL. ``` +Apple uses a case-insensitive file system by default. While this usually does not affect compilation (especially scratch makes will work), it can confuse file operations like `git mv`. +For serious development on macOS, make sure that the source code is stored on a case-sensitive disk volume, e.g. see [these instructions](https://brianboyko.medium.com/a-case-sensitive-src-folder-for-mac-programmers-176cc82a3830). + ## Build ClickHouse {#build-clickhouse} To build using Homebrew's vanilla Clang compiler (the only **recommended** way): diff --git a/tests/integration/test_storage_rabbitmq/test.py b/tests/integration/test_storage_rabbitmq/test.py index 253c140340d..aca815fef4b 100644 --- a/tests/integration/test_storage_rabbitmq/test.py +++ b/tests/integration/test_storage_rabbitmq/test.py @@ -6,6 +6,7 @@ import random import subprocess import threading import time +import uuid from random import randrange import pika @@ -18,6 +19,8 @@ from helpers.test_tools import TSV from . import rabbitmq_pb2 +DEFAULT_TIMEOUT_SEC = 60 + cluster = ClickHouseCluster(__file__) instance = cluster.add_instance( "instance", @@ -173,12 +176,18 @@ def test_rabbitmq_select(rabbitmq_cluster, secure): time.sleep(1) result = "" - while True: + deadline = time.monotonic() + DEFAULT_TIMEOUT_SEC + while time.monotonic() < deadline: result += instance.query( "SELECT * FROM test.rabbitmq ORDER BY key", ignore_error=True ) if rabbitmq_check_result(result): break + time.sleep(0.05) + else: + pytest.fail( + f"Time limit of {DEFAULT_TIMEOUT_SEC} seconds reached. The result did not match the expected value." + ) rabbitmq_check_result(result, True) @@ -244,13 +253,20 @@ def test_rabbitmq_json_without_delimiter(rabbitmq_cluster): time.sleep(1) result = "" - while True: + deadline = time.monotonic() + DEFAULT_TIMEOUT_SEC + while time.monotonic() < deadline: result += instance.query( "SELECT * FROM test.rabbitmq ORDER BY key", ignore_error=True ) if rabbitmq_check_result(result): break + time.sleep(0.05) + else: + pytest.fail( + f"Time limit of {DEFAULT_TIMEOUT_SEC} seconds reached. The result did not match the expected value." + ) + rabbitmq_check_result(result, True) @@ -287,13 +303,20 @@ def test_rabbitmq_csv_with_delimiter(rabbitmq_cluster): time.sleep(1) result = "" - while True: + deadline = time.monotonic() + DEFAULT_TIMEOUT_SEC + while time.monotonic() < deadline: result += instance.query( "SELECT * FROM test.rabbitmq ORDER BY key", ignore_error=True ) if rabbitmq_check_result(result): break + time.sleep(0.05) + else: + pytest.fail( + f"Time limit of {DEFAULT_TIMEOUT_SEC} seconds reached. The result did not match the expected value." + ) + rabbitmq_check_result(result, True) @@ -334,10 +357,16 @@ def test_rabbitmq_tsv_with_delimiter(rabbitmq_cluster): connection.close() result = "" - while True: + deadline = time.monotonic() + DEFAULT_TIMEOUT_SEC + while time.monotonic() < deadline: result = instance.query("SELECT * FROM test.view ORDER BY key") if rabbitmq_check_result(result): break + time.sleep(0.05) + else: + pytest.fail( + f"Time limit of {DEFAULT_TIMEOUT_SEC} seconds reached. The result did not match the expected value." + ) rabbitmq_check_result(result, True) @@ -372,12 +401,18 @@ def test_rabbitmq_macros(rabbitmq_cluster): time.sleep(1) result = "" - while True: + deadline = time.monotonic() + DEFAULT_TIMEOUT_SEC + while time.monotonic() < deadline: result += instance.query( "SELECT * FROM test.rabbitmq ORDER BY key", ignore_error=True ) if rabbitmq_check_result(result): break + time.sleep(0.05) + else: + pytest.fail( + f"Time limit of {DEFAULT_TIMEOUT_SEC} seconds reached. The result did not match the expected value." + ) rabbitmq_check_result(result, True) @@ -428,6 +463,11 @@ def test_rabbitmq_materialized_view(rabbitmq_cluster): result = instance.query("SELECT * FROM test.view ORDER BY key") if rabbitmq_check_result(result): break + time.sleep(0.05) + else: + pytest.fail( + f"Time limit of {time_limit_sec} seconds reached. The result did not match the expected value." + ) rabbitmq_check_result(result, True) @@ -439,6 +479,10 @@ def test_rabbitmq_materialized_view(rabbitmq_cluster): if rabbitmq_check_result(result): break time.sleep(1) + else: + pytest.fail( + f"Time limit of {time_limit_sec} seconds reached. The result did not match the expected value." + ) rabbitmq_check_result(result, True) connection.close() @@ -476,10 +520,17 @@ def test_rabbitmq_materialized_view_with_subquery(rabbitmq_cluster): for message in messages: channel.basic_publish(exchange="mvsq", routing_key="", body=message) - while True: + deadline = time.monotonic() + DEFAULT_TIMEOUT_SEC + + while time.monotonic() < deadline: result = instance.query("SELECT * FROM test.view ORDER BY key") if rabbitmq_check_result(result): break + time.sleep(0.05) + else: + pytest.fail( + f"Time limit of {DEFAULT_TIMEOUT_SEC} seconds reached. The result did not match the expected value." + ) connection.close() rabbitmq_check_result(result, True) @@ -536,7 +587,7 @@ def test_rabbitmq_many_materialized_views(rabbitmq_cluster): channel.basic_publish(exchange="mmv", routing_key="", body=message) is_check_passed = False - deadline = time.monotonic() + 60 + deadline = time.monotonic() + DEFAULT_TIMEOUT_SEC while time.monotonic() < deadline: result1 = instance.query("SELECT * FROM test.view1 ORDER BY key") result2 = instance.query("SELECT * FROM test.view2 ORDER BY key") @@ -610,10 +661,16 @@ def test_rabbitmq_big_message(rabbitmq_cluster): for message in messages: channel.basic_publish(exchange="big", routing_key="", body=message) - while True: + deadline = time.monotonic() + DEFAULT_TIMEOUT_SEC + while time.monotonic() < deadline: result = instance.query("SELECT count() FROM test.view") if int(result) == batch_messages * rabbitmq_messages: break + time.sleep(0.05) + else: + pytest.fail( + f"Time limit of {DEFAULT_TIMEOUT_SEC} seconds reached. The result did not match the expected value." + ) connection.close() @@ -687,13 +744,18 @@ def test_rabbitmq_sharding_between_queues_publish(rabbitmq_cluster): thread.start() result1 = "" - while True: + deadline = time.monotonic() + DEFAULT_TIMEOUT_SEC + expected = messages_num * threads_num + while time.monotonic() < deadline: result1 = instance.query("SELECT count() FROM test.view") time.sleep(1) - expected = messages_num * threads_num if int(result1) == expected: break logging.debug(f"Result {result1} / {expected}") + else: + pytest.fail( + f"Time limit of {DEFAULT_TIMEOUT_SEC} seconds reached. The result did not match the expected value." + ) result2 = instance.query("SELECT count(DISTINCT channel_id) FROM test.view") @@ -778,17 +840,24 @@ def test_rabbitmq_mv_combo(rabbitmq_cluster): time.sleep(random.uniform(0, 1)) thread.start() - while True: + # with threadsanitizer the speed of execution is about 8-13k rows per second. + # so consumption of 1 mln rows will require about 125 seconds + deadline = time.monotonic() + 180 + expected = messages_num * threads_num * NUM_MV + while time.monotonic() < deadline: result = 0 for mv_id in range(NUM_MV): result += int( instance.query("SELECT count() FROM test.combo_{0}".format(mv_id)) ) - expected = messages_num * threads_num * NUM_MV if int(result) == expected: break logging.debug(f"Result: {result} / {expected}") time.sleep(1) + else: + pytest.fail( + f"Time limit of 180 seconds reached. The result did not match the expected value." + ) for thread in threads: thread.join() @@ -840,7 +909,8 @@ def test_rabbitmq_insert(rabbitmq_cluster): values.append("({i}, {i})".format(i=i)) values = ",".join(values) - while True: + deadline = time.monotonic() + DEFAULT_TIMEOUT_SEC + while time.monotonic() < deadline: try: instance.query("INSERT INTO test.rabbitmq VALUES {}".format(values)) break @@ -849,6 +919,10 @@ def test_rabbitmq_insert(rabbitmq_cluster): continue else: raise + else: + pytest.fail( + f"Time limit of {DEFAULT_TIMEOUT_SEC} seconds reached. The query could not be executed successfully." + ) insert_messages = [] @@ -903,7 +977,8 @@ def test_rabbitmq_insert_headers_exchange(rabbitmq_cluster): values.append("({i}, {i})".format(i=i)) values = ",".join(values) - while True: + deadline = time.monotonic() + DEFAULT_TIMEOUT_SEC + while time.monotonic() < deadline: try: instance.query("INSERT INTO test.rabbitmq VALUES {}".format(values)) break @@ -912,6 +987,10 @@ def test_rabbitmq_insert_headers_exchange(rabbitmq_cluster): continue else: raise + else: + pytest.fail( + f"Time limit of {DEFAULT_TIMEOUT_SEC} seconds reached. The query could not be executed successfully." + ) insert_messages = [] @@ -966,7 +1045,8 @@ def test_rabbitmq_many_inserts(rabbitmq_cluster): values = ",".join(values) def insert(): - while True: + deadline = time.monotonic() + DEFAULT_TIMEOUT_SEC + while time.monotonic() < deadline: try: instance.query( "INSERT INTO test.rabbitmq_many VALUES {}".format(values) @@ -977,6 +1057,10 @@ def test_rabbitmq_many_inserts(rabbitmq_cluster): continue else: raise + else: + pytest.fail( + f"Time limit of {DEFAULT_TIMEOUT_SEC} seconds reached. The query could not be executed successfully." + ) threads = [] threads_num = 10 @@ -999,12 +1083,17 @@ def test_rabbitmq_many_inserts(rabbitmq_cluster): for thread in threads: thread.join() - while True: + deadline = time.monotonic() + DEFAULT_TIMEOUT_SEC + while time.monotonic() < deadline: result = instance.query("SELECT count() FROM test.view_many") logging.debug(result, messages_num * threads_num) if int(result) == messages_num * threads_num: break time.sleep(1) + else: + pytest.fail( + f"Time limit of {DEFAULT_TIMEOUT_SEC} seconds reached. The result did not match the expected value." + ) instance.query( """ @@ -1064,7 +1153,8 @@ def test_rabbitmq_overloaded_insert(rabbitmq_cluster): values.append("({i}, {i})".format(i=i)) values = ",".join(values) - while True: + deadline = time.monotonic() + DEFAULT_TIMEOUT_SEC + while time.monotonic() < deadline: try: instance.query( "INSERT INTO test.rabbitmq_overload VALUES {}".format(values) @@ -1075,6 +1165,10 @@ def test_rabbitmq_overloaded_insert(rabbitmq_cluster): continue else: raise + else: + pytest.fail( + f"Time limit of {DEFAULT_TIMEOUT_SEC} seconds reached. The query could not be executed successfully." + ) threads = [] threads_num = 2 @@ -1087,13 +1181,18 @@ def test_rabbitmq_overloaded_insert(rabbitmq_cluster): for thread in threads: thread.join() - while True: + deadline = time.monotonic() + DEFAULT_TIMEOUT_SEC + while time.monotonic() < deadline: result = instance.query("SELECT count() FROM test.view_overload") expected = messages_num * threads_num if int(result) == expected: break logging.debug(f"Result: {result} / {expected}") time.sleep(1) + else: + pytest.fail( + f"Time limit of {DEFAULT_TIMEOUT_SEC} seconds reached. The result did not match the expected value." + ) instance.query( """ @@ -1177,11 +1276,16 @@ def test_rabbitmq_direct_exchange(rabbitmq_cluster): connection.close() - while True: + deadline = time.monotonic() + DEFAULT_TIMEOUT_SEC + while time.monotonic() < deadline: result = instance.query("SELECT count() FROM test.destination") time.sleep(1) if int(result) == messages_num * num_tables: break + else: + pytest.fail( + f"Time limit of {DEFAULT_TIMEOUT_SEC} seconds reached. The result did not match the expected value." + ) for consumer_id in range(num_tables): instance.query( @@ -1265,11 +1369,16 @@ def test_rabbitmq_fanout_exchange(rabbitmq_cluster): connection.close() - while True: + deadline = time.monotonic() + DEFAULT_TIMEOUT_SEC + while time.monotonic() < deadline: result = instance.query("SELECT count() FROM test.destination") time.sleep(1) if int(result) == messages_num * num_tables: break + else: + pytest.fail( + f"Time limit of {DEFAULT_TIMEOUT_SEC} seconds reached. The result did not match the expected value." + ) for consumer_id in range(num_tables): instance.query( @@ -1389,11 +1498,16 @@ def test_rabbitmq_topic_exchange(rabbitmq_cluster): connection.close() - while True: + deadline = time.monotonic() + DEFAULT_TIMEOUT_SEC + while time.monotonic() < deadline: result = instance.query("SELECT count() FROM test.destination") time.sleep(1) if int(result) == messages_num * num_tables + messages_num * num_tables: break + else: + pytest.fail( + f"Time limit of {DEFAULT_TIMEOUT_SEC} seconds reached. The result did not match the expected value." + ) for consumer_id in range(num_tables * 2): instance.query( @@ -1486,11 +1600,16 @@ def test_rabbitmq_hash_exchange(rabbitmq_cluster): thread.start() result1 = "" - while True: + deadline = time.monotonic() + DEFAULT_TIMEOUT_SEC + while time.monotonic() < deadline: result1 = instance.query("SELECT count() FROM test.destination") time.sleep(1) if int(result1) == messages_num * threads_num: break + else: + pytest.fail( + f"Time limit of {DEFAULT_TIMEOUT_SEC} seconds reached. The result did not match the expected value." + ) result2 = instance.query("SELECT count(DISTINCT channel_id) FROM test.destination") @@ -1586,11 +1705,16 @@ def test_rabbitmq_multiple_bindings(rabbitmq_cluster): time.sleep(random.uniform(0, 1)) thread.start() - while True: + deadline = time.monotonic() + DEFAULT_TIMEOUT_SEC + while time.monotonic() < deadline: result = instance.query("SELECT count() FROM test.destination") time.sleep(1) if int(result) == messages_num * threads_num * 5: break + else: + pytest.fail( + f"Time limit of {DEFAULT_TIMEOUT_SEC} seconds reached. The result did not match the expected value." + ) for thread in threads: thread.join() @@ -1699,11 +1823,16 @@ def test_rabbitmq_headers_exchange(rabbitmq_cluster): connection.close() - while True: + deadline = time.monotonic() + DEFAULT_TIMEOUT_SEC + while time.monotonic() < deadline: result = instance.query("SELECT count() FROM test.destination") time.sleep(1) if int(result) == messages_num * num_tables_to_receive: break + else: + pytest.fail( + f"Time limit of {DEFAULT_TIMEOUT_SEC} seconds reached. The result did not match the expected value." + ) for consumer_id in range(num_tables_to_receive + num_tables_to_ignore): instance.query( @@ -1758,11 +1887,16 @@ def test_rabbitmq_virtual_columns(rabbitmq_cluster): for message in messages: channel.basic_publish(exchange="virtuals", routing_key="", body=message) - while True: + deadline = time.monotonic() + DEFAULT_TIMEOUT_SEC + while time.monotonic() < deadline: result = instance.query("SELECT count() FROM test.view") time.sleep(1) if int(result) == message_num: break + else: + pytest.fail( + f"Time limit of {DEFAULT_TIMEOUT_SEC} seconds reached. The result did not match the expected value." + ) connection.close() @@ -1832,11 +1966,16 @@ def test_rabbitmq_virtual_columns_with_materialized_view(rabbitmq_cluster): for message in messages: channel.basic_publish(exchange="virtuals_mv", routing_key="", body=message) - while True: + deadline = time.monotonic() + DEFAULT_TIMEOUT_SEC + while time.monotonic() < deadline: result = instance.query("SELECT count() FROM test.view") time.sleep(1) if int(result) == message_num: break + else: + pytest.fail( + f"Time limit of {DEFAULT_TIMEOUT_SEC} seconds reached. The result did not match the expected value." + ) connection.close() @@ -1937,11 +2076,16 @@ def test_rabbitmq_many_consumers_to_each_queue(rabbitmq_cluster): thread.start() result1 = "" - while True: + deadline = time.monotonic() + DEFAULT_TIMEOUT_SEC + while time.monotonic() < deadline: result1 = instance.query("SELECT count() FROM test.destination") time.sleep(1) if int(result1) == messages_num * threads_num: break + else: + pytest.fail( + f"Time limit of {DEFAULT_TIMEOUT_SEC} seconds reached. The result did not match the expected value." + ) result2 = instance.query("SELECT count(DISTINCT channel_id) FROM test.destination") @@ -2014,7 +2158,8 @@ def test_rabbitmq_restore_failed_connection_without_losses_1(rabbitmq_cluster): values.append("({i}, {i})".format(i=i)) values = ",".join(values) - while True: + deadline = time.monotonic() + 180 + while time.monotonic() < deadline: try: instance.query( "INSERT INTO test.producer_reconnect VALUES {}".format(values) @@ -2025,9 +2170,18 @@ def test_rabbitmq_restore_failed_connection_without_losses_1(rabbitmq_cluster): continue else: raise + else: + pytest.fail( + f"Time limit of 180 seconds reached. The query could not be executed successfully." + ) - while int(instance.query("SELECT count() FROM test.view")) == 0: + deadline = time.monotonic() + 180 + while time.monotonic() < deadline: + if int(instance.query("SELECT count() FROM test.view")) != 0: + break time.sleep(0.1) + else: + pytest.fail(f"Time limit of 180 seconds reached. The count is still 0.") kill_rabbitmq(rabbitmq_cluster.rabbitmq_docker_id) time.sleep(4) @@ -2035,11 +2189,16 @@ def test_rabbitmq_restore_failed_connection_without_losses_1(rabbitmq_cluster): rabbitmq_cluster.rabbitmq_docker_id, rabbitmq_cluster.rabbitmq_cookie ) - while True: + deadline = time.monotonic() + 180 + while time.monotonic() < deadline: result = instance.query("SELECT count(DISTINCT key) FROM test.view") time.sleep(1) if int(result) == messages_num: break + else: + pytest.fail( + f"Time limit of 180 seconds reached. The result did not match the expected value." + ) instance.query( """ @@ -2102,9 +2261,14 @@ def test_rabbitmq_restore_failed_connection_without_losses_2(rabbitmq_cluster): """ ) - while int(instance.query("SELECT count() FROM test.view")) == 0: + deadline = time.monotonic() + 180 + while time.monotonic() < deadline: + if int(instance.query("SELECT count() FROM test.view")) != 0: + break logging.debug(3) time.sleep(0.1) + else: + pytest.fail(f"Time limit of 180 seconds reached. The count is still 0.") kill_rabbitmq(rabbitmq_cluster.rabbitmq_docker_id) time.sleep(8) @@ -2119,12 +2283,17 @@ def test_rabbitmq_restore_failed_connection_without_losses_2(rabbitmq_cluster): # time.sleep(2) # revive_rabbitmq() - while True: + deadline = time.monotonic() + 180 + while time.monotonic() < deadline: result = instance.query("SELECT count(DISTINCT key) FROM test.view").strip() if int(result) == messages_num: break logging.debug(f"Result: {result} / {messages_num}") time.sleep(1) + else: + pytest.fail( + f"Time limit of 180 seconds reached. The result did not match the expected value." + ) instance.query( """ @@ -2182,27 +2351,49 @@ def test_rabbitmq_commit_on_block_write(rabbitmq_cluster): rabbitmq_thread = threading.Thread(target=produce) rabbitmq_thread.start() - while int(instance.query("SELECT count() FROM test.view")) == 0: + deadline = time.monotonic() + DEFAULT_TIMEOUT_SEC + while time.monotonic() < deadline: + if int(instance.query("SELECT count() FROM test.view")) != 0: + break time.sleep(1) + else: + cancel.set() + pytest.fail( + f"Time limit of {DEFAULT_TIMEOUT_SEC} seconds reached. The count is still 0." + ) cancel.set() instance.query("DETACH TABLE test.rabbitmq;") - while ( - int( - instance.query( - "SELECT count() FROM system.tables WHERE database='test' AND name='rabbitmq'" + deadline = time.monotonic() + DEFAULT_TIMEOUT_SEC + while time.monotonic() < deadline: + if ( + int( + instance.query( + "SELECT count() FROM system.tables WHERE database='test' AND name='rabbitmq'" + ) ) - ) - == 1 - ): + != 1 + ): + break time.sleep(1) + else: + pytest.fail( + f"Time limit of {DEFAULT_TIMEOUT_SEC} seconds reached. The table 'rabbitmq' still exists." + ) instance.query("ATTACH TABLE test.rabbitmq;") - while int(instance.query("SELECT uniqExact(key) FROM test.view")) < i[0]: + deadline = time.monotonic() + DEFAULT_TIMEOUT_SEC + while time.monotonic() < deadline: + if int(instance.query("SELECT uniqExact(key) FROM test.view")) >= i[0]: + break time.sleep(1) + else: + pytest.fail( + f"Time limit of {DEFAULT_TIMEOUT_SEC} seconds reached. The uniqExact(key) is still less than {i[0]}." + ) result = int(instance.query("SELECT count() == uniqExact(key) FROM test.view")) @@ -2276,11 +2467,16 @@ def test_rabbitmq_no_connection_at_startup_2(rabbitmq_cluster): ) connection.close() - while True: + deadline = time.monotonic() + DEFAULT_TIMEOUT_SEC + while time.monotonic() < deadline: result = instance.query("SELECT count() FROM test.view") time.sleep(1) if int(result) == messages_num: break + else: + pytest.fail( + f"Time limit of {DEFAULT_TIMEOUT_SEC} seconds reached. The result did not match the expected value." + ) instance.query( """ @@ -2324,10 +2520,16 @@ def test_rabbitmq_format_factory_settings(rabbitmq_cluster): channel.basic_publish(exchange="format_settings", routing_key="", body=message) result = "" - while True: + deadline = time.monotonic() + DEFAULT_TIMEOUT_SEC + while time.monotonic() < deadline: result = instance.query("SELECT date FROM test.format_settings") if result == expected: break + time.sleep(0.05) + else: + pytest.fail( + f"Time limit of {DEFAULT_TIMEOUT_SEC} seconds reached. The result did not match the expected value." + ) instance.query( """ @@ -2340,11 +2542,18 @@ def test_rabbitmq_format_factory_settings(rabbitmq_cluster): ) channel.basic_publish(exchange="format_settings", routing_key="", body=message) + result = "" - while True: + deadline = time.monotonic() + DEFAULT_TIMEOUT_SEC + while time.monotonic() < deadline: result = instance.query("SELECT date FROM test.view") if result == expected: break + time.sleep(0.05) + else: + pytest.fail( + f"Time limit of {DEFAULT_TIMEOUT_SEC} seconds reached. The result did not match the expected value." + ) connection.close() instance.query( @@ -2380,12 +2589,18 @@ def test_rabbitmq_vhost(rabbitmq_cluster): exchange="vhost", routing_key="", body=json.dumps({"key": 1, "value": 2}) ) connection.close() - while True: + deadline = time.monotonic() + DEFAULT_TIMEOUT_SEC + while time.monotonic() < deadline: result = instance.query( "SELECT * FROM test.rabbitmq_vhost ORDER BY key", ignore_error=True ) if result == "1\t2\n": break + time.sleep(0.05) + else: + pytest.fail( + f"Time limit of {DEFAULT_TIMEOUT_SEC} seconds reached. The result did not match the expected value '1\\t2\\n'." + ) def test_rabbitmq_drop_table_properly(rabbitmq_cluster): @@ -2411,12 +2626,18 @@ def test_rabbitmq_drop_table_properly(rabbitmq_cluster): channel.basic_publish( exchange="drop", routing_key="", body=json.dumps({"key": 1, "value": 2}) ) - while True: + deadline = time.monotonic() + DEFAULT_TIMEOUT_SEC + while time.monotonic() < deadline: result = instance.query( "SELECT * FROM test.rabbitmq_drop ORDER BY key", ignore_error=True ) if result == "1\t2\n": break + time.sleep(0.05) + else: + pytest.fail( + f"Time limit of {DEFAULT_TIMEOUT_SEC} seconds reached. The result did not match the expected value '1\\t2\\n'." + ) exists = channel.queue_declare(queue="rabbit_queue_drop", passive=True) assert exists @@ -2472,11 +2693,16 @@ def test_rabbitmq_queue_settings(rabbitmq_cluster): time.sleep(5) - while True: + deadline = time.monotonic() + DEFAULT_TIMEOUT_SEC + while time.monotonic() < deadline: result = instance.query("SELECT count() FROM test.view", ignore_error=True) if int(result) == 10: break time.sleep(0.5) + else: + pytest.fail( + f"Time limit of {DEFAULT_TIMEOUT_SEC} seconds reached. The result did not match the expected value of 10." + ) instance.query("DROP TABLE test.rabbitmq_settings") @@ -2530,11 +2756,16 @@ def test_rabbitmq_queue_consume(rabbitmq_cluster): ) result = "" - while True: + deadline = time.monotonic() + DEFAULT_TIMEOUT_SEC + while time.monotonic() < deadline: result = instance.query("SELECT count() FROM test.view") if int(result) == messages_num * threads_num: break time.sleep(1) + else: + pytest.fail( + f"Time limit of {DEFAULT_TIMEOUT_SEC} seconds reached. The result did not match the expected value." + ) for thread in threads: thread.join() @@ -2662,11 +2893,17 @@ def test_rabbitmq_issue_30691(rabbitmq_cluster): ), ) result = "" - while True: + deadline = time.monotonic() + DEFAULT_TIMEOUT_SEC + while time.monotonic() < deadline: result = instance.query("SELECT * FROM test.rabbitmq_drop", ignore_error=True) logging.debug(result) if result != "": break + time.sleep(0.05) + else: + pytest.fail( + f"Time limit of {DEFAULT_TIMEOUT_SEC} seconds reached. The result is still empty." + ) assert ( result.strip() == """{"event_type": "purge", "as_src": 1234, "as_dst": 0, "as_path": "", "local_pref": 100, "med": 0, "peer_as_dst": 0, "ip_src": "", "ip_dst": "", "port_src": 443, "port_dst": 41930, "ip_proto": "tcp", "tos": 0, "stamp_inserted": "2021-10-26 15:20:00", "stamp_updated": "2021-10-26 15:23:14", "packets": 2, "bytes": 1216, "writer_id": "default_amqp/449206"}""" @@ -2712,13 +2949,19 @@ def test_rabbitmq_drop_mv(rabbitmq_cluster): exchange="mv", routing_key="", body=json.dumps({"key": i, "value": i}) ) - while True: + deadline = time.monotonic() + DEFAULT_TIMEOUT_SEC + while time.monotonic() < deadline: res = instance.query("SELECT COUNT(*) FROM test.view") logging.debug(f"Current count (1): {res}") if int(res) == 20: break else: logging.debug(f"Number of rows in test.view: {res}") + time.sleep(0.05) + else: + pytest.fail( + f"Time limit of {DEFAULT_TIMEOUT_SEC} seconds reached. The row count did not reach 20." + ) instance.query("DROP VIEW test.consumer SYNC") for i in range(20, 40): @@ -2737,12 +2980,17 @@ def test_rabbitmq_drop_mv(rabbitmq_cluster): exchange="mv", routing_key="", body=json.dumps({"key": i, "value": i}) ) - while True: + deadline = time.monotonic() + DEFAULT_TIMEOUT_SEC + while time.monotonic() < deadline: result = instance.query("SELECT count() FROM test.view") logging.debug(f"Current count (2): {result}") if int(result) == 50: break time.sleep(1) + else: + pytest.fail( + f"Time limit of {DEFAULT_TIMEOUT_SEC} seconds reached. The row count did not reach 50." + ) result = instance.query("SELECT * FROM test.view ORDER BY key") rabbitmq_check_result(result, True) @@ -2756,11 +3004,14 @@ def test_rabbitmq_drop_mv(rabbitmq_cluster): connection.close() count = 0 - start = time.time() - while time.time() - start < 30: + deadline = time.monotonic() + 30 + while time.monotonic() < deadline: count = int(instance.query("SELECT count() FROM test.drop_mv")) if count: break + time.sleep(0.05) + else: + pytest.fail(f"Time limit of 30 seconds reached. The count is still 0.") instance.query("DROP TABLE test.drop_mv") assert count > 0 @@ -2850,22 +3101,36 @@ def test_rabbitmq_predefined_configuration(rabbitmq_cluster): channel.basic_publish( exchange="named", routing_key="", body=json.dumps({"key": 1, "value": 2}) ) - while True: + + deadline = time.monotonic() + DEFAULT_TIMEOUT_SEC + while time.monotonic() < deadline: result = instance.query( "SELECT * FROM test.rabbitmq ORDER BY key", ignore_error=True ) if result == "1\t2\n": break + time.sleep(0.05) + else: + pytest.fail( + f"Time limit of {DEFAULT_TIMEOUT_SEC} seconds reached. The result did not match the expected value '1\\t2\\n'." + ) + instance.restart_clickhouse() channel.basic_publish( exchange="named", routing_key="", body=json.dumps({"key": 1, "value": 2}) ) - while True: + deadline = time.monotonic() + DEFAULT_TIMEOUT_SEC + while time.monotonic() < deadline: result = instance.query( "SELECT * FROM test.rabbitmq ORDER BY key", ignore_error=True ) if result == "1\t2\n": break + time.sleep(0.05) + else: + pytest.fail( + f"Time limit of {DEFAULT_TIMEOUT_SEC} seconds reached. The result did not match the expected value '1\\t2\\n'." + ) def test_rabbitmq_msgpack(rabbitmq_cluster): @@ -2895,16 +3160,17 @@ def test_rabbitmq_msgpack(rabbitmq_cluster): ) result = "" - try_no = 0 - while True: + deadline = time.monotonic() + DEFAULT_TIMEOUT_SEC + while time.monotonic() < deadline: result = instance.query("select * from rabbit_in;") if result.strip() == "kek": break - else: - try_no = try_no + 1 - if try_no == 20: - break time.sleep(1) + else: + pytest.fail( + f"Time limit of {DEFAULT_TIMEOUT_SEC} seconds reached. The result did not match 'kek'." + ) + assert result.strip() == "kek" instance.query("drop table rabbit_in sync") @@ -2937,16 +3203,17 @@ def test_rabbitmq_address(rabbitmq_cluster): ) result = "" - try_no = 0 - while True: + deadline = time.monotonic() + DEFAULT_TIMEOUT_SEC + while time.monotonic() < deadline: result = instance2.query("select * from rabbit_in;") if result.strip() == "kek": break - else: - try_no = try_no + 1 - if try_no == 20: - break time.sleep(1) + else: + pytest.fail( + f"Time limit of {DEFAULT_TIMEOUT_SEC} seconds reached. The result did not match 'kek'." + ) + assert result.strip() == "kek" instance2.query("drop table rabbit_in sync") @@ -3059,13 +3326,17 @@ def test_max_rows_per_message(rabbitmq_cluster): == "\n0\t0\n10\t100\n20\t200\n\n\n30\t300\n40\t400\n\n" ) - attempt = 0 + deadline = time.monotonic() + DEFAULT_TIMEOUT_SEC rows = 0 - while attempt < 100: + while time.monotonic() < deadline: rows = int(instance.query("SELECT count() FROM test.view")) if rows == num_rows: break - attempt += 1 + time.sleep(0.05) + else: + pytest.fail( + f"Time limit of {DEFAULT_TIMEOUT_SEC} seconds reached. The number of rows did not match {num_rows}." + ) assert rows == num_rows @@ -3151,13 +3422,17 @@ def test_row_based_formats(rabbitmq_cluster): assert insert_messages == 2 - attempt = 0 + deadline = time.monotonic() + DEFAULT_TIMEOUT_SEC rows = 0 - while attempt < 100: + while time.monotonic() < deadline: rows = int(instance.query("SELECT count() FROM test.view")) if rows == num_rows: break - attempt += 1 + time.sleep(0.05) + else: + pytest.fail( + f"Time limit of {DEFAULT_TIMEOUT_SEC} seconds reached. The number of rows did not match {num_rows}." + ) assert rows == num_rows @@ -3297,13 +3572,17 @@ def test_block_based_formats_2(rabbitmq_cluster): assert insert_messages == 9 - attempt = 0 + deadline = time.monotonic() + DEFAULT_TIMEOUT_SEC rows = 0 - while attempt < 100: + while time.monotonic() < deadline: rows = int(instance.query("SELECT count() FROM test.view")) if rows == num_rows: break - attempt += 1 + time.sleep(0.05) + else: + pytest.fail( + f"Time limit of {DEFAULT_TIMEOUT_SEC} seconds reached. The number of rows did not match {num_rows}." + ) assert rows == num_rows @@ -3366,12 +3645,22 @@ def test_rabbitmq_flush_by_block_size(rabbitmq_cluster): produce_thread = threading.Thread(target=produce) produce_thread.start() - while 0 == int( - instance.query( - "SELECT count() FROM system.parts WHERE database = 'test' AND table = 'view' AND name = 'all_1_1_0'" - ) - ): + deadline = time.monotonic() + DEFAULT_TIMEOUT_SEC + while time.monotonic() < deadline: + if ( + int( + instance.query( + "SELECT count() FROM system.parts WHERE database = 'test' AND table = 'view' AND name = 'all_1_1_0'" + ) + ) + != 0 + ): + break time.sleep(0.5) + else: + pytest.fail( + f"Time limit of {DEFAULT_TIMEOUT_SEC} seconds reached. The part 'all_1_1_0' is still missing." + ) cancel.set() produce_thread.join() @@ -3451,12 +3740,13 @@ def test_rabbitmq_flush_by_time(rabbitmq_cluster): """ ) - while True: + deadline = time.monotonic() + DEFAULT_TIMEOUT_SEC + while time.monotonic() < deadline: time.sleep(0.2) - count = instance.query( + total_count = instance.query( "SELECT count() FROM system.parts WHERE database = 'test' AND table = 'view'" ) - logging.debug(f"kssenii total count: {count}") + logging.debug(f"kssenii total count: {total_count}") count = int( instance.query( "SELECT count() FROM system.parts WHERE database = 'test' AND table = 'view' AND name = 'all_1_1_0'" @@ -3465,6 +3755,10 @@ def test_rabbitmq_flush_by_time(rabbitmq_cluster): logging.debug(f"kssenii count: {count}") if count > 0: break + else: + pytest.fail( + f"Time limit of {DEFAULT_TIMEOUT_SEC} seconds reached. The part 'all_1_1_0' is still missing." + ) time.sleep(12) result = instance.query("SELECT uniqExact(ts) FROM test.view") @@ -3542,13 +3836,17 @@ def test_rabbitmq_handle_error_mode_stream(rabbitmq_cluster): # The order of messages in select * from test.rabbitmq is not guaranteed, so sleep to collect everything in one select time.sleep(1) - attempt = 0 + deadline = time.monotonic() + DEFAULT_TIMEOUT_SEC rows = 0 - while attempt < 500: + while time.monotonic() < deadline: rows = int(instance.query("SELECT count() FROM test.data")) if rows == num_rows: break - attempt += 1 + time.sleep(0.05) + else: + pytest.fail( + f"Time limit of {DEFAULT_TIMEOUT_SEC} seconds reached. The number of rows did not match {num_rows}." + ) assert rows == num_rows @@ -3560,13 +3858,17 @@ def test_rabbitmq_handle_error_mode_stream(rabbitmq_cluster): assert result == expected - attempt = 0 + deadline = time.monotonic() + DEFAULT_TIMEOUT_SEC errors_count = 0 - while attempt < 500: + while time.monotonic() < deadline: errors_count = int(instance.query("SELECT count() FROM test.errors")) - if errors_count == num_rows: + if errors_count == num_rows / 2: break - attempt += 1 + time.sleep(0.05) + else: + pytest.fail( + f"Time limit of {DEFAULT_TIMEOUT_SEC} seconds reached. The number of errors ({errors_count}) did not match the expected {num_rows}." + ) assert errors_count == num_rows / 2 @@ -3584,7 +3886,7 @@ def test_rabbitmq_handle_error_mode_stream(rabbitmq_cluster): def test_attach_broken_table(rabbitmq_cluster): instance.query( - "ATTACH TABLE rabbit_queue UUID '2d1cdf1a-f060-4a61-a7c9-5b59e59992c6' (`payload` String) ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'nonexisting:5671', rabbitmq_format = 'JSONEachRow', rabbitmq_username = 'test', rabbitmq_password = 'test'" + f"ATTACH TABLE rabbit_queue UUID '{uuid.uuid4()}' (`payload` String) ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'nonexisting:5671', rabbitmq_format = 'JSONEachRow', rabbitmq_username = 'test', rabbitmq_password = 'test'" ) error = instance.query_and_get_error("SELECT * FROM rabbit_queue") @@ -3658,13 +3960,17 @@ def test_rabbitmq_nack_failed_insert(rabbitmq_cluster): channel.basic_consume(queue_name, on_consume) channel.start_consuming() - attempt = 0 + deadline = time.monotonic() + DEFAULT_TIMEOUT_SEC count = 0 - while attempt < 100: + while time.monotonic() < deadline: count = int(instance3.query("SELECT count() FROM test.view")) if count == num_rows: break - attempt += 1 + time.sleep(0.05) + else: + pytest.fail( + f"Time limit of {DEFAULT_TIMEOUT_SEC} seconds reached. The count did not match {num_rows}." + ) assert count == num_rows @@ -3743,14 +4049,17 @@ def test_rabbitmq_reject_broken_messages(rabbitmq_cluster): time.sleep(1) - attempt = 0 + deadline = time.monotonic() + DEFAULT_TIMEOUT_SEC rows = 0 - while attempt < 500: + while time.monotonic() < deadline: rows = int(instance.query("SELECT count() FROM test.data")) if rows == num_rows: break - attempt += 1 time.sleep(1) + else: + pytest.fail( + f"Time limit of {DEFAULT_TIMEOUT_SEC} seconds reached. The number of rows did not match {num_rows}." + ) assert rows == num_rows