Merge pull request #34465 from Avogar/fix-url-globs

Improve schema inference with globs in FIle/S3/HDFS/URL engines
This commit is contained in:
Kruglov Pavel 2022-02-17 13:33:27 +03:00 committed by GitHub
commit 6dcb766879
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 178 additions and 113 deletions

View File

@ -172,23 +172,43 @@ ColumnsDescription StorageHDFS::getTableStructureFromData(
const String & compression_method,
ContextPtr ctx)
{
auto read_buffer_creator = [&]()
const auto [path_from_uri, uri_without_path] = getPathFromUriAndUriWithoutPath(uri);
auto paths = getPathsList(path_from_uri, uri, ctx);
std::string exception_messages;
bool read_buffer_creator_was_used = false;
for (const auto & path : paths)
{
const auto [path_from_uri, uri_without_path] = getPathFromUriAndUriWithoutPath(uri);
auto paths = getPathsList(path_from_uri, uri, ctx);
if (paths.empty())
throw Exception(
ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE,
"Cannot extract table structure from {} format file, because there are no files in HDFS with provided path. You must "
"specify table structure manually",
format);
auto read_buffer_creator = [&, uri_without_path = uri_without_path]()
{
read_buffer_creator_was_used = true;
auto compression = chooseCompressionMethod(paths[0], compression_method);
return wrapReadBufferWithCompressionMethod(
std::make_unique<ReadBufferFromHDFS>(uri_without_path, paths[0], ctx->getGlobalContext()->getConfigRef()), compression);
};
if (paths.empty())
throw Exception(
ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE,
"Cannot extract table structure from {} format file, because there are no files in HDFS with provided path. You must "
"specify table structure manually",
format);
return readSchemaFromFormat(format, std::nullopt, read_buffer_creator, ctx);
auto compression = chooseCompressionMethod(path, compression_method);
return wrapReadBufferWithCompressionMethod(
std::make_unique<ReadBufferFromHDFS>(uri_without_path, path, ctx->getGlobalContext()->getConfigRef()), compression);
};
try
{
return readSchemaFromFormat(format, std::nullopt, read_buffer_creator, ctx);
}
catch (...)
{
if (paths.size() == 1 || !read_buffer_creator_was_used)
throw;
exception_messages += getCurrentExceptionMessage(false) + "\n";
}
}
throw Exception(ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, "All attempts to extract table structure from hdfs files failed. Errors:\n{}", exception_messages);
}
class HDFSSource::DisclosedGlobIterator::Impl

View File

@ -273,22 +273,38 @@ ColumnsDescription StorageFile::getTableStructureFromFile(
return ColumnsDescription(source->getOutputs().front().getHeader().getNamesAndTypesList());
}
auto read_buffer_creator = [&]()
std::string exception_messages;
bool read_buffer_creator_was_used = false;
for (const auto & path : paths)
{
String path;
auto it = std::find_if(paths.begin(), paths.end(), [](const String & p){ return std::filesystem::exists(p); });
if (it == paths.end())
throw Exception(
ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE,
"Cannot extract table structure from {} format file, because there are no files with provided path. You must specify "
"table structure manually",
format);
auto read_buffer_creator = [&]()
{
read_buffer_creator_was_used = true;
path = *it;
return createReadBuffer(path, false, "File", -1, compression_method, context);
};
if (!std::filesystem::exists(path))
throw Exception(
ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE,
"Cannot extract table structure from {} format file, because there are no files with provided path. You must specify "
"table structure manually",
format);
return readSchemaFromFormat(format, format_settings, read_buffer_creator, context);
return createReadBuffer(path, false, "File", -1, compression_method, context);
};
try
{
return readSchemaFromFormat(format, format_settings, read_buffer_creator, context);
}
catch (...)
{
if (paths.size() == 1 || !read_buffer_creator_was_used)
throw;
exception_messages += getCurrentExceptionMessage(false) + "\n";
}
}
throw Exception(ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, "All attempts to extract table structure from files failed. Errors:\n{}", exception_messages);
}
bool StorageFile::isColumnOriented() const

View File

@ -888,23 +888,44 @@ ColumnsDescription StorageS3::getTableStructureFromDataImpl(
ContextPtr ctx)
{
std::vector<String> keys = {client_auth.uri.key};
auto read_buffer_creator = [&]()
auto file_iterator = createFileIterator(client_auth, keys, is_key_with_globs, distributed_processing, ctx);
std::string current_key;
std::string exception_messages;
bool read_buffer_creator_was_used = false;
do
{
auto file_iterator = createFileIterator(client_auth, keys, is_key_with_globs, distributed_processing, ctx);
String current_key = (*file_iterator)();
if (current_key.empty())
throw Exception(
ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE,
"Cannot extract table structure from {} format file, because there are no files with provided path in S3. You must specify "
"table structure manually",
format);
current_key = (*file_iterator)();
auto read_buffer_creator = [&]()
{
read_buffer_creator_was_used = true;
if (current_key.empty())
throw Exception(
ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE,
"Cannot extract table structure from {} format file, because there are no files with provided path in S3. You must specify "
"table structure manually",
format);
return wrapReadBufferWithCompressionMethod(
std::make_unique<ReadBufferFromS3>(client_auth.client, client_auth.uri.bucket, current_key, max_single_read_retries, ctx->getReadSettings()),
chooseCompressionMethod(current_key, compression_method));
};
return wrapReadBufferWithCompressionMethod(
std::make_unique<ReadBufferFromS3>(
client_auth.client, client_auth.uri.bucket, current_key, max_single_read_retries, ctx->getReadSettings()),
chooseCompressionMethod(current_key, compression_method));
};
return readSchemaFromFormat(format, format_settings, read_buffer_creator, ctx);
try
{
return readSchemaFromFormat(format, format_settings, read_buffer_creator, ctx);
}
catch (...)
{
if (!is_key_with_globs || !read_buffer_creator_was_used)
throw;
exception_messages += getCurrentExceptionMessage(false) + "\n";
}
} while (!current_key.empty());
throw Exception(ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, "All attempts to extract table structure from s3 files failed. Errors:\n{}", exception_messages);
}

