Add support for reading and writing backups as a

tar archive using libarchive.
This commit is contained in:
Joshua Hildred 2024-01-19 07:05:36 -08:00
parent fb099bbd62
commit eb4ec0e871
14 changed files with 651 additions and 75 deletions

View File

@ -927,7 +927,7 @@ void BackupImpl::writeFile(const BackupFileInfo & info, BackupEntryPtr entry)
const auto write_info_to_archive = [&](const auto & file_name) const auto write_info_to_archive = [&](const auto & file_name)
{ {
auto out = archive_writer->writeFile(file_name); auto out = archive_writer->writeFile(file_name, info.size);
auto read_buffer = entry->getReadBuffer(writer->getReadSettings()); auto read_buffer = entry->getReadBuffer(writer->getReadSettings());
if (info.base_size != 0) if (info.base_size != 0)
read_buffer->seek(info.base_size, SEEK_SET); read_buffer->seek(info.base_size, SEEK_SET);

View File

@ -22,6 +22,9 @@ public:
/// of the function `writeFile()` should be destroyed before next call of `writeFile()`. /// of the function `writeFile()` should be destroyed before next call of `writeFile()`.
virtual std::unique_ptr<WriteBufferFromFileBase> writeFile(const String & filename) = 0; virtual std::unique_ptr<WriteBufferFromFileBase> writeFile(const String & filename) = 0;
virtual std::unique_ptr<WriteBufferFromFileBase> writeFile(const String & filename, const size_t & size) = 0;
/// Returns true if there is an active instance of WriteBuffer returned by writeFile(). /// Returns true if there is an active instance of WriteBuffer returned by writeFile().
/// This function should be used mostly for debugging purposes. /// This function should be used mostly for debugging purposes.
virtual bool isWritingFile() const = 0; virtual bool isWritingFile() const = 0;

View File

@ -5,7 +5,6 @@
#include <IO/Archives/ArchiveUtils.h> #include <IO/Archives/ArchiveUtils.h>
#include <mutex>
namespace DB namespace DB
{ {
@ -14,35 +13,60 @@ namespace DB
namespace ErrorCodes namespace ErrorCodes
{ {
extern const int CANNOT_UNPACK_ARCHIVE; extern const int CANNOT_UNPACK_ARCHIVE;
extern const int LOGICAL_ERROR; extern const int LOGICAL_ERROR;
extern const int CANNOT_READ_ALL_DATA; extern const int CANNOT_READ_ALL_DATA;
extern const int UNSUPPORTED_METHOD; extern const int UNSUPPORTED_METHOD;
} }
class LibArchiveReader::StreamInfo
{
public:
explicit StreamInfo(std::unique_ptr<SeekableReadBuffer> read_buffer_)
: read_buffer(std::move(read_buffer_))
{
}
static ssize_t read([[maybe_unused]] struct archive * a, void * client_data, const void ** buff)
{
auto * read_stream = reinterpret_cast<StreamInfo *>(client_data);
*buff = reinterpret_cast<void *>(read_stream->buf);
return read_stream->read_buffer->read(read_stream->buf, DBMS_DEFAULT_BUFFER_SIZE);
}
std::unique_ptr<SeekableReadBuffer> read_buffer;
char buf[DBMS_DEFAULT_BUFFER_SIZE];
};
class LibArchiveReader::Handle class LibArchiveReader::Handle
{ {
public: public:
explicit Handle(std::string path_to_archive_, bool lock_on_reading_) explicit Handle(std::string path_to_archive_, bool lock_on_reading_)
: path_to_archive(path_to_archive_), lock_on_reading(lock_on_reading_) : path_to_archive(path_to_archive_), lock_on_reading(lock_on_reading_)
{ {
current_archive = open(path_to_archive); current_archive = openWithPath(path_to_archive);
}
explicit Handle(std::string path_to_archive_, bool lock_on_reading_, const ReadArchiveFunction & archive_read_function_)
: path_to_archive(path_to_archive_), archive_read_function(archive_read_function_), lock_on_reading(lock_on_reading_)
{
read_stream = std::make_unique<StreamInfo>(archive_read_function());
current_archive = openWithReader(&(*read_stream));
} }
Handle(const Handle &) = delete; Handle(const Handle &) = delete;
Handle(Handle && other) noexcept Handle(Handle && other) noexcept
: current_archive(other.current_archive) : current_archive(other.current_archive)
, current_entry(other.current_entry) , current_entry(other.current_entry)
, read_stream(std::move(other.read_stream))
, archive_read_function(std::move(other.archive_read_function))
, lock_on_reading(other.lock_on_reading) , lock_on_reading(other.lock_on_reading)
{ {
other.current_archive = nullptr; other.current_archive = nullptr;
other.current_entry = nullptr; other.current_entry = nullptr;
} }
~Handle() ~Handle() { close(current_archive); }
{
close(current_archive);
}
bool locateFile(const std::string & filename) bool locateFile(const std::string & filename)
{ {
@ -64,10 +88,14 @@ public:
break; break;
if (filter(archive_entry_pathname(current_entry))) if (filter(archive_entry_pathname(current_entry)))
{
valid = true;
return true; return true;
}
} }
checkError(err); checkError(err);
valid = false;
return false; return false;
} }
@ -81,12 +109,24 @@ public:
} while (err == ARCHIVE_RETRY); } while (err == ARCHIVE_RETRY);
checkError(err); checkError(err);
return err == ARCHIVE_OK; valid = err == ARCHIVE_OK;
return valid;
} }
std::vector<std::string> getAllFiles(NameFilter filter) std::vector<std::string> getAllFiles(NameFilter filter)
{ {
auto * archive = open(path_to_archive); struct archive * archive;
std::unique_ptr<LibArchiveReader::StreamInfo> rs;
if(archive_read_function)
{
read_stream = std::make_unique<StreamInfo>(archive_read_function());
archive = openWithReader(&(*rs));
}
else
{
archive = openWithPath(path_to_archive);
}
SCOPE_EXIT( SCOPE_EXIT(
close(archive); close(archive);
); );
@ -94,7 +134,7 @@ public:
struct archive_entry * entry = nullptr; struct archive_entry * entry = nullptr;
std::vector<std::string> files; std::vector<std::string> files;
int error = readNextHeader(archive, &entry); int error = readNextHeader(current_archive, &entry);
while (error == ARCHIVE_OK || error == ARCHIVE_RETRY) while (error == ARCHIVE_OK || error == ARCHIVE_RETRY)
{ {
chassert(entry != nullptr); chassert(entry != nullptr);
@ -102,7 +142,7 @@ public:
if (!filter || filter(name)) if (!filter || filter(name))
files.push_back(std::move(name)); files.push_back(std::move(name));
error = readNextHeader(archive, &entry); error = readNextHeader(current_archive, &entry);
} }
checkError(error); checkError(error);
@ -112,6 +152,8 @@ public:
const String & getFileName() const const String & getFileName() const
{ {
chassert(current_entry); chassert(current_entry);
if (!valid)
throw Exception(ErrorCodes::CANNOT_UNPACK_ARCHIVE, "No current file");
if (!file_name) if (!file_name)
file_name.emplace(archive_entry_pathname(current_entry)); file_name.emplace(archive_entry_pathname(current_entry));
@ -121,6 +163,8 @@ public:
const FileInfo & getFileInfo() const const FileInfo & getFileInfo() const
{ {
chassert(current_entry); chassert(current_entry);
if (!valid)
throw Exception(ErrorCodes::CANNOT_UNPACK_ARCHIVE, "No current file");
if (!file_info) if (!file_info)
{ {
file_info.emplace(); file_info.emplace();
@ -134,6 +178,8 @@ public:
struct archive * current_archive; struct archive * current_archive;
struct archive_entry * current_entry = nullptr; struct archive_entry * current_entry = nullptr;
bool valid = true;
private: private:
void checkError(int error) const void checkError(int error) const
{ {
@ -147,7 +193,16 @@ private:
file_info.reset(); file_info.reset();
} }
static struct archive * open(const String & path_to_archive) static struct archive * openWithReader(StreamInfo * read_stream_)
{
auto * a = archive_read_new();
archive_read_support_filter_all(a);
archive_read_support_format_all(a);
archive_read_open(a, read_stream_, nullptr, StreamInfo::read, nullptr);
return a;
}
static struct archive * openWithPath(const String & path_to_archive)
{ {
auto * archive = archive_read_new(); auto * archive = archive_read_new();
try try
@ -194,6 +249,8 @@ private:
} }
const String path_to_archive; const String path_to_archive;
std::unique_ptr<StreamInfo> read_stream = nullptr;
const IArchiveReader::ReadArchiveFunction archive_read_function;
/// for some archive types when we are reading headers static variables are used /// for some archive types when we are reading headers static variables are used
/// which are not thread-safe /// which are not thread-safe
@ -207,7 +264,7 @@ private:
class LibArchiveReader::FileEnumeratorImpl : public FileEnumerator class LibArchiveReader::FileEnumeratorImpl : public FileEnumerator
{ {
public: public:
explicit FileEnumeratorImpl(Handle handle_) : handle(std::move(handle_)) {} explicit FileEnumeratorImpl(Handle handle_) : handle(std::move(handle_)) { }
const String & getFileName() const override { return handle.getFileName(); } const String & getFileName() const override { return handle.getFileName(); }
const FileInfo & getFileInfo() const override { return handle.getFileInfo(); } const FileInfo & getFileInfo() const override { return handle.getFileInfo(); }
@ -215,6 +272,7 @@ public:
/// Releases owned handle to pass it to a read buffer. /// Releases owned handle to pass it to a read buffer.
Handle releaseHandle() && { return std::move(handle); } Handle releaseHandle() && { return std::move(handle); }
private: private:
Handle handle; Handle handle;
}; };
@ -226,13 +284,13 @@ public:
: ReadBufferFromFileBase(DBMS_DEFAULT_BUFFER_SIZE, nullptr, 0) : ReadBufferFromFileBase(DBMS_DEFAULT_BUFFER_SIZE, nullptr, 0)
, handle(std::move(handle_)) , handle(std::move(handle_))
, path_to_archive(std::move(path_to_archive_)) , path_to_archive(std::move(path_to_archive_))
{} {
}
off_t seek(off_t /* off */, int /* whence */) override off_t seek(off_t /* off */, int /* whence */) override
{ {
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Seek is not supported when reading from archive"); throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Seek is not supported when reading from archive");
} }
bool checkIfActuallySeekable() override { return false; } bool checkIfActuallySeekable() override { return false; }
off_t getPosition() override off_t getPosition() override
@ -240,14 +298,13 @@ public:
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "getPosition not supported when reading from archive"); throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "getPosition not supported when reading from archive");
} }
off_t getPosition() override { throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "getPosition not supported when reading from archive"); }
String getFileName() const override { return handle.getFileName(); } String getFileName() const override { return handle.getFileName(); }
size_t getFileSize() override { return handle.getFileInfo().uncompressed_size; } size_t getFileSize() override { return handle.getFileInfo().uncompressed_size; }
Handle releaseHandle() && Handle releaseHandle() && { return std::move(handle); }
{
return std::move(handle);
}
private: private:
bool nextImpl() override bool nextImpl() override
@ -274,7 +331,17 @@ private:
LibArchiveReader::LibArchiveReader(std::string archive_name_, bool lock_on_reading_, std::string path_to_archive_) LibArchiveReader::LibArchiveReader(std::string archive_name_, bool lock_on_reading_, std::string path_to_archive_)
: archive_name(std::move(archive_name_)), lock_on_reading(lock_on_reading_), path_to_archive(std::move(path_to_archive_)) : archive_name(std::move(archive_name_)), lock_on_reading(lock_on_reading_), path_to_archive(std::move(path_to_archive_))
{} {
}
LibArchiveReader::LibArchiveReader(
std::string archive_name_, bool lock_on_reading_, std::string path_to_archive_, const ReadArchiveFunction & archive_read_function_)
: archive_name(std::move(archive_name_))
, lock_on_reading(lock_on_reading_)
, path_to_archive(std::move(path_to_archive_))
, archive_read_function(archive_read_function_)
{
}
LibArchiveReader::~LibArchiveReader() = default; LibArchiveReader::~LibArchiveReader() = default;
@ -285,21 +352,21 @@ const std::string & LibArchiveReader::getPath() const
bool LibArchiveReader::fileExists(const String & filename) bool LibArchiveReader::fileExists(const String & filename)
{ {
Handle handle(path_to_archive, lock_on_reading); Handle handle = acquireHandle();
return handle.locateFile(filename); return handle.locateFile(filename);
} }
LibArchiveReader::FileInfo LibArchiveReader::getFileInfo(const String & filename) LibArchiveReader::FileInfo LibArchiveReader::getFileInfo(const String & filename)
{ {
Handle handle(path_to_archive, lock_on_reading); Handle handle = acquireHandle();
if (!handle.locateFile(filename)) if (!handle.locateFile(filename))
throw Exception(ErrorCodes::CANNOT_UNPACK_ARCHIVE, "Couldn't unpack archive {}: file not found", path_to_archive); throw Exception(ErrorCodes::CANNOT_UNPACK_ARCHIVE, "Couldn't unpack archive {}: File {} was not found in archive", path_to_archive, quoteString(filename));
return handle.getFileInfo(); return handle.getFileInfo();
} }
std::unique_ptr<LibArchiveReader::FileEnumerator> LibArchiveReader::firstFile() std::unique_ptr<LibArchiveReader::FileEnumerator> LibArchiveReader::firstFile()
{ {
Handle handle(path_to_archive, lock_on_reading); Handle handle = acquireHandle();
if (!handle.nextFile()) if (!handle.nextFile())
return nullptr; return nullptr;
@ -308,17 +375,25 @@ std::unique_ptr<LibArchiveReader::FileEnumerator> LibArchiveReader::firstFile()
std::unique_ptr<ReadBufferFromFileBase> LibArchiveReader::readFile(const String & filename, bool throw_on_not_found) std::unique_ptr<ReadBufferFromFileBase> LibArchiveReader::readFile(const String & filename, bool throw_on_not_found)
{ {
return readFile([&](const std::string & file) { return file == filename; }, throw_on_not_found); Handle handle = acquireHandle();
if (!handle.locateFile(filename))
{
if (throw_on_not_found)
throw Exception(
ErrorCodes::CANNOT_UNPACK_ARCHIVE, "Couldn't unpack archive {}: File {} was not found in archive", path_to_archive, quoteString(filename));
return nullptr;
}
return std::make_unique<ReadBufferFromLibArchive>(std::move(handle), path_to_archive);
} }
std::unique_ptr<ReadBufferFromFileBase> LibArchiveReader::readFile(NameFilter filter, bool throw_on_not_found) std::unique_ptr<ReadBufferFromFileBase> LibArchiveReader::readFile(NameFilter filter, bool throw_on_not_found)
{ {
Handle handle(path_to_archive, lock_on_reading); Handle handle = acquireHandle();
if (!handle.locateFile(filter)) if (!handle.locateFile(filter))
{ {
if (throw_on_not_found) if (throw_on_not_found)
throw Exception( throw Exception(
ErrorCodes::CANNOT_UNPACK_ARCHIVE, "Couldn't unpack archive {}: no file found satisfying the filter", path_to_archive); ErrorCodes::CANNOT_UNPACK_ARCHIVE, "Couldn't unpack archive {}: No file satisfying filter in archive", path_to_archive);
return nullptr; return nullptr;
} }
return std::make_unique<ReadBufferFromLibArchive>(std::move(handle), path_to_archive); return std::make_unique<ReadBufferFromLibArchive>(std::move(handle), path_to_archive);
@ -337,7 +412,8 @@ std::unique_ptr<LibArchiveReader::FileEnumerator> LibArchiveReader::nextFile(std
{ {
if (!dynamic_cast<ReadBufferFromLibArchive *>(read_buffer.get())) if (!dynamic_cast<ReadBufferFromLibArchive *>(read_buffer.get()))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Wrong ReadBuffer passed to nextFile()"); throw Exception(ErrorCodes::LOGICAL_ERROR, "Wrong ReadBuffer passed to nextFile()");
auto read_buffer_from_libarchive = std::unique_ptr<ReadBufferFromLibArchive>(static_cast<ReadBufferFromLibArchive *>(read_buffer.release())); auto read_buffer_from_libarchive
= std::unique_ptr<ReadBufferFromLibArchive>(static_cast<ReadBufferFromLibArchive *>(read_buffer.release()));
auto handle = std::move(*read_buffer_from_libarchive).releaseHandle(); auto handle = std::move(*read_buffer_from_libarchive).releaseHandle();
if (!handle.nextFile()) if (!handle.nextFile())
return nullptr; return nullptr;
@ -360,13 +436,23 @@ std::vector<std::string> LibArchiveReader::getAllFiles()
std::vector<std::string> LibArchiveReader::getAllFiles(NameFilter filter) std::vector<std::string> LibArchiveReader::getAllFiles(NameFilter filter)
{ {
Handle handle(path_to_archive, lock_on_reading); Handle handle = acquireHandle();
return handle.getAllFiles(filter); return handle.getAllFiles(filter);
} }
void LibArchiveReader::setPassword(const String & /*password_*/) void LibArchiveReader::setPassword([[maybe_unused]] const String & password_)
{ {
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can not set password to {} archive", archive_name); if (password_ != "")
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can not set password to {} archive", archive_name);
}
LibArchiveReader::Handle LibArchiveReader::acquireHandle()
{
std::lock_guard lock{mutex};
if (archive_read_function)
return Handle{path_to_archive, lock_on_reading, archive_read_function};
else
return Handle{path_to_archive, lock_on_reading};
} }
#endif #endif

View File

@ -1,7 +1,7 @@
#pragma once #pragma once
#include "config.h" #include "config.h"
#include <mutex>
#include <IO/Archives/IArchiveReader.h> #include <IO/Archives/IArchiveReader.h>
@ -52,20 +52,31 @@ protected:
/// Constructs an archive's reader that will read from a file in the local filesystem. /// Constructs an archive's reader that will read from a file in the local filesystem.
LibArchiveReader(std::string archive_name_, bool lock_on_reading_, std::string path_to_archive_); LibArchiveReader(std::string archive_name_, bool lock_on_reading_, std::string path_to_archive_);
LibArchiveReader(std::string archive_name_, bool lock_on_reading_, std::string path_to_archive_, const ReadArchiveFunction & archive_read_function_);
private: private:
class ReadBufferFromLibArchive; class ReadBufferFromLibArchive;
class Handle; class Handle;
class FileEnumeratorImpl; class FileEnumeratorImpl;
class StreamInfo;
Handle acquireHandle();
const std::string archive_name; const std::string archive_name;
const bool lock_on_reading; const bool lock_on_reading;
const String path_to_archive; const String path_to_archive;
const ReadArchiveFunction archive_read_function;
mutable std::mutex mutex;
}; };
class TarArchiveReader : public LibArchiveReader class TarArchiveReader : public LibArchiveReader
{ {
public: public:
explicit TarArchiveReader(std::string path_to_archive) : LibArchiveReader("tar", /*lock_on_reading_=*/ true, std::move(path_to_archive)) { } explicit TarArchiveReader(std::string path_to_archive) : LibArchiveReader("tar", /*lock_on_reading_=*/ true, std::move(path_to_archive)) { }
explicit TarArchiveReader(std::string path_to_archive, const ReadArchiveFunction & archive_read_function ) : LibArchiveReader("tar", /*lock_on_reading_=*/ true, std::move(path_to_archive), archive_read_function) { }
}; };
class SevenZipArchiveReader : public LibArchiveReader class SevenZipArchiveReader : public LibArchiveReader

View File

@ -0,0 +1,267 @@
#include <IO/Archives/LibArchiveWriter.h>
#include <filesystem>
#include <IO/WriteBufferFromFileBase.h>
#include <Common/quoteString.h>
#include <Common/scope_guard_safe.h>
#include <mutex>
#if USE_LIBARCHIVE
// this implemation follows the ZipArchiveWriter implemation as closely as possible.
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_PACK_ARCHIVE;
extern const int LOGICAL_ERROR;
extern const int CANNOT_READ_ALL_DATA;
extern const int UNSUPPORTED_METHOD;
extern const int NOT_IMPLEMENTED;
}
namespace
{
void checkResultCodeImpl(int code, const String & filename)
{
if (code == ARCHIVE_OK)
return;
String message = "LibArchive Code = " + std::to_string(code);
throw Exception(ErrorCodes::CANNOT_PACK_ARCHIVE, "Couldn't pack archive: {}, filename={}", message, quoteString(filename));
}
}
// this is a thin wrapper for libarchive to be able to write the archive to a WriteBuffer
class LibArchiveWriter::StreamInfo
{
public:
explicit StreamInfo(std::unique_ptr<WriteBuffer> archive_write_buffer_) : archive_write_buffer(std::move(archive_write_buffer_)) { }
static ssize_t memory_write([[maybe_unused]] struct archive * a, void * client_data, const void * buff, size_t length)
{
auto * stream_info = reinterpret_cast<StreamInfo *>(client_data);
stream_info->archive_write_buffer->write(reinterpret_cast<const char *>(buff), length);
return length;
}
std::unique_ptr<WriteBuffer> archive_write_buffer;
};
class LibArchiveWriter::WriteBufferFromLibArchive : public WriteBufferFromFileBase
{
public:
WriteBufferFromLibArchive(std::shared_ptr<LibArchiveWriter> archive_writer_, const String & filename_, const size_t & size_)
: WriteBufferFromFileBase(DBMS_DEFAULT_BUFFER_SIZE, nullptr, 0), archive_writer(archive_writer_), filename(filename_), size(size_)
{
startWritingFile();
a = archive_writer_->getArchive();
entry = nullptr;
}
~WriteBufferFromLibArchive() override
{
try
{
closeFile(/* throw_if_error= */ false);
endWritingFile();
}
catch (...)
{
tryLogCurrentException("WriteBufferFromTarArchive");
}
}
void finalizeImpl() override
{
next();
closeFile(/* throw_if_error=*/true);
endWritingFile();
}
void sync() override { next(); }
std::string getFileName() const override { return filename; }
private:
void nextImpl() override
{
if (!offset())
return;
if (entry == nullptr)
writeEntry();
ssize_t to_write = offset();
ssize_t written = archive_write_data(a, working_buffer.begin(), offset());
if (written != to_write)
{
throw Exception(
ErrorCodes::CANNOT_PACK_ARCHIVE,
"Couldn't pack tar archive: Failed to write all bytes, {} of {} , filename={}",
written,
to_write,
quoteString(filename));
}
}
void writeEntry()
{
expected_size = getSize();
entry = archive_entry_new();
archive_entry_set_pathname(entry, filename.c_str());
archive_entry_set_size(entry, expected_size);
archive_entry_set_filetype(entry, static_cast<__LA_MODE_T>(0100000));
archive_entry_set_perm(entry, 0644);
checkResult(archive_write_header(a, entry));
}
size_t getSize() const
{
if (size)
return size;
else
return offset();
}
void closeFile([[maybe_unused]] bool throw_if_error)
{
if (entry)
{
archive_entry_free(entry);
entry = nullptr;
}
if (throw_if_error and bytes != expected_size)
{
throw Exception(ErrorCodes::CANNOT_PACK_ARCHIVE, "Couldn't pack tar archive: Wrote {} of expected {} , filename={}", bytes, expected_size, quoteString(filename));
}
}
void endWritingFile()
{
if (auto archive_writer_ptr = archive_writer.lock())
archive_writer_ptr->endWritingFile();
}
void startWritingFile()
{
if (auto archive_writer_ptr = archive_writer.lock())
archive_writer_ptr->startWritingFile();
}
void checkResult(int code) { checkResultCodeImpl(code, filename); }
std::weak_ptr<LibArchiveWriter> archive_writer;
const String filename;
struct archive_entry * entry;
struct archive * a;
size_t size;
size_t expected_size;
};
LibArchiveWriter::LibArchiveWriter(const String & path_to_archive_) : path_to_archive(path_to_archive_)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Not Implemented");
}
LibArchiveWriter::LibArchiveWriter(const String & path_to_archive_, std::unique_ptr<WriteBuffer> archive_write_buffer_)
: path_to_archive(path_to_archive_)
{
a = archive_write_new();
archive_write_set_format_pax_restricted(a);
//this allows use to write directly to a writer buffer rather than an intermediate buffer in LibArchive
//archive_write_set_bytes_per_block(a, 0);
if (archive_write_buffer_)
{
stream_info = std::make_unique<StreamInfo>(std::move(archive_write_buffer_));
archive_write_open2(a, &(*stream_info), nullptr, &StreamInfo::memory_write, nullptr, nullptr);
}
else
{
archive_write_open_filename(a, path_to_archive.c_str());
}
}
LibArchiveWriter::~LibArchiveWriter()
{
if (!finalized)
{
if (!std::uncaught_exceptions() && std::current_exception() == nullptr)
chassert(false && "TarArchiveWriter is not finalized in destructor.");
}
if (a)
archive_write_free(a);
}
std::unique_ptr<WriteBufferFromFileBase> LibArchiveWriter::writeFile(const String & filename, const size_t & size)
{
return std::make_unique<WriteBufferFromLibArchive>(std::static_pointer_cast<LibArchiveWriter>(shared_from_this()), filename, size);
}
std::unique_ptr<WriteBufferFromFileBase> LibArchiveWriter::writeFile(const String & filename)
{
return std::make_unique<WriteBufferFromLibArchive>(std::static_pointer_cast<LibArchiveWriter>(shared_from_this()), filename, 0);
}
bool LibArchiveWriter::isWritingFile() const
{
std::lock_guard lock{mutex};
return is_writing_file;
}
void LibArchiveWriter::endWritingFile()
{
std::lock_guard lock{mutex};
is_writing_file = false;
}
void LibArchiveWriter::startWritingFile()
{
std::lock_guard lock{mutex};
if (is_writing_file)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Cannot write two files to a tar archive in parallel");
is_writing_file = true;
}
void LibArchiveWriter::finalize()
{
std::lock_guard lock{mutex};
if (finalized)
return;
if (a)
archive_write_close(a);
if (stream_info)
{
stream_info->archive_write_buffer->finalize();
stream_info.reset();
}
finalized = true;
}
void LibArchiveWriter::setCompression(const String & compression_method_, int compression_level)
{
if (compression_method_.size() == 0 and compression_level == 1)
{
return;
}
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Tar archives are currenly supported without compression");
}
void LibArchiveWriter::setPassword([[maybe_unused]] const String & password_)
{
if (password_ == "")
return;
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Setting a password is not currently supported for tar archives");
}
struct archive * LibArchiveWriter::getArchive()
{
std::lock_guard lock{mutex};
return a;
}
}
#endif

View File

@ -0,0 +1,76 @@
#pragma once
#include "config.h"
#if USE_LIBARCHIVE
# include <IO/Archives/ArchiveUtils.h>
# include <IO/Archives/IArchiveWriter.h>
# include <IO/WriteBufferFromFileBase.h>
# include <base/defines.h>
namespace DB
{
class WriteBufferFromFileBase;
/// Interface for writing an archive.
class LibArchiveWriter : public IArchiveWriter
{
public:
/// Constructs an archive that will be written as a file in the local filesystem.
[[noreturn]] explicit LibArchiveWriter(const String & path_to_archive_);
/// Constructs an archive that will be written as a file in the local filesystem.
explicit LibArchiveWriter(const String & path_to_archive_, std::unique_ptr<WriteBuffer> archive_write_buffer_);
/// Call finalize() before destructing IArchiveWriter.
~LibArchiveWriter() override;
/// Starts writing a file to the archive. The function returns a write buffer,
/// any data written to that buffer will be compressed and then put to the archive.
/// You can keep only one such buffer at a time, a buffer returned by previous call
/// of the function `writeFile()` should be destroyed before next call of `writeFile()`.
std::unique_ptr<WriteBufferFromFileBase> writeFile(const String & filename) override;
/// LibArchive needs to know the size of the file being written. If the file size is not
/// passed in the the archive writer tries to infer the size by looking at the available
/// data in the buffer, if next is called before all data is written to the buffer
/// an exception is thrown.
std::unique_ptr<WriteBufferFromFileBase> writeFile(const String & filename, const size_t & size) override;
/// Returns true if there is an active instance of WriteBuffer returned by writeFile().
/// This function should be used mostly for debugging purposes.
bool isWritingFile() const override;
/// Finalizes writing of the archive. This function must be always called at the end of writing.
/// (Unless an error appeared and the archive is in fact no longer needed.)
void finalize() override;
static constexpr const int kDefaultCompressionLevel = -1;
/// Sets compression method and level.
/// Changing them will affect next file in the archive.
void setCompression(const String & /* compression_method */, int /* compression_level */ = kDefaultCompressionLevel) override;
/// Sets password. If the password is not empty it will enable encryption in the archive.
void setPassword(const String & /* password */) override;
private:
class WriteBufferFromLibArchive;
class StreamInfo;
struct archive * getArchive();
void startWritingFile();
void endWritingFile();
String path_to_archive;
std::unique_ptr<StreamInfo> stream_info TSA_GUARDED_BY(mutex) = nullptr;
struct archive * a TSA_GUARDED_BY(mutex) = nullptr;
bool is_writing_file TSA_GUARDED_BY(mutex) = false;
bool finalized TSA_GUARDED_BY(mutex) = false;
mutable std::mutex mutex;
};
}
#endif

View File

@ -274,6 +274,11 @@ std::unique_ptr<WriteBufferFromFileBase> ZipArchiveWriter::writeFile(const Strin
return std::make_unique<WriteBufferFromZipArchive>(std::static_pointer_cast<ZipArchiveWriter>(shared_from_this()), filename); return std::make_unique<WriteBufferFromZipArchive>(std::static_pointer_cast<ZipArchiveWriter>(shared_from_this()), filename);
} }
std::unique_ptr<WriteBufferFromFileBase> ZipArchiveWriter::writeFile(const String & filename, [[maybe_unused]] const size_t & size)
{
return ZipArchiveWriter::writeFile(filename);
}
bool ZipArchiveWriter::isWritingFile() const bool ZipArchiveWriter::isWritingFile() const
{ {
std::lock_guard lock{mutex}; std::lock_guard lock{mutex};

View File

@ -32,6 +32,9 @@ public:
/// of the function `writeFile()` should be destroyed before next call of `writeFile()`. /// of the function `writeFile()` should be destroyed before next call of `writeFile()`.
std::unique_ptr<WriteBufferFromFileBase> writeFile(const String & filename) override; std::unique_ptr<WriteBufferFromFileBase> writeFile(const String & filename) override;
std::unique_ptr<WriteBufferFromFileBase> writeFile(const String & filename, const size_t & size) override;
/// Returns true if there is an active instance of WriteBuffer returned by writeFile(). /// Returns true if there is an active instance of WriteBuffer returned by writeFile().
/// This function should be used mostly for debugging purposes. /// This function should be used mostly for debugging purposes.
bool isWritingFile() const override; bool isWritingFile() const override;

View File

@ -48,7 +48,7 @@ std::shared_ptr<IArchiveReader> createArchiveReader(
tar_extensions.begin(), tar_extensions.end(), [&](const auto extension) { return path_to_archive.ends_with(extension); })) tar_extensions.begin(), tar_extensions.end(), [&](const auto extension) { return path_to_archive.ends_with(extension); }))
{ {
#if USE_LIBARCHIVE #if USE_LIBARCHIVE
return std::make_shared<TarArchiveReader>(path_to_archive); return std::make_shared<TarArchiveReader>(path_to_archive, archive_read_function);
#else #else
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "libarchive library is disabled"); throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "libarchive library is disabled");
#endif #endif

View File

@ -1,5 +1,6 @@
#include <IO/Archives/createArchiveWriter.h> #include <IO/Archives/LibArchiveWriter.h>
#include <IO/Archives/ZipArchiveWriter.h> #include <IO/Archives/ZipArchiveWriter.h>
#include <IO/Archives/createArchiveWriter.h>
#include <IO/WriteBuffer.h> #include <IO/WriteBuffer.h>
#include <Common/Exception.h> #include <Common/Exception.h>
@ -8,8 +9,8 @@ namespace DB
{ {
namespace ErrorCodes namespace ErrorCodes
{ {
extern const int CANNOT_PACK_ARCHIVE; extern const int CANNOT_PACK_ARCHIVE;
extern const int SUPPORT_IS_DISABLED; extern const int SUPPORT_IS_DISABLED;
} }
@ -19,9 +20,8 @@ std::shared_ptr<IArchiveWriter> createArchiveWriter(const String & path_to_archi
} }
std::shared_ptr<IArchiveWriter> createArchiveWriter( std::shared_ptr<IArchiveWriter>
const String & path_to_archive, createArchiveWriter(const String & path_to_archive, [[maybe_unused]] std::unique_ptr<WriteBuffer> archive_write_buffer)
[[maybe_unused]] std::unique_ptr<WriteBuffer> archive_write_buffer)
{ {
if (path_to_archive.ends_with(".zip") || path_to_archive.ends_with(".zipx")) if (path_to_archive.ends_with(".zip") || path_to_archive.ends_with(".zipx"))
{ {
@ -29,6 +29,15 @@ std::shared_ptr<IArchiveWriter> createArchiveWriter(
return std::make_shared<ZipArchiveWriter>(path_to_archive, std::move(archive_write_buffer)); return std::make_shared<ZipArchiveWriter>(path_to_archive, std::move(archive_write_buffer));
#else #else
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "minizip library is disabled"); throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "minizip library is disabled");
#endif
}
//todo add support for extentsions i.e .gz
else if (path_to_archive.ends_with(".tar"))
{
#if USE_LIBARCHIVE
return std::make_shared<LibArchiveWriter>(path_to_archive, std::move(archive_write_buffer));
#else
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "libarchive library is disabled");
#endif #endif
} }
else else

View File

@ -6,7 +6,7 @@ namespace DB
bool hasRegisteredArchiveFileExtension(const String & path) bool hasRegisteredArchiveFileExtension(const String & path)
{ {
return path.ends_with(".zip") || path.ends_with(".zipx"); return path.ends_with(".zip") || path.ends_with(".zipx") || path.ends_with(".tar");
} }
} }

View File

@ -7,9 +7,11 @@
#include <IO/Archives/createArchiveReader.h> #include <IO/Archives/createArchiveReader.h>
#include <IO/Archives/createArchiveWriter.h> #include <IO/Archives/createArchiveWriter.h>
#include <IO/ReadBufferFromFileBase.h> #include <IO/ReadBufferFromFileBase.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadBufferFromString.h> #include <IO/ReadBufferFromString.h>
#include <IO/ReadHelpers.h> #include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromFileBase.h> #include <IO/WriteBufferFromFileBase.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/WriteBufferFromString.h> #include <IO/WriteBufferFromString.h>
#include <IO/WriteHelpers.h> #include <IO/WriteHelpers.h>
#include <Common/Exception.h> #include <Common/Exception.h>
@ -335,41 +337,51 @@ TEST_P(ArchiveReaderAndWriterTest, InMemory)
TEST_P(ArchiveReaderAndWriterTest, Password) TEST_P(ArchiveReaderAndWriterTest, Password)
{ {
/// Make an archive. auto writer = createArchiveWriter(getPathToArchive());
std::string_view contents = "The contents of a.txt"; //don't support passwords for tar archives
if(getPathToArchive().ends_with(".tar"))
{ {
auto writer = createArchiveWriter(getPathToArchive()); expectException(ErrorCodes::NOT_IMPLEMENTED, "Setting a password is not currently supported for tar archives",
writer->setPassword("Qwe123"); [&]{ writer->setPassword("a.txt"); });
{
auto out = writer->writeFile("a.txt");
writeString(contents, *out);
out->finalize();
}
writer->finalize(); writer->finalize();
} }
else
/// Read the archive.
auto reader = createArchiveReader(getPathToArchive());
/// Try to read without a password.
expectException(ErrorCodes::CANNOT_UNPACK_ARCHIVE, "Password is required",
[&]{ reader->readFile("a.txt", /*throw_on_not_found=*/true); });
{ {
/// Try to read with a wrong password. /// Make an archive.
reader->setPassword("123Qwe"); std::string_view contents = "The contents of a.txt";
expectException(ErrorCodes::CANNOT_UNPACK_ARCHIVE, "Wrong password", {
writer->setPassword("Qwe123");
{
auto out = writer->writeFile("a.txt");
writeString(contents, *out);
out->finalize();
}
writer->finalize();
}
/// Read the archive.
auto reader = createArchiveReader(getPathToArchive());
/// Try to read without a password.
expectException(ErrorCodes::CANNOT_UNPACK_ARCHIVE, "Password is required",
[&]{ reader->readFile("a.txt", /*throw_on_not_found=*/true); }); [&]{ reader->readFile("a.txt", /*throw_on_not_found=*/true); });
}
{ {
/// Reading with the right password is successful. /// Try to read with a wrong password.
reader->setPassword("Qwe123"); reader->setPassword("123Qwe");
auto in = reader->readFile("a.txt", /*throw_on_not_found=*/true); expectException(ErrorCodes::CANNOT_UNPACK_ARCHIVE, "Wrong password",
String str; [&]{ reader->readFile("a.txt", /*throw_on_not_found=*/true); });
readStringUntilEOF(str, *in); }
EXPECT_EQ(str, contents);
{
/// Reading with the right password is successful.
reader->setPassword("Qwe123");
auto in = reader->readFile("a.txt", /*throw_on_not_found=*/true);
String str;
readStringUntilEOF(str, *in);
EXPECT_EQ(str, contents);
}
} }
} }
@ -380,6 +392,53 @@ TEST_P(ArchiveReaderAndWriterTest, ArchiveNotExist)
[&]{ createArchiveReader(getPathToArchive()); }); [&]{ createArchiveReader(getPathToArchive()); });
} }
TEST_P(ArchiveReaderAndWriterTest, LargeFile)
{
/// Make an archive.
std::string_view contents = "The contents of a.txt\n";
int times = 100000000;
{
auto writer = createArchiveWriter(getPathToArchive());
{
auto out = writer->writeFile("a.txt", times * contents.size());
for(int i = 0; i < times; i++)
{
writeString(contents, *out);
}
out->finalize();
}
writer->finalize();
}
/// Read the archive.
auto reader = createArchiveReader(getPathToArchive());
ASSERT_TRUE(reader->fileExists("a.txt"));
auto file_info = reader->getFileInfo("a.txt");
EXPECT_EQ(file_info.uncompressed_size, contents.size() * times);
EXPECT_GT(file_info.compressed_size, 0);
{
auto in = reader->readFile("a.txt", /*throw_on_not_found=*/true);
for(int i = 0; i < times; i++)
{
ASSERT_TRUE(checkString(String(contents), *in));
}
}
{
/// Use an enumerator.
auto enumerator = reader->firstFile();
ASSERT_NE(enumerator, nullptr);
EXPECT_EQ(enumerator->getFileName(), "a.txt");
EXPECT_EQ(enumerator->getFileInfo().uncompressed_size, contents.size() * times);
EXPECT_GT(enumerator->getFileInfo().compressed_size, 0);
EXPECT_FALSE(enumerator->nextFile());
}
}
TEST(TarArchiveReaderTest, FileExists) { TEST(TarArchiveReaderTest, FileExists) {
String archive_path = "archive.tar"; String archive_path = "archive.tar";
String filename = "file.txt"; String filename = "file.txt";
@ -508,7 +567,8 @@ namespace
{ {
const char * supported_archive_file_exts[] = const char * supported_archive_file_exts[] =
{ {
".zip" ".zip",
".tar"
}; };
} }

View File

@ -591,6 +591,57 @@ def test_zip_archive_with_bad_compression_method():
) )
def test_tar_archive():
backup_name = f"Disk('backups', 'archive.tar')"
create_and_fill_table()
assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n"
instance.query(f"BACKUP TABLE test.table TO {backup_name}")
assert os.path.isfile(get_path_to_backup(backup_name))
instance.query("DROP TABLE test.table")
assert instance.query("EXISTS test.table") == "0\n"
instance.query(f"RESTORE TABLE test.table FROM {backup_name}")
assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n"
def test_tar_archive_with_password():
backup_name = f"Disk('backups', 'archive_with_password.tar')"
create_and_fill_table()
assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n"
expected_error = "Setting a password is not currently supported for tar archives"
assert expected_error in instance.query_and_get_error(
f"BACKUP TABLE test.table TO {backup_name} SETTINGS id='tar_archive_with_password', password='password123'"
)
assert (
instance.query(
"SELECT status FROM system.backups WHERE id='tar_archive_with_password'"
)
== "BACKUP_FAILED\n"
)
def test_tar_archive_with_bad_compression_method():
backup_name = f"Disk('backups', 'archive_with_bad_compression_method.tar')"
create_and_fill_table()
assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n"
expected_error = "Tar archives are currenly supported without compression"
assert expected_error in instance.query_and_get_error(
f"BACKUP TABLE test.table TO {backup_name} SETTINGS id='tar_archive_with_bad_compression_method', compression_method='foobar'"
)
assert (
instance.query(
"SELECT status FROM system.backups WHERE id='tar_archive_with_bad_compression_method'"
)
== "BACKUP_FAILED\n"
)
def test_async(): def test_async():
create_and_fill_table() create_and_fill_table()
assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n" assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n"

View File

@ -453,6 +453,11 @@ def test_backup_to_zip():
backup_destination = f"S3('http://minio1:9001/root/data/backups/{backup_name}.zip', 'minio', 'minio123')" backup_destination = f"S3('http://minio1:9001/root/data/backups/{backup_name}.zip', 'minio', 'minio123')"
check_backup_and_restore(storage_policy, backup_destination) check_backup_and_restore(storage_policy, backup_destination)
def test_backup_to_tar():
storage_policy = "default"
backup_name = new_backup_name()
backup_destination = f"S3('http://minio1:9001/root/data/backups/{backup_name}.tar', 'minio', 'minio123')"
check_backup_and_restore(storage_policy, backup_destination)
def test_user_specific_auth(start_cluster): def test_user_specific_auth(start_cluster):
def create_user(user): def create_user(user):