Compare commits

...

23 Commits

Author SHA1 Message Date
Pavel Kruglov
2678f62717
Merge 130a1c52a2 into 44b4bd38b9 2024-11-21 03:02:11 +01:00
Mikhail Artemenko
44b4bd38b9
Merge pull request #72045 from ClickHouse/issues/70174/cluster_versions
Enable cluster table functions for DataLake Storages
2024-11-20 21:22:37 +00:00
Shichao Jin
40c7d5fd1a
Merge pull request #71894 from udiz/fix-arrayWithConstant-size-estimation
Fix: arrayWithConstant size estimation using row's element size
2024-11-20 19:56:27 +00:00
Mikhail Artemenko
4ccebd9a24 fix syntax for iceberg in docs 2024-11-20 11:15:39 +00:00
Mikhail Artemenko
99177c0daf remove icebergCluster alias 2024-11-20 11:15:12 +00:00
Pavel Kruglov
130a1c52a2
Update 03273_format_inference_create_query_s3_url.sql 2024-11-20 12:11:51 +01:00
avogar
c185b5bc38 Fix style 2024-11-19 22:57:53 +00:00
avogar
1fc4757c08 Better naming 2024-11-19 22:27:29 +00:00
avogar
90b7ae871e Add inferred format name to create query in File/S3/URL/HDFS/Azure engines 2024-11-19 22:23:05 +00:00
Mikhail Artemenko
0951991c1d update aspell-dict.txt 2024-11-19 13:10:42 +00:00
Mikhail Artemenko
19aec5e572 Merge branch 'issues/70174/cluster_versions' of github.com:ClickHouse/ClickHouse into issues/70174/cluster_versions 2024-11-19 12:51:56 +00:00
Mikhail Artemenko
a367de9977 add docs 2024-11-19 12:49:59 +00:00
Mikhail Artemenko
6894e280b2 fix pr issues 2024-11-19 12:34:42 +00:00
Mikhail Artemenko
39ebe113d9 Merge branch 'master' into issues/70174/cluster_versions 2024-11-19 11:28:46 +00:00
udiz
239bbaa133 use length 2024-11-19 00:00:43 +00:00
udiz
07fac5808d format null on test 2024-11-18 23:08:48 +00:00
udiz
ed95e0781f test uses less memory 2024-11-18 22:48:38 +00:00
robot-clickhouse
014608fb6b Automatic style fix 2024-11-18 17:51:51 +00:00
Mikhail Artemenko
a29ded4941 add test for iceberg 2024-11-18 17:39:46 +00:00
Mikhail Artemenko
d2efae7511 enable cluster versions for datalake storages 2024-11-18 17:35:21 +00:00
udiz
6879aa130a newline 2024-11-13 22:47:54 +00:00
udiz
43f3c886a2 add test 2024-11-13 22:46:36 +00:00
udiz
c383a743f7 arrayWithConstant size estimation using single value size 2024-11-13 20:02:31 +00:00
46 changed files with 806 additions and 97 deletions

View File

@ -49,4 +49,4 @@ LIMIT 2
**See Also**
- [DeltaLake engine](/docs/en/engines/table-engines/integrations/deltalake.md)
- [DeltaLake cluster table function](/docs/en/sql-reference/table-functions/deltalakeCluster.md)

View File

