diff --git a/tests/integration/test_storage_s3/test.py b/tests/integration/test_storage_s3/test.py index dd29d0a5d6a..7364af4d2dd 100644 --- a/tests/integration/test_storage_s3/test.py +++ b/tests/integration/test_storage_s3/test.py @@ -535,8 +535,9 @@ def test_multipart_put(started_cluster, maybe_auth, positive): one_line_length = 6 # 3 digits, 2 commas, 1 line separator. + total_rows = csv_size_bytes // one_line_length # Generate data having size more than one part - int_data = [[1, 2, 3] for i in range(csv_size_bytes // one_line_length)] + int_data = [[1, 2, 3] for i in range(total_rows)] csv_data = "".join(["{},{},{}\n".format(x, y, z) for x, y, z in int_data]) assert len(csv_data) > min_part_size_bytes @@ -573,6 +574,37 @@ def test_multipart_put(started_cluster, maybe_auth, positive): assert csv_data == get_s3_file_content(started_cluster, bucket, filename) + # select uploaded data from many threads + select_query = ( + "select sum(column1), sum(column2), sum(column3) " + "from s3('http://{host}:{port}/{bucket}/{filename}', {auth}'CSV', '{table_format}')".format( + host=started_cluster.minio_redirect_host, + port=started_cluster.minio_redirect_port, + bucket=bucket, + filename=filename, + auth=maybe_auth, + table_format=table_format, + ) + ) + try: + select_result = run_query( + instance, + select_query, + settings={ + "max_download_threads": random.randint(4, 16), + "max_download_buffer_size": 1024 * 1024, + }, + ) + except helpers.client.QueryRuntimeException: + if positive: + raise + else: + assert positive + assert ( + select_result + == "\t".join(map(str, [total_rows, total_rows * 2, total_rows * 3])) + "\n" + ) + def test_remote_host_filter(started_cluster): instance = started_cluster.instances["restricted_dummy"]