From 9423976b7a88e23bd98078760a23cb86bcb179a3 Mon Sep 17 00:00:00 2001 From: Antonio Andelic Date: Wed, 2 Aug 2023 07:40:59 +0000 Subject: [PATCH] Add support for file globs --- src/IO/Archives/IArchiveReader.h | 4 ++ src/IO/Archives/LibArchiveReader.cpp | 32 ++++++++--- src/IO/Archives/LibArchiveReader.h | 2 + src/IO/Archives/ZipArchiveReader.cpp | 39 ++++++++++++-- src/IO/Archives/ZipArchiveReader.h | 2 + src/Storages/StorageFile.cpp | 81 +++++++++++++++++++++++----- 6 files changed, 139 insertions(+), 21 deletions(-) diff --git a/src/IO/Archives/IArchiveReader.h b/src/IO/Archives/IArchiveReader.h index 0b08a29099c..03e5392e970 100644 --- a/src/IO/Archives/IArchiveReader.h +++ b/src/IO/Archives/IArchiveReader.h @@ -45,16 +45,20 @@ public: /// Starts enumerating files in the archive. virtual std::unique_ptr firstFile() = 0; + using NameFilter = std::function; + /// Starts reading a file from the archive. The function returns a read buffer, /// you can read that buffer to extract uncompressed data from the archive. /// Several read buffers can be used at the same time in parallel. virtual std::unique_ptr readFile(const String & filename) = 0; + virtual std::unique_ptr readFile(NameFilter filter) = 0; /// It's possible to convert a file enumerator to a read buffer and vice versa. virtual std::unique_ptr readFile(std::unique_ptr enumerator) = 0; virtual std::unique_ptr nextFile(std::unique_ptr read_buffer) = 0; virtual std::vector getAllFiles() = 0; + virtual std::vector getAllFiles(NameFilter filter) = 0; /// Sets password used to decrypt files in the archive. virtual void setPassword(const String & /* password */) {} diff --git a/src/IO/Archives/LibArchiveReader.cpp b/src/IO/Archives/LibArchiveReader.cpp index f3657d5908b..0e0d035d98b 100644 --- a/src/IO/Archives/LibArchiveReader.cpp +++ b/src/IO/Archives/LibArchiveReader.cpp @@ -49,7 +49,12 @@ public: } } - bool locateFile(const String & filename) + bool locateFile(const std::string & filename) + { + return locateFile([&](const std::string & file) { return file == filename; }); + } + + bool locateFile(NameFilter filter) { resetFileInfo(); int err = ARCHIVE_OK; @@ -63,7 +68,7 @@ public: if (err != ARCHIVE_OK) break; - if (archive_entry_pathname(current_entry) == filename) + if (filter(archive_entry_pathname(current_entry))) return true; } @@ -95,7 +100,7 @@ public: return archive; } - std::vector getAllFiles() + std::vector getAllFiles(NameFilter filter) { auto * archive = open(path_to_archive); auto * entry = archive_entry_new(); @@ -104,7 +109,10 @@ public: int error = archive_read_next_header(archive, &entry); while (error == ARCHIVE_OK || error == ARCHIVE_RETRY) { - files.push_back(archive_entry_pathname(entry)); + std::string name = archive_entry_pathname(entry); + if (!filter || filter(name)) + files.push_back(std::move(name)); + error = archive_read_next_header(archive, &entry); } @@ -262,9 +270,15 @@ std::unique_ptr::FileEnumerator> LibArchi template std::unique_ptr LibArchiveReader::readFile(const String & filename) +{ + return readFile([&](const std::string & file) { return file == filename; }); +} + +template +std::unique_ptr LibArchiveReader::readFile(NameFilter filter) { Handle handle(path_to_archive); - handle.locateFile(filename); + handle.locateFile(filter); return std::make_unique(std::move(handle), path_to_archive); } @@ -292,9 +306,15 @@ LibArchiveReader::nextFile(std::unique_ptr read_buffer) template std::vector LibArchiveReader::getAllFiles() +{ + return getAllFiles({}); +} + +template +std::vector LibArchiveReader::getAllFiles(NameFilter filter) { Handle handle(path_to_archive); - return handle.getAllFiles(); + return handle.getAllFiles(filter); } template diff --git a/src/IO/Archives/LibArchiveReader.h b/src/IO/Archives/LibArchiveReader.h index 596010c7fbd..86127fa6953 100644 --- a/src/IO/Archives/LibArchiveReader.h +++ b/src/IO/Archives/LibArchiveReader.h @@ -40,12 +40,14 @@ public: /// you can read that buffer to extract uncompressed data from the archive. /// Several read buffers can be used at the same time in parallel. std::unique_ptr readFile(const String & filename) override; + std::unique_ptr readFile(NameFilter filter) override; /// It's possible to convert a file enumerator to a read buffer and vice versa. std::unique_ptr readFile(std::unique_ptr enumerator) override; std::unique_ptr nextFile(std::unique_ptr read_buffer) override; std::vector getAllFiles() override; + std::vector getAllFiles(NameFilter filter) override; /// Sets password used to decrypt the contents of the files in the archive. void setPassword(const String & password_) override; diff --git a/src/IO/Archives/ZipArchiveReader.cpp b/src/IO/Archives/ZipArchiveReader.cpp index bcb99553eae..84a8001e70e 100644 --- a/src/IO/Archives/ZipArchiveReader.cpp +++ b/src/IO/Archives/ZipArchiveReader.cpp @@ -86,6 +86,26 @@ public: file_name = file_name_; } + void locateFile(NameFilter filter) + { + int err = unzGoToFirstFile(raw_handle); + if (err == UNZ_END_OF_LIST_OF_FILE) + showError("No file was found satisfying the filter"); + + do + { + checkResult(err); + resetFileInfo(); + retrieveFileInfo(); + if (filter(getFileName())) + return; + + err = unzGoToNextFile(raw_handle); + } while (err != UNZ_END_OF_LIST_OF_FILE); + + showError("No file was found satisfying the filter"); + } + bool tryLocateFile(const String & file_name_) { resetFileInfo(); @@ -132,7 +152,7 @@ public: return *file_info; } - std::vector getAllFiles() + std::vector getAllFiles(NameFilter filter) { std::vector files; resetFileInfo(); @@ -145,7 +165,8 @@ public: checkResult(err); resetFileInfo(); retrieveFileInfo(); - files.push_back(*file_name); + if (!filter || filter(getFileName())) + files.push_back(*file_name); err = unzGoToNextFile(raw_handle); } while (err != UNZ_END_OF_LIST_OF_FILE); @@ -512,6 +533,13 @@ std::unique_ptr ZipArchiveReader::readFile(const String return std::make_unique(std::move(handle)); } +std::unique_ptr ZipArchiveReader::readFile(NameFilter filter) +{ + auto handle = acquireHandle(); + handle.locateFile(filter); + return std::make_unique(std::move(handle)); +} + std::unique_ptr ZipArchiveReader::readFile(std::unique_ptr enumerator) { if (!dynamic_cast(enumerator.get())) @@ -533,9 +561,14 @@ std::unique_ptr ZipArchiveReader::nextFile(std } std::vector ZipArchiveReader::getAllFiles() +{ + return getAllFiles({}); +} + +std::vector ZipArchiveReader::getAllFiles(NameFilter filter) { auto handle = acquireHandle(); - return handle.getAllFiles(); + return handle.getAllFiles(filter); } void ZipArchiveReader::setPassword(const String & password_) diff --git a/src/IO/Archives/ZipArchiveReader.h b/src/IO/Archives/ZipArchiveReader.h index 164518b1a37..0b5fa572860 100644 --- a/src/IO/Archives/ZipArchiveReader.h +++ b/src/IO/Archives/ZipArchiveReader.h @@ -42,12 +42,14 @@ public: /// you can read that buffer to extract uncompressed data from the archive. /// Several read buffers can be used at the same time in parallel. std::unique_ptr readFile(const String & filename) override; + std::unique_ptr readFile(NameFilter filter) override; /// It's possible to convert a file enumerator to a read buffer and vice versa. std::unique_ptr readFile(std::unique_ptr enumerator) override; std::unique_ptr nextFile(std::unique_ptr read_buffer) override; std::vector getAllFiles() override; + std::vector getAllFiles(NameFilter filter) override; /// Sets password used to decrypt the contents of the files in the archive. void setPassword(const String & password_) override; diff --git a/src/Storages/StorageFile.cpp b/src/Storages/StorageFile.cpp index 37998f37c3f..3d87793d06c 100644 --- a/src/Storages/StorageFile.cpp +++ b/src/Storages/StorageFile.cpp @@ -57,7 +57,6 @@ #include #include - namespace ProfileEvents { extern const Event CreatedReadBufferOrdinary; @@ -387,7 +386,23 @@ std::unique_ptr createReadBuffer( if (!path_to_archive.empty()) { auto reader = createArchiveReader(path_to_archive); - return reader->readFile(current_path); + + if (current_path.find_first_of("*?{") != std::string::npos) + { + auto matcher = std::make_shared(makeRegexpPatternFromGlobs(current_path)); + if (!matcher->ok()) + throw Exception(ErrorCodes::CANNOT_COMPILE_REGEXP, + "Cannot compile regex from glob ({}): {}", current_path, matcher->error()); + + return reader->readFile([matcher = std::move(matcher)](const std::string & path) + { + return re2::RE2::FullMatch(path, *matcher); + }); + } + else + { + return reader->readFile(current_path); + } } if (use_table_fd) @@ -529,14 +544,30 @@ ColumnsDescription StorageFile::getTableStructureFromFile( } else { - read_buffer_iterator = [&, path_it = paths.begin(), archive_it = paths_to_archive.begin()](ColumnsDescription &) mutable -> std::unique_ptr + read_buffer_iterator = [&, path_it = paths.begin(), archive_it = paths_to_archive.begin(), first = true](ColumnsDescription &) mutable -> std::unique_ptr { - if (archive_it == paths_to_archive.end()) - return nullptr; + String path; + struct stat file_stat; + do + { + if (archive_it == paths_to_archive.end()) + { + if (first) + throw Exception( + ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, + "Cannot extract table structure from {} format file, because all files are empty. You must specify table structure manually", + format); + return nullptr; + } - auto file_stat = getFileStat(*archive_it, false, -1, "File"); + path = *archive_it++; + file_stat = getFileStat(path, false, -1, "File"); + } + while (context->getSettingsRef().engine_file_skip_empty_files && file_stat.st_size == 0); + + first = false; + return createReadBuffer(*path_it, file_stat, false, -1, compression_method, context, path); - return createReadBuffer(*path_it, file_stat, false, -1, compression_method, context, *archive_it); }; } @@ -1012,13 +1043,39 @@ Pipe StorageFile::read( if (!paths_to_archive.empty()) { + if (paths.size() != 1) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Multiple paths defined for reading from archive"); + + const auto & path = paths[0]; + + IArchiveReader::NameFilter filter; + if (path.find_first_of("*?{") != std::string::npos) + { + auto matcher = std::make_shared(makeRegexpPatternFromGlobs(path)); + if (!matcher->ok()) + throw Exception(ErrorCodes::CANNOT_COMPILE_REGEXP, + "Cannot compile regex from glob ({}): {}", path, matcher->error()); + + filter = [matcher](const std::string & p) + { + return re2::RE2::FullMatch(p, *matcher); + }; + } + for (size_t i = 0; i < paths_to_archive.size(); ++i) { - const auto & path_to_archive = paths_to_archive[i]; - auto archive_reader = createArchiveReader(path_to_archive); - auto files = archive_reader->getAllFiles(); - for (auto & file : files) - files_in_archive.push_back({i, std::move(file)}); + if (filter) + { + const auto & path_to_archive = paths_to_archive[i]; + auto archive_reader = createArchiveReader(path_to_archive); + auto files = archive_reader->getAllFiles(filter); + for (auto & file : files) + files_in_archive.push_back({i, std::move(file)}); + } + else + { + files_in_archive.push_back({i, path}); + } } }