Better process of object storage arguments

This commit is contained in:
avogar 2024-09-02 20:11:00 +00:00
parent 0cb8c9f148
commit d281333db2
17 changed files with 437 additions and 133 deletions

View File

@ -24,6 +24,7 @@ namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int LOGICAL_ERROR;
}
const std::unordered_set<std::string_view> required_configuration_keys = {
@ -146,14 +147,13 @@ void StorageAzureConfiguration::fromNamedCollection(const NamedCollection & coll
void StorageAzureConfiguration::fromAST(ASTs & engine_args, ContextPtr context, bool with_structure)
{
if (engine_args.size() < 3 || engine_args.size() > (with_structure ? 8 : 7))
if (engine_args.size() < 3 || engine_args.size() > getMaxNumberOfArguments(with_structure))
{
throw Exception(
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Storage AzureBlobStorage requires 3 to {} arguments: "
"AzureBlobStorage(connection_string|storage_account_url, container_name, blobpath, "
"[account_name, account_key, format, compression, structure)])",
(with_structure ? 8 : 7));
"Storage AzureBlobStorage requires 1 to {} arguments. All supported signatures:\n{}",
getMaxNumberOfArguments(with_structure),
getSignatures(with_structure));
}
for (auto & engine_arg : engine_args)
@ -294,13 +294,8 @@ void StorageAzureConfiguration::addStructureAndFormatToArgsIfNeeded(
}
else
{
if (args.size() < 3 || args.size() > 8)
{
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Storage Azure requires 3 to 7 arguments: "
"StorageObjectStorage(connection_string|storage_account_url, container_name, "
"blobpath, [account_name, account_key, format, compression, structure])");
}
if (args.size() < 3 || args.size() > getMaxNumberOfArguments())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected 3 to {} arguments in table function azureBlobStorage, got {}", getMaxNumberOfArguments(), args.size());
for (auto & arg : args)
arg = evaluateConstantExpressionOrIdentifierAsLiteral(arg, context);

View File

@ -22,6 +22,29 @@ public:
static constexpr auto type_name = "azure";
static constexpr auto engine_name = "Azure";
/// All possible signatures for Azure engine with structure argument (for example for azureBlobStorage table function).
static constexpr auto max_number_of_arguments_with_structure = 4;
static constexpr auto signatures_with_structure =
" - connection_string, container_name, blobpath\n"
" - connection_string, container_name, blobpath, structure \n"
" - connection_string, container_name, blobpath, format \n"
" - connection_string, container_name, blobpath, format, compression \n"
" - connection_string, container_name, blobpath, format, compression, structure \n"
" - storage_account_url, container_name, blobpath, account_name, account_key\n"
" - storage_account_url, container_name, blobpath, account_name, account_key, structure\n"
" - storage_account_url, container_name, blobpath, account_name, account_key, format\n"
" - storage_account_url, container_name, blobpath, account_name, account_key, format, compression\n"
" - storage_account_url, container_name, blobpath, account_name, account_key, format, compression, structure\n";
/// All possible signatures for Azure engine without structure argument (for example for AzureBlobStorage table engine).
static constexpr auto max_number_of_arguments_without_structure = 3;
static constexpr auto signatures_without_structure =
" - connection_string, container_name, blobpath\n"
" - connection_string, container_name, blobpath, format \n"
" - connection_string, container_name, blobpath, format, compression \n"
" - storage_account_url, container_name, blobpath, account_name, account_key\n"
" - storage_account_url, container_name, blobpath, account_name, account_key, format\n"
" - storage_account_url, container_name, blobpath, account_name, account_key, format, compression\n";
StorageAzureConfiguration() = default;
StorageAzureConfiguration(const StorageAzureConfiguration & other);
@ -29,6 +52,9 @@ public:
std::string getTypeName() const override { return type_name; }
std::string getEngineName() const override { return engine_name; }
std::string getSignatures(bool with_structure = true) const override { return with_structure ? signatures_with_structure : signatures_without_structure; }
size_t getMaxNumberOfArguments(bool with_structure = true) const override { return with_structure ? max_number_of_arguments_with_structure : max_number_of_arguments_without_structure; }
Path getPath() const override { return blob_path; }
void setPath(const Path & path) override { blob_path = path; }

View File

@ -24,6 +24,7 @@ namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int LOGICAL_ERROR;
}
StorageHDFSConfiguration::StorageHDFSConfiguration(const StorageHDFSConfiguration & other)
@ -83,12 +84,13 @@ StorageObjectStorage::QuerySettings StorageHDFSConfiguration::getQuerySettings(c
void StorageHDFSConfiguration::fromAST(ASTs & args, ContextPtr context, bool with_structure)
{
const size_t max_args_num = with_structure ? 4 : 3;
if (args.empty() || args.size() > max_args_num)
{
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Expected not more than {} arguments", max_args_num);
}
if (args.empty() || args.size() > getMaxNumberOfArguments(with_structure))
throw Exception(
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Storage HDFS requires 1 to {} arguments. All supported signatures:\n{}",
getMaxNumberOfArguments(with_structure),
getSignatures(with_structure));
std::string url_str;
url_str = checkAndGetLiteralArgument<String>(args[0], "url");
@ -184,11 +186,8 @@ void StorageHDFSConfiguration::addStructureAndFormatToArgsIfNeeded(
else
{
size_t count = args.size();
if (count == 0 || count > 4)
{
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Expected 1 to 4 arguments in table function, got {}", count);
}
if (count == 0 || count > getMaxNumberOfArguments())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected 1 to {} arguments in table function hdfs, got {}", getMaxNumberOfArguments(), count);
auto format_literal = std::make_shared<ASTLiteral>(format_);
auto structure_literal = std::make_shared<ASTLiteral>(structure_);

View File

@ -16,6 +16,20 @@ public:
static constexpr auto type_name = "hdfs";
static constexpr auto engine_name = "HDFS";
/// All possible signatures for HDFS engine with structure argument (for example for hdfs table function).
static constexpr auto max_number_of_arguments_with_structure = 4;
static constexpr auto signatures_with_structure =
" - uri\n"
" - uri, format\n"
" - uri, format, structure\n"
" - uri, format, structure, compression_method\n";
/// All possible signatures for HDFS engine without structure argument (for example for HS table engine).
static constexpr auto max_number_of_arguments_without_structure = 3;
static constexpr auto signatures_without_structure =
" - uri\n"
" - uri, format\n"
" - uri, format, compression_method\n";
StorageHDFSConfiguration() = default;
StorageHDFSConfiguration(const StorageHDFSConfiguration & other);
@ -23,6 +37,9 @@ public:
std::string getTypeName() const override { return type_name; }
std::string getEngineName() const override { return engine_name; }
std::string getSignatures(bool with_structure = true) const override { return with_structure ? signatures_with_structure : signatures_without_structure; }
size_t getMaxNumberOfArguments(bool with_structure = true) const override { return with_structure ? max_number_of_arguments_with_structure : max_number_of_arguments_without_structure; }
Path getPath() const override { return path; }
void setPath(const Path & path_) override { path = path_; }

View File

@ -26,11 +26,11 @@ void StorageLocalConfiguration::fromNamedCollection(const NamedCollection & coll
void StorageLocalConfiguration::fromAST(ASTs & args, ContextPtr context, bool with_structure)
{
const size_t max_args_num = with_structure ? 4 : 3;
if (args.empty() || args.size() > max_args_num)
{
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Expected not more than {} arguments", max_args_num);
}
if (args.empty() || args.size() > getMaxNumberOfArguments(with_structure))
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Storage Local requires 1 to {} arguments. All supported signatures:\n{}",
getMaxNumberOfArguments(with_structure),
getSignatures(with_structure));
for (auto & arg : args)
arg = evaluateConstantExpressionOrIdentifierAsLiteral(arg, context);

View File

@ -19,6 +19,20 @@ public:
using ConfigurationPtr = StorageObjectStorage::ConfigurationPtr;
static constexpr auto type_name = "local";
/// All possible signatures for Local engine with structure argument (for example for local table function).
static constexpr auto max_number_of_arguments_with_structure = 4;
static constexpr auto signatures_with_structure =
" - path\n"
" - path, format\n"
" - path, format, structure\n"
" - path, format, structure, compression_method\n";
/// All possible signatures for S3 engine without structure argument (for example for Local table engine).
static constexpr auto max_number_of_arguments_without_structure = 3;
static constexpr auto signatures_without_structure =
" - path\n"
" - path, format\n"
" - path, format, compression_method\n";
StorageLocalConfiguration() = default;
StorageLocalConfiguration(const StorageLocalConfiguration & other) = default;
@ -26,6 +40,9 @@ public:
std::string getTypeName() const override { return type_name; }
std::string getEngineName() const override { return "Local"; }
std::string getSignatures(bool with_structure = true) const override { return with_structure ? signatures_with_structure : signatures_without_structure; }
size_t getMaxNumberOfArguments(bool with_structure = true) const override { return with_structure ? max_number_of_arguments_with_structure : max_number_of_arguments_without_structure; }
Path getPath() const override { return path; }
void setPath(const Path & path_) override { path = path_; }

View File

@ -170,21 +170,20 @@ void StorageS3Configuration::fromNamedCollection(const NamedCollection & collect
void StorageS3Configuration::fromAST(ASTs & args, ContextPtr context, bool with_structure)
{
/// Supported signatures: S3('url') S3('url', 'format') S3('url', 'format', 'compression') S3('url', NOSIGN) S3('url', NOSIGN, 'format') S3('url', NOSIGN, 'format', 'compression') S3('url', 'aws_access_key_id', 'aws_secret_access_key') S3('url', 'aws_access_key_id', 'aws_secret_access_key', 'session_token') S3('url', 'aws_access_key_id', 'aws_secret_access_key', 'format') S3('url', 'aws_access_key_id', 'aws_secret_access_key', 'session_token', 'format') S3('url', 'aws_access_key_id', 'aws_secret_access_key', 'format', 'compression')
/// S3('url', 'aws_access_key_id', 'aws_secret_access_key', 'session_token', 'format', 'compression')
/// with optional headers() function
size_t count = StorageURL::evalArgsAndCollectHeaders(args, headers_from_ast, context);
if (count == 0 || count > (with_structure ? 7 : 6))
if (count == 0 || count > getMaxNumberOfArguments(with_structure))
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Storage S3 requires 1 to 5 arguments: "
"url, [NOSIGN | access_key_id, secret_access_key], name of used format and [compression_method]");
"Storage S3 requires 1 to {} arguments. All supported signatures:\n{}",
getMaxNumberOfArguments(with_structure),
getSignatures(with_structure));
std::unordered_map<std::string_view, size_t> engine_args_to_idx;
bool no_sign_request = false;
/// For 2 arguments we support 2 possible variants:
/// When adding new arguments in the signature don't forget to update addStructureAndFormatToArgsIfNeeded as well.
/// For 2 arguments we support:
/// - s3(source, format)
/// - s3(source, NOSIGN)
/// We can distinguish them by looking at the 2-nd argument: check if it's NOSIGN or not.
@ -196,10 +195,15 @@ void StorageS3Configuration::fromAST(ASTs & args, ContextPtr context, bool with_
else
engine_args_to_idx = {{"format", 1}};
}
/// For 3 arguments we support 2 possible variants:
/// For 3 arguments we support:
/// if with_structure == 0:
/// - s3(source, NOSIGN, format)
/// - s3(source, format, compression_method)
/// - s3(source, access_key_id, secret_access_key)
/// if with_structure == 1:
/// - s3(source, NOSIGN, format)
/// - s3(source, format, structure)
/// - s3(source, access_key_id, secret_access_key)
/// We can distinguish them by looking at the 2-nd argument: check if it's NOSIGN or format name.
else if (count == 3)
{
@ -219,7 +223,7 @@ void StorageS3Configuration::fromAST(ASTs & args, ContextPtr context, bool with_
else
engine_args_to_idx = {{"access_key_id", 1}, {"secret_access_key", 2}};
}
/// For 4 arguments we support 3 possible variants:
/// For 4 arguments we support:
/// if with_structure == 0:
/// - s3(source, access_key_id, secret_access_key, session_token)
/// - s3(source, access_key_id, secret_access_key, format)
@ -229,7 +233,7 @@ void StorageS3Configuration::fromAST(ASTs & args, ContextPtr context, bool with_
/// - s3(source, access_key_id, secret_access_key, format),
/// - s3(source, access_key_id, secret_access_key, session_token)
/// - s3(source, NOSIGN, format, structure)
/// We can distinguish them by looking at the 2-nd argument: check if it's a NOSIGN or not.
/// We can distinguish them by looking at the 2-nd argument: check if it's a NOSIGN, format name of something else.
else if (count == 4)
{
auto second_arg = checkAndGetLiteralArgument<String>(args[1], "access_key_id/NOSIGN");
@ -258,7 +262,7 @@ void StorageS3Configuration::fromAST(ASTs & args, ContextPtr context, bool with_
}
}
}
/// For 5 arguments we support 2 possible variants:
/// For 5 arguments we support:
/// if with_structure == 0:
/// - s3(source, access_key_id, secret_access_key, session_token, format)
/// - s3(source, access_key_id, secret_access_key, format, compression)
@ -302,13 +306,16 @@ void StorageS3Configuration::fromAST(ASTs & args, ContextPtr context, bool with_
}
}
}
/// For 6 arguments we support:
/// if with_structure == 0:
/// - s3(source, access_key_id, secret_access_key, session_token, format, compression_method)
/// if with_structure == 1:
/// - s3(source, access_key_id, secret_access_key, format, structure, compression_method)
/// - s3(source, access_key_id, secret_access_key, session_token, format, structure)
else if (count == 6)
{
if (with_structure)
{
/// - s3(source, access_key_id, secret_access_key, format, structure, compression_method)
/// - s3(source, access_key_id, secret_access_key, session_token, format, structure)
/// We can distinguish them by looking at the 4-th argument: check if it's a format name or not
auto fourth_arg = checkAndGetLiteralArgument<String>(args[3], "format/session_token");
if (fourth_arg == "auto" || FormatFactory::instance().exists(fourth_arg))
{
@ -324,6 +331,7 @@ void StorageS3Configuration::fromAST(ASTs & args, ContextPtr context, bool with_
engine_args_to_idx = {{"access_key_id", 1}, {"secret_access_key", 2}, {"session_token", 3}, {"format", 4}, {"compression_method", 5}};
}
}
/// s3(source, access_key_id, secret_access_key, session_token, format, structure, compression_method)
else if (with_structure && count == 7)
{
engine_args_to_idx = {{"access_key_id", 1}, {"secret_access_key", 2}, {"session_token", 3}, {"format", 4}, {"structure", 5}, {"compression_method", 6}};
@ -390,8 +398,8 @@ void StorageS3Configuration::addStructureAndFormatToArgsIfNeeded(
HTTPHeaderEntries tmp_headers;
size_t count = StorageURL::evalArgsAndCollectHeaders(args, tmp_headers, context);
if (count == 0 || count > 6)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected 1 to 6 arguments in table function, got {}", count);
if (count == 0 || count > getMaxNumberOfArguments())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected 1 to {} arguments in table function s3, got {}", getMaxNumberOfArguments(), count);
auto format_literal = std::make_shared<ASTLiteral>(format_);
auto structure_literal = std::make_shared<ASTLiteral>(structure_);
@ -403,7 +411,8 @@ void StorageS3Configuration::addStructureAndFormatToArgsIfNeeded(
args.push_back(std::make_shared<ASTLiteral>("auto"));
args.push_back(structure_literal);
}
/// s3(s3_url, format) or s3(s3_url, NOSIGN)
/// s3(s3_url, format) or
/// s3(s3_url, NOSIGN)
/// We can distinguish them by looking at the 2-nd argument: check if it's NOSIGN or not.
else if (count == 2)
{
@ -445,6 +454,7 @@ void StorageS3Configuration::addStructureAndFormatToArgsIfNeeded(
}
/// s3(source, format, structure, compression_method) or
/// s3(source, access_key_id, secret_access_key, format) or
/// s3(source, access_key_id, secret_access_key, session_token) or
/// s3(source, NOSIGN, format, structure)
/// We can distinguish them by looking at the 2-nd argument: check if it's NOSIGN, format name or neither.
else if (count == 4)
@ -466,18 +476,28 @@ void StorageS3Configuration::addStructureAndFormatToArgsIfNeeded(
}
else
{
if (checkAndGetLiteralArgument<String>(args[3], "format") == "auto")
args[3] = format_literal;
args.push_back(structure_literal);
auto fourth_arg = checkAndGetLiteralArgument<String>(args[3], "format/session_token");
if (fourth_arg == "auto" || FormatFactory::instance().exists(fourth_arg))
{
if (checkAndGetLiteralArgument<String>(args[3], "format") == "auto")
args[3] = format_literal;
args.push_back(structure_literal);
}
else
{
args.push_back(format_literal);
args.push_back(structure_literal);
}
}
}
/// s3(source, access_key_id, secret_access_key, format, structure) or
/// s3(source, access_key_id, secret_access_key, session_token, format) or
/// s3(source, NOSIGN, format, structure, compression_method)
/// We can distinguish them by looking at the 2-nd argument: check if it's a NOSIGN keyword name or not.
else if (count == 5)
{
auto second_arg = checkAndGetLiteralArgument<String>(args[1], "format/NOSIGN");
if (boost::iequals(sedond_arg, "NOSIGN"))
if (boost::iequals(second_arg, "NOSIGN"))
{
if (checkAndGetLiteralArgument<String>(args[2], "format") == "auto")
args[2] = format_literal;
@ -485,20 +505,50 @@ void StorageS3Configuration::addStructureAndFormatToArgsIfNeeded(
args[3] = structure_literal;
}
else
{
auto fourth_arg = checkAndGetLiteralArgument<String>(args[3], "format/session_token");
if (fourth_arg == "auto" || FormatFactory::instance().exists(fourth_arg))
{
if (checkAndGetLiteralArgument<String>(args[3], "format") == "auto")
args[3] = format_literal;
if (checkAndGetLiteralArgument<String>(args[4], "structure") == "auto")
args[4] = structure_literal;
}
else
{
if (checkAndGetLiteralArgument<String>(args[4], "format") == "auto")
args[4] = format_literal;
args.push_back(structure_literal);
}
}
}
/// s3(source, access_key_id, secret_access_key, format, structure, compression) or
/// s3(source, access_key_id, secret_access_key, session_token, format, structure)
else if (count == 6)
{
auto fourth_arg = checkAndGetLiteralArgument<String>(args[3], "format/session_token");
if (fourth_arg == "auto" || FormatFactory::instance().exists(fourth_arg))
{
if (checkAndGetLiteralArgument<String>(args[3], "format") == "auto")
args[3] = format_literal;
if (checkAndGetLiteralArgument<String>(args[4], "structure") == "auto")
args[4] = structure_literal;
}
else
{
if (checkAndGetLiteralArgument<String>(args[4], "format") == "auto")
args[4] = format_literal;
if (checkAndGetLiteralArgument<String>(args[5], "format") == "auto")
args[5] = structure_literal;
}
}
/// s3(source, access_key_id, secret_access_key, format, structure, compression)
else if (count == 6)
/// s3(source, access_key_id, secret_access_key, session_token, format, structure, compression_method)
else if (count == 7)
{
if (checkAndGetLiteralArgument<String>(args[3], "format") == "auto")
args[3] = format_literal;
if (checkAndGetLiteralArgument<String>(args[4], "structure") == "auto")
args[4] = structure_literal;
if (checkAndGetLiteralArgument<String>(args[4], "format") == "auto")
args[4] = format_literal;
if (checkAndGetLiteralArgument<String>(args[5], "format") == "auto")
args[5] = structure_literal;
}
}
}

View File

@ -14,8 +14,48 @@ class StorageS3Configuration : public StorageObjectStorage::Configuration
public:
using ConfigurationPtr = StorageObjectStorage::ConfigurationPtr;
static constexpr auto xx =
42;
static constexpr auto type_name = "s3";
static constexpr auto namespace_name = "bucket";
/// All possible signatures for S3 storage with structure argument (for example for s3 table function).
static constexpr auto max_number_of_arguments_with_structure = 7;
static constexpr auto signatures_with_structure =
" - url\n"
" - url, NOSIGN\n"
" - url, format\n"
" - url, NOSIGN, format\n"
" - url, format, structure\n"
" - url, NOSIGN, format, structure\n"
" - url, format, structure, compression_method\n"
" - url, NOSIGN, format, structure, compression_method\n"
" - url, access_key_id, secret_access_key\n"
" - url, access_key_id, secret_access_key, session_token\n"
" - url, access_key_id, secret_access_key, format\n"
" - url, access_key_id, secret_access_key, session_token, format\n"
" - url, access_key_id, secret_access_key, format, structure\n"
" - url, access_key_id, secret_access_key, session_token, format, structure\n"
" - url, access_key_id, secret_access_key, format, structure, compression_method\n"
" - url, access_key_id, secret_access_key, session_token, format, structure, compression_method\n"
"All signatures supports optional headers (specified as `headers('name'='value', 'name2'='value2')`)";
/// All possible signatures for S3 storage without structure argument (for example for S3 table engine).
static constexpr auto max_number_of_arguments_without_structure = 6;
static constexpr auto signatures_without_structure =
" - url\n"
" - url, NOSIGN\n"
" - url, format\n"
" - url, NOSIGN, format\n"
" - url, format, compression_method\n"
" - url, NOSIGN, format, compression_method\n"
" - url, access_key_id, secret_access_key\n"
" - url, access_key_id, secret_access_key, session_token\n"
" - url, access_key_id, secret_access_key, format\n"
" - url, access_key_id, secret_access_key, session_token, format\n"
" - url, access_key_id, secret_access_key, format, compression_method\n"
" - url, access_key_id, secret_access_key, session_token, format, compression_method\n"
"All signatures supports optional headers (specified as `headers('name'='value', 'name2'='value2')`)";
StorageS3Configuration() = default;
StorageS3Configuration(const StorageS3Configuration & other);
@ -24,6 +64,9 @@ public:
std::string getEngineName() const override { return url.storage_name; }
std::string getNamespaceType() const override { return namespace_name; }
std::string getSignatures(bool with_structure = true) const override { return with_structure ? signatures_with_structure : signatures_without_structure; }
size_t getMaxNumberOfArguments(bool with_structure = true) const override { return with_structure ? max_number_of_arguments_with_structure : max_number_of_arguments_without_structure; }
Path getPath() const override { return url.key; }
void setPath(const Path & path) override { url.key = path; }

View File

@ -170,6 +170,10 @@ public:
/// buckets in S3. If object storage doesn't have any namepaces return empty string.
virtual std::string getNamespaceType() const { return "namespace"; }
/// Return the string containing all supported signatures for this storage arguments.
virtual std::string getSignatures(bool with_structure = true) const = 0;
virtual size_t getMaxNumberOfArguments(bool with_structure = true) const = 0;
virtual Path getPath() const = 0;
virtual void setPath(const Path & path) = 0;

View File

@ -23,7 +23,6 @@ class ITableFunctionCluster : public Base
{
public:
String getName() const override = 0;
String getSignature() const override = 0;
static void updateStructureAndFormatArgumentsIfNeeded(ASTs & args, const String & structure_, const String & format_, const ContextPtr & context)
{
@ -46,7 +45,11 @@ protected:
void parseArgumentsImpl(ASTs & args, const ContextPtr & context) override
{
if (args.empty())
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "The signature of table function {} shall be the following:\n{}", getName(), getSignature());
throw Exception(
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"The function {} should have arguments. The first argument must be the cluster name and the rest are the arguments of "
"corresponding table function",
getName());
/// Evaluate only first argument, everything else will be done Base class
args[0] = evaluateConstantExpressionOrIdentifierAsLiteral(args[0], context);

View File

@ -57,7 +57,7 @@ void ITableFunctionFileLike::parseArguments(const ASTPtr & ast_function, Context
void ITableFunctionFileLike::parseArgumentsImpl(ASTs & args, const ContextPtr & context)
{
if (args.empty() || args.size() > 4)
if (args.empty() || args.size() > getMaxNumberOfArguments())
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "The signature of table function {} shall be the following:\n{}", getName(), getSignature());
for (auto & arg : args)

View File

@ -15,6 +15,7 @@ class Context;
class ITableFunctionFileLike : public ITableFunction
{
public:
static constexpr auto max_number_of_arguments = 4;
static constexpr auto signature = " - filename\n"
" - filename, format\n"
" - filename, format, structure\n"
@ -32,7 +33,7 @@ public:
NameSet getVirtualsToCheckBeforeUsingStructureHint() const override;
static size_t getMaxNumberOfArguments() { return 4; }
static size_t getMaxNumberOfArguments() { return max_number_of_arguments; }
static void updateStructureAndFormatArgumentsIfNeeded(ASTs & args, const String & structure, const String & format, const ContextPtr &);

View File

@ -23,83 +23,42 @@ struct AzureDefinition
{
static constexpr auto name = "azureBlobStorage";
static constexpr auto storage_type_name = "Azure";
static constexpr auto signature = " - connection_string, container_name, blobpath\n"
" - connection_string, container_name, blobpath, structure \n"
" - connection_string, container_name, blobpath, format \n"
" - connection_string, container_name, blobpath, format, compression \n"
" - connection_string, container_name, blobpath, format, compression, structure \n"
" - storage_account_url, container_name, blobpath, account_name, account_key\n"
" - storage_account_url, container_name, blobpath, account_name, account_key, structure\n"
" - storage_account_url, container_name, blobpath, account_name, account_key, format\n"
" - storage_account_url, container_name, blobpath, account_name, account_key, format, compression\n"
" - storage_account_url, container_name, blobpath, account_name, account_key, format, compression, structure\n";
static constexpr auto max_number_of_arguments = 8;
};
struct S3Definition
{
static constexpr auto name = "s3";
static constexpr auto storage_type_name = "S3";
static constexpr auto signature = " - url\n"
" - url, format\n"
" - url, format, structure\n"
" - url, format, structure, compression_method\n"
" - url, access_key_id, secret_access_key\n"
" - url, access_key_id, secret_access_key, session_token\n"
" - url, access_key_id, secret_access_key, format\n"
" - url, access_key_id, secret_access_key, session_token, format\n"
" - url, access_key_id, secret_access_key, format, structure\n"
" - url, access_key_id, secret_access_key, session_token, format, structure\n"
" - url, access_key_id, secret_access_key, format, structure, compression_method\n"
" - url, access_key_id, secret_access_key, session_token, format, structure, compression_method\n"
"All signatures supports optional headers (specified as `headers('name'='value', 'name2'='value2')`)";
static constexpr auto max_number_of_arguments = 8;
};
struct GCSDefinition
{
static constexpr auto name = "gcs";
static constexpr auto storage_type_name = "GCS";
static constexpr auto signature = S3Definition::signature;
static constexpr auto max_number_of_arguments = S3Definition::max_number_of_arguments;
};
struct COSNDefinition
{
static constexpr auto name = "cosn";
static constexpr auto storage_type_name = "COSN";
static constexpr auto signature = S3Definition::signature;
static constexpr auto max_number_of_arguments = S3Definition::max_number_of_arguments;
};
struct OSSDefinition
{
static constexpr auto name = "oss";
static constexpr auto storage_type_name = "OSS";
static constexpr auto signature = S3Definition::signature;
static constexpr auto max_number_of_arguments = S3Definition::max_number_of_arguments;
};
struct HDFSDefinition
{
static constexpr auto name = "hdfs";
static constexpr auto storage_type_name = "HDFS";
static constexpr auto signature = " - uri\n"
" - uri, format\n"
" - uri, format, structure\n"
" - uri, format, structure, compression_method\n";
static constexpr auto max_number_of_arguments = 4;
};
struct LocalDefinition
{
static constexpr auto name = "local";
static constexpr auto storage_type_name = "Local";
static constexpr auto signature = " - path\n"
" - path, format\n"
" - path, format, structure\n"
" - path, format, structure, compression_method\n";
static constexpr auto max_number_of_arguments = 4;
};
template <typename Definition, typename Configuration>
@ -107,14 +66,9 @@ class TableFunctionObjectStorage : public ITableFunction
{
public:
static constexpr auto name = Definition::name;
static constexpr auto signature = Definition::signature;
static size_t getMaxNumberOfArguments() { return Definition::max_number_of_arguments; }
String getName() const override { return name; }
virtual String getSignature() const { return signature; }
bool hasStaticStructure() const override { return configuration->structure != "auto"; }
bool needStructureHint() const override { return configuration->structure == "auto"; }

View File

@ -19,40 +19,22 @@ struct AzureClusterDefinition
{
static constexpr auto name = "azureBlobStorageCluster";
static constexpr auto storage_type_name = "AzureBlobStorageCluster";
static constexpr auto signature = " - cluster, connection_string|storage_account_url, container_name, blobpath, [account_name, account_key, format, compression, structure]";
static constexpr auto max_number_of_arguments = AzureDefinition::max_number_of_arguments + 1;
};
struct S3ClusterDefinition
{
static constexpr auto name = "s3Cluster";
static constexpr auto storage_type_name = "S3Cluster";
static constexpr auto signature = " - cluster, url\n"
" - cluster, url, format\n"
" - cluster, url, format, structure\n"
" - cluster, url, access_key_id, secret_access_key\n"
" - cluster, url, format, structure, compression_method\n"
" - cluster, url, access_key_id, secret_access_key, format\n"
" - cluster, url, access_key_id, secret_access_key, format, structure\n"
" - cluster, url, access_key_id, secret_access_key, format, structure, compression_method\n"
" - cluster, url, access_key_id, secret_access_key, session_token, format, structure, compression_method\n"
"All signatures supports optional headers (specified as `headers('name'='value', 'name2'='value2')`)";
static constexpr auto max_number_of_arguments = S3Definition::max_number_of_arguments + 1;
};
struct HDFSClusterDefinition
{
static constexpr auto name = "hdfsCluster";
static constexpr auto storage_type_name = "HDFSCluster";
static constexpr auto signature = " - cluster_name, uri\n"
" - cluster_name, uri, format\n"
" - cluster_name, uri, format, structure\n"
" - cluster_name, uri, format, structure, compression_method\n";
static constexpr auto max_number_of_arguments = HDFSDefinition::max_number_of_arguments + 1;
};
/**
* Class implementing s3/hdfs/azureBlobStorage)Cluster(...) table functions,
* Class implementing s3/hdfs/azureBlobStorageCluster(...) table functions,
* which allow to process many files from S3/HDFS/Azure blob storage on a specific cluster.
* On initiator it creates a connection to _all_ nodes in cluster, discloses asterisks
* in file path and dispatch each file dynamically.
@ -64,10 +46,8 @@ class TableFunctionObjectStorageCluster : public ITableFunctionCluster<TableFunc
{
public:
static constexpr auto name = Definition::name;
static constexpr auto signature = Definition::signature;
String getName() const override { return name; }
String getSignature() const override { return signature; }
protected:
using Base = TableFunctionObjectStorage<Definition, Configuration>;

View File

@ -460,12 +460,19 @@ def test_cluster_format_detection(started_cluster):
assert result == expected_result
def test_cluster_default_expression(started_cluster):
node = started_cluster.instances["s0_0_0"]
node.query("insert into function s3('http://minio1:9001/root/data/data1', 'minio', 'minio123', JSONEachRow) select 1 as id settings s3_truncate_on_insert=1")
node.query("insert into function s3('http://minio1:9001/root/data/data2', 'minio', 'minio123', JSONEachRow) select * from numbers(0) settings s3_truncate_on_insert=1")
node.query("insert into function s3('http://minio1:9001/root/data/data3', 'minio', 'minio123', JSONEachRow) select 2 as id settings s3_truncate_on_insert=1")
node.query(
"insert into function s3('http://minio1:9001/root/data/data1', 'minio', 'minio123', JSONEachRow) select 1 as id settings s3_truncate_on_insert=1"
)
node.query(
"insert into function s3('http://minio1:9001/root/data/data2', 'minio', 'minio123', JSONEachRow) select * from numbers(0) settings s3_truncate_on_insert=1"
)
node.query(
"insert into function s3('http://minio1:9001/root/data/data3', 'minio', 'minio123', JSONEachRow) select 2 as id settings s3_truncate_on_insert=1"
)
expected_result = node.query(
"SELECT * FROM s3('http://minio1:9001/root/data/data{1,2,3}', 'minio', 'minio123', 'JSONEachRow', 'id UInt32, date Date DEFAULT 18262') order by id"

View File

@ -190,3 +190,195 @@
20 21 22
23 24 25
26 27 28
0 0 0
0 0 0
0 0 0
1 2 3
4 5 6
7 8 9
10 11 12
13 14 15
16 17 18
20 21 22
23 24 25
26 27 28
0 0 0
0 0 0
0 0 0
1 2 3
4 5 6
7 8 9
10 11 12
13 14 15
16 17 18
20 21 22
23 24 25
26 27 28
0 0 0
0 0 0
0 0 0
1 2 3
4 5 6
7 8 9
10 11 12
13 14 15
16 17 18
20 21 22
23 24 25
26 27 28
0 0 0
0 0 0
0 0 0
1 2 3
4 5 6
7 8 9
10 11 12
13 14 15
16 17 18
20 21 22
23 24 25
26 27 28
0 0 0
0 0 0
0 0 0
1 2 3
4 5 6
7 8 9
10 11 12
13 14 15
16 17 18
20 21 22
23 24 25
26 27 28
0 0 0
0 0 0
0 0 0
1 2 3
4 5 6
7 8 9
10 11 12
13 14 15
16 17 18
20 21 22
23 24 25
26 27 28
0 0 0
0 0 0
0 0 0
1 2 3
4 5 6
7 8 9
10 11 12
13 14 15
16 17 18
20 21 22
23 24 25
26 27 28
0 0 0
0 0 0
0 0 0
1 2 3
4 5 6
7 8 9
10 11 12
13 14 15
16 17 18
20 21 22
23 24 25
26 27 28
0 0 0
0 0 0
0 0 0
1 2 3
4 5 6
7 8 9
10 11 12
13 14 15
16 17 18
20 21 22
23 24 25
26 27 28
0 0 0
0 0 0
0 0 0
1 2 3
4 5 6
7 8 9
10 11 12
13 14 15
16 17 18
20 21 22
23 24 25
26 27 28
0 0 0
0 0 0
0 0 0
1 2 3
4 5 6
7 8 9
10 11 12
13 14 15
16 17 18
20 21 22
23 24 25
26 27 28
0 0 0
0 0 0
0 0 0
1 2 3
4 5 6
7 8 9
10 11 12
13 14 15
16 17 18
20 21 22
23 24 25
26 27 28
0 0 0
0 0 0
0 0 0
1 2 3
4 5 6
7 8 9
10 11 12
13 14 15
16 17 18
20 21 22
23 24 25
26 27 28
0 0 0
0 0 0
0 0 0
1 2 3
4 5 6
7 8 9
10 11 12
13 14 15
16 17 18
20 21 22
23 24 25
26 27 28
0 0 0
0 0 0
0 0 0
1 2 3
4 5 6
7 8 9
10 11 12
13 14 15
16 17 18
20 21 22
23 24 25
26 27 28
0 0 0
0 0 0
0 0 0
1 2 3
4 5 6
7 8 9
10 11 12
13 14 15
16 17 18
20 21 22
23 24 25
26 27 28

View File

@ -2,21 +2,37 @@
-- Tag no-fasttest: Depends on AWS
select * from s3('http://localhost:11111/test/{a,b,c}.tsv') ORDER BY c1, c2, c3;
select * from s3('http://localhost:11111/test/{a,b,c}.tsv', NOSIGN) ORDER BY c1, c2, c3;
select * from s3('http://localhost:11111/test/{a,b,c}.tsv', 'TSV') ORDER BY c1, c2, c3;
select * from s3('http://localhost:11111/test/{a,b,c}.tsv', NOSIGN, 'TSV') ORDER BY c1, c2, c3;
select * from s3('http://localhost:11111/test/{a,b,c}.tsv', 'TSV', 'c1 UInt64, c2 UInt64, c3 UInt64') ORDER BY c1, c2, c3;
select * from s3('http://localhost:11111/test/{a,b,c}.tsv', 'test', 'testtest') ORDER BY c1, c2, c3;
select * from s3('http://localhost:11111/test/{a,b,c}.tsv', NOSIGN, 'TSV', 'c1 UInt64, c2 UInt64, c3 UInt64') ORDER BY c1, c2, c3;
select * from s3('http://localhost:11111/test/{a,b,c}.tsv', 'TSV', 'c1 UInt64, c2 UInt64, c3 UInt64', 'auto') ORDER BY c1, c2, c3;
select * from s3('http://localhost:11111/test/{a,b,c}.tsv', NOSIGN, 'TSV', 'c1 UInt64, c2 UInt64, c3 UInt64', 'auto') ORDER BY c1, c2, c3;
select * from s3('http://localhost:11111/test/{a,b,c}.tsv', 'test', 'testtest') ORDER BY c1, c2, c3;
select * from s3('http://localhost:11111/test/{a,b,c}.tsv', 'test', 'testtest', '') ORDER BY c1, c2, c3;
select * from s3('http://localhost:11111/test/{a,b,c}.tsv', 'test', 'testtest', 'TSV') ORDER BY c1, c2, c3;
select * from s3('http://localhost:11111/test/{a,b,c}.tsv', 'test', 'testtest', '', 'TSV') ORDER BY c1, c2, c3;
select * from s3('http://localhost:11111/test/{a,b,c}.tsv', 'test', 'testtest', 'TSV', 'c1 UInt64, c2 UInt64, c3 UInt64') ORDER BY c1, c2, c3;
select * from s3('http://localhost:11111/test/{a,b,c}.tsv', 'test', 'testtest', '', 'TSV', 'c1 UInt64, c2 UInt64, c3 UInt64') ORDER BY c1, c2, c3;
select * from s3('http://localhost:11111/test/{a,b,c}.tsv', 'test', 'testtest', 'TSV', 'c1 UInt64, c2 UInt64, c3 UInt64', 'auto') ORDER BY c1, c2, c3;
select * from s3('http://localhost:11111/test/{a,b,c}.tsv', 'test', 'testtest', '', 'TSV', 'c1 UInt64, c2 UInt64, c3 UInt64', 'auto') ORDER BY c1, c2, c3;
select * from s3Cluster('test_cluster_two_shards_localhost', 'http://localhost:11111/test/{a,b,c}.tsv') ORDER BY c1, c2, c3;
select * from s3Cluster('test_cluster_two_shards_localhost', 'http://localhost:11111/test/{a,b,c}.tsv', NOSIGN) ORDER BY c1, c2, c3;
select * from s3Cluster('test_cluster_two_shards_localhost', 'http://localhost:11111/test/{a,b,c}.tsv', 'TSV') ORDER BY c1, c2, c3;
select * from s3Cluster('test_cluster_two_shards_localhost', 'http://localhost:11111/test/{a,b,c}.tsv', NOSIGN, 'TSV') ORDER BY c1, c2, c3;
select * from s3Cluster('test_cluster_two_shards_localhost', 'http://localhost:11111/test/{a,b,c}.tsv', 'TSV', 'c1 UInt64, c2 UInt64, c3 UInt64') ORDER BY c1, c2, c3;
select * from s3Cluster('test_cluster_two_shards_localhost', 'http://localhost:11111/test/{a,b,c}.tsv', 'test', 'testtest') ORDER BY c1, c2, c3;
select * from s3Cluster('test_cluster_two_shards_localhost', 'http://localhost:11111/test/{a,b,c}.tsv', NOSIGN, 'TSV', 'c1 UInt64, c2 UInt64, c3 UInt64') ORDER BY c1, c2, c3;
select * from s3Cluster('test_cluster_two_shards_localhost', 'http://localhost:11111/test/{a,b,c}.tsv', 'TSV', 'c1 UInt64, c2 UInt64, c3 UInt64', 'auto') ORDER BY c1, c2, c3;
select * from s3Cluster('test_cluster_two_shards_localhost', 'http://localhost:11111/test/{a,b,c}.tsv', NOSIGN, 'TSV', 'c1 UInt64, c2 UInt64, c3 UInt64', 'auto') ORDER BY c1, c2, c3;
select * from s3Cluster('test_cluster_two_shards_localhost', 'http://localhost:11111/test/{a,b,c}.tsv', 'test', 'testtest') ORDER BY c1, c2, c3;
select * from s3Cluster('test_cluster_two_shards_localhost', 'http://localhost:11111/test/{a,b,c}.tsv', 'test', 'testtest', '') ORDER BY c1, c2, c3;
select * from s3Cluster('test_cluster_two_shards_localhost', 'http://localhost:11111/test/{a,b,c}.tsv', 'test', 'testtest', 'TSV') ORDER BY c1, c2, c3;
select * from s3Cluster('test_cluster_two_shards_localhost', 'http://localhost:11111/test/{a,b,c}.tsv', 'test', 'testtest', '', 'TSV') ORDER BY c1, c2, c3;
select * from s3Cluster('test_cluster_two_shards_localhost', 'http://localhost:11111/test/{a,b,c}.tsv', 'test', 'testtest', 'TSV', 'c1 UInt64, c2 UInt64, c3 UInt64') ORDER BY c1, c2, c3;
select * from s3Cluster('test_cluster_two_shards_localhost', 'http://localhost:11111/test/{a,b,c}.tsv', 'test', 'testtest', '', 'TSV', 'c1 UInt64, c2 UInt64, c3 UInt64') ORDER BY c1, c2, c3;
select * from s3Cluster('test_cluster_two_shards_localhost', 'http://localhost:11111/test/{a,b,c}.tsv', 'test', 'testtest', 'TSV', 'c1 UInt64, c2 UInt64, c3 UInt64', 'auto') ORDER BY c1, c2, c3;
select * from s3Cluster('test_cluster_two_shards_localhost', 'http://localhost:11111/test/{a,b,c}.tsv', 'test', 'testtest', '', 'TSV', 'c1 UInt64, c2 UInt64, c3 UInt64', 'auto') ORDER BY c1, c2, c3;