Add support for file globs

This commit is contained in:
Antonio Andelic 2023-08-02 07:40:59 +00:00
parent 2430ebabf5
commit 9423976b7a
6 changed files with 139 additions and 21 deletions

View File

@ -45,16 +45,20 @@ public:
/// Starts enumerating files in the archive. /// Starts enumerating files in the archive.
virtual std::unique_ptr<FileEnumerator> firstFile() = 0; virtual std::unique_ptr<FileEnumerator> firstFile() = 0;
using NameFilter = std::function<bool(const std::string &)>;
/// Starts reading a file from the archive. The function returns a read buffer, /// Starts reading a file from the archive. The function returns a read buffer,
/// you can read that buffer to extract uncompressed data from the archive. /// you can read that buffer to extract uncompressed data from the archive.
/// Several read buffers can be used at the same time in parallel. /// Several read buffers can be used at the same time in parallel.
virtual std::unique_ptr<ReadBufferFromFileBase> readFile(const String & filename) = 0; virtual std::unique_ptr<ReadBufferFromFileBase> readFile(const String & filename) = 0;
virtual std::unique_ptr<ReadBufferFromFileBase> readFile(NameFilter filter) = 0;
/// It's possible to convert a file enumerator to a read buffer and vice versa. /// It's possible to convert a file enumerator to a read buffer and vice versa.
virtual std::unique_ptr<ReadBufferFromFileBase> readFile(std::unique_ptr<FileEnumerator> enumerator) = 0; virtual std::unique_ptr<ReadBufferFromFileBase> readFile(std::unique_ptr<FileEnumerator> enumerator) = 0;
virtual std::unique_ptr<FileEnumerator> nextFile(std::unique_ptr<ReadBuffer> read_buffer) = 0; virtual std::unique_ptr<FileEnumerator> nextFile(std::unique_ptr<ReadBuffer> read_buffer) = 0;
virtual std::vector<std::string> getAllFiles() = 0; virtual std::vector<std::string> getAllFiles() = 0;
virtual std::vector<std::string> getAllFiles(NameFilter filter) = 0;
/// Sets password used to decrypt files in the archive. /// Sets password used to decrypt files in the archive.
virtual void setPassword(const String & /* password */) {} virtual void setPassword(const String & /* password */) {}

View File