View File

@ -37,6 +37,7 @@ namespace ErrorCodes
extern const int NETWORK_ERROR;
extern const int BAD_ARGUMENTS;
extern const int LOGICAL_ERROR;
extern const int CANNOT_EXTRACT_TABLE_STRUCTURE;
}
@ -173,41 +174,10 @@ namespace
if (uri_options.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Got empty url list");
if (uri_options.size() > 1)
{
read_buf = getFirstAvailableURLReadBuffer(
uri_options, context, params, http_method,
callback, timeouts, compression_method, credentials, headers);
}
else
{
ReadSettings read_settings = context->getReadSettings();
bool skip_url_not_found_error = glob_url && read_settings.http_skip_not_found_url_for_globs;
auto request_uri = Poco::URI(uri_options[0]);
for (const auto & [param, value] : params)
request_uri.addQueryParameter(param, value);
setCredentials(credentials, request_uri);
read_buf = wrapReadBufferWithCompressionMethod(
std::make_unique<ReadWriteBufferFromHTTP>(
request_uri,
http_method,
callback,
timeouts,
credentials,
context->getSettingsRef().max_http_get_redirects,
DBMS_DEFAULT_BUFFER_SIZE,
read_settings,
headers,
ReadWriteBufferFromHTTP::Range{},
context->getRemoteHostFilter(),
/* delay_initiliazation */true,
/* use_external_buffer */false,
/* skip_url_not_found_error */skip_url_not_found_error),
chooseCompressionMethod(request_uri.getPath(), compression_method));
}
auto first_option = uri_options.begin();
read_buf = getFirstAvailableURLReadBuffer(
first_option, uri_options.end(), context, params, http_method,
callback, timeouts, compression_method, credentials, headers, glob_url, uri_options.size() == 1);
auto input_format = FormatFactory::instance().getInput(format, *read_buf, sample_block, context, max_block_size, format_settings);
QueryPipelineBuilder builder;
@ -258,7 +228,8 @@ namespace
}
static std::unique_ptr<ReadBuffer> getFirstAvailableURLReadBuffer(
const std::vector<String> & urls,
std::vector<String>::const_iterator & option,
const std::vector<String>::const_iterator & end,
ContextPtr context,
const URIParams & params,
const String & http_method,
@ -266,14 +237,17 @@ namespace
const ConnectionTimeouts & timeouts,
const String & compression_method,
Poco::Net::HTTPBasicCredentials & credentials,
const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers)
const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers,
bool glob_url,
bool delay_initialization)
{
String first_exception_message;
ReadSettings read_settings = context->getReadSettings();
for (auto option = urls.begin(); option != urls.end(); ++option)
size_t options = std::distance(option, end);
for (; option != end; ++option)
{
bool skip_url_not_found_error = read_settings.http_skip_not_found_url_for_globs && option == std::prev(urls.end());
bool skip_url_not_found_error = glob_url && read_settings.http_skip_not_found_url_for_globs && option == std::prev(end);
auto request_uri = Poco::URI(*option);
for (const auto & [param, value] : params)
@ -296,7 +270,7 @@ namespace
headers,
ReadWriteBufferFromHTTP::Range{},
context->getRemoteHostFilter(),
/* delay_initiliazation */false,
delay_initialization,
/* use_external_buffer */false,
/* skip_url_not_found_error */skip_url_not_found_error),
chooseCompressionMethod(request_uri.getPath(), compression_method));
@ -306,14 +280,14 @@ namespace
if (first_exception_message.empty())
first_exception_message = getCurrentExceptionMessage(false);
if (urls.size() == 1)
if (options == 1)
throw;
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
throw Exception(ErrorCodes::NETWORK_ERROR, "All uri ({}) options are unreachable: {}", urls.size(), first_exception_message);
throw Exception(ErrorCodes::NETWORK_ERROR, "All uri ({}) options are unreachable: {}", options, first_exception_message);
}
private:
@ -449,13 +423,11 @@ ColumnsDescription IStorageURLBase::getTableStructureFromData(
const std::optional<FormatSettings> & format_settings,
ContextPtr context)
{
ReadBufferCreator read_buffer_creator;
Poco::Net::HTTPBasicCredentials credentials;
std::vector<String> urls_to_check;
if (urlWithGlobs(uri))
{
std::vector<String> urls_to_check;
size_t max_addresses = context->getSettingsRef().glob_expansion_max_elements;
auto uri_descriptions = parseRemoteDescription(uri, 0, uri.size(), ',', max_addresses);
for (const auto & description : uri_descriptions)
@ -463,11 +435,24 @@ ColumnsDescription IStorageURLBase::getTableStructureFromData(
auto options = parseRemoteDescription(description, 0, description.size(), '|', max_addresses);
urls_to_check.insert(urls_to_check.end(), options.begin(), options.end());
}
}
else
{
urls_to_check = {uri};
}
read_buffer_creator = [&, urls_to_check]()
String exception_messages;
bool read_buffer_creator_was_used = false;
std::vector<String>::const_iterator option = urls_to_check.begin();
do
{
auto read_buffer_creator = [&]()
{
read_buffer_creator_was_used = true;
return StorageURLSource::getFirstAvailableURLReadBuffer(
urls_to_check,
option,
urls_to_check.end(),
context,
{},
Poco::Net::HTTPRequest::HTTP_GET,
@ -475,35 +460,26 @@ ColumnsDescription IStorageURLBase::getTableStructureFromData(
ConnectionTimeouts::getHTTPTimeouts(context),
compression_method,
credentials,
headers);
headers,
false,
false);
};
}
else
{
read_buffer_creator = [&]()
try
{
auto parsed_uri = Poco::URI(uri);
StorageURLSource::setCredentials(credentials, parsed_uri);
return readSchemaFromFormat(format, format_settings, read_buffer_creator, context);
}
catch (...)
{
if (urls_to_check.size() == 1 || !read_buffer_creator_was_used)
throw;
return wrapReadBufferWithCompressionMethod(
std::make_unique<ReadWriteBufferFromHTTP>(
parsed_uri,
Poco::Net::HTTPRequest::HTTP_GET,
nullptr,
ConnectionTimeouts::getHTTPTimeouts(context),
credentials,
context->getSettingsRef().max_http_get_redirects,
DBMS_DEFAULT_BUFFER_SIZE,
context->getReadSettings(),
headers,
ReadWriteBufferFromHTTP::Range{},
context->getRemoteHostFilter(),
/* delay_initiliazation */true),
chooseCompressionMethod(parsed_uri.getPath(), compression_method));
};
}
exception_messages += getCurrentExceptionMessage(false) + "\n";
}
return readSchemaFromFormat(format, format_settings, read_buffer_creator, context);
} while (++option < urls_to_check.end());
throw Exception(ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, "All attempts to extract table structure from urls failed. Errors:\n{}", exception_messages);
}
Pipe IStorageURLBase::read(

View File

@ -410,6 +410,17 @@ def test_format_detection(started_cluster):
assert(int(result) == 1)
def test_schema_inference_with_globs(started_cluster):
node1.query(f"insert into table function hdfs('hdfs://hdfs1:9000/data1.jsoncompacteachrow', 'JSONCompactEachRow', 'x Nullable(UInt32)') select NULL")
node1.query(f"insert into table function hdfs('hdfs://hdfs1:9000/data2.jsoncompacteachrow', 'JSONCompactEachRow', 'x Nullable(UInt32)') select 0")
result = node1.query(f"desc hdfs('hdfs://hdfs1:9000/data*.jsoncompacteachrow')")
assert(result.strip() == 'c1\tNullable(Float64)')
result = node1.query(f"select * from hdfs('hdfs://hdfs1:9000/data*.jsoncompacteachrow')")
assert(sorted(result.split()) == ['0', '\\N'])
if __name__ == '__main__':
cluster.start()
input("Cluster created, press any key to destroy...")

View File

@ -994,6 +994,27 @@ def test_format_detection(started_cluster):
assert(int(result) == 1)
def test_schema_inference_from_globs(started_cluster):
bucket = started_cluster.minio_bucket
instance = started_cluster.instances["dummy"]
instance.query(f"insert into table function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test1.jsoncompacteachrow', 'JSONCompactEachRow', 'x Nullable(UInt32)') select NULL")
instance.query(f"insert into table function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test2.jsoncompacteachrow', 'JSONCompactEachRow', 'x Nullable(UInt32)') select 0")
url_filename = "test{1,2}.jsoncompacteachrow"
result = instance.query(f"desc url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{url_filename}')")
assert(result.strip() == 'c1\tNullable(Float64)')
result = instance.query(f"select * from url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{url_filename}')")
assert(sorted(result.split()) == ['0', '\\N'])
result = instance.query(f"desc s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test*.jsoncompacteachrow')")
assert(result.strip() == 'c1\tNullable(Float64)')
result = instance.query(f"select * from s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test*.jsoncompacteachrow')")
assert(sorted(result.split()) == ['0', '\\N'])
def test_signatures(started_cluster):
bucket = started_cluster.minio_bucket
instance = started_cluster.instances["dummy"]