@ -0,0 +1,30 @@
---
slug: /en/sql-reference/table-functions/deltalakeCluster
sidebar_position: 46
sidebar_label: deltaLakeCluster
title: "deltaLakeCluster Table Function"
---
This is an extension to the [deltaLake](/docs/en/sql-reference/table-functions/deltalake.md) table function.
Allows processing files from [Delta Lake](https://github.com/delta-io/delta) tables in Amazon S3 in parallel from many nodes in a specified cluster. On initiator it creates a connection to all nodes in the cluster and dispatches each file dynamically. On the worker node it asks the initiator about the next task to process and processes it. This is repeated until all tasks are finished.
**Syntax**
``` sql
deltaLakeCluster(cluster_name, url [,aws_access_key_id, aws_secret_access_key] [,format] [,structure] [,compression])
```
**Arguments**
- `cluster_name` — Name of a cluster that is used to build a set of addresses and connection parameters to remote and local servers.
- Description of all other arguments coincides with description of arguments in equivalent [deltaLake](/docs/en/sql-reference/table-functions/deltalake.md) table function.
**Returned value**
A table with the specified structure for reading data from cluster in the specified Delta Lake table in S3.
**See Also**
- [deltaLake engine](/docs/en/engines/table-engines/integrations/deltalake.md)
- [deltaLake table function](/docs/en/sql-reference/table-functions/deltalake.md)

View File

@ -29,4 +29,4 @@ A table with the specified structure for reading data in the specified Hudi tabl
**See Also**
- [Hudi engine](/docs/en/engines/table-engines/integrations/hudi.md)
- [Hudi cluster table function](/docs/en/sql-reference/table-functions/hudiCluster.md)

View File

@ -0,0 +1,30 @@
---
slug: /en/sql-reference/table-functions/hudiCluster
sidebar_position: 86
sidebar_label: hudiCluster
title: "hudiCluster Table Function"
---
This is an extension to the [hudi](/docs/en/sql-reference/table-functions/hudi.md) table function.
Allows processing files from Apache [Hudi](https://hudi.apache.org/) tables in Amazon S3 in parallel from many nodes in a specified cluster. On initiator it creates a connection to all nodes in the cluster and dispatches each file dynamically. On the worker node it asks the initiator about the next task to process and processes it. This is repeated until all tasks are finished.
**Syntax**
``` sql
hudiCluster(cluster_name, url [,aws_access_key_id, aws_secret_access_key] [,format] [,structure] [,compression])
```
**Arguments**
- `cluster_name` — Name of a cluster that is used to build a set of addresses and connection parameters to remote and local servers.
- Description of all other arguments coincides with description of arguments in equivalent [hudi](/docs/en/sql-reference/table-functions/hudi.md) table function.
**Returned value**
A table with the specified structure for reading data from cluster in the specified Hudi table in S3.
**See Also**
- [Hudi engine](/docs/en/engines/table-engines/integrations/hudi.md)
- [Hudi table function](/docs/en/sql-reference/table-functions/hudi.md)

View File

@ -72,3 +72,4 @@ Table function `iceberg` is an alias to `icebergS3` now.
**See Also**
- [Iceberg engine](/docs/en/engines/table-engines/integrations/iceberg.md)
- [Iceberg cluster table function](/docs/en/sql-reference/table-functions/icebergCluster.md)

View File

@ -0,0 +1,43 @@
---
slug: /en/sql-reference/table-functions/icebergCluster
sidebar_position: 91
sidebar_label: icebergCluster
title: "icebergCluster Table Function"
---
This is an extension to the [iceberg](/docs/en/sql-reference/table-functions/iceberg.md) table function.
Allows processing files from Apache [Iceberg](https://iceberg.apache.org/) in parallel from many nodes in a specified cluster. On initiator it creates a connection to all nodes in the cluster and dispatches each file dynamically. On the worker node it asks the initiator about the next task to process and processes it. This is repeated until all tasks are finished.
**Syntax**
``` sql
icebergS3Cluster(cluster_name, url [, NOSIGN | access_key_id, secret_access_key, [session_token]] [,format] [,compression_method])
icebergS3Cluster(cluster_name, named_collection[, option=value [,..]])
icebergAzureCluster(cluster_name, connection_string|storage_account_url, container_name, blobpath, [,account_name], [,account_key] [,format] [,compression_method])
icebergAzureCluster(cluster_name, named_collection[, option=value [,..]])
icebergHDFSCluster(cluster_name, path_to_table, [,format] [,compression_method])
icebergHDFSCluster(cluster_name, named_collection[, option=value [,..]])
```
**Arguments**
- `cluster_name` — Name of a cluster that is used to build a set of addresses and connection parameters to remote and local servers.
- Description of all other arguments coincides with description of arguments in equivalent [iceberg](/docs/en/sql-reference/table-functions/iceberg.md) table function.
**Returned value**
A table with the specified structure for reading data from cluster in the specified Iceberg table.
**Examples**
```sql
SELECT * FROM icebergS3Cluster('cluster_simple', 'http://test.s3.amazonaws.com/clickhouse-bucket/test_table', 'test', 'test')
```
**See Also**
- [Iceberg engine](/docs/en/engines/table-engines/integrations/iceberg.md)
- [Iceberg table function](/docs/en/sql-reference/table-functions/iceberg.md)

View File

@ -62,16 +62,17 @@ public:
for (size_t i = 0; i < num_rows; ++i)
{
auto array_size = col_num->getInt(i);
auto element_size = col_value->byteSizeAt(i);
if (unlikely(array_size < 0))
throw Exception(ErrorCodes::TOO_LARGE_ARRAY_SIZE, "Array size {} cannot be negative: while executing function {}", array_size, getName());
Int64 estimated_size = 0;
if (unlikely(common::mulOverflow(array_size, col_value->byteSize(), estimated_size)))
throw Exception(ErrorCodes::TOO_LARGE_ARRAY_SIZE, "Array size {} with element size {} bytes is too large: while executing function {}", array_size, col_value->byteSize(), getName());
if (unlikely(common::mulOverflow(array_size, element_size, estimated_size)))
throw Exception(ErrorCodes::TOO_LARGE_ARRAY_SIZE, "Array size {} with element size {} bytes is too large: while executing function {}", array_size, element_size, getName());
if (unlikely(estimated_size > max_array_size_in_columns_bytes))
throw Exception(ErrorCodes::TOO_LARGE_ARRAY_SIZE, "Array size {} with element size {} bytes is too large: while executing function {}", array_size, col_value->byteSize(), getName());
throw Exception(ErrorCodes::TOO_LARGE_ARRAY_SIZE, "Array size {} with element size {} bytes is too large: while executing function {}", array_size, element_size, getName());
offset += array_size;

View File

@ -1187,6 +1187,22 @@ namespace
source_ast->children.push_back(source_ast->elements);
dict.set(dict.source, source_ast);
}
ASTs * getEngineArgsFromCreateQuery(ASTCreateQuery & create_query)
{
ASTStorage * storage_def = create_query.storage;
if (!storage_def)
return nullptr;
if (!storage_def->engine)
return nullptr;
const ASTFunction & engine_def = *storage_def->engine;
if (!engine_def.arguments)
return nullptr;
return &engine_def.arguments->children;
}
}
void InterpreterCreateQuery::setEngine(ASTCreateQuery & create) const
@ -1876,7 +1892,11 @@ bool InterpreterCreateQuery::doCreateTable(ASTCreateQuery & create,
mode);
/// If schema wes inferred while storage creation, add columns description to create query.
addColumnsDescriptionToCreateQueryIfNecessary(query_ptr->as<ASTCreateQuery &>(), res);
auto & create_query = query_ptr->as<ASTCreateQuery &>();
addColumnsDescriptionToCreateQueryIfNecessary(create_query, res);
/// Add any inferred engine args if needed. For example, data format for engines File/S3/URL/etc
if (auto * engine_args = getEngineArgsFromCreateQuery(create_query))
res->addInferredEngineArgsToCreateQuery(*engine_args, getContext());
}
validateVirtualColumns(*res);

View File

@ -284,6 +284,10 @@ public:
/// Returns hints for serialization of columns accorsing to statistics accumulated by storage.
virtual SerializationInfoByName getSerializationHints() const { return {}; }
/// Add engine args that were inferred during storage creation to create query to avoid the same
/// inference on server restart. For example - data format inference in File/URL/S3/etc engines.
virtual void addInferredEngineArgsToCreateQuery(ASTs & /*args*/, const ContextPtr & /*context*/) const {}
private:
StorageID storage_id;

View File

@ -283,7 +283,7 @@ void StorageAzureConfiguration::fromAST(ASTs & engine_args, ContextPtr context,
}
void StorageAzureConfiguration::addStructureAndFormatToArgsIfNeeded(
ASTs & args, const String & structure_, const String & format_, ContextPtr context)
ASTs & args, const String & structure_, const String & format_, ContextPtr context, bool with_structure)
{
if (auto collection = tryGetNamedCollectionWithOverrides(args, context))
{
@ -295,7 +295,7 @@ void StorageAzureConfiguration::addStructureAndFormatToArgsIfNeeded(
auto format_equal_func = makeASTFunction("equals", std::move(format_equal_func_args));
args.push_back(format_equal_func);
}
if (collection->getOrDefault<String>("structure", "auto") == "auto")
if (with_structure && collection->getOrDefault<String>("structure", "auto") == "auto")
{
ASTs structure_equal_func_args = {std::make_shared<ASTIdentifier>("structure"), std::make_shared<ASTLiteral>(structure_)};
auto structure_equal_func = makeASTFunction("equals", std::move(structure_equal_func_args));
@ -319,9 +319,12 @@ void StorageAzureConfiguration::addStructureAndFormatToArgsIfNeeded(
if (args.size() == 3)
{
args.push_back(format_literal);
/// Add compression = "auto" before structure argument.
args.push_back(std::make_shared<ASTLiteral>("auto"));
args.push_back(structure_literal);
if (with_structure)
{
/// Add compression = "auto" before structure argument.
args.push_back(std::make_shared<ASTLiteral>("auto"));
args.push_back(structure_literal);
}
}
/// (connection_string, container_name, blobpath, structure) or
/// (connection_string, container_name, blobpath, format)
@ -334,12 +337,15 @@ void StorageAzureConfiguration::addStructureAndFormatToArgsIfNeeded(
{
if (fourth_arg == "auto")
args[3] = format_literal;
/// Add compression=auto before structure argument.
args.push_back(std::make_shared<ASTLiteral>("auto"));
args.push_back(structure_literal);
if (with_structure)
{
/// Add compression=auto before structure argument.
args.push_back(std::make_shared<ASTLiteral>("auto"));
args.push_back(structure_literal);
}
}
/// (..., structure) -> (..., format, compression, structure)
else
else if (with_structure)
{
auto structure_arg = args.back();
args[3] = format_literal;
@ -362,15 +368,19 @@ void StorageAzureConfiguration::addStructureAndFormatToArgsIfNeeded(
{
if (fourth_arg == "auto")
args[3] = format_literal;
args.push_back(structure_literal);
if (with_structure)
args.push_back(structure_literal);
}
/// (..., account_name, account_key) -> (..., account_name, account_key, format, compression, structure)
else
{
args.push_back(format_literal);
/// Add compression=auto before structure argument.
args.push_back(std::make_shared<ASTLiteral>("auto"));
args.push_back(structure_literal);
if (with_structure)
{
/// Add compression=auto before structure argument.
args.push_back(std::make_shared<ASTLiteral>("auto"));
args.push_back(structure_literal);
}
}
}
/// (connection_string, container_name, blobpath, format, compression, structure) or
@ -386,7 +396,7 @@ void StorageAzureConfiguration::addStructureAndFormatToArgsIfNeeded(
{
if (fourth_arg == "auto")
args[3] = format_literal;
if (checkAndGetLiteralArgument<String>(args[5], "structure") == "auto")
if (with_structure && checkAndGetLiteralArgument<String>(args[5], "structure") == "auto")
args[5] = structure_literal;
}
/// (..., account_name, account_key, format) -> (..., account_name, account_key, format, compression, structure)
@ -394,12 +404,15 @@ void StorageAzureConfiguration::addStructureAndFormatToArgsIfNeeded(
{
if (sixth_arg == "auto")
args[5] = format_literal;
/// Add compression=auto before structure argument.
args.push_back(std::make_shared<ASTLiteral>("auto"));
args.push_back(structure_literal);
if (with_structure)
{
/// Add compression=auto before structure argument.
args.push_back(std::make_shared<ASTLiteral>("auto"));
args.push_back(structure_literal);
}
}
/// (..., account_name, account_key, structure) -> (..., account_name, account_key, format, compression, structure)
else
else if (with_structure)
{
auto structure_arg = args.back();
args[5] = format_literal;
@ -417,14 +430,15 @@ void StorageAzureConfiguration::addStructureAndFormatToArgsIfNeeded(
/// (..., format, compression) -> (..., format, compression, structure)
if (checkAndGetLiteralArgument<String>(args[5], "format") == "auto")
args[5] = format_literal;
args.push_back(structure_literal);
if (with_structure)
args.push_back(structure_literal);
}
/// (storage_account_url, container_name, blobpath, account_name, account_key, format, compression, structure)
else if (args.size() == 8)
{
if (checkAndGetLiteralArgument<String>(args[5], "format") == "auto")
args[5] = format_literal;
if (checkAndGetLiteralArgument<String>(args[7], "structure") == "auto")
if (with_structure && checkAndGetLiteralArgument<String>(args[7], "structure") == "auto")
args[7] = structure_literal;
}
}

View File

@ -76,7 +76,8 @@ public:
ASTs & args,
const String & structure_,
const String & format_,
ContextPtr context) override;
ContextPtr context,
bool with_structure) override;
protected:
void fromNamedCollection(const NamedCollection & collection, ContextPtr context) override;

View File

@ -175,7 +175,8 @@ void StorageHDFSConfiguration::addStructureAndFormatToArgsIfNeeded(
ASTs & args,
const String & structure_,
const String & format_,
ContextPtr context)
ContextPtr context,
bool with_structure)
{
if (auto collection = tryGetNamedCollectionWithOverrides(args, context))
{
@ -187,7 +188,7 @@ void StorageHDFSConfiguration::addStructureAndFormatToArgsIfNeeded(
auto format_equal_func = makeASTFunction("equals", std::move(format_equal_func_args));
args.push_back(format_equal_func);
}
if (collection->getOrDefault<String>("structure", "auto") == "auto")
if (with_structure && collection->getOrDefault<String>("structure", "auto") == "auto")
{
ASTs structure_equal_func_args = {std::make_shared<ASTIdentifier>("structure"), std::make_shared<ASTLiteral>(structure_)};
auto structure_equal_func = makeASTFunction("equals", std::move(structure_equal_func_args));
@ -210,23 +211,26 @@ void StorageHDFSConfiguration::addStructureAndFormatToArgsIfNeeded(
if (count == 1)
{
/// Add format=auto before structure argument.
args.push_back(std::make_shared<ASTLiteral>("auto"));
args.push_back(structure_literal);
args.push_back(format_literal);
if (with_structure)
args.push_back(structure_literal);
}
/// hdfs(url, format)
else if (count == 2)
{
if (checkAndGetLiteralArgument<String>(args[1], "format") == "auto")
args.back() = format_literal;
args.push_back(structure_literal);
if (with_structure)
args.push_back(structure_literal);
}
/// hdfs(url, format, structure)
/// hdfs(url, format, structure, compression_method)
/// hdfs(url, format, compression_method)
else if (count >= 3)
{
if (checkAndGetLiteralArgument<String>(args[1], "format") == "auto")
args[1] = format_literal;
if (checkAndGetLiteralArgument<String>(args[2], "structure") == "auto")
if (with_structure && checkAndGetLiteralArgument<String>(args[2], "structure") == "auto")
args[2] = structure_literal;
}
}

View File

@ -62,7 +62,8 @@ public:
ASTs & args,
const String & structure_,
const String & format_,
ContextPtr context) override;
ContextPtr context,
bool with_structure) override;
private:
void fromNamedCollection(const NamedCollection &, ContextPtr context) override;

View File

@ -59,7 +59,7 @@ public:
ObjectStoragePtr createObjectStorage(ContextPtr, bool) override { return std::make_shared<LocalObjectStorage>("/"); }
void addStructureAndFormatToArgsIfNeeded(ASTs &, const String &, const String &, ContextPtr) override { }
void addStructureAndFormatToArgsIfNeeded(ASTs &, const String &, const String &, ContextPtr, bool) override { }
private:
void fromNamedCollection(const NamedCollection & collection, ContextPtr context) override;

View File

@ -395,7 +395,7 @@ void StorageS3Configuration::fromAST(ASTs & args, ContextPtr context, bool with_
}
void StorageS3Configuration::addStructureAndFormatToArgsIfNeeded(
ASTs & args, const String & structure_, const String & format_, ContextPtr context)
ASTs & args, const String & structure_, const String & format_, ContextPtr context, bool with_structure)
{
if (auto collection = tryGetNamedCollectionWithOverrides(args, context))
{
@ -407,7 +407,7 @@ void StorageS3Configuration::addStructureAndFormatToArgsIfNeeded(
auto format_equal_func = makeASTFunction("equals", std::move(format_equal_func_args));
args.push_back(format_equal_func);
}
if (collection->getOrDefault<String>("structure", "auto") == "auto")
if (with_structure && collection->getOrDefault<String>("structure", "auto") == "auto")
{
ASTs structure_equal_func_args = {std::make_shared<ASTIdentifier>("structure"), std::make_shared<ASTLiteral>(structure_)};
auto structure_equal_func = makeASTFunction("equals", std::move(structure_equal_func_args));
@ -429,8 +429,9 @@ void StorageS3Configuration::addStructureAndFormatToArgsIfNeeded(
if (count == 1)
{
/// Add format=auto before structure argument.
args.push_back(std::make_shared<ASTLiteral>("auto"));
args.push_back(structure_literal);
args.push_back(format_literal);
if (with_structure)
args.push_back(structure_literal);
}
/// s3(s3_url, format) or
/// s3(s3_url, NOSIGN)
@ -444,11 +445,13 @@ void StorageS3Configuration::addStructureAndFormatToArgsIfNeeded(
else if (checkAndGetLiteralArgument<String>(args[1], "format") == "auto")
args[1] = format_literal;
args.push_back(structure_literal);
if (with_structure)
args.push_back(structure_literal);
}
/// s3(source, format, structure) or
/// s3(source, access_key_id, secret_access_key) or
/// s3(source, NOSIGN, format)
/// s3(source, NOSIGN, format) or
/// s3(source, format, compression_method)
/// We can distinguish them by looking at the 2-nd argument: check if it's NOSIGN, format name or neither.
else if (count == 3)
{
@ -457,26 +460,29 @@ void StorageS3Configuration::addStructureAndFormatToArgsIfNeeded(
{
if (checkAndGetLiteralArgument<String>(args[2], "format") == "auto")
args[2] = format_literal;
args.push_back(structure_literal);
if (with_structure)
args.push_back(structure_literal);
}
else if (second_arg == "auto" || FormatFactory::instance().exists(second_arg))
{
if (second_arg == "auto")
args[1] = format_literal;
if (checkAndGetLiteralArgument<String>(args[2], "structure") == "auto")
if (with_structure && checkAndGetLiteralArgument<String>(args[2], "structure") == "auto")
args[2] = structure_literal;
}
else
{
/// Add format and structure arguments.
args.push_back(format_literal);
args.push_back(structure_literal);
if (with_structure)
args.push_back(structure_literal);
}
}
/// s3(source, format, structure, compression_method) or
/// s3(source, access_key_id, secret_access_key, format) or
/// s3(source, access_key_id, secret_access_key, session_token) or
/// s3(source, NOSIGN, format, structure)
/// s3(source, NOSIGN, format, structure) or
/// s3(source, NOSIGN, format, compression_method)
/// We can distinguish them by looking at the 2-nd argument: check if it's NOSIGN, format name or neither.
else if (count == 4)
{
@ -485,14 +491,14 @@ void StorageS3Configuration::addStructureAndFormatToArgsIfNeeded(
{
if (checkAndGetLiteralArgument<String>(args[2], "format") == "auto")
args[2] = format_literal;
if (checkAndGetLiteralArgument<String>(args[3], "structure") == "auto")
if (with_structure && checkAndGetLiteralArgument<String>(args[3], "structure") == "auto")
args[3] = structure_literal;
}
else if (second_arg == "auto" || FormatFactory::instance().exists(second_arg))
{
if (second_arg == "auto")
args[1] = format_literal;
if (checkAndGetLiteralArgument<String>(args[2], "structure") == "auto")
if (with_structure && checkAndGetLiteralArgument<String>(args[2], "structure") == "auto")
args[2] = structure_literal;
}
else
@ -502,18 +508,21 @@ void StorageS3Configuration::addStructureAndFormatToArgsIfNeeded(
{
if (checkAndGetLiteralArgument<String>(args[3], "format") == "auto")
args[3] = format_literal;
args.push_back(structure_literal);
if (with_structure)
args.push_back(structure_literal);
}
else
{
args.push_back(format_literal);
args.push_back(structure_literal);
if (with_structure)
args.push_back(structure_literal);
}
}
}
/// s3(source, access_key_id, secret_access_key, format, structure) or
/// s3(source, access_key_id, secret_access_key, session_token, format) or
/// s3(source, NOSIGN, format, structure, compression_method)
/// s3(source, NOSIGN, format, structure, compression_method) or
/// s3(source, access_key_id, secret_access_key, format, compression)
/// We can distinguish them by looking at the 2-nd argument: check if it's a NOSIGN keyword name or not.
else if (count == 5)
{
@ -522,7 +531,7 @@ void StorageS3Configuration::addStructureAndFormatToArgsIfNeeded(
{
if (checkAndGetLiteralArgument<String>(args[2], "format") == "auto")
args[2] = format_literal;
if (checkAndGetLiteralArgument<String>(args[2], "structure") == "auto")
if (with_structure && checkAndGetLiteralArgument<String>(args[2], "structure") == "auto")
args[3] = structure_literal;
}
else
@ -532,19 +541,21 @@ void StorageS3Configuration::addStructureAndFormatToArgsIfNeeded(
{
if (checkAndGetLiteralArgument<String>(args[3], "format") == "auto")
args[3] = format_literal;
if (checkAndGetLiteralArgument<String>(args[4], "structure") == "auto")
if (with_structure && checkAndGetLiteralArgument<String>(args[4], "structure") == "auto")
args[4] = structure_literal;
}
else
{
if (checkAndGetLiteralArgument<String>(args[4], "format") == "auto")
args[4] = format_literal;
args.push_back(structure_literal);
if (with_structure)
args.push_back(structure_literal);
}
}
}
/// s3(source, access_key_id, secret_access_key, format, structure, compression) or
/// s3(source, access_key_id, secret_access_key, session_token, format, structure)
/// s3(source, access_key_id, secret_access_key, session_token, format, structure) or
/// s3(source, access_key_id, secret_access_key, session_token, format, compression_method)
else if (count == 6)
{
auto fourth_arg = checkAndGetLiteralArgument<String>(args[3], "format/session_token");
@ -552,14 +563,14 @@ void StorageS3Configuration::addStructureAndFormatToArgsIfNeeded(
{
if (checkAndGetLiteralArgument<String>(args[3], "format") == "auto")
args[3] = format_literal;
if (checkAndGetLiteralArgument<String>(args[4], "structure") == "auto")
if (with_structure && checkAndGetLiteralArgument<String>(args[4], "structure") == "auto")
args[4] = structure_literal;
}
else
{
if (checkAndGetLiteralArgument<String>(args[4], "format") == "auto")
args[4] = format_literal;
if (checkAndGetLiteralArgument<String>(args[5], "format") == "auto")
if (with_structure && checkAndGetLiteralArgument<String>(args[5], "format") == "auto")
args[5] = structure_literal;
}
}
@ -568,7 +579,7 @@ void StorageS3Configuration::addStructureAndFormatToArgsIfNeeded(
{
if (checkAndGetLiteralArgument<String>(args[4], "format") == "auto")
args[4] = format_literal;
if (checkAndGetLiteralArgument<String>(args[5], "format") == "auto")
if (with_structure && checkAndGetLiteralArgument<String>(args[5], "format") == "auto")
args[5] = structure_literal;
}
}

View File

@ -91,7 +91,8 @@ public:
ASTs & args,
const String & structure,
const String & format,
ContextPtr context) override;
ContextPtr context,
bool with_structure) override;
private:
void fromNamedCollection(const NamedCollection & collection, ContextPtr context) override;

View File

@ -483,6 +483,11 @@ std::pair<ColumnsDescription, std::string> StorageObjectStorage::resolveSchemaAn
return std::pair(columns, format);
}
void StorageObjectStorage::addInferredEngineArgsToCreateQuery(ASTs & args, const ContextPtr & context) const
{
configuration->addStructureAndFormatToArgsIfNeeded(args, "", configuration->format, context, /*with_structure=*/false);
}
SchemaCache & StorageObjectStorage::getSchemaCache(const ContextPtr & context, const std::string & storage_type_name)
{
if (storage_type_name == "s3")

View File

@ -122,6 +122,8 @@ public:
std::string & sample_path,
const ContextPtr & context);
void addInferredEngineArgsToCreateQuery(ASTs & args, const ContextPtr & context) const override;
protected:
String getPathSample(StorageInMemoryMetadata metadata, ContextPtr context);
@ -179,7 +181,7 @@ public:
/// Add/replace structure and format arguments in the AST arguments if they have 'auto' values.
virtual void addStructureAndFormatToArgsIfNeeded(
ASTs & args, const String & structure_, const String & format_, ContextPtr context) = 0;
ASTs & args, const String & structure_, const String & format_, ContextPtr context, bool with_structure) = 0;
bool withPartitionWildcard() const;
bool withGlobs() const { return isPathWithGlobs() || isNamespaceWithGlobs(); }

View File

@ -107,7 +107,7 @@ void StorageObjectStorageCluster::updateQueryToSendIfNeeded(
ASTPtr cluster_name_arg = args.front();
args.erase(args.begin());
configuration->addStructureAndFormatToArgsIfNeeded(args, structure, configuration->format, context);
configuration->addStructureAndFormatToArgsIfNeeded(args, structure, configuration->format, context, /*with_structure=*/true);
args.insert(args.begin(), cluster_name_arg);
}

View File

@ -2113,6 +2113,11 @@ void StorageFile::truncate(
}
}
void StorageFile::addInferredEngineArgsToCreateQuery(ASTs & args, const ContextPtr & context) const
{
if (checkAndGetLiteralArgument<String>(evaluateConstantExpressionOrIdentifierAsLiteral(args[0], context), "format") == "auto")
args[0] = std::make_shared<ASTLiteral>(format_name);
}
void registerStorageFile(StorageFactory & factory)
{

View File

@ -139,6 +139,8 @@ public:
bool supportsTrivialCountOptimization(const StorageSnapshotPtr &, ContextPtr) const override { return true; }
void addInferredEngineArgsToCreateQuery(ASTs & args, const ContextPtr & context) const override;
protected:
friend class StorageFileSource;
friend class StorageFileSink;

View File

@ -38,6 +38,8 @@
#include <Common/logger_useful.h>
#include <Common/re2.h>
#include <TableFunctions/TableFunctionURL.h>
#include <Formats/SchemaInferenceUtils.h>
#include <Core/FormatFactorySettings.h>
#include <Core/ServerSettings.h>
@ -1570,6 +1572,11 @@ void StorageURL::processNamedCollectionResult(Configuration & configuration, con
configuration.structure = collection.getOrDefault<String>("structure", "auto");
}
void StorageURL::addInferredEngineArgsToCreateQuery(ASTs & args, const ContextPtr & context) const
{
TableFunctionURL::updateStructureAndFormatArgumentsIfNeeded(args, "", format_name, context, /*with_structure=*/false);
}
StorageURL::Configuration StorageURL::getConfiguration(ASTs & args, const ContextPtr & local_context)
{
StorageURL::Configuration configuration;

View File

@ -305,6 +305,8 @@ public:
bool supportsDynamicSubcolumns() const override { return true; }
void addInferredEngineArgsToCreateQuery(ASTs & args, const ContextPtr & context) const override;
static FormatSettings getFormatSettingsFromArgs(const StorageFactory::Arguments & args);
struct Configuration : public StatelessTableEngineConfiguration

View File

@ -31,7 +31,7 @@ public:
ASTPtr cluster_name_arg = args.front();
args.erase(args.begin());
Base::updateStructureAndFormatArgumentsIfNeeded(args, structure_, format_, context);
Base::updateStructureAndFormatArgumentsIfNeeded(args, structure_, format_, context, /*with_structure=*/true);
args.insert(args.begin(), cluster_name_arg);
}

View File

@ -88,7 +88,7 @@ void ITableFunctionFileLike::parseArgumentsImpl(ASTs & args, const ContextPtr &
compression_method = checkAndGetLiteralArgument<String>(args[3], "compression_method");
}
void ITableFunctionFileLike::updateStructureAndFormatArgumentsIfNeeded(ASTs & args, const String & structure, const String & format, const ContextPtr & context)
void ITableFunctionFileLike::updateStructureAndFormatArgumentsIfNeeded(ASTs & args, const String & structure, const String & format, const ContextPtr & context, bool with_structure)
{
if (args.empty() || args.size() > getMaxNumberOfArguments())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected 1 to {} arguments in table function, got {}", getMaxNumberOfArguments(), args.size());
@ -103,21 +103,23 @@ void ITableFunctionFileLike::updateStructureAndFormatArgumentsIfNeeded(ASTs & ar
if (args.size() == 1)
{
args.push_back(format_literal);
args.push_back(structure_literal);
if (with_structure)
args.push_back(structure_literal);
}
/// f(filename, format)
else if (args.size() == 2)
{
if (checkAndGetLiteralArgument<String>(args[1], "format") == "auto")
args.back() = format_literal;
args.push_back(structure_literal);
if (with_structure)
args.push_back(structure_literal);
}
/// f(filename, format, structure) or f(filename, format, structure, compression)
/// f(filename, format, structure) or f(filename, format, structure, compression) or f(filename, format, compression)
else if (args.size() >= 3)
{
if (checkAndGetLiteralArgument<String>(args[1], "format") == "auto")
args[1] = format_literal;
if (checkAndGetLiteralArgument<String>(args[2], "structure") == "auto")
if (with_structure && checkAndGetLiteralArgument<String>(args[2], "structure") == "auto")
args[2] = structure_literal;
}
}

View File

@ -35,7 +35,7 @@ public:
static size_t getMaxNumberOfArguments() { return max_number_of_arguments; }
static void updateStructureAndFormatArgumentsIfNeeded(ASTs & args, const String & structure, const String & format, const ContextPtr &);
static void updateStructureAndFormatArgumentsIfNeeded(ASTs & args, const String & structure, const String & format, const ContextPtr &, bool with_structure);
protected:

View File

@ -226,6 +226,26 @@ template class TableFunctionObjectStorage<HDFSClusterDefinition, StorageHDFSConf
#endif
template class TableFunctionObjectStorage<LocalDefinition, StorageLocalConfiguration>;
#if USE_AVRO && USE_AWS_S3
template class TableFunctionObjectStorage<IcebergS3ClusterDefinition, StorageS3IcebergConfiguration>;
#endif
#if USE_AVRO && USE_AZURE_BLOB_STORAGE
template class TableFunctionObjectStorage<IcebergAzureClusterDefinition, StorageAzureIcebergConfiguration>;
#endif
#if USE_AVRO && USE_HDFS
template class TableFunctionObjectStorage<IcebergHDFSClusterDefinition, StorageHDFSIcebergConfiguration>;
#endif
#if USE_PARQUET && USE_AWS_S3
template class TableFunctionObjectStorage<DeltaLakeClusterDefinition, StorageS3DeltaLakeConfiguration>;
#endif
#if USE_AWS_S3
template class TableFunctionObjectStorage<HudiClusterDefinition, StorageS3HudiConfiguration>;
#endif
#if USE_AVRO
void registerTableFunctionIceberg(TableFunctionFactory & factory)
{

View File

@ -139,7 +139,7 @@ public:
const String & format,
const ContextPtr & context)
{
Configuration().addStructureAndFormatToArgsIfNeeded(args, structure, format, context);
Configuration().addStructureAndFormatToArgsIfNeeded(args, structure, format, context, /*with_structure=*/true);
}
protected:

View File

@ -96,7 +96,7 @@ void registerTableFunctionObjectStorageCluster(TableFunctionFactory & factory)
{
.documentation = {
.description=R"(The table function can be used to read the data stored on HDFS in parallel for many nodes in a specified cluster.)",
.examples{{"HDFSCluster", "SELECT * FROM HDFSCluster(cluster_name, uri, format)", ""}}},
.examples{{"HDFSCluster", "SELECT * FROM HDFSCluster(cluster, uri, format)", ""}}},
.allow_readonly = false
}
);
@ -105,15 +105,77 @@ void registerTableFunctionObjectStorageCluster(TableFunctionFactory & factory)
UNUSED(factory);
}
#if USE_AVRO
void registerTableFunctionIcebergCluster(TableFunctionFactory & factory)
{
UNUSED(factory);
#if USE_AWS_S3
template class TableFunctionObjectStorageCluster<S3ClusterDefinition, StorageS3Configuration>;
factory.registerFunction<TableFunctionIcebergS3Cluster>(
{.documentation
= {.description = R"(The table function can be used to read the Iceberg table stored on S3 object store in parallel for many nodes in a specified cluster.)",
.examples{{"icebergS3Cluster", "SELECT * FROM icebergS3Cluster(cluster, url, [, NOSIGN | access_key_id, secret_access_key, [session_token]], format, [,compression])", ""}},
.categories{"DataLake"}},
.allow_readonly = false});
#endif
#if USE_AZURE_BLOB_STORAGE
template class TableFunctionObjectStorageCluster<AzureClusterDefinition, StorageAzureConfiguration>;
factory.registerFunction<TableFunctionIcebergAzureCluster>(
{.documentation
= {.description = R"(The table function can be used to read the Iceberg table stored on Azure object store in parallel for many nodes in a specified cluster.)",
.examples{{"icebergAzureCluster", "SELECT * FROM icebergAzureCluster(cluster, connection_string|storage_account_url, container_name, blobpath, [account_name, account_key, format, compression])", ""}},
.categories{"DataLake"}},
.allow_readonly = false});
#endif
#if USE_HDFS
template class TableFunctionObjectStorageCluster<HDFSClusterDefinition, StorageHDFSConfiguration>;
factory.registerFunction<TableFunctionIcebergHDFSCluster>(
{.documentation
= {.description = R"(The table function can be used to read the Iceberg table stored on HDFS virtual filesystem in parallel for many nodes in a specified cluster.)",
.examples{{"icebergHDFSCluster", "SELECT * FROM icebergHDFSCluster(cluster, uri, [format], [structure], [compression_method])", ""}},
.categories{"DataLake"}},
.allow_readonly = false});
#endif
}
#endif
#if USE_AWS_S3
#if USE_PARQUET
void registerTableFunctionDeltaLakeCluster(TableFunctionFactory & factory)
{
factory.registerFunction<TableFunctionDeltaLakeCluster>(
{.documentation
= {.description = R"(The table function can be used to read the DeltaLake table stored on object store in parallel for many nodes in a specified cluster.)",
.examples{{"deltaLakeCluster", "SELECT * FROM deltaLakeCluster(cluster, url, access_key_id, secret_access_key)", ""}},
.categories{"DataLake"}},
.allow_readonly = false});
}
#endif
void registerTableFunctionHudiCluster(TableFunctionFactory & factory)
{
factory.registerFunction<TableFunctionHudiCluster>(
{.documentation
= {.description = R"(The table function can be used to read the Hudi table stored on object store in parallel for many nodes in a specified cluster.)",
.examples{{"hudiCluster", "SELECT * FROM hudiCluster(cluster, url, access_key_id, secret_access_key)", ""}},
.categories{"DataLake"}},
.allow_readonly = false});
}
#endif
void registerDataLakeClusterTableFunctions(TableFunctionFactory & factory)
{
UNUSED(factory);
#if USE_AVRO
registerTableFunctionIcebergCluster(factory);
#endif
#if USE_AWS_S3
#if USE_PARQUET
registerTableFunctionDeltaLakeCluster(factory);
#endif
registerTableFunctionHudiCluster(factory);
#endif
}
}

View File

@ -33,6 +33,36 @@ struct HDFSClusterDefinition
static constexpr auto storage_type_name = "HDFSCluster";
};
struct IcebergS3ClusterDefinition
{
static constexpr auto name = "icebergS3Cluster";
static constexpr auto storage_type_name = "IcebergS3Cluster";
};
struct IcebergAzureClusterDefinition
{
static constexpr auto name = "icebergAzureCluster";
static constexpr auto storage_type_name = "IcebergAzureCluster";
};
struct IcebergHDFSClusterDefinition
{
static constexpr auto name = "icebergHDFSCluster";
static constexpr auto storage_type_name = "IcebergHDFSCluster";
};
struct DeltaLakeClusterDefinition
{
static constexpr auto name = "deltaLakeCluster";
static constexpr auto storage_type_name = "DeltaLakeS3Cluster";
};
struct HudiClusterDefinition
{
static constexpr auto name = "hudiCluster";
static constexpr auto storage_type_name = "HudiS3Cluster";
};
/**
* Class implementing s3/hdfs/azureBlobStorageCluster(...) table functions,
* which allow to process many files from S3/HDFS/Azure blob storage on a specific cluster.
@ -79,4 +109,25 @@ using TableFunctionAzureBlobCluster = TableFunctionObjectStorageCluster<AzureClu
#if USE_HDFS
using TableFunctionHDFSCluster = TableFunctionObjectStorageCluster<HDFSClusterDefinition, StorageHDFSConfiguration>;
#endif
#if USE_AVRO && USE_AWS_S3
using TableFunctionIcebergS3Cluster = TableFunctionObjectStorageCluster<IcebergS3ClusterDefinition, StorageS3IcebergConfiguration>;
#endif
#if USE_AVRO && USE_AZURE_BLOB_STORAGE
using TableFunctionIcebergAzureCluster = TableFunctionObjectStorageCluster<IcebergAzureClusterDefinition, StorageAzureIcebergConfiguration>;
#endif
#if USE_AVRO && USE_HDFS
using TableFunctionIcebergHDFSCluster = TableFunctionObjectStorageCluster<IcebergHDFSClusterDefinition, StorageHDFSIcebergConfiguration>;
#endif
#if USE_AWS_S3 && USE_PARQUET
using TableFunctionDeltaLakeCluster = TableFunctionObjectStorageCluster<DeltaLakeClusterDefinition, StorageS3DeltaLakeConfiguration>;
#endif
#if USE_AWS_S3
using TableFunctionHudiCluster = TableFunctionObjectStorageCluster<HudiClusterDefinition, StorageS3HudiConfiguration>;
#endif
}

View File

@ -77,7 +77,7 @@ void TableFunctionURL::parseArgumentsImpl(ASTs & args, const ContextPtr & contex
}
}
void TableFunctionURL::updateStructureAndFormatArgumentsIfNeeded(ASTs & args, const String & structure_, const String & format_, const ContextPtr & context)
void TableFunctionURL::updateStructureAndFormatArgumentsIfNeeded(ASTs & args, const String & structure_, const String & format_, const ContextPtr & context, bool with_structure)
{
if (auto collection = tryGetNamedCollectionWithOverrides(args, context))
{
@ -89,7 +89,7 @@ void TableFunctionURL::updateStructureAndFormatArgumentsIfNeeded(ASTs & args, co
auto format_equal_func = makeASTFunction("equals", std::move(format_equal_func_args));
args.push_back(format_equal_func);
}
if (collection->getOrDefault<String>("structure", "auto") == "auto")
if (with_structure && collection->getOrDefault<String>("structure", "auto") == "auto")
{
ASTs structure_equal_func_args = {std::make_shared<ASTIdentifier>("structure"), std::make_shared<ASTLiteral>(structure_)};
auto structure_equal_func = makeASTFunction("equals", std::move(structure_equal_func_args));
@ -109,7 +109,7 @@ void TableFunctionURL::updateStructureAndFormatArgumentsIfNeeded(ASTs & args, co
args.pop_back();
}
ITableFunctionFileLike::updateStructureAndFormatArgumentsIfNeeded(args, structure_, format_, context);
ITableFunctionFileLike::updateStructureAndFormatArgumentsIfNeeded(args, structure_, format_, context, with_structure);
if (headers_ast)
args.push_back(headers_ast);

View File

@ -34,7 +34,7 @@ public:
ColumnsDescription getActualTableStructure(ContextPtr context, bool is_insert_query) const override;
static void updateStructureAndFormatArgumentsIfNeeded(ASTs & args, const String & structure_, const String & format_, const ContextPtr & context);
static void updateStructureAndFormatArgumentsIfNeeded(ASTs & args, const String & structure_, const String & format_, const ContextPtr & context, bool with_structure);
protected:
void parseArguments(const ASTPtr & ast, ContextPtr context) override;

View File

@ -66,6 +66,7 @@ void registerTableFunctions(bool use_legacy_mongodb_integration [[maybe_unused]]
registerTableFunctionObjectStorage(factory);
registerTableFunctionObjectStorageCluster(factory);
registerDataLakeTableFunctions(factory);
registerDataLakeClusterTableFunctions(factory);
}
}

View File

@ -70,6 +70,7 @@ void registerTableFunctionExplain(TableFunctionFactory & factory);
void registerTableFunctionObjectStorage(TableFunctionFactory & factory);
void registerTableFunctionObjectStorageCluster(TableFunctionFactory & factory);
void registerDataLakeTableFunctions(TableFunctionFactory & factory);
void registerDataLakeClusterTableFunctions(TableFunctionFactory & factory);
void registerTableFunctionTimeSeries(TableFunctionFactory & factory);

View File

@ -1314,6 +1314,7 @@ def test_size_virtual_column(cluster):
def test_format_detection(cluster):
node = cluster.instances["node"]
connection_string = cluster.env_variables["AZURITE_CONNECTION_STRING"]
storage_account_url = cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]
account_name = "devstoreaccount1"
account_key = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="
@ -1381,6 +1382,84 @@ def test_format_detection(cluster):
assert result == expected_result
azure_query(
node,
f"create table test_format_detection engine=AzureBlobStorage('{connection_string}', 'cont', 'test_format_detection1')",
)
result = azure_query(
node,
f"show create table test_format_detection",
)
assert (
result
== f"CREATE TABLE default.test_format_detection\\n(\\n `x` Nullable(String),\\n `y` Nullable(String)\\n)\\nENGINE = AzureBlobStorage(\\'{connection_string}\\', \\'cont\\', \\'test_format_detection1\\', \\'JSON\\')\n"
)
azure_query(
node,
f"create or replace table test_format_detection engine=AzureBlobStorage('{connection_string}', 'cont', 'test_format_detection1', auto)",
)
result = azure_query(
node,
f"show create table test_format_detection",
)
assert (
result
== f"CREATE TABLE default.test_format_detection\\n(\\n `x` Nullable(String),\\n `y` Nullable(String)\\n)\\nENGINE = AzureBlobStorage(\\'{connection_string}\\', \\'cont\\', \\'test_format_detection1\\', \\'JSON\\')\n"
)
azure_query(
node,
f"create or replace table test_format_detection engine=AzureBlobStorage('{connection_string}', 'cont', 'test_format_detection1', auto, 'none')",
)
result = azure_query(
node,
f"show create table test_format_detection",
)
assert (
result
== f"CREATE TABLE default.test_format_detection\\n(\\n `x` Nullable(String),\\n `y` Nullable(String)\\n)\\nENGINE = AzureBlobStorage(\\'{connection_string}\\', \\'cont\\', \\'test_format_detection1\\', \\'JSON\\', \\'none\\')\n"
)
azure_query(
node,
f"create or replace table test_format_detection engine=AzureBlobStorage('{storage_account_url}', 'cont', 'test_format_detection1', '{account_name}', '{account_key}')",
)
result = azure_query(
node,
f"show create table test_format_detection",
)
assert (
result
== f"CREATE TABLE default.test_format_detection\\n(\\n `x` Nullable(String),\\n `y` Nullable(String)\\n)\\nENGINE = AzureBlobStorage(\\'{storage_account_url}\\', \\'cont\\', \\'test_format_detection1\\', \\'{account_name}\\', \\'{account_key}\\', \\'JSON\\')\n"
)
azure_query(
node,
f"create or replace table test_format_detection engine=AzureBlobStorage('{storage_account_url}', 'cont', 'test_format_detection1', '{account_name}', '{account_key}', auto)",
)
result = azure_query(
node,
f"show create table test_format_detection",
)
assert (
result
== f"CREATE TABLE default.test_format_detection\\n(\\n `x` Nullable(String),\\n `y` Nullable(String)\\n)\\nENGINE = AzureBlobStorage(\\'{storage_account_url}\\', \\'cont\\', \\'test_format_detection1\\', \\'{account_name}\\', \\'{account_key}\\', \\'JSON\\')\n"
)
azure_query(
node,
f"create or replace table test_format_detection engine=AzureBlobStorage('{storage_account_url}', 'cont', 'test_format_detection1', '{account_name}', '{account_key}', auto, 'none')",
)
result = azure_query(
node,
f"show create table test_format_detection",
)
assert (
result
== f"CREATE TABLE default.test_format_detection\\n(\\n `x` Nullable(String),\\n `y` Nullable(String)\\n)\\nENGINE = AzureBlobStorage(\\'{storage_account_url}\\', \\'cont\\', \\'test_format_detection1\\', \\'{account_name}\\', \\'{account_key}\\', \\'JSON\\', \\'none\\')\n"
)
def test_write_to_globbed_partitioned_path(cluster):
node = cluster.instances["node"]

View File

@ -636,7 +636,7 @@ def test_multiple_inserts(started_cluster):
node1.query(f"drop table test_multiple_inserts")
def test_format_detection(started_cluster):
def test_format_detection_from_file_name(started_cluster):
node1.query(
f"create table arrow_table (x UInt64) engine=HDFS('hdfs://hdfs1:9000/data.arrow')"
)
@ -1222,6 +1222,35 @@ def test_format_detection(started_cluster):
assert expected_result == result
node.query(
f"create table test_format_detection engine=HDFS('hdfs://hdfs1:9000/{dir}/test_format_detection1')"
)
result = node.query(f"show create table test_format_detection")
assert (
result
== f"CREATE TABLE default.test_format_detection\\n(\\n `x` Nullable(String),\\n `y` Nullable(String)\\n)\\nENGINE = HDFS(\\'hdfs://hdfs1:9000/{dir}/test_format_detection1\\', \\'JSON\\')\n"
)
node.query("drop table test_format_detection")
node.query(
f"create table test_format_detection engine=HDFS('hdfs://hdfs1:9000/{dir}/test_format_detection1', auto)"
)
result = node.query(f"show create table test_format_detection")
assert (
result
== f"CREATE TABLE default.test_format_detection\\n(\\n `x` Nullable(String),\\n `y` Nullable(String)\\n)\\nENGINE = HDFS(\\'hdfs://hdfs1:9000/{dir}/test_format_detection1\\', \\'JSON\\')\n"
)
node.query("drop table test_format_detection")
node.query(
f"create table test_format_detection engine=HDFS('hdfs://hdfs1:9000/{dir}/test_format_detection1', auto, 'none')"
)
result = node.query(f"show create table test_format_detection")
assert (
result
== f"CREATE TABLE default.test_format_detection\\n(\\n `x` Nullable(String),\\n `y` Nullable(String)\\n)\\nENGINE = HDFS(\\'hdfs://hdfs1:9000/{dir}/test_format_detection1\\', \\'JSON\\', \\'none\\')\n"
)
def test_write_to_globbed_partitioned_path(started_cluster):
node = started_cluster.instances["node1"]

View File

@ -0,0 +1,20 @@
<clickhouse>
<remote_servers>
<cluster_simple>
<shard>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
<replica>
<host>node2</host>
<port>9000</port>
</replica>
<replica>
<host>node3</host>
<port>9000</port>
</replica>
</shard>
</cluster_simple>
</remote_servers>
</clickhouse>

View File

@ -0,0 +1,6 @@
<clickhouse>
<query_log>
<database>system</database>
<table>query_log</table>
</query_log>
</clickhouse>

View File

@ -73,14 +73,38 @@ def started_cluster():
cluster.add_instance(
"node1",
main_configs=[
"configs/config.d/query_log.xml",
"configs/config.d/cluster.xml",
"configs/config.d/named_collections.xml",
"configs/config.d/filesystem_caches.xml",
],
user_configs=["configs/users.d/users.xml"],
with_minio=True,
with_azurite=True,
stay_alive=True,
with_hdfs=with_hdfs,
stay_alive=True,
)
cluster.add_instance(
"node2",
main_configs=[
"configs/config.d/query_log.xml",
"configs/config.d/cluster.xml",
"configs/config.d/named_collections.xml",
"configs/config.d/filesystem_caches.xml",
],
user_configs=["configs/users.d/users.xml"],
stay_alive=True,
)
cluster.add_instance(
"node3",
main_configs=[
"configs/config.d/query_log.xml",
"configs/config.d/cluster.xml",
"configs/config.d/named_collections.xml",
"configs/config.d/filesystem_caches.xml",
],
user_configs=["configs/users.d/users.xml"],
stay_alive=True,
)
logging.info("Starting cluster...")
@ -182,6 +206,7 @@ def get_creation_expression(
cluster,
format="Parquet",
table_function=False,
run_on_cluster=False,
**kwargs,
):
if storage_type == "s3":
@ -189,35 +214,56 @@ def get_creation_expression(
bucket = kwargs["bucket"]
else:
bucket = cluster.minio_bucket
print(bucket)
if table_function:
return f"icebergS3(s3, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'http://minio1:9001/{bucket}/')"
if run_on_cluster:
assert table_function
return f"icebergS3Cluster('cluster_simple', s3, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'http://minio1:9001/{bucket}/')"
else:
return f"""
DROP TABLE IF EXISTS {table_name};
CREATE TABLE {table_name}
ENGINE=IcebergS3(s3, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'http://minio1:9001/{bucket}/')"""
if table_function:
return f"icebergS3(s3, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'http://minio1:9001/{bucket}/')"
else:
return f"""
DROP TABLE IF EXISTS {table_name};
CREATE TABLE {table_name}
ENGINE=IcebergS3(s3, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'http://minio1:9001/{bucket}/')"""
elif storage_type == "azure":
if table_function:
if run_on_cluster:
assert table_function
return f"""
icebergAzure(azure, container = '{cluster.azure_container_name}', storage_account_url = '{cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]}', blob_path = '/iceberg_data/default/{table_name}/', format={format})
icebergAzureCluster('cluster_simple', azure, container = '{cluster.azure_container_name}', storage_account_url = '{cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]}', blob_path = '/iceberg_data/default/{table_name}/', format={format})
"""
else:
return f"""
DROP TABLE IF EXISTS {table_name};
CREATE TABLE {table_name}
ENGINE=IcebergAzure(azure, container = {cluster.azure_container_name}, storage_account_url = '{cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]}', blob_path = '/iceberg_data/default/{table_name}/', format={format})"""
if table_function:
return f"""
icebergAzure(azure, container = '{cluster.azure_container_name}', storage_account_url = '{cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]}', blob_path = '/iceberg_data/default/{table_name}/', format={format})
"""
else:
return f"""
DROP TABLE IF EXISTS {table_name};
CREATE TABLE {table_name}
ENGINE=IcebergAzure(azure, container = {cluster.azure_container_name}, storage_account_url = '{cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]}', blob_path = '/iceberg_data/default/{table_name}/', format={format})"""
elif storage_type == "hdfs":
if table_function:
if run_on_cluster:
assert table_function
return f"""
icebergHDFS(hdfs, filename= 'iceberg_data/default/{table_name}/', format={format}, url = 'hdfs://hdfs1:9000/')
icebergHDFSCluster('cluster_simple', hdfs, filename= 'iceberg_data/default/{table_name}/', format={format}, url = 'hdfs://hdfs1:9000/')
"""
else:
return f"""
DROP TABLE IF EXISTS {table_name};
CREATE TABLE {table_name}
ENGINE=IcebergHDFS(hdfs, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'hdfs://hdfs1:9000/');"""
if table_function:
return f"""
icebergHDFS(hdfs, filename= 'iceberg_data/default/{table_name}/', format={format}, url = 'hdfs://hdfs1:9000/')
"""
else:
return f"""
DROP TABLE IF EXISTS {table_name};
CREATE TABLE {table_name}
ENGINE=IcebergHDFS(hdfs, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'hdfs://hdfs1:9000/');"""
elif storage_type == "local":
assert not run_on_cluster
if table_function:
return f"""
icebergLocal(local, path = '/iceberg_data/default/{table_name}/', format={format})
@ -227,6 +273,7 @@ def get_creation_expression(
DROP TABLE IF EXISTS {table_name};
CREATE TABLE {table_name}
ENGINE=IcebergLocal(local, path = '/iceberg_data/default/{table_name}/', format={format});"""
else:
raise Exception(f"Unknown iceberg storage type: {storage_type}")
@ -492,6 +539,108 @@ def test_types(started_cluster, format_version, storage_type):
)
@pytest.mark.parametrize("format_version", ["1", "2"])
@pytest.mark.parametrize("storage_type", ["s3", "azure", "hdfs"])
def test_cluster_table_function(started_cluster, format_version, storage_type):
if is_arm() and storage_type == "hdfs":
pytest.skip("Disabled test IcebergHDFS for aarch64")
instance = started_cluster.instances["node1"]
spark = started_cluster.spark_session
TABLE_NAME = (
"test_iceberg_cluster_"
+ format_version
+ "_"
+ storage_type
+ "_"
+ get_uuid_str()
)
def add_df(mode):
write_iceberg_from_df(
spark,
generate_data(spark, 0, 100),
TABLE_NAME,
mode=mode,
format_version=format_version,
)
files = default_upload_directory(
started_cluster,
storage_type,
f"/iceberg_data/default/{TABLE_NAME}/",
f"/iceberg_data/default/{TABLE_NAME}/",
)
logging.info(f"Adding another dataframe. result files: {files}")
return files
files = add_df(mode="overwrite")
for i in range(1, len(started_cluster.instances)):
files = add_df(mode="append")
logging.info(f"Setup complete. files: {files}")
assert len(files) == 5 + 4 * (len(started_cluster.instances) - 1)
clusters = instance.query(f"SELECT * FROM system.clusters")
logging.info(f"Clusters setup: {clusters}")
# Regular Query only node1
table_function_expr = get_creation_expression(
storage_type, TABLE_NAME, started_cluster, table_function=True
)
select_regular = (
instance.query(f"SELECT * FROM {table_function_expr}").strip().split()
)
# Cluster Query with node1 as coordinator
table_function_expr_cluster = get_creation_expression(
storage_type,
TABLE_NAME,
started_cluster,
table_function=True,
run_on_cluster=True,
)
select_cluster = (
instance.query(f"SELECT * FROM {table_function_expr_cluster}").strip().split()
)
# Simple size check
assert len(select_regular) == 600
assert len(select_cluster) == 600
# Actual check
assert select_cluster == select_regular
# Check query_log
for replica in started_cluster.instances.values():
replica.query("SYSTEM FLUSH LOGS")
for node_name, replica in started_cluster.instances.items():
cluster_secondary_queries = (
replica.query(
f"""
SELECT query, type, is_initial_query, read_rows, read_bytes FROM system.query_log
WHERE
type = 'QueryStart' AND
positionCaseInsensitive(query, '{storage_type}Cluster') != 0 AND
position(query, '{TABLE_NAME}') != 0 AND
position(query, 'system.query_log') = 0 AND
NOT is_initial_query
"""
)
.strip()
.split("\n")
)
logging.info(
f"[{node_name}] cluster_secondary_queries: {cluster_secondary_queries}"
)
assert len(cluster_secondary_queries) == 1
@pytest.mark.parametrize("format_version", ["1", "2"])
@pytest.mark.parametrize("storage_type", ["s3", "azure", "hdfs", "local"])
def test_delete_files(started_cluster, format_version, storage_type):

View File

@ -1,3 +1,6 @@
SELECT arrayWithConstant(96142475, ['qMUF']); -- { serverError TOO_LARGE_ARRAY_SIZE }
SELECT arrayWithConstant(100000000, materialize([[[[[[[[[['Hello, world!']]]]]]]]]])); -- { serverError TOO_LARGE_ARRAY_SIZE }
SELECT length(arrayWithConstant(10000000, materialize([[[[[[[[[['Hello world']]]]]]]]]])));
CREATE TEMPORARY TABLE args (value Array(Int)) ENGINE=Memory AS SELECT [1, 1, 1, 1] as value FROM numbers(1, 100);
SELECT length(arrayWithConstant(1000000, value)) FROM args FORMAT NULL;

View File

@ -0,0 +1,14 @@
CREATE TABLE default.test\n(\n `a` Nullable(Int64)\n)\nENGINE = S3(\'http://localhost:11111/test/json_data\', \'JSON\')
CREATE TABLE default.test\n(\n `a` Nullable(Int64)\n)\nENGINE = S3(\'http://localhost:11111/test/json_data\', \'NOSIGN\', \'JSON\')
CREATE TABLE default.test\n(\n `a` Nullable(Int64)\n)\nENGINE = S3(\'http://localhost:11111/test/json_data\', \'JSON\')
CREATE TABLE default.test\n(\n `a` Nullable(Int64)\n)\nENGINE = S3(\'http://localhost:11111/test/json_data\', \'JSON\', \'none\')
CREATE TABLE default.test\n(\n `a` Nullable(Int64)\n)\nENGINE = S3(\'http://localhost:11111/test/json_data\', \'NOSIGN\', \'JSON\')
CREATE TABLE default.test\n(\n `a` Nullable(Int64)\n)\nENGINE = S3(\'http://localhost:11111/test/json_data\', \'test\', \'[HIDDEN]\', \'JSON\')
CREATE TABLE default.test\n(\n `a` Nullable(Int64)\n)\nENGINE = S3(\'http://localhost:11111/test/json_data\', \'NOSIGN\', \'JSON\', \'none\')
CREATE TABLE default.test\n(\n `a` Nullable(Int64)\n)\nENGINE = S3(\'http://localhost:11111/test/json_data\', \'test\', \'[HIDDEN]\', \'\', \'JSON\')
CREATE TABLE default.test\n(\n `a` Nullable(Int64)\n)\nENGINE = S3(\'http://localhost:11111/test/json_data\', \'test\', \'[HIDDEN]\', \'\', \'JSON\')
CREATE TABLE default.test\n(\n `a` Nullable(Int64)\n)\nENGINE = S3(\'http://localhost:11111/test/json_data\', \'test\', \'[HIDDEN]\', \'JSON\', \'none\')
CREATE TABLE default.test\n(\n `a` Nullable(Int64)\n)\nENGINE = S3(\'http://localhost:11111/test/json_data\', \'test\', \'[HIDDEN]\', \'\', \'JSON\', \'none\')
CREATE TABLE default.test\n(\n `a` Nullable(Int64)\n)\nENGINE = URL(\'http://localhost:11111/test/json_data\', \'JSON\')
CREATE TABLE default.test\n(\n `a` Nullable(Int64)\n)\nENGINE = URL(\'http://localhost:11111/test/json_data\', \'JSON\')
CREATE TABLE default.test\n(\n `a` Nullable(Int64)\n)\nENGINE = URL(\'http://localhost:11111/test/json_data\', \'JSON\', \'none\')

View File

@ -0,0 +1,59 @@
-- Tags: no-fasttest
drop table if exists test;
create table test engine=S3('http://localhost:11111/test/json_data');
show create table test;
drop table test;
create table test engine=S3('http://localhost:11111/test/json_data', NOSIGN);
show create table test;
drop table test;
create table test engine=S3('http://localhost:11111/test/json_data', auto);
show create table test;
drop table test;
create table test engine=S3('http://localhost:11111/test/json_data', auto, 'none');
show create table test;
drop table test;
create table test engine=S3('http://localhost:11111/test/json_data', NOSIGN, auto);
show create table test;
drop table test;
create table test engine=S3('http://localhost:11111/test/json_data', 'test', 'testtest');
show create table test;
drop table test;
create table test engine=S3('http://localhost:11111/test/json_data', NOSIGN, auto, 'none');
show create table test;
drop table test;
create table test engine=S3('http://localhost:11111/test/json_data', 'test', 'testtest', '');
show create table test;
drop table test;
create table test engine=S3('http://localhost:11111/test/json_data', 'test', 'testtest', '', auto);
show create table test;
drop table test;
create table test engine=S3('http://localhost:11111/test/json_data', 'test', 'testtest', auto, 'none');
show create table test;
drop table test;
create table test engine=S3('http://localhost:11111/test/json_data', 'test', 'testtest', '', auto, 'none');
show create table test;
drop table test;
create table test engine=URL('http://localhost:11111/test/json_data');
show create table test;
drop table test;
create table test engine=URL('http://localhost:11111/test/json_data', auto);
show create table test;
drop table test;
create table test engine=URL('http://localhost:11111/test/json_data', auto, 'none');
show create table test;
drop table test;

View File

@ -0,0 +1,19 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_LOCAL -q "select 42 as a format JSONEachRow" > data_$CLICKHOUSE_TEST_UNIQUE_NAME
$CLICKHOUSE_LOCAL -nm -q "
create table test engine=File(auto, './data_$CLICKHOUSE_TEST_UNIQUE_NAME');
show create table test;
drop table test;
create table test engine=File(auto, './data_$CLICKHOUSE_TEST_UNIQUE_NAME', 'none');
show create table test;
drop table test;
" | grep "JSON" -c
rm data_$CLICKHOUSE_TEST_UNIQUE_NAME

View File

@ -0,0 +1 @@
{"a" : 42}

View File

@ -244,7 +244,10 @@ Deduplication
DefaultTableEngine
DelayedInserts
DeliveryTag
Deltalake
DeltaLake
deltalakeCluster
deltaLakeCluster
Denormalize
DestroyAggregatesThreads
DestroyAggregatesThreadsActive
@ -377,10 +380,15 @@ Homebrew's
HorizontalDivide
Hostname
HouseOps
hudi
Hudi
hudiCluster
HudiCluster
HyperLogLog
Hypot
IANA
icebergCluster
IcebergCluster
IDE
IDEs
IDNA