diff --git a/docs/en/sql-reference/table-functions/deltalake.md b/docs/en/sql-reference/table-functions/deltalake.md index 885d8df6a1e..4f8515a539f 100644 --- a/docs/en/sql-reference/table-functions/deltalake.md +++ b/docs/en/sql-reference/table-functions/deltalake.md @@ -49,4 +49,4 @@ LIMIT 2 **See Also** - [DeltaLake engine](/docs/en/engines/table-engines/integrations/deltalake.md) - +- [DeltaLake cluster table function](/docs/en/sql-reference/table-functions/deltalakeCluster.md) diff --git a/docs/en/sql-reference/table-functions/deltalakeCluster.md b/docs/en/sql-reference/table-functions/deltalakeCluster.md new file mode 100644 index 00000000000..49c2264823f --- /dev/null +++ b/docs/en/sql-reference/table-functions/deltalakeCluster.md @@ -0,0 +1,30 @@ +--- +slug: /en/sql-reference/table-functions/deltalakeCluster +sidebar_position: 46 +sidebar_label: deltaLakeCluster +title: "deltaLakeCluster Table Function" +--- +This is an extension to the [deltaLake](/docs/en/sql-reference/table-functions/deltalake.md) table function. + +Allows processing files from [Delta Lake](https://github.com/delta-io/delta) tables in Amazon S3 in parallel from many nodes in a specified cluster. On initiator it creates a connection to all nodes in the cluster and dispatches each file dynamically. On the worker node it asks the initiator about the next task to process and processes it. This is repeated until all tasks are finished. + +**Syntax** + +``` sql +deltaLakeCluster(cluster_name, url [,aws_access_key_id, aws_secret_access_key] [,format] [,structure] [,compression]) +``` + +**Arguments** + +- `cluster_name` — Name of a cluster that is used to build a set of addresses and connection parameters to remote and local servers. + +- Description of all other arguments coincides with description of arguments in equivalent [deltaLake](/docs/en/sql-reference/table-functions/deltalake.md) table function. + +**Returned value** + +A table with the specified structure for reading data from cluster in the specified Delta Lake table in S3. + +**See Also** + +- [deltaLake engine](/docs/en/engines/table-engines/integrations/deltalake.md) +- [deltaLake table function](/docs/en/sql-reference/table-functions/deltalake.md) diff --git a/docs/en/sql-reference/table-functions/hudi.md b/docs/en/sql-reference/table-functions/hudi.md index 959a32fe26d..f4cdb0bf948 100644 --- a/docs/en/sql-reference/table-functions/hudi.md +++ b/docs/en/sql-reference/table-functions/hudi.md @@ -29,4 +29,4 @@ A table with the specified structure for reading data in the specified Hudi tabl **See Also** - [Hudi engine](/docs/en/engines/table-engines/integrations/hudi.md) - +- [Hudi cluster table function](/docs/en/sql-reference/table-functions/hudiCluster.md) diff --git a/docs/en/sql-reference/table-functions/hudiCluster.md b/docs/en/sql-reference/table-functions/hudiCluster.md new file mode 100644 index 00000000000..985b7479f66 --- /dev/null +++ b/docs/en/sql-reference/table-functions/hudiCluster.md @@ -0,0 +1,30 @@ +--- +slug: /en/sql-reference/table-functions/hudiCluster +sidebar_position: 86 +sidebar_label: hudiCluster +title: "hudiCluster Table Function" +--- +This is an extension to the [hudi](/docs/en/sql-reference/table-functions/hudi.md) table function. + +Allows processing files from Apache [Hudi](https://hudi.apache.org/) tables in Amazon S3 in parallel from many nodes in a specified cluster. On initiator it creates a connection to all nodes in the cluster and dispatches each file dynamically. On the worker node it asks the initiator about the next task to process and processes it. This is repeated until all tasks are finished. + +**Syntax** + +``` sql +hudiCluster(cluster_name, url [,aws_access_key_id, aws_secret_access_key] [,format] [,structure] [,compression]) +``` + +**Arguments** + +- `cluster_name` — Name of a cluster that is used to build a set of addresses and connection parameters to remote and local servers. + +- Description of all other arguments coincides with description of arguments in equivalent [hudi](/docs/en/sql-reference/table-functions/hudi.md) table function. + +**Returned value** + +A table with the specified structure for reading data from cluster in the specified Hudi table in S3. + +**See Also** + +- [Hudi engine](/docs/en/engines/table-engines/integrations/hudi.md) +- [Hudi table function](/docs/en/sql-reference/table-functions/hudi.md) diff --git a/docs/en/sql-reference/table-functions/iceberg.md b/docs/en/sql-reference/table-functions/iceberg.md index 4f54b2cd440..28063330008 100644 --- a/docs/en/sql-reference/table-functions/iceberg.md +++ b/docs/en/sql-reference/table-functions/iceberg.md @@ -72,3 +72,4 @@ Table function `iceberg` is an alias to `icebergS3` now. **See Also** - [Iceberg engine](/docs/en/engines/table-engines/integrations/iceberg.md) +- [Iceberg cluster table function](/docs/en/sql-reference/table-functions/icebergCluster.md) diff --git a/docs/en/sql-reference/table-functions/icebergCluster.md b/docs/en/sql-reference/table-functions/icebergCluster.md new file mode 100644 index 00000000000..bc444f361d5 --- /dev/null +++ b/docs/en/sql-reference/table-functions/icebergCluster.md @@ -0,0 +1,43 @@ +--- +slug: /en/sql-reference/table-functions/icebergCluster +sidebar_position: 91 +sidebar_label: icebergCluster +title: "icebergCluster Table Function" +--- +This is an extension to the [iceberg](/docs/en/sql-reference/table-functions/iceberg.md) table function. + +Allows processing files from Apache [Iceberg](https://iceberg.apache.org/) in parallel from many nodes in a specified cluster. On initiator it creates a connection to all nodes in the cluster and dispatches each file dynamically. On the worker node it asks the initiator about the next task to process and processes it. This is repeated until all tasks are finished. + +**Syntax** + +``` sql +icebergS3Cluster(cluster_name, url [, NOSIGN | access_key_id, secret_access_key, [session_token]] [,format] [,compression_method]) +icebergS3Cluster(cluster_name, named_collection[, option=value [,..]]) + +icebergAzureCluster(cluster_name, connection_string|storage_account_url, container_name, blobpath, [,account_name], [,account_key] [,format] [,compression_method]) +icebergAzureCluster(cluster_name, named_collection[, option=value [,..]]) + +icebergHDFSCluster(cluster_name, path_to_table, [,format] [,compression_method]) +icebergHDFSCluster(cluster_name, named_collection[, option=value [,..]]) +``` + +**Arguments** + +- `cluster_name` — Name of a cluster that is used to build a set of addresses and connection parameters to remote and local servers. + +- Description of all other arguments coincides with description of arguments in equivalent [iceberg](/docs/en/sql-reference/table-functions/iceberg.md) table function. + +**Returned value** + +A table with the specified structure for reading data from cluster in the specified Iceberg table. + +**Examples** + +```sql +SELECT * FROM icebergS3Cluster('cluster_simple', 'http://test.s3.amazonaws.com/clickhouse-bucket/test_table', 'test', 'test') +``` + +**See Also** + +- [Iceberg engine](/docs/en/engines/table-engines/integrations/iceberg.md) +- [Iceberg table function](/docs/en/sql-reference/table-functions/iceberg.md) diff --git a/src/TableFunctions/TableFunctionObjectStorage.cpp b/src/TableFunctions/TableFunctionObjectStorage.cpp index 12de08afad0..1ed803ae5ce 100644 --- a/src/TableFunctions/TableFunctionObjectStorage.cpp +++ b/src/TableFunctions/TableFunctionObjectStorage.cpp @@ -226,6 +226,26 @@ template class TableFunctionObjectStorage; +#if USE_AVRO && USE_AWS_S3 +template class TableFunctionObjectStorage; +#endif + +#if USE_AVRO && USE_AZURE_BLOB_STORAGE +template class TableFunctionObjectStorage; +#endif + +#if USE_AVRO && USE_HDFS +template class TableFunctionObjectStorage; +#endif + +#if USE_PARQUET && USE_AWS_S3 +template class TableFunctionObjectStorage; +#endif + +#if USE_AWS_S3 +template class TableFunctionObjectStorage; +#endif + #if USE_AVRO void registerTableFunctionIceberg(TableFunctionFactory & factory) { diff --git a/src/TableFunctions/TableFunctionObjectStorageCluster.cpp b/src/TableFunctions/TableFunctionObjectStorageCluster.cpp index 5ca26aabe32..be7603f18e6 100644 --- a/src/TableFunctions/TableFunctionObjectStorageCluster.cpp +++ b/src/TableFunctions/TableFunctionObjectStorageCluster.cpp @@ -96,7 +96,7 @@ void registerTableFunctionObjectStorageCluster(TableFunctionFactory & factory) { .documentation = { .description=R"(The table function can be used to read the data stored on HDFS in parallel for many nodes in a specified cluster.)", - .examples{{"HDFSCluster", "SELECT * FROM HDFSCluster(cluster_name, uri, format)", ""}}}, + .examples{{"HDFSCluster", "SELECT * FROM HDFSCluster(cluster, uri, format)", ""}}}, .allow_readonly = false } ); @@ -105,15 +105,77 @@ void registerTableFunctionObjectStorageCluster(TableFunctionFactory & factory) UNUSED(factory); } + +#if USE_AVRO +void registerTableFunctionIcebergCluster(TableFunctionFactory & factory) +{ + UNUSED(factory); + #if USE_AWS_S3 -template class TableFunctionObjectStorageCluster; + factory.registerFunction( + {.documentation + = {.description = R"(The table function can be used to read the Iceberg table stored on S3 object store in parallel for many nodes in a specified cluster.)", + .examples{{"icebergS3Cluster", "SELECT * FROM icebergS3Cluster(cluster, url, [, NOSIGN | access_key_id, secret_access_key, [session_token]], format, [,compression])", ""}}, + .categories{"DataLake"}}, + .allow_readonly = false}); #endif #if USE_AZURE_BLOB_STORAGE -template class TableFunctionObjectStorageCluster; + factory.registerFunction( + {.documentation + = {.description = R"(The table function can be used to read the Iceberg table stored on Azure object store in parallel for many nodes in a specified cluster.)", + .examples{{"icebergAzureCluster", "SELECT * FROM icebergAzureCluster(cluster, connection_string|storage_account_url, container_name, blobpath, [account_name, account_key, format, compression])", ""}}, + .categories{"DataLake"}}, + .allow_readonly = false}); #endif #if USE_HDFS -template class TableFunctionObjectStorageCluster; + factory.registerFunction( + {.documentation + = {.description = R"(The table function can be used to read the Iceberg table stored on HDFS virtual filesystem in parallel for many nodes in a specified cluster.)", + .examples{{"icebergHDFSCluster", "SELECT * FROM icebergHDFSCluster(cluster, uri, [format], [structure], [compression_method])", ""}}, + .categories{"DataLake"}}, + .allow_readonly = false}); #endif } +#endif + +#if USE_AWS_S3 +#if USE_PARQUET +void registerTableFunctionDeltaLakeCluster(TableFunctionFactory & factory) +{ + factory.registerFunction( + {.documentation + = {.description = R"(The table function can be used to read the DeltaLake table stored on object store in parallel for many nodes in a specified cluster.)", + .examples{{"deltaLakeCluster", "SELECT * FROM deltaLakeCluster(cluster, url, access_key_id, secret_access_key)", ""}}, + .categories{"DataLake"}}, + .allow_readonly = false}); +} +#endif + +void registerTableFunctionHudiCluster(TableFunctionFactory & factory) +{ + factory.registerFunction( + {.documentation + = {.description = R"(The table function can be used to read the Hudi table stored on object store in parallel for many nodes in a specified cluster.)", + .examples{{"hudiCluster", "SELECT * FROM hudiCluster(cluster, url, access_key_id, secret_access_key)", ""}}, + .categories{"DataLake"}}, + .allow_readonly = false}); +} +#endif + +void registerDataLakeClusterTableFunctions(TableFunctionFactory & factory) +{ + UNUSED(factory); +#if USE_AVRO + registerTableFunctionIcebergCluster(factory); +#endif +#if USE_AWS_S3 +#if USE_PARQUET + registerTableFunctionDeltaLakeCluster(factory); +#endif + registerTableFunctionHudiCluster(factory); +#endif +} + +} diff --git a/src/TableFunctions/TableFunctionObjectStorageCluster.h b/src/TableFunctions/TableFunctionObjectStorageCluster.h index 11e6c1fde82..d1fac5fdc2c 100644 --- a/src/TableFunctions/TableFunctionObjectStorageCluster.h +++ b/src/TableFunctions/TableFunctionObjectStorageCluster.h @@ -33,6 +33,36 @@ struct HDFSClusterDefinition static constexpr auto storage_type_name = "HDFSCluster"; }; +struct IcebergS3ClusterDefinition +{ + static constexpr auto name = "icebergS3Cluster"; + static constexpr auto storage_type_name = "IcebergS3Cluster"; +}; + +struct IcebergAzureClusterDefinition +{ + static constexpr auto name = "icebergAzureCluster"; + static constexpr auto storage_type_name = "IcebergAzureCluster"; +}; + +struct IcebergHDFSClusterDefinition +{ + static constexpr auto name = "icebergHDFSCluster"; + static constexpr auto storage_type_name = "IcebergHDFSCluster"; +}; + +struct DeltaLakeClusterDefinition +{ + static constexpr auto name = "deltaLakeCluster"; + static constexpr auto storage_type_name = "DeltaLakeS3Cluster"; +}; + +struct HudiClusterDefinition +{ + static constexpr auto name = "hudiCluster"; + static constexpr auto storage_type_name = "HudiS3Cluster"; +}; + /** * Class implementing s3/hdfs/azureBlobStorageCluster(...) table functions, * which allow to process many files from S3/HDFS/Azure blob storage on a specific cluster. @@ -79,4 +109,25 @@ using TableFunctionAzureBlobCluster = TableFunctionObjectStorageCluster; #endif + +#if USE_AVRO && USE_AWS_S3 +using TableFunctionIcebergS3Cluster = TableFunctionObjectStorageCluster; +#endif + +#if USE_AVRO && USE_AZURE_BLOB_STORAGE +using TableFunctionIcebergAzureCluster = TableFunctionObjectStorageCluster; +#endif + +#if USE_AVRO && USE_HDFS +using TableFunctionIcebergHDFSCluster = TableFunctionObjectStorageCluster; +#endif + +#if USE_AWS_S3 && USE_PARQUET +using TableFunctionDeltaLakeCluster = TableFunctionObjectStorageCluster; +#endif + +#if USE_AWS_S3 +using TableFunctionHudiCluster = TableFunctionObjectStorageCluster; +#endif + } diff --git a/src/TableFunctions/registerTableFunctions.cpp b/src/TableFunctions/registerTableFunctions.cpp index fbe2c7c59ed..131ca783f73 100644 --- a/src/TableFunctions/registerTableFunctions.cpp +++ b/src/TableFunctions/registerTableFunctions.cpp @@ -66,6 +66,7 @@ void registerTableFunctions(bool use_legacy_mongodb_integration [[maybe_unused]] registerTableFunctionObjectStorage(factory); registerTableFunctionObjectStorageCluster(factory); registerDataLakeTableFunctions(factory); + registerDataLakeClusterTableFunctions(factory); } } diff --git a/src/TableFunctions/registerTableFunctions.h b/src/TableFunctions/registerTableFunctions.h index e22ba7346fa..1168a8ef739 100644 --- a/src/TableFunctions/registerTableFunctions.h +++ b/src/TableFunctions/registerTableFunctions.h @@ -70,6 +70,7 @@ void registerTableFunctionExplain(TableFunctionFactory & factory); void registerTableFunctionObjectStorage(TableFunctionFactory & factory); void registerTableFunctionObjectStorageCluster(TableFunctionFactory & factory); void registerDataLakeTableFunctions(TableFunctionFactory & factory); +void registerDataLakeClusterTableFunctions(TableFunctionFactory & factory); void registerTableFunctionTimeSeries(TableFunctionFactory & factory); diff --git a/tests/integration/test_storage_iceberg/configs/config.d/cluster.xml b/tests/integration/test_storage_iceberg/configs/config.d/cluster.xml new file mode 100644 index 00000000000..54c08b27abe --- /dev/null +++ b/tests/integration/test_storage_iceberg/configs/config.d/cluster.xml @@ -0,0 +1,20 @@ + + + + + + node1 + 9000 + + + node2 + 9000 + + + node3 + 9000 + + + + + diff --git a/tests/integration/test_storage_iceberg/configs/config.d/query_log.xml b/tests/integration/test_storage_iceberg/configs/config.d/query_log.xml new file mode 100644 index 00000000000..a63e91f41fb --- /dev/null +++ b/tests/integration/test_storage_iceberg/configs/config.d/query_log.xml @@ -0,0 +1,6 @@ + + + system + query_log
+
+
diff --git a/tests/integration/test_storage_iceberg/test.py b/tests/integration/test_storage_iceberg/test.py index 36aba550dbd..b36347aa510 100644 --- a/tests/integration/test_storage_iceberg/test.py +++ b/tests/integration/test_storage_iceberg/test.py @@ -73,14 +73,38 @@ def started_cluster(): cluster.add_instance( "node1", main_configs=[ + "configs/config.d/query_log.xml", + "configs/config.d/cluster.xml", "configs/config.d/named_collections.xml", "configs/config.d/filesystem_caches.xml", ], user_configs=["configs/users.d/users.xml"], with_minio=True, with_azurite=True, - stay_alive=True, with_hdfs=with_hdfs, + stay_alive=True, + ) + cluster.add_instance( + "node2", + main_configs=[ + "configs/config.d/query_log.xml", + "configs/config.d/cluster.xml", + "configs/config.d/named_collections.xml", + "configs/config.d/filesystem_caches.xml", + ], + user_configs=["configs/users.d/users.xml"], + stay_alive=True, + ) + cluster.add_instance( + "node3", + main_configs=[ + "configs/config.d/query_log.xml", + "configs/config.d/cluster.xml", + "configs/config.d/named_collections.xml", + "configs/config.d/filesystem_caches.xml", + ], + user_configs=["configs/users.d/users.xml"], + stay_alive=True, ) logging.info("Starting cluster...") @@ -182,6 +206,7 @@ def get_creation_expression( cluster, format="Parquet", table_function=False, + run_on_cluster=False, **kwargs, ): if storage_type == "s3": @@ -189,35 +214,56 @@ def get_creation_expression( bucket = kwargs["bucket"] else: bucket = cluster.minio_bucket - print(bucket) - if table_function: - return f"icebergS3(s3, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'http://minio1:9001/{bucket}/')" + + if run_on_cluster: + assert table_function + return f"icebergS3Cluster('cluster_simple', s3, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'http://minio1:9001/{bucket}/')" else: - return f""" - DROP TABLE IF EXISTS {table_name}; - CREATE TABLE {table_name} - ENGINE=IcebergS3(s3, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'http://minio1:9001/{bucket}/')""" + if table_function: + return f"icebergS3(s3, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'http://minio1:9001/{bucket}/')" + else: + return f""" + DROP TABLE IF EXISTS {table_name}; + CREATE TABLE {table_name} + ENGINE=IcebergS3(s3, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'http://minio1:9001/{bucket}/')""" + elif storage_type == "azure": - if table_function: + if run_on_cluster: + assert table_function return f""" - icebergAzure(azure, container = '{cluster.azure_container_name}', storage_account_url = '{cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]}', blob_path = '/iceberg_data/default/{table_name}/', format={format}) + icebergAzureCluster('cluster_simple', azure, container = '{cluster.azure_container_name}', storage_account_url = '{cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]}', blob_path = '/iceberg_data/default/{table_name}/', format={format}) """ else: - return f""" - DROP TABLE IF EXISTS {table_name}; - CREATE TABLE {table_name} - ENGINE=IcebergAzure(azure, container = {cluster.azure_container_name}, storage_account_url = '{cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]}', blob_path = '/iceberg_data/default/{table_name}/', format={format})""" + if table_function: + return f""" + icebergAzure(azure, container = '{cluster.azure_container_name}', storage_account_url = '{cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]}', blob_path = '/iceberg_data/default/{table_name}/', format={format}) + """ + else: + return f""" + DROP TABLE IF EXISTS {table_name}; + CREATE TABLE {table_name} + ENGINE=IcebergAzure(azure, container = {cluster.azure_container_name}, storage_account_url = '{cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]}', blob_path = '/iceberg_data/default/{table_name}/', format={format})""" + elif storage_type == "hdfs": - if table_function: + if run_on_cluster: + assert table_function return f""" - icebergHDFS(hdfs, filename= 'iceberg_data/default/{table_name}/', format={format}, url = 'hdfs://hdfs1:9000/') + icebergHDFSCluster('cluster_simple', hdfs, filename= 'iceberg_data/default/{table_name}/', format={format}, url = 'hdfs://hdfs1:9000/') """ else: - return f""" - DROP TABLE IF EXISTS {table_name}; - CREATE TABLE {table_name} - ENGINE=IcebergHDFS(hdfs, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'hdfs://hdfs1:9000/');""" + if table_function: + return f""" + icebergHDFS(hdfs, filename= 'iceberg_data/default/{table_name}/', format={format}, url = 'hdfs://hdfs1:9000/') + """ + else: + return f""" + DROP TABLE IF EXISTS {table_name}; + CREATE TABLE {table_name} + ENGINE=IcebergHDFS(hdfs, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'hdfs://hdfs1:9000/');""" + elif storage_type == "local": + assert not run_on_cluster + if table_function: return f""" icebergLocal(local, path = '/iceberg_data/default/{table_name}/', format={format}) @@ -227,6 +273,7 @@ def get_creation_expression( DROP TABLE IF EXISTS {table_name}; CREATE TABLE {table_name} ENGINE=IcebergLocal(local, path = '/iceberg_data/default/{table_name}/', format={format});""" + else: raise Exception(f"Unknown iceberg storage type: {storage_type}") @@ -492,6 +539,108 @@ def test_types(started_cluster, format_version, storage_type): ) +@pytest.mark.parametrize("format_version", ["1", "2"]) +@pytest.mark.parametrize("storage_type", ["s3", "azure", "hdfs"]) +def test_cluster_table_function(started_cluster, format_version, storage_type): + if is_arm() and storage_type == "hdfs": + pytest.skip("Disabled test IcebergHDFS for aarch64") + + instance = started_cluster.instances["node1"] + spark = started_cluster.spark_session + + TABLE_NAME = ( + "test_iceberg_cluster_" + + format_version + + "_" + + storage_type + + "_" + + get_uuid_str() + ) + + def add_df(mode): + write_iceberg_from_df( + spark, + generate_data(spark, 0, 100), + TABLE_NAME, + mode=mode, + format_version=format_version, + ) + + files = default_upload_directory( + started_cluster, + storage_type, + f"/iceberg_data/default/{TABLE_NAME}/", + f"/iceberg_data/default/{TABLE_NAME}/", + ) + + logging.info(f"Adding another dataframe. result files: {files}") + + return files + + files = add_df(mode="overwrite") + for i in range(1, len(started_cluster.instances)): + files = add_df(mode="append") + + logging.info(f"Setup complete. files: {files}") + assert len(files) == 5 + 4 * (len(started_cluster.instances) - 1) + + clusters = instance.query(f"SELECT * FROM system.clusters") + logging.info(f"Clusters setup: {clusters}") + + # Regular Query only node1 + table_function_expr = get_creation_expression( + storage_type, TABLE_NAME, started_cluster, table_function=True + ) + select_regular = ( + instance.query(f"SELECT * FROM {table_function_expr}").strip().split() + ) + + # Cluster Query with node1 as coordinator + table_function_expr_cluster = get_creation_expression( + storage_type, + TABLE_NAME, + started_cluster, + table_function=True, + run_on_cluster=True, + ) + select_cluster = ( + instance.query(f"SELECT * FROM {table_function_expr_cluster}").strip().split() + ) + + # Simple size check + assert len(select_regular) == 600 + assert len(select_cluster) == 600 + + # Actual check + assert select_cluster == select_regular + + # Check query_log + for replica in started_cluster.instances.values(): + replica.query("SYSTEM FLUSH LOGS") + + for node_name, replica in started_cluster.instances.items(): + cluster_secondary_queries = ( + replica.query( + f""" + SELECT query, type, is_initial_query, read_rows, read_bytes FROM system.query_log + WHERE + type = 'QueryStart' AND + positionCaseInsensitive(query, '{storage_type}Cluster') != 0 AND + position(query, '{TABLE_NAME}') != 0 AND + position(query, 'system.query_log') = 0 AND + NOT is_initial_query + """ + ) + .strip() + .split("\n") + ) + + logging.info( + f"[{node_name}] cluster_secondary_queries: {cluster_secondary_queries}" + ) + assert len(cluster_secondary_queries) == 1 + + @pytest.mark.parametrize("format_version", ["1", "2"]) @pytest.mark.parametrize("storage_type", ["s3", "azure", "hdfs", "local"]) def test_delete_files(started_cluster, format_version, storage_type): diff --git a/utils/check-style/aspell-ignore/en/aspell-dict.txt b/utils/check-style/aspell-ignore/en/aspell-dict.txt index a53104581bb..03e4d149a8d 100644 --- a/utils/check-style/aspell-ignore/en/aspell-dict.txt +++ b/utils/check-style/aspell-ignore/en/aspell-dict.txt @@ -244,7 +244,10 @@ Deduplication DefaultTableEngine DelayedInserts DeliveryTag +Deltalake DeltaLake +deltalakeCluster +deltaLakeCluster Denormalize DestroyAggregatesThreads DestroyAggregatesThreadsActive @@ -377,10 +380,15 @@ Homebrew's HorizontalDivide Hostname HouseOps +hudi Hudi +hudiCluster +HudiCluster HyperLogLog Hypot IANA +icebergCluster +IcebergCluster IDE IDEs IDNA