Detect format in S3/HDFS/URL table engines

This commit is contained in:
avogar 2022-01-13 16:14:18 +03:00
parent 8390e9ad60
commit 2d7b1bfa5e
8 changed files with 37 additions and 30 deletions

View File

@ -400,7 +400,7 @@ void FormatFactory::registerFileExtension(const String & extension, const String
file_extension_formats[boost::to_lower_copy(extension)] = format_name;
}
String FormatFactory::getFormatFromFileName(String file_name)
String FormatFactory::getFormatFromFileName(String file_name, bool throw_if_not_found)
{
CompressionMethod compression_method = chooseCompressionMethod(file_name, "");
if (CompressionMethod::None != compression_method)
@ -416,7 +416,14 @@ String FormatFactory::getFormatFromFileName(String file_name)
String file_extension = file_name.substr(pos + 1, String::npos);
boost::algorithm::to_lower(file_extension);
return file_extension_formats[file_extension];
auto it = file_extension_formats.find(file_extension);
if (it == file_extension_formats.end())
{
if (throw_if_not_found)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot determine the file format by it's extension");
return "";
}
return it->second;
}
void FormatFactory::registerFileSegmentationEngine(const String & name, FileSegmentationEngine file_segmentation_engine)

View File

@ -173,7 +173,7 @@ public:
/// Register file extension for format
void registerFileExtension(const String & extension, const String & format_name);
String getFormatFromFileName(String file_name);
String getFormatFromFileName(String file_name, bool throw_if_not_found = false);
/// Register schema readers for format its name.
void registerSchemaReader(const String & name, SchemaReaderCreator schema_reader_creator);

View File

@ -544,17 +544,23 @@ void registerStorageHDFS(StorageFactory & factory)
{
ASTs & engine_args = args.engine_args;
if (engine_args.size() != 2 && engine_args.size() != 3)
if (engine_args.size() < 1 || engine_args.size() > 3)
throw Exception(
"Storage HDFS requires 2 or 3 arguments: url, name of used format and optional compression method.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
"Storage HDFS requires 1, 2 or 3 arguments: url, name of used format (taken from file extension by default) and optional compression method.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
engine_args[0] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[0], args.getLocalContext());
String url = engine_args[0]->as<ASTLiteral &>().value.safeGet<String>();
engine_args[1] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[1], args.getLocalContext());
String format_name = "auto";
if (engine_args.size() > 1)
{
engine_args[1] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[1], args.getLocalContext());
format_name = engine_args[1]->as<ASTLiteral &>().value.safeGet<String>();
}
String format_name = engine_args[1]->as<ASTLiteral &>().value.safeGet<String>();
if (format_name == "auto")
format_name = FormatFactory::instance().getFormatFromFileName(url, true);
String compression_method;
if (engine_args.size() == 3)

View File

@ -687,9 +687,9 @@ StorageS3Configuration StorageS3::getConfiguration(ASTs & engine_args, ContextPt
}
else
{
if (engine_args.size() < 2 || engine_args.size() > 5)
if (engine_args.size() < 1 || engine_args.size() > 5)
throw Exception(
"Storage S3 requires 2 to 5 arguments: url, [access_key_id, secret_access_key], name of used format and [compression_method].",
"Storage S3 requires 1 to 5 arguments: url, [access_key_id, secret_access_key], name of used format and [compression_method].",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
for (auto & engine_arg : engine_args)
@ -707,13 +707,16 @@ StorageS3Configuration StorageS3::getConfiguration(ASTs & engine_args, ContextPt
configuration.compression_method = engine_args.back()->as<ASTLiteral &>().value.safeGet<String>();
configuration.format = engine_args[engine_args.size() - 2]->as<ASTLiteral &>().value.safeGet<String>();
}
else
else if (engine_args.size() != 1)
{
configuration.compression_method = "auto";
configuration.format = engine_args.back()->as<ASTLiteral &>().value.safeGet<String>();
}
}
if (configuration.format == "auto")
configuration.format = FormatFactory::instance().getFormatFromFileName(configuration.url, true);
return configuration;
}

View File

@ -624,20 +624,24 @@ URLBasedDataSourceConfiguration StorageURL::getConfiguration(ASTs & args, Contex
}
else
{
if (args.size() != 2 && args.size() != 3)
if (args.size() < 1 || args.size() > 3)
throw Exception(
"Storage URL requires 2 or 3 arguments: url, name of used format and optional compression method.",
"Storage URL requires 1, 2 or 3 arguments: url, name of used format (taken from file extension by default) and optional compression method.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
for (auto & arg : args)
arg = evaluateConstantExpressionOrIdentifierAsLiteral(arg, local_context);
configuration.url = args[0]->as<ASTLiteral &>().value.safeGet<String>();
configuration.format = args[1]->as<ASTLiteral &>().value.safeGet<String>();
if (args.size() > 1)
configuration.format = args[1]->as<ASTLiteral &>().value.safeGet<String>();
if (args.size() == 3)
configuration.compression_method = args[2]->as<ASTLiteral &>().value.safeGet<String>();
}
if (configuration.format == "auto")
configuration.format = FormatFactory::instance().getFormatFromFileName(configuration.url, true);
return configuration;
}

View File

@ -65,11 +65,7 @@ void ITableFunctionFileLike::parseArguments(const ASTPtr & ast_function, Context
format = args[1]->as<ASTLiteral &>().value.safeGet<String>();
if (format == "auto")
{
format = FormatFactory::instance().getFormatFromFileName(filename);
if (format.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot determine the file format by it's extension, you should provide the format manually");
}
format = FormatFactory::instance().getFormatFromFileName(filename, true);
if (args.size() <= 2)
{

View File

@ -115,11 +115,7 @@ void TableFunctionS3::parseArguments(const ASTPtr & ast_function, ContextPtr con
}
if (configuration.format == "auto")
{
configuration.format = FormatFactory::instance().getFormatFromFileName(configuration.url);
if (configuration.format.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot determine the file format by it's extension, you should provide the format manually");
}
configuration.format = FormatFactory::instance().getFormatFromFileName(configuration.url, true);
s3_configuration = std::move(configuration);
}

View File

@ -51,15 +51,10 @@ void TableFunctionURL::parseArguments(const ASTPtr & ast_function, ContextPtr co
filename = configuration.url;
format = configuration.format;
if (format == "auto")
format = FormatFactory::instance().getFormatFromFileName(filename, true);
structure = configuration.structure;
compression_method = configuration.compression_method;
if (format == "auto")
{
format = FormatFactory::instance().getFormatFromFileName(configuration.url);
if (format.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot determine the file format by it's extension, you should provide the format manually");
}
}
else
{