Fix comments

This commit is contained in:
avogar 2023-06-15 12:59:46 +00:00
parent 7aea4a1f10
commit 870f3d1270
4 changed files with 37 additions and 23 deletions

View File

@ -213,7 +213,6 @@ ColumnsDescription StorageHDFS::getTableStructureFromData(
ColumnsDescription &) mutable -> std::unique_ptr<ReadBuffer>
{
PathWithInfo path_with_info;
std::unique_ptr<ReadBuffer> buf;
while (true)
{
if (it == paths_with_info.end())
@ -231,13 +230,11 @@ ColumnsDescription StorageHDFS::getTableStructureFromData(
auto compression = chooseCompressionMethod(path_with_info.path, compression_method);
auto impl = std::make_unique<ReadBufferFromHDFS>(my_uri_without_path, path_with_info.path, ctx->getGlobalContext()->getConfigRef(), ctx->getReadSettings());
const Int64 zstd_window_log_max = ctx->getSettingsRef().zstd_window_log_max;
buf = wrapReadBufferWithCompressionMethod(std::move(impl), compression, static_cast<int>(zstd_window_log_max));
if (!ctx->getSettingsRef().hdfs_skip_empty_files || !buf->eof())
if (!ctx->getSettingsRef().hdfs_skip_empty_files || !impl->eof())
{
const Int64 zstd_window_log_max = ctx->getSettingsRef().zstd_window_log_max;
first = false;
return buf;
return wrapReadBufferWithCompressionMethod(std::move(impl), compression, static_cast<int>(zstd_window_log_max));
}
}
};
@ -364,11 +361,10 @@ HDFSSource::HDFSSource(
bool HDFSSource::initialize()
{
StorageHDFS::PathWithInfo path_with_info;
bool skip_empty_files = getContext()->getSettingsRef().hdfs_skip_empty_files;
while (true)
{
path_with_info = (*file_iterator)();
auto path_with_info = (*file_iterator)();
if (path_with_info.path.empty())
return false;
@ -381,10 +377,12 @@ bool HDFSSource::initialize()
auto compression = chooseCompressionMethod(path_from_uri, storage->compression_method);
auto impl = std::make_unique<ReadBufferFromHDFS>(
uri_without_path, path_from_uri, getContext()->getGlobalContext()->getConfigRef(), getContext()->getReadSettings());
const Int64 zstd_window_log_max = getContext()->getSettingsRef().zstd_window_log_max;
read_buf = wrapReadBufferWithCompressionMethod(std::move(impl), compression, static_cast<int>(zstd_window_log_max));
if (!skip_empty_files || !read_buf->eof())
if (!skip_empty_files || !impl->eof())
{
const Int64 zstd_window_log_max = getContext()->getSettingsRef().zstd_window_log_max;
read_buf = wrapReadBufferWithCompressionMethod(std::move(impl), compression, static_cast<int>(zstd_window_log_max));
break;
}
}
auto input_format = getContext()->getInputFormat(storage->format_name, *read_buf, block_for_format, max_block_size);

View File

@ -1453,12 +1453,9 @@ ColumnsDescription StorageS3::getTableStructureFromDataImpl(
ReadBufferIterator read_buffer_iterator = [&, first = true](ColumnsDescription & cached_columns) mutable -> std::unique_ptr<ReadBuffer>
{
StorageS3Source::KeyWithInfo key_with_info;
std::unique_ptr<ReadBuffer> buf;
while (true)
{
key_with_info = (*file_iterator)();
auto key_with_info = (*file_iterator)();
if (key_with_info.key.empty())
{
@ -1488,16 +1485,11 @@ ColumnsDescription StorageS3::getTableStructureFromDataImpl(
continue;
int zstd_window_log_max = static_cast<int>(ctx->getSettingsRef().zstd_window_log_max);
buf = wrapReadBufferWithCompressionMethod(
std::make_unique<ReadBufferFromS3>(
configuration.client, configuration.url.bucket, key_with_info.key, configuration.url.version_id, configuration.request_settings, ctx->getReadSettings()),
chooseCompressionMethod(key_with_info.key, configuration.compression_method),
zstd_window_log_max);
if (!ctx->getSettingsRef().s3_skip_empty_files || !buf->eof())
auto impl = std::make_unique<ReadBufferFromS3>(configuration.client, configuration.url.bucket, key_with_info.key, configuration.url.version_id, configuration.request_settings, ctx->getReadSettings());
if (!ctx->getSettingsRef().s3_skip_empty_files || !impl->eof())
{
first = false;
return buf;
return wrapReadBufferWithCompressionMethod(std::move(impl), chooseCompressionMethod(key_with_info.key, configuration.compression_method), zstd_window_log_max);
}
}
};

View File

@ -368,6 +368,7 @@ std::pair<Poco::URI, std::unique_ptr<ReadWriteBufferFromHTTP>> StorageURLSource:
ReadSettings read_settings = context->getReadSettings();
size_t options = std::distance(option, end);
std::pair<Poco::URI, std::unique_ptr<ReadWriteBufferFromHTTP>> last_skipped_empty_res;
for (; option != end; ++option)
{
bool skip_url_not_found_error = glob_url && read_settings.http_skip_not_found_url_for_globs && option == std::prev(end);
@ -397,6 +398,12 @@ std::pair<Poco::URI, std::unique_ptr<ReadWriteBufferFromHTTP>> StorageURLSource:
/* use_external_buffer */ false,
/* skip_url_not_found_error */ skip_url_not_found_error);
if (context->getSettingsRef().engine_url_skip_empty_files && res->eof() && option != std::prev(end))
{
last_skipped_empty_res = {request_uri, std::move(res)};
continue;
}
return std::make_tuple(request_uri, std::move(res));
}
catch (...)
@ -413,6 +420,11 @@ std::pair<Poco::URI, std::unique_ptr<ReadWriteBufferFromHTTP>> StorageURLSource:
}
}
/// If all options are unreachable except empty ones that we skipped,
/// return last empty result. It will be skipped later.
if (last_skipped_empty_res.second)
return last_skipped_empty_res;
throw Exception(ErrorCodes::NETWORK_ERROR, "All uri ({}) options are unreachable: {}", options, first_exception_message);
}

View File

@ -1768,3 +1768,15 @@ def test_skip_empty_files(started_cluster):
test("s3", "s3_skip_empty_files")
test("url", "engine_url_skip_empty_files")
res = instance.query(
f"select * from url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/skip_empty_files{{1|2}}.parquet') settings engine_url_skip_empty_files=1"
)
assert int(res) == 0
res = instance.query(
f"select * from url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/skip_empty_files{{11|1|22}}.parquet') settings engine_url_skip_empty_files=1"
)
assert len(res.strip()) == 0