Improve code for reading from archives

This commit is contained in:
Antonio Andelic 2023-09-05 08:37:02 +00:00
parent b2821106fd
commit a51c333c5f

View File

@ -520,7 +520,6 @@ namespace
std::unique_ptr<ReadBuffer> next() override
{
std::unique_ptr<ReadBuffer> read_buf;
struct stat file_stat;
while (true)
{
if (current_archive_index == archive_info.paths_to_archives.size())
@ -535,6 +534,7 @@ namespace
}
const auto & archive = archive_info.paths_to_archives[current_archive_index];
struct stat file_stat;
file_stat = getFileStat(archive, false, -1, "File");
if (file_stat.st_size == 0)
{
@ -554,30 +554,6 @@ namespace
auto archive_reader = createArchiveReader(archive);
auto try_get_columns_from_schema_cache = [&](const std::string & full_path) -> std::optional<ColumnsDescription>
{
auto context = getContext();
if (!getContext()->getSettingsRef().schema_inference_use_cache_for_file)
return std::nullopt;
auto & schema_cache = StorageFile::getSchemaCache(context);
auto get_last_mod_time = [&]() -> std::optional<time_t>
{
if (0 != stat(archive_reader->getPath().c_str(), &file_stat))
return std::nullopt;
return file_stat.st_mtime;
};
auto cache_key = getKeyForSchemaCache(full_path, format, format_settings, context);
auto columns = schema_cache.tryGetColumns(cache_key, get_last_mod_time);
if (columns)
return columns;
return std::nullopt;
};
if (archive_info.isSingleFileRead())
{
read_buf = archive_reader->readFile(archive_info.path_in_archive, false);
@ -586,7 +562,7 @@ namespace
continue;
last_read_file_path = processed_files.emplace_back(fmt::format("{}::{}", archive_reader->getPath(), archive_info.path_in_archive));
columns_from_cache = try_get_columns_from_schema_cache(last_read_file_path);
columns_from_cache = tryGetColumnsFromSchemaCache(archive, last_read_file_path);
if (columns_from_cache)
return nullptr;
@ -596,19 +572,9 @@ namespace
auto file_enumerator = archive_reader->firstFile();
if (!file_enumerator)
{
if (getContext()->getSettingsRef().engine_file_skip_empty_files)
{
read_files_from_archive.clear();
++current_archive_index;
continue;
}
throw Exception(
ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE,
"Cannot extract table structure from {} format file, because the archive {} has no files. "
"You must specify table structure manually",
format,
archive);
read_files_from_archive.clear();
++current_archive_index;
continue;
}
const auto * filename = &file_enumerator->getFileName();
@ -631,7 +597,7 @@ namespace
}
last_read_file_path = processed_files.emplace_back(fmt::format("{}::{}", archive_reader->getPath(), *filename));
columns_from_cache = try_get_columns_from_schema_cache(last_read_file_path);
columns_from_cache = tryGetColumnsFromSchemaCache(archive, last_read_file_path);
if (columns_from_cache)
return nullptr;
@ -647,6 +613,11 @@ namespace
return read_buf;
}
std::optional<ColumnsDescription> getCachedColumns() override
{
return columns_from_cache;
}
void setNumRowsToLastFile(size_t num_rows) override
{
if (!getContext()->getSettingsRef().use_cache_for_count_from_files)
@ -657,8 +628,33 @@ namespace
}
std::vector<std::string> processed_files;
std::optional<ColumnsDescription> columns_from_cache;
private:
std::optional<ColumnsDescription> tryGetColumnsFromSchemaCache(const std::string & archive_path, const std::string & full_path)
{
auto context = getContext();
if (!getContext()->getSettingsRef().schema_inference_use_cache_for_file)
return std::nullopt;
struct stat file_stat;
auto & schema_cache = StorageFile::getSchemaCache(context);
auto get_last_mod_time = [&]() -> std::optional<time_t>
{
if (0 != stat(archive_path.c_str(), &file_stat))
return std::nullopt;
return file_stat.st_mtime;
};
auto cache_key = getKeyForSchemaCache(full_path, format, format_settings, context);
auto columns = schema_cache.tryGetColumns(cache_key, get_last_mod_time);
if (columns)
return columns;
return std::nullopt;
}
const StorageFile::ArchiveInfo & archive_info;
size_t current_archive_index = 0;
@ -668,9 +664,41 @@ namespace
std::string last_read_file_path;
std::optional<ColumnsDescription> columns_from_cache;
String format;
const std::optional<FormatSettings> & format_settings;
};
std::optional<ColumnsDescription> tryGetColumnsFromCacheForArchives(
const StorageFile::ArchiveInfo & archive_info,
std::vector<std::string> & paths_for_schema_cache,
const String & format,
const std::optional<FormatSettings> & format_settings,
const ContextPtr & context)
{
struct stat file_stat{};
std::optional<ColumnsDescription> columns_from_cache;
for (const auto & archive : archive_info.paths_to_archives)
{
const auto & full_path = paths_for_schema_cache.emplace_back(fmt::format("{}::{}", archive, archive_info.path_in_archive));
auto & schema_cache = StorageFile::getSchemaCache(context);
auto get_last_mod_time = [&]() -> std::optional<time_t>
{
if (0 != stat(archive.c_str(), &file_stat))
return std::nullopt;
return file_stat.st_mtime;
};
auto cache_key = getKeyForSchemaCache(full_path, format, format_settings, context);
columns_from_cache = schema_cache.tryGetColumns(cache_key, get_last_mod_time);
}
return columns_from_cache;
}
}
ColumnsDescription StorageFile::getTableStructureFromFileDescriptor(ContextPtr context)
@ -724,95 +752,46 @@ ColumnsDescription StorageFile::getTableStructureFromFile(
"You must specify table structure manually", format);
ColumnsDescription columns;
if (archive_info)
std::vector<std::string> archive_paths_for_schema_cache;
std::optional<ColumnsDescription> columns_from_cache;
if (context->getSettingsRef().schema_inference_use_cache_for_file)
{
std::vector<std::string> paths_for_schema_cache;
std::optional<ColumnsDescription> columns_from_cache;
if (context->getSettingsRef().schema_inference_use_cache_for_file)
{
paths_for_schema_cache.reserve(archive_info->paths_to_archives.size());
struct stat file_stat{};
for (const auto & archive : archive_info->paths_to_archives)
{
const auto & full_path = paths_for_schema_cache.emplace_back(fmt::format("{}::{}", archive, archive_info->path_in_archive));
if (!columns_from_cache)
{
auto & schema_cache = getSchemaCache(context);
auto get_last_mod_time = [&]() -> std::optional<time_t>
{
if (0 != stat(archive.c_str(), &file_stat))
return std::nullopt;
return file_stat.st_mtime;
};
auto cache_key = getKeyForSchemaCache(full_path, format, format_settings, context);
columns_from_cache = schema_cache.tryGetColumns(cache_key, get_last_mod_time);
}
}
}
if (columns_from_cache)
{
columns = std::move(*columns_from_cache);
}
if (archive_info)
columns_from_cache = tryGetColumnsFromCacheForArchives(*archive_info, archive_paths_for_schema_cache, format, format_settings, context);
else
{
ReadBufferFromArchiveIterator read_buffer_iterator(*archive_info, format, format_settings, context);
try
{
columns = readSchemaFromFormat(
format,
format_settings,
read_buffer_iterator,
/*retry=*/archive_info->paths_to_archives.size() > 1 || !archive_info->isSingleFileRead(),
context);
}
catch (const DB::Exception & e)
{
/// maybe we found something in cache while iterating files
if (e.code() == ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE)
{
if (read_buffer_iterator.columns_from_cache)
columns = std::move(*read_buffer_iterator.columns_from_cache);
else
throw;
}
else
{
throw;
}
}
columns_from_cache = tryGetColumnsFromCache(paths, format, format_settings, context);
}
for (auto & file : read_buffer_iterator.processed_files)
paths_for_schema_cache.push_back(std::move(file));
}
if (context->getSettingsRef().schema_inference_use_cache_for_file)
addColumnsToCache(paths_for_schema_cache, columns, format, format_settings, context);
if (columns_from_cache)
{
columns = std::move(*columns_from_cache);
}
else
{
std::optional<ColumnsDescription> columns_from_cache;
if (context->getSettingsRef().schema_inference_use_cache_for_file)
columns_from_cache = tryGetColumnsFromCache(paths, format, format_settings, context);
if (columns_from_cache)
if (archive_info)
{
columns = *columns_from_cache;
ReadBufferFromArchiveIterator read_buffer_iterator(*archive_info, format, format_settings, context);
columns = readSchemaFromFormat(
format,
format_settings,
read_buffer_iterator,
/*retry=*/archive_info->paths_to_archives.size() > 1 || !archive_info->isSingleFileRead(),
context);
for (auto & file : read_buffer_iterator.processed_files)
archive_paths_for_schema_cache.push_back(std::move(file));
}
else
{
ReadBufferFromFileIterator read_buffer_iterator(paths, format, compression_method, format_settings, context);
columns = readSchemaFromFormat(format, format_settings, read_buffer_iterator, paths.size() > 1, context);
}
if (context->getSettingsRef().schema_inference_use_cache_for_file)
addColumnsToCache(archive_info ? archive_info->paths_to_archives : paths, columns, format, format_settings, context);
}
if (context->getSettingsRef().schema_inference_use_cache_for_file)
addColumnsToCache(archive_info.has_value() ? archive_paths_for_schema_cache : paths, columns, format, format_settings, context);
return columns;
}