Address comments

This commit is contained in:
avogar 2023-06-13 14:43:50 +00:00
parent e4838725e3
commit 2e1f56ae33
8 changed files with 177 additions and 147 deletions

View File

@ -99,4 +99,4 @@ For partitioning by month, use the `toYYYYMM(date_column)` expression, where `da
- [engine_file_truncate_on_insert](/docs/en/operations/settings/settings.md#engine-file-truncate-on-insert) - allows to truncate file before insert into it. Disabled by default.
- [engine_file_allow_create_multiple_files](/docs/en/operations/settings/settings.md#engine_file_allow_create_multiple_files) - allows to create a new file on each insert if format has suffix. Disabled by default.
- [engine_file_skip_empty_files](/docs/en/operations/settings/settings.md#engine_file_skip_empty_files) - allows to skip empty files while reading. Disabled by default.
- [storage_file_read_method](/docs/en/operations/settings/settings.md#engine-file-emptyif-not-exists) - method of reading data from storage file, one of: read, pread, mmap (only for clickhouse-local). Default value: `pread` for clickhouse-server, `mmap` for clickhouse-local.
- [storage_file_read_method](/docs/en/operations/settings/settings.md#engine-file-emptyif-not-exists) - method of reading data from storage file, one of: `read`, `pread`, `mmap`. The mmap method does not apply to clickhouse-server (it's intended for clickhouse-local). Default value: `pread` for clickhouse-server, `mmap` for clickhouse-local.

View File

@ -3332,7 +3332,7 @@ Enables or disables creating a new file on each insert in file engine tables if
Possible values:
- 0 — `INSERT` query appends new data to the end of the file.
- 1 — `INSERT` query replaces existing content of the file with the new data.
- 1 — `INSERT` query creates a new file.
Default value: `0`.
@ -3370,7 +3370,7 @@ initial: `data.Parquet.gz` -> `data.1.Parquet.gz` -> `data.2.Parquet.gz`, etc.
Possible values:
- 0 — `INSERT` query appends new data to the end of the file.
- 1 — `INSERT` query replaces existing content of the file with the new data.
- 1 — `INSERT` query creates a new file.
Default value: `0`.
@ -3402,7 +3402,7 @@ initial: `data.Parquet.gz` -> `data.1.Parquet.gz` -> `data.2.Parquet.gz`, etc.
Possible values:
- 0 — `INSERT` query appends new data to the end of the file.
- 1 — `INSERT` query replaces existing content of the file with the new data.
- 1 — `INSERT` query creates a new file.
Default value: `0`.

View File

@ -18,7 +18,7 @@ file(path [,format] [,structure] [,compression])
**Parameters**
- `path` — The relative path to the file from [user_files_path](/docs/en/operations/server-configuration-parameters/settings.md/#server_configuration_parameters-user_files_path). Path to file support following globs in read-only mode: `*`, `?`, `{abc,def}` and `{N..M}` where `N`, `M` — numbers, `'abc', 'def'` — strings.
- `path` — The relative path to the file from [user_files_path](/docs/en/operations/server-configuration-parameters/settings.md#server_configuration_parameters-user_files_path). Path to file support following globs in read-only mode: `*`, `?`, `{abc,def}` and `{N..M}` where `N`, `M` — numbers, `'abc', 'def'` — strings.
- `format` — The [format](/docs/en/interfaces/formats.md#formats) of the file.
- `structure` — Structure of the table. Format: `'column1_name column1_type, column2_name column2_type, ...'`.
- `compression` — The existing compression type when used in a `SELECT` query, or the desired compression type when used in an `INSERT` query. The supported compression types are `gz`, `br`, `xz`, `zst`, `lz4`, and `bz2`.

View File

@ -140,14 +140,6 @@ namespace
return LSWithRegexpMatching("/", fs, path_from_uri);
}
size_t getFileSize(const String & path_from_uri, const String & uri_without_path, ContextPtr context)
{
HDFSBuilderWrapper builder = createHDFSBuilder(uri_without_path + "/", context->getGlobalContext()->getConfigRef());
HDFSFSPtr fs = createHDFSFS(builder.get());
auto * info = hdfsGetPathInfo(fs.get(), path_from_uri.data());
return info->mSize;
}
}
StorageHDFS::StorageHDFS(
@ -218,26 +210,36 @@ ColumnsDescription StorageHDFS::getTableStructureFromData(
ReadBufferIterator read_buffer_iterator
= [&, my_uri_without_path = uri_without_path, it = paths_with_info.begin(), first = true](
ColumnsDescription & columns) mutable -> std::unique_ptr<ReadBuffer>
ColumnsDescription &) mutable -> std::unique_ptr<ReadBuffer>
{
if (it == paths_with_info.end())
PathWithInfo path_with_info;
std::unique_ptr<ReadBuffer> buf;
while (true)
{
if (first)
throw Exception(ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE,
"Cannot extract table structure from {} format file, because all files are empty. "
"You must specify table structure manually", format);
return nullptr;
if (it == paths_with_info.end())
{
if (first)
throw Exception(ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE,
"Cannot extract table structure from {} format file, because all files are empty. "
"You must specify table structure manually", format);
return nullptr;
}
path_with_info = *it++;
if (ctx->getSettingsRef().hdfs_skip_empty_files && path_with_info.info && path_with_info.info->size == 0)
continue;
auto compression = chooseCompressionMethod(path_with_info.path, compression_method);
auto impl = std::make_unique<ReadBufferFromHDFS>(my_uri_without_path, path_with_info.path, ctx->getGlobalContext()->getConfigRef(), ctx->getReadSettings());
const Int64 zstd_window_log_max = ctx->getSettingsRef().zstd_window_log_max;
buf = wrapReadBufferWithCompressionMethod(std::move(impl), compression, static_cast<int>(zstd_window_log_max));
if (!ctx->getSettingsRef().hdfs_skip_empty_files || !buf->eof())
{
first = false;
return buf;
}
}
auto path_with_info = *it++;
if (ctx->getSettingsRef().hdfs_skip_empty_files && path_with_info.info && path_with_info.info->size == 0)
return read_buffer_iterator(columns);
first = false;
auto compression = chooseCompressionMethod(path_with_info.path, compression_method);
auto impl = std::make_unique<ReadBufferFromHDFS>(my_uri_without_path, path_with_info.path, ctx->getGlobalContext()->getConfigRef(), ctx->getReadSettings());
const Int64 zstd_window_log_max = ctx->getSettingsRef().zstd_window_log_max;
return wrapReadBufferWithCompressionMethod(std::move(impl), compression, static_cast<int>(zstd_window_log_max));
};
ColumnsDescription columns;
@ -362,26 +364,28 @@ HDFSSource::HDFSSource(
bool HDFSSource::initialize()
{
auto path_with_info = (*file_iterator)();
if (path_with_info.path.empty())
return false;
current_path = path_with_info.path;
const auto [path_from_uri, uri_without_path] = getPathFromUriAndUriWithoutPath(current_path);
if (getContext()->getSettingsRef().hdfs_skip_empty_files)
StorageHDFS::PathWithInfo path_with_info;
bool skip_empty_files = getContext()->getSettingsRef().hdfs_skip_empty_files;
while (true)
{
auto file_size = path_with_info.info ? path_with_info.info->size : getFileSize(path_from_uri, uri_without_path, getContext());
/// If file is empty and hdfs_skip_empty_files=1, skip it and go to the next file.
if (file_size == 0)
return initialize();
}
path_with_info = (*file_iterator)();
if (path_with_info.path.empty())
return false;
auto compression = chooseCompressionMethod(path_from_uri, storage->compression_method);
auto impl = std::make_unique<ReadBufferFromHDFS>(
uri_without_path, path_from_uri, getContext()->getGlobalContext()->getConfigRef(), getContext()->getReadSettings());
const Int64 zstd_window_log_max = getContext()->getSettingsRef().zstd_window_log_max;
read_buf = wrapReadBufferWithCompressionMethod(std::move(impl), compression, static_cast<int>(zstd_window_log_max));
if (path_with_info.info && skip_empty_files && path_with_info.info->size == 0)
continue;
current_path = path_with_info.path;
const auto [path_from_uri, uri_without_path] = getPathFromUriAndUriWithoutPath(current_path);
auto compression = chooseCompressionMethod(path_from_uri, storage->compression_method);
auto impl = std::make_unique<ReadBufferFromHDFS>(
uri_without_path, path_from_uri, getContext()->getGlobalContext()->getConfigRef(), getContext()->getReadSettings());
const Int64 zstd_window_log_max = getContext()->getSettingsRef().zstd_window_log_max;
read_buf = wrapReadBufferWithCompressionMethod(std::move(impl), compression, static_cast<int>(zstd_window_log_max));
if (!skip_empty_files || !read_buf->eof())
break;
}
auto input_format = getContext()->getInputFormat(storage->format_name, *read_buf, block_for_format, max_block_size);

View File

@ -404,21 +404,26 @@ ColumnsDescription StorageFile::getTableStructureFromFile(
if (context->getSettingsRef().schema_inference_use_cache_for_file)
columns_from_cache = tryGetColumnsFromCache(paths, format, format_settings, context);
ReadBufferIterator read_buffer_iterator = [&, it = paths.begin(), first = true](ColumnsDescription & columns) mutable -> std::unique_ptr<ReadBuffer>
ReadBufferIterator read_buffer_iterator = [&, it = paths.begin(), first = true](ColumnsDescription &) mutable -> std::unique_ptr<ReadBuffer>
{
if (it == paths.end())
String path;
struct stat file_stat;
do
{
if (first)
throw Exception(ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE,
"Cannot extract table structure from {} format file, because all files are empty. You must specify table structure manually",
format);
return nullptr;
}
if (it == paths.end())
{
if (first)
throw Exception(
ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE,
"Cannot extract table structure from {} format file, because all files are empty. You must specify table structure manually",
format);
return nullptr;
}
auto path = *it++;
auto file_stat = getFileStat(path, false, -1, "File");
if (context->getSettingsRef().engine_file_skip_empty_files && file_stat.st_size == 0)
return read_buffer_iterator(columns);
path = *it++;
file_stat = getFileStat(path, false, -1, "File");
}
while (context->getSettingsRef().engine_file_skip_empty_files && file_stat.st_size == 0);
first = false;
return createReadBuffer(path, file_stat, false, -1, compression_method, context);

View File

@ -575,19 +575,21 @@ StorageS3Source::StorageS3Source(
StorageS3Source::ReaderHolder StorageS3Source::createReader()
{
auto [current_key, info] = (*file_iterator)();
if (current_key.empty())
return {};
KeyWithInfo key_with_info;
size_t object_size;
do
{
key_with_info = (*file_iterator)();
if (key_with_info.key.empty())
return {};
size_t object_size = info ? info->size : S3::getObjectSize(*client, bucket, current_key, version_id, request_settings);
object_size = key_with_info.info ? key_with_info.info->size : S3::getObjectSize(*client, bucket, key_with_info.key, version_id, request_settings);
}
while (getContext()->getSettingsRef().s3_skip_empty_files && object_size == 0);
/// If object is empty and s3_skip_empty_files=1, skip it and go to the next key.
if (getContext()->getSettingsRef().s3_skip_empty_files && object_size == 0)
return createReader();
auto compression_method = chooseCompressionMethod(key_with_info.key, compression_hint);
auto compression_method = chooseCompressionMethod(current_key, compression_hint);
auto read_buf = createS3ReadBuffer(current_key, object_size);
auto read_buf = createS3ReadBuffer(key_with_info.key, object_size);
auto input_format = FormatFactory::instance().getInput(
format, *read_buf, sample_block, getContext(), max_block_size,
format_settings, std::nullopt, std::nullopt,
@ -606,7 +608,7 @@ StorageS3Source::ReaderHolder StorageS3Source::createReader()
auto pipeline = std::make_unique<QueryPipeline>(QueryPipelineBuilder::getPipeline(std::move(builder)));
auto current_reader = std::make_unique<PullingPipelineExecutor>(*pipeline);
return ReaderHolder{fs::path(bucket) / current_key, std::move(read_buf), std::move(pipeline), std::move(current_reader)};
return ReaderHolder{fs::path(bucket) / key_with_info.key, std::move(read_buf), std::move(pipeline), std::move(current_reader)};
}
std::future<StorageS3Source::ReaderHolder> StorageS3Source::createReaderAsync()
@ -1451,41 +1453,53 @@ ColumnsDescription StorageS3::getTableStructureFromDataImpl(
ReadBufferIterator read_buffer_iterator = [&, first = true](ColumnsDescription & cached_columns) mutable -> std::unique_ptr<ReadBuffer>
{
auto [key, info] = (*file_iterator)();
StorageS3Source::KeyWithInfo key_with_info;
std::unique_ptr<ReadBuffer> buf;
if (key.empty())
while (true)
{
if (first)
throw Exception(
ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE,
"Cannot extract table structure from {} format file, because there are no files with provided path "
"in S3 or all files are empty. You must specify table structure manually", configuration.format);
key_with_info = (*file_iterator)();
return nullptr;
}
if (ctx->getSettingsRef().s3_skip_empty_files && info->size == 0)
return read_buffer_iterator(cached_columns);
/// S3 file iterator could get new keys after new iteration, check them in schema cache.
if (ctx->getSettingsRef().schema_inference_use_cache_for_s3 && read_keys.size() > prev_read_keys_size)
{
columns_from_cache = tryGetColumnsFromCache(read_keys.begin() + prev_read_keys_size, read_keys.end(), configuration, format_settings, ctx);
prev_read_keys_size = read_keys.size();
if (columns_from_cache)
if (key_with_info.key.empty())
{
cached_columns = *columns_from_cache;
if (first)
throw Exception(
ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE,
"Cannot extract table structure from {} format file, because there are no files with provided path "
"in S3 or all files are empty. You must specify table structure manually",
configuration.format);
return nullptr;
}
}
first = false;
int zstd_window_log_max = static_cast<int>(ctx->getSettingsRef().zstd_window_log_max);
return wrapReadBufferWithCompressionMethod(
std::make_unique<ReadBufferFromS3>(
configuration.client, configuration.url.bucket, key, configuration.url.version_id, configuration.request_settings, ctx->getReadSettings()),
chooseCompressionMethod(key, configuration.compression_method),
zstd_window_log_max);
/// S3 file iterator could get new keys after new iteration, check them in schema cache.
if (ctx->getSettingsRef().schema_inference_use_cache_for_s3 && read_keys.size() > prev_read_keys_size)
{
columns_from_cache = tryGetColumnsFromCache(read_keys.begin() + prev_read_keys_size, read_keys.end(), configuration, format_settings, ctx);
prev_read_keys_size = read_keys.size();
if (columns_from_cache)
{
cached_columns = *columns_from_cache;
return nullptr;
}
}
if (ctx->getSettingsRef().s3_skip_empty_files && key_with_info.info && key_with_info.info->size == 0)
continue;
int zstd_window_log_max = static_cast<int>(ctx->getSettingsRef().zstd_window_log_max);
buf = wrapReadBufferWithCompressionMethod(
std::make_unique<ReadBufferFromS3>(
configuration.client, configuration.url.bucket, key_with_info.key, configuration.url.version_id, configuration.request_settings, ctx->getReadSettings()),
chooseCompressionMethod(key_with_info.key, configuration.compression_method),
zstd_window_log_max);
if (!ctx->getSettingsRef().s3_skip_empty_files || !buf->eof())
{
first = false;
return buf;
}
}
};
ColumnsDescription columns;

View File

@ -241,30 +241,34 @@ StorageURLSource::StorageURLSource(
/// Lazy initialization. We should not perform requests in constructor, because we need to do it in query pipeline.
initialize = [=, this]()
{
const auto current_uri_options = (*uri_iterator)();
if (current_uri_options.empty())
return false;
std::vector<String> current_uri_options;
std::pair<Poco::URI, std::unique_ptr<ReadWriteBufferFromHTTP>> uri_and_buf;
do
{
current_uri_options = (*uri_iterator)();
if (current_uri_options.empty())
return false;
auto first_option = uri_options.begin();
auto [actual_uri, buf] = getFirstAvailableURIAndReadBuffer(
first_option,
current_uri_options.end(),
context,
params,
http_method,
callback,
timeouts,
credentials,
headers,
glob_url,
current_uri_options.size() == 1);
auto first_option = current_uri_options.cbegin();
uri_and_buf = getFirstAvailableURIAndReadBuffer(
first_option,
current_uri_options.end(),
context,
params,
http_method,
callback,
timeouts,
credentials,
headers,
glob_url,
current_uri_options.size() == 1);
/// If file is empty and engine_url_skip_empty_files=1, skip it and go to the next file.
if (context->getSettingsRef().engine_url_skip_empty_files && getFileSizeFromReadBuffer(*buf) == 0)
return initialize();
/// If file is empty and engine_url_skip_empty_files=1, skip it and go to the next file.
}
while (context->getSettingsRef().engine_url_skip_empty_files && uri_and_buf.second->eof());
curr_uri = actual_uri;
read_buf = std::move(buf);
curr_uri = uri_and_buf.first;
read_buf = std::move(uri_and_buf.second);
try
{
@ -347,7 +351,7 @@ Chunk StorageURLSource::generate()
return {};
}
std::tuple<Poco::URI, std::unique_ptr<ReadWriteBufferFromHTTP>> StorageURLSource::getFirstAvailableURIAndReadBuffer(
std::pair<Poco::URI, std::unique_ptr<ReadWriteBufferFromHTTP>> StorageURLSource::getFirstAvailableURIAndReadBuffer(
std::vector<String>::const_iterator & option,
const std::vector<String>::const_iterator & end,
ContextPtr context,
@ -590,38 +594,41 @@ ColumnsDescription IStorageURLBase::getTableStructureFromData(
if (context->getSettingsRef().schema_inference_use_cache_for_url)
columns_from_cache = tryGetColumnsFromCache(urls_to_check, headers, credentials, format, format_settings, context);
ReadBufferIterator read_buffer_iterator = [&, it = urls_to_check.cbegin(), first = true](ColumnsDescription & columns) mutable -> std::unique_ptr<ReadBuffer>
ReadBufferIterator read_buffer_iterator = [&, it = urls_to_check.cbegin(), first = true](ColumnsDescription &) mutable -> std::unique_ptr<ReadBuffer>
{
if (it == urls_to_check.cend())
std::pair<Poco::URI, std::unique_ptr<ReadWriteBufferFromHTTP>> uri_and_buf;
do
{
if (first)
throw Exception(ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE,
"Cannot extract table structure from {} format file, because all files are empty. "
"You must specify table structure manually", format);
return nullptr;
}
if (it == urls_to_check.cend())
{
if (first)
throw Exception(
ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE,
"Cannot extract table structure from {} format file, because all files are empty. "
"You must specify table structure manually",
format);
return nullptr;
}
auto [_, buf] = StorageURLSource::getFirstAvailableURIAndReadBuffer(
it,
urls_to_check.cend(),
context,
{},
Poco::Net::HTTPRequest::HTTP_GET,
{},
getHTTPTimeouts(context),
credentials,
headers,
false,
false);
uri_and_buf = StorageURLSource::getFirstAvailableURIAndReadBuffer(
it,
urls_to_check.cend(),
context,
{},
Poco::Net::HTTPRequest::HTTP_GET,
{},
getHTTPTimeouts(context),
credentials,
headers,
false,
false);
++it;
if (context->getSettingsRef().engine_url_skip_empty_files && buf_factory->getFileSize() == 0)
return read_buffer_iterator(columns);
++it;
} while (context->getSettingsRef().engine_url_skip_empty_files && uri_and_buf.second->eof());
first = false;
return wrapReadBufferWithCompressionMethod(
std::move(buf),
std::move(uri_and_buf.second),
compression_method,
static_cast<int>(context->getSettingsRef().zstd_window_log_max));
};

View File

@ -183,7 +183,7 @@ public:
static Block getHeader(Block sample_block, const std::vector<NameAndTypePair> & requested_virtual_columns);
static std::tuple<Poco::URI, std::unique_ptr<ReadWriteBufferFromHTTP>> getFirstAvailableURIAndReadBuffer(
static std::pair<Poco::URI, std::unique_ptr<ReadWriteBufferFromHTTP>> getFirstAvailableURIAndReadBuffer(
std::vector<String>::const_iterator & option,
const std::vector<String>::const_iterator & end,
ContextPtr context,