#include #include #include #include #include namespace DB { #if USE_LIBARCHIVE namespace ErrorCodes { extern const int CANNOT_UNPACK_ARCHIVE; extern const int LOGICAL_ERROR; extern const int CANNOT_READ_ALL_DATA; extern const int UNSUPPORTED_METHOD; } class LibArchiveReader::StreamInfo { public: explicit StreamInfo(std::unique_ptr read_buffer_) : read_buffer(std::move(read_buffer_)) { } static ssize_t read([[maybe_unused]] struct archive * a, void * client_data, const void ** buff) { auto * read_stream = reinterpret_cast(client_data); *buff = reinterpret_cast(read_stream->buf); return read_stream->read_buffer->read(read_stream->buf, DBMS_DEFAULT_BUFFER_SIZE); } std::unique_ptr read_buffer; char buf[DBMS_DEFAULT_BUFFER_SIZE]; }; class LibArchiveReader::Handle { public: explicit Handle(std::string path_to_archive_, bool lock_on_reading_) : path_to_archive(std::move(path_to_archive_)), lock_on_reading(lock_on_reading_) { current_archive = openWithPath(path_to_archive); } explicit Handle(std::string path_to_archive_, bool lock_on_reading_, const ReadArchiveFunction & archive_read_function_) : path_to_archive(std::move(path_to_archive_)), archive_read_function(archive_read_function_), lock_on_reading(lock_on_reading_) { read_stream = std::make_unique(archive_read_function()); current_archive = openWithReader(read_stream.get()); } Handle(const Handle &) = delete; Handle(Handle && other) noexcept : read_stream(std::move(other.read_stream)) , current_archive(other.current_archive) , current_entry(other.current_entry) , archive_read_function(std::move(other.archive_read_function)) , lock_on_reading(other.lock_on_reading) { other.current_archive = nullptr; other.current_entry = nullptr; } ~Handle() { close(current_archive); } bool locateFile(const std::string & filename) { return locateFile([&](const std::string & file) { return file == filename; }); } bool locateFile(NameFilter filter) { resetFileInfo(); int err = ARCHIVE_OK; while (true) { err = readNextHeader(current_archive, ¤t_entry); if (err == ARCHIVE_RETRY) continue; if (err != ARCHIVE_OK) break; if (filter(archive_entry_pathname(current_entry))) { valid = true; return true; } } checkError(err); valid = false; return false; } bool nextFile() { resetFileInfo(); int err = ARCHIVE_OK; do { err = readNextHeader(current_archive, ¤t_entry); } while (err == ARCHIVE_RETRY); checkError(err); valid = err == ARCHIVE_OK; return valid; } std::vector getAllFiles(NameFilter filter) { std::unique_ptr rs = archive_read_function ? std::make_unique(archive_read_function()) : nullptr; auto * archive = rs ? openWithReader(rs.get()) : openWithPath(path_to_archive); SCOPE_EXIT(close(archive);); Entry entry = nullptr; std::vector files; int error = readNextHeader(archive, &entry); while (error == ARCHIVE_OK || error == ARCHIVE_RETRY) { chassert(entry != nullptr); std::string name = archive_entry_pathname(entry); if (!filter || filter(name)) files.push_back(std::move(name)); error = readNextHeader(archive, &entry); } checkError(error); return files; } const String & getFileName() const { chassert(current_entry); if (!valid) throw Exception(ErrorCodes::CANNOT_UNPACK_ARCHIVE, "No current file"); if (!file_name) file_name.emplace(archive_entry_pathname(current_entry)); return *file_name; } const FileInfo & getFileInfo() const { chassert(current_entry); if (!valid) throw Exception(ErrorCodes::CANNOT_UNPACK_ARCHIVE, "No current file"); if (!file_info) { file_info.emplace(); file_info->uncompressed_size = archive_entry_size(current_entry); file_info->compressed_size = archive_entry_size(current_entry); file_info->is_encrypted = false; } return *file_info; } la_ssize_t readData(void * buf, size_t len) { return archive_read_data(current_archive, buf, len); } const char * getArchiveError() { return archive_error_string(current_archive); } private: using Archive = struct archive *; using Entry = struct archive_entry *; void checkError(int error) const { if (error == ARCHIVE_FATAL) throw Exception( ErrorCodes::CANNOT_UNPACK_ARCHIVE, "Failed to read archive while fetching all files: {}", archive_error_string(current_archive)); } void resetFileInfo() { file_name.reset(); file_info.reset(); } Archive openWithReader(StreamInfo * read_stream_) { auto * archive = archive_read_new(); try { // Support for bzip2, gzip, lzip, xz, zstd and lz4 archive_read_support_filter_bzip2(archive); archive_read_support_filter_gzip(archive); archive_read_support_filter_xz(archive); archive_read_support_filter_lz4(archive); archive_read_support_filter_zstd(archive); // Support tar, 7zip and zip archive_read_support_format_tar(archive); archive_read_support_format_7zip(archive); archive_read_support_format_zip(archive); if (archive_read_open_filename(archive, path_to_archive.c_str(), 10240) != ARCHIVE_OK) throw Exception(ErrorCodes::CANNOT_UNPACK_ARCHIVE, "Couldn't open archive {}: {}", quoteString(path_to_archive), archive_error_string(archive)); } catch (...) { close(archive); throw; } return archive; } Archive openWithPath(const String & path_to_archive_) { auto * archive = archive_read_new(); try { archive_read_support_filter_all(archive); archive_read_support_format_all(archive); if (archive_read_open_filename(archive, path_to_archive_.c_str(), 10240) != ARCHIVE_OK) throw Exception( ErrorCodes::CANNOT_UNPACK_ARCHIVE, "Couldn't open archive {}: {}", quoteString(path_to_archive), archive_error_string(archive)); } catch (...) { close(archive); throw; } return archive; } static void close(Archive archive) { if (archive) { archive_read_close(archive); archive_read_free(archive); } } int readNextHeader(struct archive * archive, struct archive_entry ** entry) const { std::unique_lock lock(Handle::read_lock, std::defer_lock); if (lock_on_reading) lock.lock(); return archive_read_next_header(archive, entry); } String path_to_archive; std::unique_ptr read_stream; Archive current_archive; Entry current_entry = nullptr; bool valid = true; IArchiveReader::ReadArchiveFunction archive_read_function; /// for some archive types when we are reading headers static variables are used /// which are not thread-safe const bool lock_on_reading; static inline std::mutex read_lock; mutable std::optional file_name; mutable std::optional file_info; }; class LibArchiveReader::FileEnumeratorImpl : public FileEnumerator { public: explicit FileEnumeratorImpl(Handle handle_) : handle(std::move(handle_)) { } const String & getFileName() const override { return handle.getFileName(); } const FileInfo & getFileInfo() const override { return handle.getFileInfo(); } bool nextFile() override { return handle.nextFile(); } /// Releases owned handle to pass it to a read buffer. Handle releaseHandle() && { return std::move(handle); } private: Handle handle; }; class LibArchiveReader::ReadBufferFromLibArchive : public ReadBufferFromFileBase { public: explicit ReadBufferFromLibArchive(Handle handle_, std::string path_to_archive_) : ReadBufferFromFileBase(DBMS_DEFAULT_BUFFER_SIZE, nullptr, 0) , handle(std::move(handle_)) , path_to_archive(std::move(path_to_archive_)) { } off_t seek(off_t /* off */, int /* whence */) override { throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Seek is not supported when reading from archive"); } bool checkIfActuallySeekable() override { return false; } off_t getPosition() override { throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "getPosition not supported when reading from archive"); } String getFileName() const override { return handle.getFileName(); } size_t getFileSize() override { return handle.getFileInfo().uncompressed_size; } Handle releaseHandle() && { return std::move(handle); } private: bool nextImpl() override { auto bytes_read = handle.readData(internal_buffer.begin(), internal_buffer.size()); if (bytes_read < 0) throw Exception( ErrorCodes::CANNOT_READ_ALL_DATA, "Failed to read file {} from {}: {}", handle.getFileName(), path_to_archive, handle.getArchiveError()); if (!bytes_read) return false; total_bytes_read += bytes; working_buffer = internal_buffer; working_buffer.resize(bytes_read); return true; } Handle handle; const String path_to_archive; size_t total_bytes_read = 0; }; LibArchiveReader::LibArchiveReader(std::string archive_name_, bool lock_on_reading_, std::string path_to_archive_) : archive_name(std::move(archive_name_)), lock_on_reading(lock_on_reading_), path_to_archive(std::move(path_to_archive_)) { } LibArchiveReader::LibArchiveReader( std::string archive_name_, bool lock_on_reading_, std::string path_to_archive_, const ReadArchiveFunction & archive_read_function_) : archive_name(std::move(archive_name_)) , lock_on_reading(lock_on_reading_) , path_to_archive(std::move(path_to_archive_)) , archive_read_function(archive_read_function_) { } LibArchiveReader::~LibArchiveReader() = default; const std::string & LibArchiveReader::getPath() const { return path_to_archive; } bool LibArchiveReader::fileExists(const String & filename) { Handle handle = acquireHandle(); return handle.locateFile(filename); } LibArchiveReader::FileInfo LibArchiveReader::getFileInfo(const String & filename) { Handle handle = acquireHandle(); if (!handle.locateFile(filename)) throw Exception( ErrorCodes::CANNOT_UNPACK_ARCHIVE, "Couldn't unpack archive {}: File {} was not found in archive", path_to_archive, quoteString(filename)); return handle.getFileInfo(); } std::unique_ptr LibArchiveReader::firstFile() { Handle handle = acquireHandle(); if (!handle.nextFile()) return nullptr; return std::make_unique(std::move(handle)); } std::unique_ptr LibArchiveReader::readFile(const String & filename, bool throw_on_not_found) { Handle handle = acquireHandle(); if (!handle.locateFile(filename)) { if (throw_on_not_found) throw Exception( ErrorCodes::CANNOT_UNPACK_ARCHIVE, "Couldn't unpack archive {}: File {} was not found in archive", path_to_archive, quoteString(filename)); return nullptr; } return std::make_unique(std::move(handle), path_to_archive); } std::unique_ptr LibArchiveReader::readFile(NameFilter filter, bool throw_on_not_found) { Handle handle = acquireHandle(); if (!handle.locateFile(filter)) { if (throw_on_not_found) throw Exception( ErrorCodes::CANNOT_UNPACK_ARCHIVE, "Couldn't unpack archive {}: No file satisfying filter in archive", path_to_archive); return nullptr; } return std::make_unique(std::move(handle), path_to_archive); } std::unique_ptr LibArchiveReader::readFile(std::unique_ptr enumerator) { if (!dynamic_cast(enumerator.get())) throw Exception(ErrorCodes::LOGICAL_ERROR, "Wrong enumerator passed to readFile()"); auto enumerator_impl = std::unique_ptr(static_cast(enumerator.release())); auto handle = std::move(*enumerator_impl).releaseHandle(); return std::make_unique(std::move(handle), path_to_archive); } std::unique_ptr LibArchiveReader::nextFile(std::unique_ptr read_buffer) { if (!dynamic_cast(read_buffer.get())) throw Exception(ErrorCodes::LOGICAL_ERROR, "Wrong ReadBuffer passed to nextFile()"); auto read_buffer_from_libarchive = std::unique_ptr(static_cast(read_buffer.release())); auto handle = std::move(*read_buffer_from_libarchive).releaseHandle(); if (!handle.nextFile()) return nullptr; return std::make_unique(std::move(handle)); } std::unique_ptr LibArchiveReader::currentFile(std::unique_ptr read_buffer) { if (!dynamic_cast(read_buffer.get())) throw Exception(ErrorCodes::LOGICAL_ERROR, "Wrong ReadBuffer passed to nextFile()"); auto read_buffer_from_libarchive = std::unique_ptr(static_cast(read_buffer.release())); auto handle = std::move(*read_buffer_from_libarchive).releaseHandle(); return std::make_unique(std::move(handle)); } std::vector LibArchiveReader::getAllFiles() { return getAllFiles({}); } std::vector LibArchiveReader::getAllFiles(NameFilter filter) { Handle handle = acquireHandle(); return handle.getAllFiles(filter); } void LibArchiveReader::setPassword([[maybe_unused]] const String & password_) { if (password_.empty()) throw Exception(ErrorCodes::LOGICAL_ERROR, "Can not set password to {} archive", archive_name); } LibArchiveReader::Handle LibArchiveReader::acquireHandle() { std::lock_guard lock{mutex}; return archive_read_function ? Handle{path_to_archive, lock_on_reading, archive_read_function} : Handle{path_to_archive, lock_on_reading}; } #endif }