@ -49,7 +49,12 @@ public:
} }
} }
bool locateFile(const String & filename) bool locateFile(const std::string & filename)
{
return locateFile([&](const std::string & file) { return file == filename; });
}
bool locateFile(NameFilter filter)
{ {
resetFileInfo(); resetFileInfo();
int err = ARCHIVE_OK; int err = ARCHIVE_OK;
@ -63,7 +68,7 @@ public:
if (err != ARCHIVE_OK) if (err != ARCHIVE_OK)
break; break;
if (archive_entry_pathname(current_entry) == filename) if (filter(archive_entry_pathname(current_entry)))
return true; return true;
} }
@ -95,7 +100,7 @@ public:
return archive; return archive;
} }
std::vector<std::string> getAllFiles() std::vector<std::string> getAllFiles(NameFilter filter)
{ {
auto * archive = open(path_to_archive); auto * archive = open(path_to_archive);
auto * entry = archive_entry_new(); auto * entry = archive_entry_new();
@ -104,7 +109,10 @@ public:
int error = archive_read_next_header(archive, &entry); int error = archive_read_next_header(archive, &entry);
while (error == ARCHIVE_OK || error == ARCHIVE_RETRY) while (error == ARCHIVE_OK || error == ARCHIVE_RETRY)
{ {
files.push_back(archive_entry_pathname(entry)); std::string name = archive_entry_pathname(entry);
if (!filter || filter(name))
files.push_back(std::move(name));
error = archive_read_next_header(archive, &entry); error = archive_read_next_header(archive, &entry);
} }
@ -262,9 +270,15 @@ std::unique_ptr<typename LibArchiveReader<ArchiveInfo>::FileEnumerator> LibArchi
template <typename ArchiveInfo> template <typename ArchiveInfo>
std::unique_ptr<ReadBufferFromFileBase> LibArchiveReader<ArchiveInfo>::readFile(const String & filename) std::unique_ptr<ReadBufferFromFileBase> LibArchiveReader<ArchiveInfo>::readFile(const String & filename)
{
return readFile([&](const std::string & file) { return file == filename; });
}
template <typename ArchiveInfo>
std::unique_ptr<ReadBufferFromFileBase> LibArchiveReader<ArchiveInfo>::readFile(NameFilter filter)
{ {
Handle handle(path_to_archive); Handle handle(path_to_archive);
handle.locateFile(filename); handle.locateFile(filter);
return std::make_unique<ReadBufferFromLibArchive>(std::move(handle), path_to_archive); return std::make_unique<ReadBufferFromLibArchive>(std::move(handle), path_to_archive);
} }
@ -292,9 +306,15 @@ LibArchiveReader<ArchiveInfo>::nextFile(std::unique_ptr<ReadBuffer> read_buffer)
template <typename ArchiveInfo> template <typename ArchiveInfo>
std::vector<std::string> LibArchiveReader<ArchiveInfo>::getAllFiles() std::vector<std::string> LibArchiveReader<ArchiveInfo>::getAllFiles()
{
return getAllFiles({});
}
template <typename ArchiveInfo>
std::vector<std::string> LibArchiveReader<ArchiveInfo>::getAllFiles(NameFilter filter)
{ {
Handle handle(path_to_archive); Handle handle(path_to_archive);
return handle.getAllFiles(); return handle.getAllFiles(filter);
} }
template <typename ArchiveInfo> template <typename ArchiveInfo>

View File

@ -40,12 +40,14 @@ public:
/// you can read that buffer to extract uncompressed data from the archive. /// you can read that buffer to extract uncompressed data from the archive.
/// Several read buffers can be used at the same time in parallel. /// Several read buffers can be used at the same time in parallel.
std::unique_ptr<ReadBufferFromFileBase> readFile(const String & filename) override; std::unique_ptr<ReadBufferFromFileBase> readFile(const String & filename) override;
std::unique_ptr<ReadBufferFromFileBase> readFile(NameFilter filter) override;
/// It's possible to convert a file enumerator to a read buffer and vice versa. /// It's possible to convert a file enumerator to a read buffer and vice versa.
std::unique_ptr<ReadBufferFromFileBase> readFile(std::unique_ptr<FileEnumerator> enumerator) override; std::unique_ptr<ReadBufferFromFileBase> readFile(std::unique_ptr<FileEnumerator> enumerator) override;
std::unique_ptr<FileEnumerator> nextFile(std::unique_ptr<ReadBuffer> read_buffer) override; std::unique_ptr<FileEnumerator> nextFile(std::unique_ptr<ReadBuffer> read_buffer) override;
std::vector<std::string> getAllFiles() override; std::vector<std::string> getAllFiles() override;
std::vector<std::string> getAllFiles(NameFilter filter) override;
/// Sets password used to decrypt the contents of the files in the archive. /// Sets password used to decrypt the contents of the files in the archive.
void setPassword(const String & password_) override; void setPassword(const String & password_) override;

View File

@ -86,6 +86,26 @@ public:
file_name = file_name_; file_name = file_name_;
} }
void locateFile(NameFilter filter)
{
int err = unzGoToFirstFile(raw_handle);
if (err == UNZ_END_OF_LIST_OF_FILE)
showError("No file was found satisfying the filter");
do
{
checkResult(err);
resetFileInfo();
retrieveFileInfo();
if (filter(getFileName()))
return;
err = unzGoToNextFile(raw_handle);
} while (err != UNZ_END_OF_LIST_OF_FILE);
showError("No file was found satisfying the filter");
}
bool tryLocateFile(const String & file_name_) bool tryLocateFile(const String & file_name_)
{ {
resetFileInfo(); resetFileInfo();
@ -132,7 +152,7 @@ public:
return *file_info; return *file_info;
} }
std::vector<std::string> getAllFiles() std::vector<std::string> getAllFiles(NameFilter filter)
{ {
std::vector<std::string> files; std::vector<std::string> files;
resetFileInfo(); resetFileInfo();
@ -145,7 +165,8 @@ public:
checkResult(err); checkResult(err);
resetFileInfo(); resetFileInfo();
retrieveFileInfo(); retrieveFileInfo();
files.push_back(*file_name); if (!filter || filter(getFileName()))
files.push_back(*file_name);
err = unzGoToNextFile(raw_handle); err = unzGoToNextFile(raw_handle);
} while (err != UNZ_END_OF_LIST_OF_FILE); } while (err != UNZ_END_OF_LIST_OF_FILE);
@ -512,6 +533,13 @@ std::unique_ptr<ReadBufferFromFileBase> ZipArchiveReader::readFile(const String
return std::make_unique<ReadBufferFromZipArchive>(std::move(handle)); return std::make_unique<ReadBufferFromZipArchive>(std::move(handle));
} }
std::unique_ptr<ReadBufferFromFileBase> ZipArchiveReader::readFile(NameFilter filter)
{
auto handle = acquireHandle();
handle.locateFile(filter);
return std::make_unique<ReadBufferFromZipArchive>(std::move(handle));
}
std::unique_ptr<ReadBufferFromFileBase> ZipArchiveReader::readFile(std::unique_ptr<FileEnumerator> enumerator) std::unique_ptr<ReadBufferFromFileBase> ZipArchiveReader::readFile(std::unique_ptr<FileEnumerator> enumerator)
{ {
if (!dynamic_cast<FileEnumeratorImpl *>(enumerator.get())) if (!dynamic_cast<FileEnumeratorImpl *>(enumerator.get()))
@ -533,9 +561,14 @@ std::unique_ptr<ZipArchiveReader::FileEnumerator> ZipArchiveReader::nextFile(std
} }
std::vector<std::string> ZipArchiveReader::getAllFiles() std::vector<std::string> ZipArchiveReader::getAllFiles()
{
return getAllFiles({});
}
std::vector<std::string> ZipArchiveReader::getAllFiles(NameFilter filter)
{ {
auto handle = acquireHandle(); auto handle = acquireHandle();
return handle.getAllFiles(); return handle.getAllFiles(filter);
} }
void ZipArchiveReader::setPassword(const String & password_) void ZipArchiveReader::setPassword(const String & password_)

View File

@ -42,12 +42,14 @@ public:
/// you can read that buffer to extract uncompressed data from the archive. /// you can read that buffer to extract uncompressed data from the archive.
/// Several read buffers can be used at the same time in parallel. /// Several read buffers can be used at the same time in parallel.
std::unique_ptr<ReadBufferFromFileBase> readFile(const String & filename) override; std::unique_ptr<ReadBufferFromFileBase> readFile(const String & filename) override;
std::unique_ptr<ReadBufferFromFileBase> readFile(NameFilter filter) override;
/// It's possible to convert a file enumerator to a read buffer and vice versa. /// It's possible to convert a file enumerator to a read buffer and vice versa.
std::unique_ptr<ReadBufferFromFileBase> readFile(std::unique_ptr<FileEnumerator> enumerator) override; std::unique_ptr<ReadBufferFromFileBase> readFile(std::unique_ptr<FileEnumerator> enumerator) override;
std::unique_ptr<FileEnumerator> nextFile(std::unique_ptr<ReadBuffer> read_buffer) override; std::unique_ptr<FileEnumerator> nextFile(std::unique_ptr<ReadBuffer> read_buffer) override;
std::vector<std::string> getAllFiles() override; std::vector<std::string> getAllFiles() override;
std::vector<std::string> getAllFiles(NameFilter filter) override;
/// Sets password used to decrypt the contents of the files in the archive. /// Sets password used to decrypt the contents of the files in the archive.
void setPassword(const String & password_) override; void setPassword(const String & password_) override;

View File

@ -57,7 +57,6 @@
#include <cmath> #include <cmath>
#include <algorithm> #include <algorithm>
namespace ProfileEvents namespace ProfileEvents
{ {
extern const Event CreatedReadBufferOrdinary; extern const Event CreatedReadBufferOrdinary;
@ -387,7 +386,23 @@ std::unique_ptr<ReadBuffer> createReadBuffer(
if (!path_to_archive.empty()) if (!path_to_archive.empty())
{ {
auto reader = createArchiveReader(path_to_archive); auto reader = createArchiveReader(path_to_archive);
return reader->readFile(current_path);
if (current_path.find_first_of("*?{") != std::string::npos)
{
auto matcher = std::make_shared<re2::RE2>(makeRegexpPatternFromGlobs(current_path));
if (!matcher->ok())
throw Exception(ErrorCodes::CANNOT_COMPILE_REGEXP,
"Cannot compile regex from glob ({}): {}", current_path, matcher->error());
return reader->readFile([matcher = std::move(matcher)](const std::string & path)
{
return re2::RE2::FullMatch(path, *matcher);
});
}
else
{
return reader->readFile(current_path);
}
} }
if (use_table_fd) if (use_table_fd)
@ -529,14 +544,30 @@ ColumnsDescription StorageFile::getTableStructureFromFile(
} }
else else
{ {
read_buffer_iterator = [&, path_it = paths.begin(), archive_it = paths_to_archive.begin()](ColumnsDescription &) mutable -> std::unique_ptr<ReadBuffer> read_buffer_iterator = [&, path_it = paths.begin(), archive_it = paths_to_archive.begin(), first = true](ColumnsDescription &) mutable -> std::unique_ptr<ReadBuffer>
{ {
if (archive_it == paths_to_archive.end()) String path;
return nullptr; struct stat file_stat;
do
{
if (archive_it == paths_to_archive.end())
{
if (first)
throw Exception(
ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE,
"Cannot extract table structure from {} format file, because all files are empty. You must specify table structure manually",
format);
return nullptr;
}
auto file_stat = getFileStat(*archive_it, false, -1, "File"); path = *archive_it++;
file_stat = getFileStat(path, false, -1, "File");
}
while (context->getSettingsRef().engine_file_skip_empty_files && file_stat.st_size == 0);
first = false;
return createReadBuffer(*path_it, file_stat, false, -1, compression_method, context, path);
return createReadBuffer(*path_it, file_stat, false, -1, compression_method, context, *archive_it);
}; };
} }
@ -1012,13 +1043,39 @@ Pipe StorageFile::read(
if (!paths_to_archive.empty()) if (!paths_to_archive.empty())
{ {
if (paths.size() != 1)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Multiple paths defined for reading from archive");
const auto & path = paths[0];
IArchiveReader::NameFilter filter;
if (path.find_first_of("*?{") != std::string::npos)
{
auto matcher = std::make_shared<re2::RE2>(makeRegexpPatternFromGlobs(path));
if (!matcher->ok())
throw Exception(ErrorCodes::CANNOT_COMPILE_REGEXP,
"Cannot compile regex from glob ({}): {}", path, matcher->error());
filter = [matcher](const std::string & p)
{
return re2::RE2::FullMatch(p, *matcher);
};
}
for (size_t i = 0; i < paths_to_archive.size(); ++i) for (size_t i = 0; i < paths_to_archive.size(); ++i)
{ {
const auto & path_to_archive = paths_to_archive[i]; if (filter)
auto archive_reader = createArchiveReader(path_to_archive); {
auto files = archive_reader->getAllFiles(); const auto & path_to_archive = paths_to_archive[i];
for (auto & file : files) auto archive_reader = createArchiveReader(path_to_archive);
files_in_archive.push_back({i, std::move(file)}); auto files = archive_reader->getAllFiles(filter);
for (auto & file : files)
files_in_archive.push_back({i, std::move(file)});
}
else
{
files_in_archive.push_back({i, path});
}
} }
} }