diff --git a/tests/analyzer_integration_broken_tests.txt b/tests/analyzer_integration_broken_tests.txt index 7c87a41dae9..23f22209451 100644 --- a/tests/analyzer_integration_broken_tests.txt +++ b/tests/analyzer_integration_broken_tests.txt @@ -8,7 +8,6 @@ test_executable_table_function/test.py::test_executable_function_input_python test_mask_sensitive_info/test.py::test_encryption_functions test_merge_table_over_distributed/test.py::test_global_in test_merge_table_over_distributed/test.py::test_select_table_name_from_merge_over_distributed -test_merge_tree_s3/test.py::test_heavy_insert_select_check_memory[node] test_mutations_with_merge_tree/test.py::test_mutations_with_merge_background_task test_mysql_database_engine/test.py::test_mysql_ddl_for_mysql_database test_passing_max_partitions_to_read_remotely/test.py::test_default_database_on_cluster diff --git a/tests/integration/test_merge_tree_s3/test.py b/tests/integration/test_merge_tree_s3/test.py index 507f25209a4..3b2f1c0f6a6 100644 --- a/tests/integration/test_merge_tree_s3/test.py +++ b/tests/integration/test_merge_tree_s3/test.py @@ -941,215 +941,3 @@ def test_s3_disk_heavy_write_check_mem(cluster, broken_s3, node_name): assert int(result) > 0.8 * memory check_no_objects_after_drop(cluster, node_name=node_name) - - -def get_memory_usage(node, query_id): - node.query("SYSTEM FLUSH LOGS") - memory_usage = node.query( - "SELECT memory_usage" - " FROM system.query_log" - f" WHERE query_id='{query_id}'" - " AND type='QueryFinish'" - ) - return int(memory_usage) - - -def get_memory_usages(node, query_ids): - node.query("SYSTEM FLUSH LOGS") - result = [] - for query_id in query_ids: - memory_usage = node.query( - "SELECT memory_usage" - " FROM system.query_log" - f" WHERE query_id='{query_id}'" - " AND type='QueryFinish'" - ) - result.append(int(memory_usage)) - return result - - -@pytest.mark.parametrize("node_name", ["node"]) -def test_heavy_insert_select_check_memory(cluster, broken_s3, node_name): - node = cluster.instances[node_name] - - node.query( - """ - CREATE TABLE central_query_log - ( - control_plane_id UUID, - pod_id LowCardinality(String), - scrape_ts_microseconds DateTime64(6) CODEC(Delta(8), LZ4), - event_date Date, - event_time DateTime, - payload Array(String), - payload_01 String, - payload_02 String, - payload_03 String, - payload_04 String, - payload_05 String, - payload_06 String, - payload_07 String, - payload_08 String, - payload_09 String, - payload_10 String, - payload_11 String, - payload_12 String, - payload_13 String, - payload_14 String, - payload_15 String, - payload_16 String, - payload_17 String, - payload_18 String, - payload_19 String - ) - ENGINE=MergeTree() - PARTITION BY toYYYYMM(event_date) - ORDER BY (control_plane_id, event_date, pod_id) - SETTINGS - storage_policy='s3' - """ - ) - - node.query("SYSTEM STOP MERGES central_query_log") - - write_count = 2 - write_query_ids = [] - for x in range(write_count): - query_id = f"INSERT_INTO_TABLE_RANDOM_DATA_QUERY_ID_{x}" - write_query_ids.append(query_id) - node.query( - """ - INSERT INTO central_query_log - SELECT - control_plane_id, - pod_id, - toStartOfHour(event_time) + toIntervalSecond(randUniform(0,60)) as scrape_ts_microseconds, - toDate(event_time) as event_date, - event_time, - payload, - payload[1] as payload_01, - payload[2] as payload_02, - payload[3] as payload_03, - payload[4] as payload_04, - payload[5] as payload_05, - payload[6] as payload_06, - payload[7] as payload_07, - payload[8] as payload_08, - payload[9] as payload_09, - payload[10] as payload_10, - payload[11] as payload_11, - payload[12] as payload_12, - payload[13] as payload_13, - payload[14] as payload_14, - payload[15] as payload_15, - payload[16] as payload_16, - payload[17] as payload_17, - payload[18] as payload_18, - payload[19] as payload_19 - FROM - ( - SELECT - control_plane_id, - substring(payload[1], 1, 5) as pod_id, - toDateTime('2022-12-12 00:00:00') - + toIntervalDay(floor(randUniform(0,3))) - + toIntervalHour(floor(randUniform(0,24))) - + toIntervalSecond(floor(randUniform(0,60))) - as event_time, - payload - FROM - generateRandom( - 'control_plane_id UUID, payload Array(String)', - NULL, - 100, - 100 - ) - LIMIT 10000 - ) - SETTINGS - max_insert_block_size=256000000, - min_insert_block_size_rows=1000000, - min_insert_block_size_bytes=256000000 - """, - query_id=query_id, - ) - - memory = 845346116 - for memory_usage, query_id in zip( - get_memory_usages(node, write_query_ids), write_query_ids - ): - assert int(memory_usage) < 1.2 * memory, f"{memory_usage} : {query_id}" - assert int(memory_usage) > 0.8 * memory, f"{memory_usage} : {query_id}" - - broken_s3.setup_slow_answers(minimal_length=1000, timeout=5, count=20) - broken_s3.setup_fake_multpartuploads() - - insert_query_id = f"INSERT_INTO_S3_FUNCTION_QUERY_ID" - node.query( - """ - INSERT INTO - TABLE FUNCTION s3( - 'http://resolver:8083/root/data/test-upload_{_partition_id}.csv.gz', - 'minio', 'minio123', - 'CSV', auto, 'gzip' - ) - PARTITION BY formatDateTime(subtractHours(toDateTime('2022-12-13 00:00:00'), 1),'%Y-%m-%d_%H:00') - WITH toDateTime('2022-12-13 00:00:00') as time_point - SELECT - * - FROM central_query_log - WHERE - event_date >= subtractDays(toDate(time_point), 1) - AND scrape_ts_microseconds >= subtractHours(toStartOfHour(time_point), 12) - AND scrape_ts_microseconds < toStartOfDay(time_point) - SETTINGS - s3_max_inflight_parts_for_one_file=1 - """, - query_id=insert_query_id, - ) - - query_id = f"SELECT_QUERY_ID" - total = node.query( - """ - SELECT - count() - FROM central_query_log - """, - query_id=query_id, - ) - assert int(total) == 10000 * write_count - - query_id = f"SELECT_WHERE_QUERY_ID" - selected = node.query( - """ - WITH toDateTime('2022-12-13 00:00:00') as time_point - SELECT - count() - FROM central_query_log - WHERE - event_date >= subtractDays(toDate(time_point), 1) - AND scrape_ts_microseconds >= subtractHours(toStartOfHour(time_point), 12) - AND scrape_ts_microseconds < toStartOfDay(time_point) - """, - query_id=query_id, - ) - assert int(selected) < 4500, selected - assert int(selected) > 2500, selected - - node.query("SYSTEM FLUSH LOGS") - profile_events = node.query( - f""" - SELECT ProfileEvents - FROM system.query_log - WHERE query_id='{insert_query_id}' - AND type='QueryFinish' - """ - ) - - memory_usage = get_memory_usage(node, insert_query_id) - memory = 123507857 - assert int(memory_usage) < 1.2 * memory, f"{memory_usage} {profile_events}" - assert int(memory_usage) > 0.8 * memory, f"{memory_usage} {profile_events}" - - node.query(f"DROP TABLE IF EXISTS central_query_log SYNC") - remove_all_s3_objects(cluster)