From eb4ec0e871e966742812a65f5142310b7d3c0602 Mon Sep 17 00:00:00 2001 From: Joshua Hildred Date: Fri, 19 Jan 2024 07:05:36 -0800 Subject: [PATCH] Add support for reading and writing backups as a tar archive using libarchive. --- src/Backups/BackupImpl.cpp | 2 +- src/IO/Archives/IArchiveWriter.h | 3 + src/IO/Archives/LibArchiveReader.cpp | 156 +++++++--- src/IO/Archives/LibArchiveReader.h | 13 +- src/IO/Archives/LibArchiveWriter.cpp | 267 ++++++++++++++++++ src/IO/Archives/LibArchiveWriter.h | 76 +++++ src/IO/Archives/ZipArchiveWriter.cpp | 5 + src/IO/Archives/ZipArchiveWriter.h | 3 + src/IO/Archives/createArchiveReader.cpp | 2 +- src/IO/Archives/createArchiveWriter.cpp | 21 +- .../hasRegisteredArchiveFileExtension.cpp | 2 +- .../tests/gtest_archive_reader_and_writer.cpp | 120 ++++++-- .../test_backup_restore_new/test.py | 51 ++++ .../test_backup_restore_s3/test.py | 5 + 14 files changed, 651 insertions(+), 75 deletions(-) create mode 100644 src/IO/Archives/LibArchiveWriter.cpp create mode 100644 src/IO/Archives/LibArchiveWriter.h diff --git a/src/Backups/BackupImpl.cpp b/src/Backups/BackupImpl.cpp index 42fdb719149..8c0989b8202 100644 --- a/src/Backups/BackupImpl.cpp +++ b/src/Backups/BackupImpl.cpp @@ -927,7 +927,7 @@ void BackupImpl::writeFile(const BackupFileInfo & info, BackupEntryPtr entry) const auto write_info_to_archive = [&](const auto & file_name) { - auto out = archive_writer->writeFile(file_name); + auto out = archive_writer->writeFile(file_name, info.size); auto read_buffer = entry->getReadBuffer(writer->getReadSettings()); if (info.base_size != 0) read_buffer->seek(info.base_size, SEEK_SET); diff --git a/src/IO/Archives/IArchiveWriter.h b/src/IO/Archives/IArchiveWriter.h index cccc6dc953b..3dce83629fb 100644 --- a/src/IO/Archives/IArchiveWriter.h +++ b/src/IO/Archives/IArchiveWriter.h @@ -22,6 +22,9 @@ public: /// of the function `writeFile()` should be destroyed before next call of `writeFile()`. virtual std::unique_ptr writeFile(const String & filename) = 0; + virtual std::unique_ptr writeFile(const String & filename, const size_t & size) = 0; + + /// Returns true if there is an active instance of WriteBuffer returned by writeFile(). /// This function should be used mostly for debugging purposes. virtual bool isWritingFile() const = 0; diff --git a/src/IO/Archives/LibArchiveReader.cpp b/src/IO/Archives/LibArchiveReader.cpp index 22300ed5444..83f34f31568 100644 --- a/src/IO/Archives/LibArchiveReader.cpp +++ b/src/IO/Archives/LibArchiveReader.cpp @@ -5,7 +5,6 @@ #include -#include namespace DB { @@ -14,35 +13,60 @@ namespace DB namespace ErrorCodes { - extern const int CANNOT_UNPACK_ARCHIVE; - extern const int LOGICAL_ERROR; - extern const int CANNOT_READ_ALL_DATA; - extern const int UNSUPPORTED_METHOD; +extern const int CANNOT_UNPACK_ARCHIVE; +extern const int LOGICAL_ERROR; +extern const int CANNOT_READ_ALL_DATA; +extern const int UNSUPPORTED_METHOD; } +class LibArchiveReader::StreamInfo +{ +public: + explicit StreamInfo(std::unique_ptr read_buffer_) + : read_buffer(std::move(read_buffer_)) + { + } + + static ssize_t read([[maybe_unused]] struct archive * a, void * client_data, const void ** buff) + { + auto * read_stream = reinterpret_cast(client_data); + *buff = reinterpret_cast(read_stream->buf); + return read_stream->read_buffer->read(read_stream->buf, DBMS_DEFAULT_BUFFER_SIZE); + } + + std::unique_ptr read_buffer; + char buf[DBMS_DEFAULT_BUFFER_SIZE]; +}; + class LibArchiveReader::Handle { public: explicit Handle(std::string path_to_archive_, bool lock_on_reading_) - : path_to_archive(path_to_archive_), lock_on_reading(lock_on_reading_) + : path_to_archive(path_to_archive_), lock_on_reading(lock_on_reading_) { - current_archive = open(path_to_archive); + current_archive = openWithPath(path_to_archive); + } + explicit Handle(std::string path_to_archive_, bool lock_on_reading_, const ReadArchiveFunction & archive_read_function_) + : path_to_archive(path_to_archive_), archive_read_function(archive_read_function_), lock_on_reading(lock_on_reading_) + { + read_stream = std::make_unique(archive_read_function()); + current_archive = openWithReader(&(*read_stream)); } Handle(const Handle &) = delete; Handle(Handle && other) noexcept : current_archive(other.current_archive) , current_entry(other.current_entry) + , read_stream(std::move(other.read_stream)) + , archive_read_function(std::move(other.archive_read_function)) , lock_on_reading(other.lock_on_reading) + { other.current_archive = nullptr; other.current_entry = nullptr; } - ~Handle() - { - close(current_archive); - } + ~Handle() { close(current_archive); } bool locateFile(const std::string & filename) { @@ -64,10 +88,14 @@ public: break; if (filter(archive_entry_pathname(current_entry))) + { + valid = true; return true; + } } checkError(err); + valid = false; return false; } @@ -81,12 +109,24 @@ public: } while (err == ARCHIVE_RETRY); checkError(err); - return err == ARCHIVE_OK; + valid = err == ARCHIVE_OK; + return valid; } std::vector getAllFiles(NameFilter filter) { - auto * archive = open(path_to_archive); + struct archive * archive; + std::unique_ptr rs; + if(archive_read_function) + { + read_stream = std::make_unique(archive_read_function()); + archive = openWithReader(&(*rs)); + } + else + { + archive = openWithPath(path_to_archive); + } + SCOPE_EXIT( close(archive); ); @@ -94,7 +134,7 @@ public: struct archive_entry * entry = nullptr; std::vector files; - int error = readNextHeader(archive, &entry); + int error = readNextHeader(current_archive, &entry); while (error == ARCHIVE_OK || error == ARCHIVE_RETRY) { chassert(entry != nullptr); @@ -102,7 +142,7 @@ public: if (!filter || filter(name)) files.push_back(std::move(name)); - error = readNextHeader(archive, &entry); + error = readNextHeader(current_archive, &entry); } checkError(error); @@ -112,6 +152,8 @@ public: const String & getFileName() const { chassert(current_entry); + if (!valid) + throw Exception(ErrorCodes::CANNOT_UNPACK_ARCHIVE, "No current file"); if (!file_name) file_name.emplace(archive_entry_pathname(current_entry)); @@ -121,6 +163,8 @@ public: const FileInfo & getFileInfo() const { chassert(current_entry); + if (!valid) + throw Exception(ErrorCodes::CANNOT_UNPACK_ARCHIVE, "No current file"); if (!file_info) { file_info.emplace(); @@ -134,6 +178,8 @@ public: struct archive * current_archive; struct archive_entry * current_entry = nullptr; + bool valid = true; + private: void checkError(int error) const { @@ -147,7 +193,16 @@ private: file_info.reset(); } - static struct archive * open(const String & path_to_archive) + static struct archive * openWithReader(StreamInfo * read_stream_) + { + auto * a = archive_read_new(); + archive_read_support_filter_all(a); + archive_read_support_format_all(a); + archive_read_open(a, read_stream_, nullptr, StreamInfo::read, nullptr); + return a; + } + + static struct archive * openWithPath(const String & path_to_archive) { auto * archive = archive_read_new(); try @@ -194,6 +249,8 @@ private: } const String path_to_archive; + std::unique_ptr read_stream = nullptr; + const IArchiveReader::ReadArchiveFunction archive_read_function; /// for some archive types when we are reading headers static variables are used /// which are not thread-safe @@ -207,7 +264,7 @@ private: class LibArchiveReader::FileEnumeratorImpl : public FileEnumerator { public: - explicit FileEnumeratorImpl(Handle handle_) : handle(std::move(handle_)) {} + explicit FileEnumeratorImpl(Handle handle_) : handle(std::move(handle_)) { } const String & getFileName() const override { return handle.getFileName(); } const FileInfo & getFileInfo() const override { return handle.getFileInfo(); } @@ -215,6 +272,7 @@ public: /// Releases owned handle to pass it to a read buffer. Handle releaseHandle() && { return std::move(handle); } + private: Handle handle; }; @@ -226,13 +284,13 @@ public: : ReadBufferFromFileBase(DBMS_DEFAULT_BUFFER_SIZE, nullptr, 0) , handle(std::move(handle_)) , path_to_archive(std::move(path_to_archive_)) - {} + { + } off_t seek(off_t /* off */, int /* whence */) override { throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Seek is not supported when reading from archive"); } - bool checkIfActuallySeekable() override { return false; } off_t getPosition() override @@ -240,14 +298,13 @@ public: throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "getPosition not supported when reading from archive"); } + off_t getPosition() override { throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "getPosition not supported when reading from archive"); } + String getFileName() const override { return handle.getFileName(); } size_t getFileSize() override { return handle.getFileInfo().uncompressed_size; } - Handle releaseHandle() && - { - return std::move(handle); - } + Handle releaseHandle() && { return std::move(handle); } private: bool nextImpl() override @@ -274,7 +331,17 @@ private: LibArchiveReader::LibArchiveReader(std::string archive_name_, bool lock_on_reading_, std::string path_to_archive_) : archive_name(std::move(archive_name_)), lock_on_reading(lock_on_reading_), path_to_archive(std::move(path_to_archive_)) -{} +{ +} + +LibArchiveReader::LibArchiveReader( + std::string archive_name_, bool lock_on_reading_, std::string path_to_archive_, const ReadArchiveFunction & archive_read_function_) + : archive_name(std::move(archive_name_)) + , lock_on_reading(lock_on_reading_) + , path_to_archive(std::move(path_to_archive_)) + , archive_read_function(archive_read_function_) +{ +} LibArchiveReader::~LibArchiveReader() = default; @@ -285,21 +352,21 @@ const std::string & LibArchiveReader::getPath() const bool LibArchiveReader::fileExists(const String & filename) { - Handle handle(path_to_archive, lock_on_reading); + Handle handle = acquireHandle(); return handle.locateFile(filename); } LibArchiveReader::FileInfo LibArchiveReader::getFileInfo(const String & filename) { - Handle handle(path_to_archive, lock_on_reading); + Handle handle = acquireHandle(); if (!handle.locateFile(filename)) - throw Exception(ErrorCodes::CANNOT_UNPACK_ARCHIVE, "Couldn't unpack archive {}: file not found", path_to_archive); + throw Exception(ErrorCodes::CANNOT_UNPACK_ARCHIVE, "Couldn't unpack archive {}: File {} was not found in archive", path_to_archive, quoteString(filename)); return handle.getFileInfo(); } std::unique_ptr LibArchiveReader::firstFile() { - Handle handle(path_to_archive, lock_on_reading); + Handle handle = acquireHandle(); if (!handle.nextFile()) return nullptr; @@ -308,17 +375,25 @@ std::unique_ptr LibArchiveReader::firstFile() std::unique_ptr LibArchiveReader::readFile(const String & filename, bool throw_on_not_found) { - return readFile([&](const std::string & file) { return file == filename; }, throw_on_not_found); + Handle handle = acquireHandle(); + if (!handle.locateFile(filename)) + { + if (throw_on_not_found) + throw Exception( + ErrorCodes::CANNOT_UNPACK_ARCHIVE, "Couldn't unpack archive {}: File {} was not found in archive", path_to_archive, quoteString(filename)); + return nullptr; + } + return std::make_unique(std::move(handle), path_to_archive); } std::unique_ptr LibArchiveReader::readFile(NameFilter filter, bool throw_on_not_found) { - Handle handle(path_to_archive, lock_on_reading); + Handle handle = acquireHandle(); if (!handle.locateFile(filter)) { if (throw_on_not_found) throw Exception( - ErrorCodes::CANNOT_UNPACK_ARCHIVE, "Couldn't unpack archive {}: no file found satisfying the filter", path_to_archive); + ErrorCodes::CANNOT_UNPACK_ARCHIVE, "Couldn't unpack archive {}: No file satisfying filter in archive", path_to_archive); return nullptr; } return std::make_unique(std::move(handle), path_to_archive); @@ -337,7 +412,8 @@ std::unique_ptr LibArchiveReader::nextFile(std { if (!dynamic_cast(read_buffer.get())) throw Exception(ErrorCodes::LOGICAL_ERROR, "Wrong ReadBuffer passed to nextFile()"); - auto read_buffer_from_libarchive = std::unique_ptr(static_cast(read_buffer.release())); + auto read_buffer_from_libarchive + = std::unique_ptr(static_cast(read_buffer.release())); auto handle = std::move(*read_buffer_from_libarchive).releaseHandle(); if (!handle.nextFile()) return nullptr; @@ -360,13 +436,23 @@ std::vector LibArchiveReader::getAllFiles() std::vector LibArchiveReader::getAllFiles(NameFilter filter) { - Handle handle(path_to_archive, lock_on_reading); + Handle handle = acquireHandle(); return handle.getAllFiles(filter); } -void LibArchiveReader::setPassword(const String & /*password_*/) +void LibArchiveReader::setPassword([[maybe_unused]] const String & password_) { - throw Exception(ErrorCodes::LOGICAL_ERROR, "Can not set password to {} archive", archive_name); + if (password_ != "") + throw Exception(ErrorCodes::LOGICAL_ERROR, "Can not set password to {} archive", archive_name); +} + +LibArchiveReader::Handle LibArchiveReader::acquireHandle() +{ + std::lock_guard lock{mutex}; + if (archive_read_function) + return Handle{path_to_archive, lock_on_reading, archive_read_function}; + else + return Handle{path_to_archive, lock_on_reading}; } #endif diff --git a/src/IO/Archives/LibArchiveReader.h b/src/IO/Archives/LibArchiveReader.h index c4b08d8ddf7..cca72e89d4d 100644 --- a/src/IO/Archives/LibArchiveReader.h +++ b/src/IO/Archives/LibArchiveReader.h @@ -1,7 +1,7 @@ #pragma once #include "config.h" - +#include #include @@ -52,20 +52,31 @@ protected: /// Constructs an archive's reader that will read from a file in the local filesystem. LibArchiveReader(std::string archive_name_, bool lock_on_reading_, std::string path_to_archive_); + LibArchiveReader(std::string archive_name_, bool lock_on_reading_, std::string path_to_archive_, const ReadArchiveFunction & archive_read_function_); + private: class ReadBufferFromLibArchive; class Handle; class FileEnumeratorImpl; + class StreamInfo; + + Handle acquireHandle(); const std::string archive_name; const bool lock_on_reading; const String path_to_archive; + const ReadArchiveFunction archive_read_function; + mutable std::mutex mutex; + }; class TarArchiveReader : public LibArchiveReader { public: explicit TarArchiveReader(std::string path_to_archive) : LibArchiveReader("tar", /*lock_on_reading_=*/ true, std::move(path_to_archive)) { } + + explicit TarArchiveReader(std::string path_to_archive, const ReadArchiveFunction & archive_read_function ) : LibArchiveReader("tar", /*lock_on_reading_=*/ true, std::move(path_to_archive), archive_read_function) { } + }; class SevenZipArchiveReader : public LibArchiveReader diff --git a/src/IO/Archives/LibArchiveWriter.cpp b/src/IO/Archives/LibArchiveWriter.cpp new file mode 100644 index 00000000000..c9a34cd014b --- /dev/null +++ b/src/IO/Archives/LibArchiveWriter.cpp @@ -0,0 +1,267 @@ +#include + +#include +#include +#include +#include + +#include + +#if USE_LIBARCHIVE + +// this implemation follows the ZipArchiveWriter implemation as closely as possible. + +namespace DB +{ +namespace ErrorCodes +{ +extern const int CANNOT_PACK_ARCHIVE; +extern const int LOGICAL_ERROR; +extern const int CANNOT_READ_ALL_DATA; +extern const int UNSUPPORTED_METHOD; +extern const int NOT_IMPLEMENTED; +} + +namespace +{ +void checkResultCodeImpl(int code, const String & filename) +{ + if (code == ARCHIVE_OK) + return; + String message = "LibArchive Code = " + std::to_string(code); + throw Exception(ErrorCodes::CANNOT_PACK_ARCHIVE, "Couldn't pack archive: {}, filename={}", message, quoteString(filename)); +} +} + +// this is a thin wrapper for libarchive to be able to write the archive to a WriteBuffer +class LibArchiveWriter::StreamInfo +{ +public: + explicit StreamInfo(std::unique_ptr archive_write_buffer_) : archive_write_buffer(std::move(archive_write_buffer_)) { } + + + static ssize_t memory_write([[maybe_unused]] struct archive * a, void * client_data, const void * buff, size_t length) + { + auto * stream_info = reinterpret_cast(client_data); + stream_info->archive_write_buffer->write(reinterpret_cast(buff), length); + return length; + } + + std::unique_ptr archive_write_buffer; +}; + +class LibArchiveWriter::WriteBufferFromLibArchive : public WriteBufferFromFileBase +{ +public: + WriteBufferFromLibArchive(std::shared_ptr archive_writer_, const String & filename_, const size_t & size_) + : WriteBufferFromFileBase(DBMS_DEFAULT_BUFFER_SIZE, nullptr, 0), archive_writer(archive_writer_), filename(filename_), size(size_) + { + startWritingFile(); + a = archive_writer_->getArchive(); + entry = nullptr; + } + + + ~WriteBufferFromLibArchive() override + { + try + { + closeFile(/* throw_if_error= */ false); + endWritingFile(); + } + catch (...) + { + tryLogCurrentException("WriteBufferFromTarArchive"); + } + } + + void finalizeImpl() override + { + next(); + closeFile(/* throw_if_error=*/true); + endWritingFile(); + } + + void sync() override { next(); } + std::string getFileName() const override { return filename; } + + +private: + void nextImpl() override + { + if (!offset()) + return; + if (entry == nullptr) + writeEntry(); + ssize_t to_write = offset(); + ssize_t written = archive_write_data(a, working_buffer.begin(), offset()); + if (written != to_write) + { + throw Exception( + ErrorCodes::CANNOT_PACK_ARCHIVE, + "Couldn't pack tar archive: Failed to write all bytes, {} of {} , filename={}", + written, + to_write, + quoteString(filename)); + } + } + + + void writeEntry() + { + expected_size = getSize(); + entry = archive_entry_new(); + archive_entry_set_pathname(entry, filename.c_str()); + archive_entry_set_size(entry, expected_size); + archive_entry_set_filetype(entry, static_cast<__LA_MODE_T>(0100000)); + archive_entry_set_perm(entry, 0644); + checkResult(archive_write_header(a, entry)); + } + + size_t getSize() const + { + if (size) + return size; + else + return offset(); + } + + void closeFile([[maybe_unused]] bool throw_if_error) + { + if (entry) + { + archive_entry_free(entry); + entry = nullptr; + } + if (throw_if_error and bytes != expected_size) + { + throw Exception(ErrorCodes::CANNOT_PACK_ARCHIVE, "Couldn't pack tar archive: Wrote {} of expected {} , filename={}", bytes, expected_size, quoteString(filename)); + } + } + + void endWritingFile() + { + if (auto archive_writer_ptr = archive_writer.lock()) + archive_writer_ptr->endWritingFile(); + } + + void startWritingFile() + { + if (auto archive_writer_ptr = archive_writer.lock()) + archive_writer_ptr->startWritingFile(); + } + + void checkResult(int code) { checkResultCodeImpl(code, filename); } + + std::weak_ptr archive_writer; + const String filename; + struct archive_entry * entry; + struct archive * a; + size_t size; + size_t expected_size; +}; + +LibArchiveWriter::LibArchiveWriter(const String & path_to_archive_) : path_to_archive(path_to_archive_) +{ + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Not Implemented"); +} + +LibArchiveWriter::LibArchiveWriter(const String & path_to_archive_, std::unique_ptr archive_write_buffer_) + : path_to_archive(path_to_archive_) +{ + a = archive_write_new(); + archive_write_set_format_pax_restricted(a); + //this allows use to write directly to a writer buffer rather than an intermediate buffer in LibArchive + //archive_write_set_bytes_per_block(a, 0); + if (archive_write_buffer_) + { + stream_info = std::make_unique(std::move(archive_write_buffer_)); + archive_write_open2(a, &(*stream_info), nullptr, &StreamInfo::memory_write, nullptr, nullptr); + } + else + { + archive_write_open_filename(a, path_to_archive.c_str()); + } +} + + +LibArchiveWriter::~LibArchiveWriter() +{ + if (!finalized) + { + if (!std::uncaught_exceptions() && std::current_exception() == nullptr) + chassert(false && "TarArchiveWriter is not finalized in destructor."); + } + + if (a) + archive_write_free(a); +} + +std::unique_ptr LibArchiveWriter::writeFile(const String & filename, const size_t & size) +{ + return std::make_unique(std::static_pointer_cast(shared_from_this()), filename, size); +} + +std::unique_ptr LibArchiveWriter::writeFile(const String & filename) +{ + return std::make_unique(std::static_pointer_cast(shared_from_this()), filename, 0); +} + +bool LibArchiveWriter::isWritingFile() const +{ + std::lock_guard lock{mutex}; + return is_writing_file; +} + +void LibArchiveWriter::endWritingFile() +{ + std::lock_guard lock{mutex}; + is_writing_file = false; +} + +void LibArchiveWriter::startWritingFile() +{ + std::lock_guard lock{mutex}; + if (is_writing_file) + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Cannot write two files to a tar archive in parallel"); + is_writing_file = true; +} + +void LibArchiveWriter::finalize() +{ + std::lock_guard lock{mutex}; + if (finalized) + return; + if (a) + archive_write_close(a); + if (stream_info) + { + stream_info->archive_write_buffer->finalize(); + stream_info.reset(); + } + finalized = true; +} + +void LibArchiveWriter::setCompression(const String & compression_method_, int compression_level) +{ + if (compression_method_.size() == 0 and compression_level == 1) + { + return; + } + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Tar archives are currenly supported without compression"); +} + +void LibArchiveWriter::setPassword([[maybe_unused]] const String & password_) +{ + if (password_ == "") + return; + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Setting a password is not currently supported for tar archives"); +} + +struct archive * LibArchiveWriter::getArchive() +{ + std::lock_guard lock{mutex}; + return a; +} +} +#endif diff --git a/src/IO/Archives/LibArchiveWriter.h b/src/IO/Archives/LibArchiveWriter.h new file mode 100644 index 00000000000..982265cb2f1 --- /dev/null +++ b/src/IO/Archives/LibArchiveWriter.h @@ -0,0 +1,76 @@ +#pragma once + +#include "config.h" + +#if USE_LIBARCHIVE +# include +# include +# include +# include + + +namespace DB +{ +class WriteBufferFromFileBase; + +/// Interface for writing an archive. +class LibArchiveWriter : public IArchiveWriter +{ +public: + /// Constructs an archive that will be written as a file in the local filesystem. + [[noreturn]] explicit LibArchiveWriter(const String & path_to_archive_); + + /// Constructs an archive that will be written as a file in the local filesystem. + explicit LibArchiveWriter(const String & path_to_archive_, std::unique_ptr archive_write_buffer_); + + /// Call finalize() before destructing IArchiveWriter. + ~LibArchiveWriter() override; + + /// Starts writing a file to the archive. The function returns a write buffer, + /// any data written to that buffer will be compressed and then put to the archive. + /// You can keep only one such buffer at a time, a buffer returned by previous call + /// of the function `writeFile()` should be destroyed before next call of `writeFile()`. + std::unique_ptr writeFile(const String & filename) override; + /// LibArchive needs to know the size of the file being written. If the file size is not + /// passed in the the archive writer tries to infer the size by looking at the available + /// data in the buffer, if next is called before all data is written to the buffer + /// an exception is thrown. + std::unique_ptr writeFile(const String & filename, const size_t & size) override; + + + /// Returns true if there is an active instance of WriteBuffer returned by writeFile(). + /// This function should be used mostly for debugging purposes. + bool isWritingFile() const override; + + /// Finalizes writing of the archive. This function must be always called at the end of writing. + /// (Unless an error appeared and the archive is in fact no longer needed.) + void finalize() override; + + static constexpr const int kDefaultCompressionLevel = -1; + + /// Sets compression method and level. + /// Changing them will affect next file in the archive. + void setCompression(const String & /* compression_method */, int /* compression_level */ = kDefaultCompressionLevel) override; + + /// Sets password. If the password is not empty it will enable encryption in the archive. + void setPassword(const String & /* password */) override; + +private: + class WriteBufferFromLibArchive; + class StreamInfo; + + struct archive * getArchive(); + void startWritingFile(); + void endWritingFile(); + + String path_to_archive; + std::unique_ptr stream_info TSA_GUARDED_BY(mutex) = nullptr; + struct archive * a TSA_GUARDED_BY(mutex) = nullptr; + bool is_writing_file TSA_GUARDED_BY(mutex) = false; + bool finalized TSA_GUARDED_BY(mutex) = false; + mutable std::mutex mutex; +}; + +} + +#endif diff --git a/src/IO/Archives/ZipArchiveWriter.cpp b/src/IO/Archives/ZipArchiveWriter.cpp index 8cb4a2e0bd6..521e1e4e74c 100644 --- a/src/IO/Archives/ZipArchiveWriter.cpp +++ b/src/IO/Archives/ZipArchiveWriter.cpp @@ -274,6 +274,11 @@ std::unique_ptr ZipArchiveWriter::writeFile(const Strin return std::make_unique(std::static_pointer_cast(shared_from_this()), filename); } +std::unique_ptr ZipArchiveWriter::writeFile(const String & filename, [[maybe_unused]] const size_t & size) +{ + return ZipArchiveWriter::writeFile(filename); +} + bool ZipArchiveWriter::isWritingFile() const { std::lock_guard lock{mutex}; diff --git a/src/IO/Archives/ZipArchiveWriter.h b/src/IO/Archives/ZipArchiveWriter.h index 891da1a2e75..0b8260b1f2f 100644 --- a/src/IO/Archives/ZipArchiveWriter.h +++ b/src/IO/Archives/ZipArchiveWriter.h @@ -32,6 +32,9 @@ public: /// of the function `writeFile()` should be destroyed before next call of `writeFile()`. std::unique_ptr writeFile(const String & filename) override; + std::unique_ptr writeFile(const String & filename, const size_t & size) override; + + /// Returns true if there is an active instance of WriteBuffer returned by writeFile(). /// This function should be used mostly for debugging purposes. bool isWritingFile() const override; diff --git a/src/IO/Archives/createArchiveReader.cpp b/src/IO/Archives/createArchiveReader.cpp index 0c998971de1..03623c78624 100644 --- a/src/IO/Archives/createArchiveReader.cpp +++ b/src/IO/Archives/createArchiveReader.cpp @@ -48,7 +48,7 @@ std::shared_ptr createArchiveReader( tar_extensions.begin(), tar_extensions.end(), [&](const auto extension) { return path_to_archive.ends_with(extension); })) { #if USE_LIBARCHIVE - return std::make_shared(path_to_archive); + return std::make_shared(path_to_archive, archive_read_function); #else throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "libarchive library is disabled"); #endif diff --git a/src/IO/Archives/createArchiveWriter.cpp b/src/IO/Archives/createArchiveWriter.cpp index 807fe66e6a9..2fb56e14bd3 100644 --- a/src/IO/Archives/createArchiveWriter.cpp +++ b/src/IO/Archives/createArchiveWriter.cpp @@ -1,5 +1,6 @@ -#include +#include #include +#include #include #include @@ -8,8 +9,8 @@ namespace DB { namespace ErrorCodes { - extern const int CANNOT_PACK_ARCHIVE; - extern const int SUPPORT_IS_DISABLED; +extern const int CANNOT_PACK_ARCHIVE; +extern const int SUPPORT_IS_DISABLED; } @@ -19,9 +20,8 @@ std::shared_ptr createArchiveWriter(const String & path_to_archi } -std::shared_ptr createArchiveWriter( - const String & path_to_archive, - [[maybe_unused]] std::unique_ptr archive_write_buffer) +std::shared_ptr +createArchiveWriter(const String & path_to_archive, [[maybe_unused]] std::unique_ptr archive_write_buffer) { if (path_to_archive.ends_with(".zip") || path_to_archive.ends_with(".zipx")) { @@ -29,6 +29,15 @@ std::shared_ptr createArchiveWriter( return std::make_shared(path_to_archive, std::move(archive_write_buffer)); #else throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "minizip library is disabled"); +#endif + } + //todo add support for extentsions i.e .gz + else if (path_to_archive.ends_with(".tar")) + { +#if USE_LIBARCHIVE + return std::make_shared(path_to_archive, std::move(archive_write_buffer)); +#else + throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "libarchive library is disabled"); #endif } else diff --git a/src/IO/Archives/hasRegisteredArchiveFileExtension.cpp b/src/IO/Archives/hasRegisteredArchiveFileExtension.cpp index 6b2ef29d054..3ac9bba5e5d 100644 --- a/src/IO/Archives/hasRegisteredArchiveFileExtension.cpp +++ b/src/IO/Archives/hasRegisteredArchiveFileExtension.cpp @@ -6,7 +6,7 @@ namespace DB bool hasRegisteredArchiveFileExtension(const String & path) { - return path.ends_with(".zip") || path.ends_with(".zipx"); + return path.ends_with(".zip") || path.ends_with(".zipx") || path.ends_with(".tar"); } } diff --git a/src/IO/tests/gtest_archive_reader_and_writer.cpp b/src/IO/tests/gtest_archive_reader_and_writer.cpp index 37fbdff901a..e5909ce5869 100644 --- a/src/IO/tests/gtest_archive_reader_and_writer.cpp +++ b/src/IO/tests/gtest_archive_reader_and_writer.cpp @@ -7,9 +7,11 @@ #include #include #include +#include #include #include #include +#include #include #include #include @@ -335,41 +337,51 @@ TEST_P(ArchiveReaderAndWriterTest, InMemory) TEST_P(ArchiveReaderAndWriterTest, Password) -{ - /// Make an archive. - std::string_view contents = "The contents of a.txt"; +{ + auto writer = createArchiveWriter(getPathToArchive()); + //don't support passwords for tar archives + if(getPathToArchive().ends_with(".tar")) { - auto writer = createArchiveWriter(getPathToArchive()); - writer->setPassword("Qwe123"); - { - auto out = writer->writeFile("a.txt"); - writeString(contents, *out); - out->finalize(); - } + expectException(ErrorCodes::NOT_IMPLEMENTED, "Setting a password is not currently supported for tar archives", + [&]{ writer->setPassword("a.txt"); }); writer->finalize(); } - - /// Read the archive. - auto reader = createArchiveReader(getPathToArchive()); - - /// Try to read without a password. - expectException(ErrorCodes::CANNOT_UNPACK_ARCHIVE, "Password is required", - [&]{ reader->readFile("a.txt", /*throw_on_not_found=*/true); }); - + else { - /// Try to read with a wrong password. - reader->setPassword("123Qwe"); - expectException(ErrorCodes::CANNOT_UNPACK_ARCHIVE, "Wrong password", + /// Make an archive. + std::string_view contents = "The contents of a.txt"; + { + writer->setPassword("Qwe123"); + { + auto out = writer->writeFile("a.txt"); + writeString(contents, *out); + out->finalize(); + } + writer->finalize(); + } + + /// Read the archive. + auto reader = createArchiveReader(getPathToArchive()); + + /// Try to read without a password. + expectException(ErrorCodes::CANNOT_UNPACK_ARCHIVE, "Password is required", [&]{ reader->readFile("a.txt", /*throw_on_not_found=*/true); }); - } - { - /// Reading with the right password is successful. - reader->setPassword("Qwe123"); - auto in = reader->readFile("a.txt", /*throw_on_not_found=*/true); - String str; - readStringUntilEOF(str, *in); - EXPECT_EQ(str, contents); + { + /// Try to read with a wrong password. + reader->setPassword("123Qwe"); + expectException(ErrorCodes::CANNOT_UNPACK_ARCHIVE, "Wrong password", + [&]{ reader->readFile("a.txt", /*throw_on_not_found=*/true); }); + } + + { + /// Reading with the right password is successful. + reader->setPassword("Qwe123"); + auto in = reader->readFile("a.txt", /*throw_on_not_found=*/true); + String str; + readStringUntilEOF(str, *in); + EXPECT_EQ(str, contents); + } } } @@ -380,6 +392,53 @@ TEST_P(ArchiveReaderAndWriterTest, ArchiveNotExist) [&]{ createArchiveReader(getPathToArchive()); }); } + +TEST_P(ArchiveReaderAndWriterTest, LargeFile) +{ + /// Make an archive. + std::string_view contents = "The contents of a.txt\n"; + int times = 100000000; + { + auto writer = createArchiveWriter(getPathToArchive()); + { + auto out = writer->writeFile("a.txt", times * contents.size()); + for(int i = 0; i < times; i++) + { + writeString(contents, *out); + } + out->finalize(); + } + writer->finalize(); + } + + /// Read the archive. + auto reader = createArchiveReader(getPathToArchive()); + + ASSERT_TRUE(reader->fileExists("a.txt")); + + auto file_info = reader->getFileInfo("a.txt"); + EXPECT_EQ(file_info.uncompressed_size, contents.size() * times); + EXPECT_GT(file_info.compressed_size, 0); + + { + auto in = reader->readFile("a.txt", /*throw_on_not_found=*/true); + for(int i = 0; i < times; i++) + { + ASSERT_TRUE(checkString(String(contents), *in)); + } + } + + { + /// Use an enumerator. + auto enumerator = reader->firstFile(); + ASSERT_NE(enumerator, nullptr); + EXPECT_EQ(enumerator->getFileName(), "a.txt"); + EXPECT_EQ(enumerator->getFileInfo().uncompressed_size, contents.size() * times); + EXPECT_GT(enumerator->getFileInfo().compressed_size, 0); + EXPECT_FALSE(enumerator->nextFile()); + } +} + TEST(TarArchiveReaderTest, FileExists) { String archive_path = "archive.tar"; String filename = "file.txt"; @@ -508,7 +567,8 @@ namespace { const char * supported_archive_file_exts[] = { - ".zip" + ".zip", + ".tar" }; } diff --git a/tests/integration/test_backup_restore_new/test.py b/tests/integration/test_backup_restore_new/test.py index 06560bda2b6..ba69f7bda0d 100644 --- a/tests/integration/test_backup_restore_new/test.py +++ b/tests/integration/test_backup_restore_new/test.py @@ -591,6 +591,57 @@ def test_zip_archive_with_bad_compression_method(): ) +def test_tar_archive(): + backup_name = f"Disk('backups', 'archive.tar')" + create_and_fill_table() + + assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n" + instance.query(f"BACKUP TABLE test.table TO {backup_name}") + + assert os.path.isfile(get_path_to_backup(backup_name)) + + instance.query("DROP TABLE test.table") + assert instance.query("EXISTS test.table") == "0\n" + + instance.query(f"RESTORE TABLE test.table FROM {backup_name}") + assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n" + + +def test_tar_archive_with_password(): + backup_name = f"Disk('backups', 'archive_with_password.tar')" + create_and_fill_table() + + assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n" + + expected_error = "Setting a password is not currently supported for tar archives" + assert expected_error in instance.query_and_get_error( + f"BACKUP TABLE test.table TO {backup_name} SETTINGS id='tar_archive_with_password', password='password123'" + ) + assert ( + instance.query( + "SELECT status FROM system.backups WHERE id='tar_archive_with_password'" + ) + == "BACKUP_FAILED\n" + ) + + +def test_tar_archive_with_bad_compression_method(): + backup_name = f"Disk('backups', 'archive_with_bad_compression_method.tar')" + create_and_fill_table() + + assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n" + + expected_error = "Tar archives are currenly supported without compression" + assert expected_error in instance.query_and_get_error( + f"BACKUP TABLE test.table TO {backup_name} SETTINGS id='tar_archive_with_bad_compression_method', compression_method='foobar'" + ) + assert ( + instance.query( + "SELECT status FROM system.backups WHERE id='tar_archive_with_bad_compression_method'" + ) + == "BACKUP_FAILED\n" + ) + def test_async(): create_and_fill_table() assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n" diff --git a/tests/integration/test_backup_restore_s3/test.py b/tests/integration/test_backup_restore_s3/test.py index 634f14621c0..f30db01550f 100644 --- a/tests/integration/test_backup_restore_s3/test.py +++ b/tests/integration/test_backup_restore_s3/test.py @@ -453,6 +453,11 @@ def test_backup_to_zip(): backup_destination = f"S3('http://minio1:9001/root/data/backups/{backup_name}.zip', 'minio', 'minio123')" check_backup_and_restore(storage_policy, backup_destination) +def test_backup_to_tar(): + storage_policy = "default" + backup_name = new_backup_name() + backup_destination = f"S3('http://minio1:9001/root/data/backups/{backup_name}.tar', 'minio', 'minio123')" + check_backup_and_restore(storage_policy, backup_destination) def test_user_specific_auth(start_cluster): def create_user(user):