Apply PR comments

This commit is contained in:
Antonio Andelic 2023-08-30 08:40:55 +00:00
parent ddb58217d4
commit f406019413
4 changed files with 103 additions and 64 deletions

View File

@ -49,7 +49,6 @@
#include <Common/filesystemHelpers.h>
#include <Common/logger_useful.h>
#include <Common/ProfileEvents.h>
#include "IO/ReadBufferFromFileBase.h"
#include <QueryPipeline/Pipe.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
@ -540,7 +539,10 @@ namespace
if (file_stat.st_size == 0)
{
if (getContext()->getSettingsRef().engine_file_skip_empty_files)
{
++current_archive_index;
continue;
}
throw Exception(
ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE,
@ -552,7 +554,7 @@ namespace
auto archive_reader = createArchiveReader(archive);
auto check_schema_cache = [&](const std::string & full_path) -> std::optional<ColumnsDescription>
auto try_get_columns_from_schema_cache = [&](const std::string & full_path) -> std::optional<ColumnsDescription>
{
auto context = getContext();
if (!getContext()->getSettingsRef().schema_inference_use_cache_for_file)
@ -576,27 +578,27 @@ namespace
return std::nullopt;
};
if (archive_info.readSingleFile())
if (archive_info.isSingleFileRead())
{
read_buf = archive_reader->readFile(archive_info.path_in_archive, false);
++current_archive_index;
if (!read_buf)
continue;
const auto & full_path = processed_files.emplace_back(fmt::format("{}::{}", archive_reader->getPath(), archive_info.path_in_archive));
columns_from_cache = check_schema_cache(full_path);
last_read_file_path = processed_files.emplace_back(fmt::format("{}::{}", archive_reader->getPath(), archive_info.path_in_archive));
columns_from_cache = try_get_columns_from_schema_cache(last_read_file_path);
if (columns_from_cache)
return nullptr;
}
else
{
auto & read_files = read_files_from_archive[current_archive_index];
auto file_enumerator = archive_reader->firstFile();
if (!file_enumerator)
{
if (getContext()->getSettingsRef().engine_file_skip_empty_files)
{
read_files_from_archive.clear();
++current_archive_index;
continue;
}
@ -610,7 +612,7 @@ namespace
}
const auto * filename = &file_enumerator->getFileName();
while (read_files.contains(*filename) || !archive_info.filter(*filename))
while (read_files_from_archive.contains(*filename) || !archive_info.filter(*filename))
{
if (!file_enumerator->nextFile())
{
@ -623,19 +625,19 @@ namespace
if (!archive_reader)
{
read_files_from_archive.clear();
++current_archive_index;
continue;
}
const auto & full_path = processed_files.emplace_back(fmt::format("{}::{}", archive_reader->getPath(), *filename));
read_files.insert(*filename);
read_buf = archive_reader->readFile(std::move(file_enumerator));
columns_from_cache = check_schema_cache(full_path);
last_read_file_path = processed_files.emplace_back(fmt::format("{}::{}", archive_reader->getPath(), *filename));
columns_from_cache = try_get_columns_from_schema_cache(last_read_file_path);
if (columns_from_cache)
return nullptr;
read_files_from_archive.insert(*filename);
read_buf = archive_reader->readFile(std::move(file_enumerator));
}
break;
@ -645,8 +647,13 @@ namespace
return read_buf;
}
void setNumRowsToLastFile(size_t /*num_rows*/) override
void setNumRowsToLastFile(size_t num_rows) override
{
if (!getContext()->getSettingsRef().use_cache_for_count_from_files)
return;
auto key = getKeyForSchemaCache(last_read_file_path, format, format_settings, getContext());
StorageFile::getSchemaCache(getContext()).addNumRows(key, num_rows);
}
std::vector<std::string> processed_files;
@ -655,10 +662,12 @@ namespace
const StorageFile::ArchiveInfo & archive_info;
size_t current_archive_index = 0;
std::unordered_map<size_t, std::unordered_set<std::string>> read_files_from_archive;
std::unordered_set<std::string> read_files_from_archive;
bool is_first = true;
std::string last_read_file_path;
String format;
const std::optional<FormatSettings> & format_settings;
};
@ -758,7 +767,7 @@ ColumnsDescription StorageFile::getTableStructureFromFile(
format,
format_settings,
read_buffer_iterator,
/*retry=*/archive_info->paths_to_archives.size() > 1 || !archive_info->readSingleFile(),
/*retry=*/archive_info->paths_to_archives.size() > 1 || !archive_info->isSingleFileRead(),
context);
}
catch (const DB::Exception & e)
@ -963,7 +972,7 @@ public:
String next()
{
const auto & fs = readFromArchive() ? archive_info->paths_to_archives : files;
const auto & fs = isReadFromArchive() ? archive_info->paths_to_archives : files;
auto current_index = index.fetch_add(1, std::memory_order_relaxed);
if (current_index >= fs.size())
@ -972,7 +981,7 @@ public:
return fs[current_index];
}
bool readFromArchive() const
bool isReadFromArchive() const
{
return archive_info.has_value();
}
@ -982,9 +991,9 @@ public:
return archive_info->filter(path);
}
bool readSingleFileFromArchive() const
bool isSingleFileReadFromArchive() const
{
return archive_info->readSingleFile();
return archive_info->isSingleFileRead();
}
const String & getFileNameInArchive()
@ -1101,6 +1110,32 @@ public:
return storage->getName();
}
bool tryGetCountFromCache(const struct stat & file_stat)
{
if (!context->getSettingsRef().use_cache_for_count_from_files)
return false;
auto num_rows_from_cache = tryGetNumRowsFromCache(current_path, file_stat.st_mtime);
if (!num_rows_from_cache)
return false;
/// We should not return single chunk with all number of rows,
/// because there is a chance that this chunk will be materialized later
/// (it can cause memory problems even with default values in columns or when virtual columns are requested).
/// Instead, we use special ConstChunkGenerator that will generate chunks
/// with max_block_size rows until total number of rows is reached.
auto const_chunk_generator = std::make_shared<ConstChunkGenerator>(block_for_format, *num_rows_from_cache, max_block_size);
QueryPipelineBuilder builder;
builder.init(Pipe(const_chunk_generator));
builder.addSimpleTransform([&](const Block & header)
{
return std::make_shared<ExtractColumnsTransform>(header, requested_columns);
});
pipeline = std::make_unique<QueryPipeline>(QueryPipelineBuilder::getPipeline(std::move(builder)));
reader = std::make_unique<PullingPipelineExecutor>(*pipeline);
return true;
}
Chunk generate() override
{
while (!finished_generate)
@ -1110,22 +1145,27 @@ public:
{
if (!storage->use_table_fd)
{
if (files_iterator->readFromArchive())
if (files_iterator->isReadFromArchive())
{
if (files_iterator->readSingleFileFromArchive())
struct stat file_stat;
if (files_iterator->isSingleFileReadFromArchive())
{
auto archive = files_iterator->next();
if (archive.empty())
return {};
struct stat file_stat = getFileStat(archive, storage->use_table_fd, storage->table_fd, storage->getName());
file_stat = getFileStat(archive, storage->use_table_fd, storage->table_fd, storage->getName());
if (context->getSettingsRef().engine_file_skip_empty_files && file_stat.st_size == 0)
continue;
archive_reader = createArchiveReader(archive);
filename = files_iterator->getFileNameInArchive();
filename_override = files_iterator->getFileNameInArchive();
read_buf = archive_reader->readFile(filename, /*throw_on_not_found=*/false);
current_path = fmt::format("{}::{}", archive_reader->getPath(), *filename_override);
if (need_only_count && tryGetCountFromCache(file_stat))
continue;
read_buf = archive_reader->readFile(*filename_override, /*throw_on_not_found=*/false);
if (!read_buf)
continue;
}
@ -1139,7 +1179,7 @@ public:
if (archive.empty())
return {};
struct stat file_stat = getFileStat(archive, storage->use_table_fd, storage->table_fd, storage->getName());
file_stat = getFileStat(archive, storage->use_table_fd, storage->table_fd, storage->getName());
if (context->getSettingsRef().engine_file_skip_empty_files && file_stat.st_size == 0)
continue;
@ -1160,7 +1200,7 @@ public:
if (file_found)
{
filename = file_enumerator->getFileName();
filename_override = file_enumerator->getFileName();
break;
}
@ -1168,19 +1208,18 @@ public:
}
chassert(file_enumerator);
current_path = fmt::format("{}::{}", archive_reader->getPath(), *filename_override);
if (need_only_count && tryGetCountFromCache(file_stat))
continue;
read_buf = archive_reader->readFile(std::move(file_enumerator));
}
current_path = fmt::format("{}::{}", archive_reader->getPath(), filename);
}
else
{
current_path = files_iterator->next();
if (current_path.empty())
return {};
size_t last_slash_pos = current_path.find_last_of('/');
filename = current_path.substr(last_slash_pos + 1);
}
/// Special case for distributed format. Defaults are not needed here.
@ -1200,28 +1239,8 @@ public:
if (context->getSettingsRef().engine_file_skip_empty_files && file_stat.st_size == 0)
continue;
if (need_only_count && context->getSettingsRef().use_cache_for_count_from_files)
{
auto num_rows_from_cache = tryGetNumRowsFromCache(current_path, file_stat.st_mtime);
if (num_rows_from_cache)
{
/// We should not return single chunk with all number of rows,
/// because there is a chance that this chunk will be materialized later
/// (it can cause memory problems even with default values in columns or when virtual columns are requested).
/// Instead, we use special ConstChunkGenerator that will generate chunks
/// with max_block_size rows until total number of rows is reached.
auto const_chunk_generator = std::make_shared<ConstChunkGenerator>(block_for_format, *num_rows_from_cache, max_block_size);
QueryPipelineBuilder builder;
builder.init(Pipe(const_chunk_generator));
builder.addSimpleTransform([&](const Block & header)
{
return std::make_shared<ExtractColumnsTransform>(header, requested_columns);
});
pipeline = std::make_unique<QueryPipeline>(QueryPipelineBuilder::getPipeline(std::move(builder)));
reader = std::make_unique<PullingPipelineExecutor>(*pipeline);
continue;
}
}
if (need_only_count && tryGetCountFromCache(file_stat))
continue;
read_buf = createReadBuffer(current_path, file_stat, storage->use_table_fd, storage->table_fd, storage->compression_method, context);
}
@ -1277,7 +1296,8 @@ public:
progress(num_rows, chunk_size ? chunk_size : chunk.bytes());
/// Enrich with virtual columns.
VirtualColumnUtils::addRequestedPathAndFileVirtualsToChunk(chunk, requested_virtual_columns, current_path);
VirtualColumnUtils::addRequestedPathAndFileVirtualsToChunk(
chunk, requested_virtual_columns, current_path, filename_override.has_value() ? &filename_override.value() : nullptr);
return chunk;
}
@ -1295,8 +1315,18 @@ public:
pipeline.reset();
input_format.reset();
if (files_iterator->readFromArchive() && !files_iterator->readSingleFileFromArchive())
file_enumerator = archive_reader->nextFile(std::move(read_buf));
if (files_iterator->isReadFromArchive() && !files_iterator->isSingleFileReadFromArchive())
{
if (file_enumerator)
{
if (!file_enumerator->nextFile())
file_enumerator = nullptr;
}
else
{
file_enumerator = archive_reader->nextFile(std::move(read_buf));
}
}
read_buf.reset();
}
@ -1328,7 +1358,7 @@ private:
StorageSnapshotPtr storage_snapshot;
FilesIteratorPtr files_iterator;
String current_path;
String filename;
std::optional<String> filename_override;
Block sample_block;
std::unique_ptr<ReadBuffer> read_buf;
InputFormatPtr input_format;

