Fix StorageURL forgetting headers on server restart

This commit is contained in:
Michael Kolupaev 2024-01-18 01:56:21 +00:00
parent 6a75641048
commit fd361273f0
8 changed files with 83 additions and 101 deletions

View File

@ -31,6 +31,7 @@ public:
struct Arguments
{
const String & engine_name;
/// Mutable to allow replacing constant expressions with literals, and other transformations.
ASTs & engine_args;
ASTStorage * storage_def;
const ASTCreateQuery & query;

View File

@ -1488,25 +1488,13 @@ StorageS3::Configuration StorageS3::getConfiguration(ASTs & engine_args, Context
/// S3('url', 'aws_access_key_id', 'aws_secret_access_key', 'session_token', 'format', 'compression')
/// with optional headers() function
if (engine_args.empty() || engine_args.size() > 6)
size_t count = StorageURL::evalArgsAndCollectHeaders(engine_args, configuration.headers_from_ast, local_context);
if (count == 0 || count > 6)
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Storage S3 requires 1 to 5 arguments: "
"url, [NOSIGN | access_key_id, secret_access_key], name of used format and [compression_method]");
auto * header_it = StorageURL::collectHeaders(engine_args, configuration.headers_from_ast, local_context);
if (header_it != engine_args.end())
engine_args.erase(header_it);
for (auto & engine_arg : engine_args)
engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, local_context);
/// Size -> argument indexes
static std::unordered_map<size_t, std::unordered_map<std::string_view, size_t>> size_to_engine_args
{
{1, {{}}},
{6, {{"access_key_id", 1}, {"secret_access_key", 2}, {"session_token", 3}, {"format", 4}, {"compression_method", 5}}}
};
std::unordered_map<std::string_view, size_t> engine_args_to_idx;
bool no_sign_request = false;
@ -1514,7 +1502,7 @@ StorageS3::Configuration StorageS3::getConfiguration(ASTs & engine_args, Context
/// - s3(source, format)
/// - s3(source, NOSIGN)
/// We can distinguish them by looking at the 2-nd argument: check if it's NOSIGN or not.
if (engine_args.size() == 2)
if (count == 2)
{
auto second_arg = checkAndGetLiteralArgument<String>(engine_args[1], "format/NOSIGN");
if (boost::iequals(second_arg, "NOSIGN"))
@ -1524,10 +1512,10 @@ StorageS3::Configuration StorageS3::getConfiguration(ASTs & engine_args, Context
}
/// For 3 arguments we support 2 possible variants:
/// - s3(source, format, compression_method)
/// - s3(source, access_key_id, access_key_id)
/// - s3(source, access_key_id, secret_access_key)
/// - s3(source, NOSIGN, format)
/// We can distinguish them by looking at the 2-nd argument: check if it's NOSIGN or format name.
else if (engine_args.size() == 3)
else if (count == 3)
{
auto second_arg = checkAndGetLiteralArgument<String>(engine_args[1], "format/access_key_id/NOSIGN");
if (boost::iequals(second_arg, "NOSIGN"))
@ -1545,7 +1533,7 @@ StorageS3::Configuration StorageS3::getConfiguration(ASTs & engine_args, Context
/// - s3(source, access_key_id, secret_access_key, format)
/// - s3(source, NOSIGN, format, compression_method)
/// We can distinguish them by looking at the 2-nd argument: check if it's a NOSIGN or not.
else if (engine_args.size() == 4)
else if (count == 4)
{
auto second_arg = checkAndGetLiteralArgument<String>(engine_args[1], "access_key_id/NOSIGN");
if (boost::iequals(second_arg, "NOSIGN"))
@ -1569,7 +1557,7 @@ StorageS3::Configuration StorageS3::getConfiguration(ASTs & engine_args, Context
/// For 5 arguments we support 2 possible variants:
/// - s3(source, access_key_id, secret_access_key, session_token, format)
/// - s3(source, access_key_id, secret_access_key, format, compression)
else if (engine_args.size() == 5)
else if (count == 5)
{
auto fourth_arg = checkAndGetLiteralArgument<String>(engine_args[3], "session_token/format");
if (fourth_arg == "auto" || FormatFactory::instance().getAllFormats().contains(fourth_arg))
@ -1581,9 +1569,9 @@ StorageS3::Configuration StorageS3::getConfiguration(ASTs & engine_args, Context
engine_args_to_idx = {{"access_key_id", 1}, {"secret_access_key", 2}, {"session_token", 3}, {"format", 4}};
}
}
else
else if (count == 6)
{
engine_args_to_idx = size_to_engine_args[engine_args.size()];
engine_args_to_idx = {{"access_key_id", 1}, {"secret_access_key", 2}, {"session_token", 3}, {"format", 4}, {"compression_method", 5}};
}
/// This argument is always the first

View File

@ -1324,7 +1324,7 @@ FormatSettings StorageURL::getFormatSettingsFromArgs(const StorageFactory::Argum
return format_settings;
}
ASTs::iterator StorageURL::collectHeaders(
size_t StorageURL::evalArgsAndCollectHeaders(
ASTs & url_function_args, HTTPHeaderEntries & header_entries, ContextPtr context)
{
ASTs::iterator headers_it = url_function_args.end();
@ -1382,7 +1382,11 @@ ASTs::iterator StorageURL::collectHeaders(
(*arg_it) = evaluateConstantExpressionOrIdentifierAsLiteral((*arg_it), context);
}
return headers_it;
if (headers_it == url_function_args.end())
return url_function_args.size();
std::rotate(headers_it, std::next(headers_it), url_function_args.end());
return url_function_args.size() - 1;
}
void StorageURL::processNamedCollectionResult(Configuration & configuration, const NamedCollection & collection)
@ -1412,21 +1416,19 @@ StorageURL::Configuration StorageURL::getConfiguration(ASTs & args, ContextPtr l
if (auto named_collection = tryGetNamedCollectionWithOverrides(args, local_context))
{
StorageURL::processNamedCollectionResult(configuration, *named_collection);
collectHeaders(args, configuration.headers, local_context);
evalArgsAndCollectHeaders(args, configuration.headers, local_context);
}
else
{
if (args.empty() || args.size() > 3)
size_t count = evalArgsAndCollectHeaders(args, configuration.headers, local_context);
if (count == 0 || count > 3)
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, bad_arguments_error_message);
auto * header_it = collectHeaders(args, configuration.headers, local_context);
if (header_it != args.end())
args.erase(header_it);
configuration.url = checkAndGetLiteralArgument<String>(args[0], "url");
if (args.size() > 1)
if (count > 1)
configuration.format = checkAndGetLiteralArgument<String>(args[1], "format");
if (args.size() == 3)
if (count == 3)
configuration.compression_method = checkAndGetLiteralArgument<String>(args[2], "compression_method");
}

View File

@ -294,7 +294,10 @@ public:
static Configuration getConfiguration(ASTs & args, ContextPtr context);
static ASTs::iterator collectHeaders(ASTs & url_function_args, HTTPHeaderEntries & header_entries, ContextPtr context);
/// Does evaluateConstantExpressionOrIdentifierAsLiteral() on all arguments.
/// If `headers(...)` argument is present, parses it and moves it to the end of the array.
/// Returns number of arguments excluding `headers(...)`.
static size_t evalArgsAndCollectHeaders(ASTs & url_function_args, HTTPHeaderEntries & header_entries, ContextPtr context);
static void processNamedCollectionResult(Configuration & configuration, const NamedCollection & collection);
};

View File

@ -67,23 +67,11 @@ void TableFunctionS3::parseArgumentsImpl(ASTs & args, const ContextPtr & context
else
{
auto * header_it = StorageURL::collectHeaders(args, configuration.headers_from_ast, context);
if (header_it != args.end())
args.erase(header_it);
size_t count = StorageURL::evalArgsAndCollectHeaders(args, configuration.headers_from_ast, context);
if (args.empty() || args.size() > 7)
if (count == 0 || count > 7)
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "The signature of table function {} shall be the following:\n{}", getName(), getSignature());
for (auto & arg : args)
arg = evaluateConstantExpressionOrIdentifierAsLiteral(arg, context);
/// Size -> argument indexes
static std::unordered_map<size_t, std::unordered_map<std::string_view, size_t>> size_to_args
{
{1, {{}}},
{7, {{"access_key_id", 1}, {"secret_access_key", 2}, {"session_token", 3}, {"format", 4}, {"structure", 5}, {"compression_method", 6}}}
};
std::unordered_map<std::string_view, size_t> args_to_idx;
bool no_sign_request = false;
@ -92,7 +80,7 @@ void TableFunctionS3::parseArgumentsImpl(ASTs & args, const ContextPtr & context
/// - s3(source, format)
/// - s3(source, NOSIGN)
/// We can distinguish them by looking at the 2-nd argument: check if it's NOSIGN or not.
if (args.size() == 2)
if (count == 2)
{
auto second_arg = checkAndGetLiteralArgument<String>(args[1], "format/NOSIGN");
if (boost::iequals(second_arg, "NOSIGN"))
@ -102,10 +90,10 @@ void TableFunctionS3::parseArgumentsImpl(ASTs & args, const ContextPtr & context
}
/// For 3 arguments we support 3 possible variants:
/// - s3(source, format, structure)
/// - s3(source, access_key_id, access_key_id)
/// - s3(source, access_key_id, secret_access_key)
/// - s3(source, NOSIGN, format)
/// We can distinguish them by looking at the 2-nd argument: check if it's a format name or not.
else if (args.size() == 3)
else if (count == 3)
{
auto second_arg = checkAndGetLiteralArgument<String>(args[1], "format/access_key_id/NOSIGN");
if (boost::iequals(second_arg, "NOSIGN"))
@ -120,11 +108,11 @@ void TableFunctionS3::parseArgumentsImpl(ASTs & args, const ContextPtr & context
}
/// For 4 arguments we support 4 possible variants:
/// - s3(source, format, structure, compression_method),
/// - s3(source, access_key_id, access_key_id, format),
/// - s3(source, access_key_id, access_key_id, session_token)
/// - s3(source, access_key_id, secret_access_key, format),
/// - s3(source, access_key_id, secret_access_key, session_token)
/// - s3(source, NOSIGN, format, structure)
/// We can distinguish them by looking at the 2-nd and 4-th argument: check if it's a format name or not.
else if (args.size() == 4)
else if (count == 4)
{
auto second_arg = checkAndGetLiteralArgument<String>(args[1], "format/access_key_id/NOSIGN");
if (boost::iequals(second_arg, "NOSIGN"))
@ -150,12 +138,12 @@ void TableFunctionS3::parseArgumentsImpl(ASTs & args, const ContextPtr & context
}
}
/// For 5 arguments we support 3 possible variants:
/// - s3(source, access_key_id, access_key_id, format, structure)
/// - s3(source, access_key_id, access_key_id, session_token, format)
/// - s3(source, access_key_id, secret_access_key, format, structure)
/// - s3(source, access_key_id, secret_access_key, session_token, format)
/// - s3(source, NOSIGN, format, structure, compression_method)
/// We can distinguish them by looking at the 2-nd argument: check if it's a NOSIGN keyword name or no,
/// and by the 4-th argument, check if it's a format name or not
else if (args.size() == 5)
else if (count == 5)
{
auto second_arg = checkAndGetLiteralArgument<String>(args[1], "NOSIGN/access_key_id");
if (boost::iequals(second_arg, "NOSIGN"))
@ -177,10 +165,10 @@ void TableFunctionS3::parseArgumentsImpl(ASTs & args, const ContextPtr & context
}
}
// For 6 arguments we support 2 possible variants:
/// - s3(source, access_key_id, access_key_id, format, structure, compression_method)
/// - s3(source, access_key_id, access_key_id, session_token, format, structure)
/// - s3(source, access_key_id, secret_access_key, format, structure, compression_method)
/// - s3(source, access_key_id, secret_access_key, session_token, format, structure)
/// We can distinguish them by looking at the 4-th argument: check if it's a format name or not
else if (args.size() == 6)
else if (count == 6)
{
auto fourth_arg = checkAndGetLiteralArgument<String>(args[3], "format/session_token");
if (fourth_arg == "auto" || FormatFactory::instance().getAllFormats().contains(fourth_arg))
@ -192,9 +180,9 @@ void TableFunctionS3::parseArgumentsImpl(ASTs & args, const ContextPtr & context
args_to_idx = {{"access_key_id", 1}, {"secret_access_key", 2}, {"session_token", 3}, {"format", 4}, {"structure", 5}};
}
}
else
else if (count == 7)
{
args_to_idx = size_to_args[args.size()];
args_to_idx = {{"access_key_id", 1}, {"secret_access_key", 2}, {"session_token", 3}, {"format", 4}, {"structure", 5}, {"compression_method", 6}};
}
/// This argument is always the first
@ -262,24 +250,16 @@ void TableFunctionS3::addColumnsStructureToArguments(ASTs & args, const String &
}
else
{
/// If arguments contain headers, just remove it and add to the end of arguments later
/// (header argument can be at any position).
HTTPHeaderEntries tmp_headers;
auto * headers_it = StorageURL::collectHeaders(args, tmp_headers, context);
ASTPtr headers_ast;
if (headers_it != args.end())
{
headers_ast = *headers_it;
args.erase(headers_it);
}
size_t count = StorageURL::evalArgsAndCollectHeaders(args, tmp_headers, context);
if (args.empty() || args.size() > getMaxNumberOfArguments())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected 1 to {} arguments in table function, got {}", getMaxNumberOfArguments(), args.size());
if (count == 0 || count > getMaxNumberOfArguments())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected 1 to {} arguments in table function, got {}", getMaxNumberOfArguments(), count);
auto structure_literal = std::make_shared<ASTLiteral>(structure);
/// s3(s3_url)
if (args.size() == 1)
if (count == 1)
{
/// Add format=auto before structure argument.
args.push_back(std::make_shared<ASTLiteral>("auto"));
@ -287,7 +267,7 @@ void TableFunctionS3::addColumnsStructureToArguments(ASTs & args, const String &
}
/// s3(s3_url, format) or s3(s3_url, NOSIGN)
/// We can distinguish them by looking at the 2-nd argument: check if it's NOSIGN or not.
else if (args.size() == 2)
else if (count == 2)
{
auto second_arg = checkAndGetLiteralArgument<String>(args[1], "format/NOSIGN");
/// If there is NOSIGN, add format=auto before structure.
@ -296,10 +276,10 @@ void TableFunctionS3::addColumnsStructureToArguments(ASTs & args, const String &
args.push_back(structure_literal);
}
/// s3(source, format, structure) or
/// s3(source, access_key_id, access_key_id) or
/// s3(source, access_key_id, secret_access_key) or
/// s3(source, NOSIGN, format)
/// We can distinguish them by looking at the 2-nd argument: check if it's NOSIGN, format name or neither.
else if (args.size() == 3)
else if (count == 3)
{
auto second_arg = checkAndGetLiteralArgument<String>(args[1], "format/NOSIGN");
if (boost::iequals(second_arg, "NOSIGN"))
@ -308,7 +288,7 @@ void TableFunctionS3::addColumnsStructureToArguments(ASTs & args, const String &
}
else if (second_arg == "auto" || FormatFactory::instance().getAllFormats().contains(second_arg))
{
args.back() = structure_literal;
args[count - 1] = structure_literal;
}
else
{
@ -318,48 +298,45 @@ void TableFunctionS3::addColumnsStructureToArguments(ASTs & args, const String &
}
}
/// s3(source, format, structure, compression_method) or
/// s3(source, access_key_id, access_key_id, format) or
/// s3(source, access_key_id, secret_access_key, format) or
/// s3(source, NOSIGN, format, structure)
/// We can distinguish them by looking at the 2-nd argument: check if it's NOSIGN, format name or neither.
else if (args.size() == 4)
else if (count == 4)
{
auto second_arg = checkAndGetLiteralArgument<String>(args[1], "format/NOSIGN");
if (boost::iequals(second_arg, "NOSIGN"))
{
args.back() = structure_literal;
args[count - 1] = structure_literal;
}
else if (second_arg == "auto" || FormatFactory::instance().getAllFormats().contains(second_arg))
{
args[args.size() - 2] = structure_literal;
args[count - 2] = structure_literal;
}
else
{
args.push_back(structure_literal);
}
}
/// s3(source, access_key_id, access_key_id, format, structure) or
/// s3(source, access_key_id, secret_access_key, format, structure) or
/// s3(source, NOSIGN, format, structure, compression_method)
/// We can distinguish them by looking at the 2-nd argument: check if it's a NOSIGN keyword name or not.
else if (args.size() == 5)
else if (count == 5)
{
auto sedond_arg = checkAndGetLiteralArgument<String>(args[1], "format/NOSIGN");
if (boost::iequals(sedond_arg, "NOSIGN"))
{
args[args.size() - 2] = structure_literal;
args[count - 2] = structure_literal;
}
else
{
args.back() = structure_literal;
args[count - 1] = structure_literal;
}
}
/// s3(source, access_key_id, access_key_id, format, structure, compression)
else if (args.size() == 6)
/// s3(source, access_key_id, secret_access_key, format, structure, compression)
else if (count == 6)
{
args[args.size() - 2] = structure_literal;
args[count - 2] = structure_literal;
}
if (headers_ast)
args.push_back(headers_ast);
}
}

View File

@ -57,16 +57,24 @@ void TableFunctionURL::parseArgumentsImpl(ASTs & args, const ContextPtr & contex
if (format == "auto")
format = FormatFactory::instance().getFormatFromFileName(Poco::URI(filename).getPath(), true);
StorageURL::collectHeaders(args, configuration.headers, context);
StorageURL::evalArgsAndCollectHeaders(args, configuration.headers, context);
}
else
{
auto * headers_it = StorageURL::collectHeaders(args, configuration.headers, context);
size_t count = StorageURL::evalArgsAndCollectHeaders(args, configuration.headers, context);
/// ITableFunctionFileLike cannot parse headers argument, so remove it.
if (headers_it != args.end())
args.erase(headers_it);
ASTPtr headers_ast;
if (count != args.size())
{
chassert(count + 1 == args.size());
headers_ast = args.back();
args.pop_back();
}
ITableFunctionFileLike::parseArgumentsImpl(args, context);
if (headers_ast)
args.push_back(headers_ast);
}
}
@ -82,15 +90,15 @@ void TableFunctionURL::addColumnsStructureToArguments(ASTs & args, const String
}
else
{
/// If arguments contain headers, just remove it and add to the end of arguments later
/// (header argument can be at any position).
/// If arguments contain headers, just remove it and add to the end of arguments later.
HTTPHeaderEntries tmp_headers;
auto * headers_it = StorageURL::collectHeaders(args, tmp_headers, context);
size_t count = StorageURL::evalArgsAndCollectHeaders(args, tmp_headers, context);
ASTPtr headers_ast;
if (headers_it != args.end())
if (count != args.size())
{
headers_ast = *headers_it;
args.erase(headers_it);
chassert(count + 1 == args.size());
headers_ast = args.back();
args.pop_back();
}
ITableFunctionFileLike::addColumnsStructureToArguments(args, desired_structure, context);

View File

@ -0,0 +1 @@
CREATE TABLE default.a\n(\n `x` Int64\n)\nENGINE = URL(\'https://example.com/\', \'CSV\', headers(\'foo\' = \'bar\'))

View File

@ -0,0 +1,2 @@
create table a (x Int64) engine URL('https://example.com/', CSV, headers('foo' = 'bar'));
show create a;