View File

@ -90,7 +90,7 @@ public:
std::string path_in_archive; // used when reading a single file from archive
IArchiveReader::NameFilter filter = {}; // used when files inside archive are defined with a glob
bool readSingleFile() const
bool isSingleFileRead() const
{
return !filter;
}

View File

@ -340,7 +340,8 @@ ColumnPtr getFilterByPathAndFileIndexes(const std::vector<String> & paths, const
return block.getByName("_idx").column;
}
void addRequestedPathAndFileVirtualsToChunk(Chunk & chunk, const NamesAndTypesList & requested_virtual_columns, const String & path)
void addRequestedPathAndFileVirtualsToChunk(
Chunk & chunk, const NamesAndTypesList & requested_virtual_columns, const String & path, const String * filename)
{
for (const auto & virtual_column : requested_virtual_columns)
{
@ -350,9 +351,16 @@ void addRequestedPathAndFileVirtualsToChunk(Chunk & chunk, const NamesAndTypesLi
}
else if (virtual_column.name == "_file")
{
size_t last_slash_pos = path.find_last_of('/');
auto file_name = path.substr(last_slash_pos + 1);
chunk.addColumn(virtual_column.type->createColumnConst(chunk.getNumRows(), file_name));
if (filename)
{
chunk.addColumn(virtual_column.type->createColumnConst(chunk.getNumRows(), *filename));
}
else
{
size_t last_slash_pos = path.find_last_of('/');
auto filename_from_path = path.substr(last_slash_pos + 1);
chunk.addColumn(virtual_column.type->createColumnConst(chunk.getNumRows(), filename_from_path));
}
}
}
}

View File

@ -67,7 +67,8 @@ void filterByPathOrFile(std::vector<T> & sources, const std::vector<String> & pa
sources = std::move(filtered_sources);
}
void addRequestedPathAndFileVirtualsToChunk(Chunk & chunk, const NamesAndTypesList & requested_virtual_columns, const String & path);
void addRequestedPathAndFileVirtualsToChunk(
Chunk & chunk, const NamesAndTypesList & requested_virtual_columns, const String & path, const String * filename = nullptr);
}
}