Merge pull request #59535 from josh-hildred/tar_support

Add support for reading and writing backups as tar archives
This commit is contained in:
Antonio Andelic 2024-03-01 10:01:59 +01:00 committed by GitHub
commit 5104029f62
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 1052 additions and 145 deletions

View File

@ -157,7 +157,7 @@ if (TARGET ch_contrib::zlib)
endif()
if (TARGET ch_contrib::zstd)
target_compile_definitions(_libarchive PUBLIC HAVE_ZSTD_H=1 HAVE_LIBZSTD=1)
target_compile_definitions(_libarchive PUBLIC HAVE_ZSTD_H=1 HAVE_LIBZSTD=1 HAVE_LIBZSTD_COMPRESSOR=1)
target_link_libraries(_libarchive PRIVATE ch_contrib::zstd)
endif()

View File

@ -168,6 +168,28 @@ RESTORE TABLE test.table PARTITIONS '2', '3'
FROM Disk('backups', 'filename.zip')
```
### Backups as tar archives
Backups can also be stored as tar archives. The functionality is the same as for zip, except that a password is not supported.
Write a backup as a tar:
```
BACKUP TABLE test.table TO Disk('backups', '1.tar')
```
Corresponding restore:
```
RESTORE TABLE test.table FROM Disk('backups', '1.tar')
```
To change the compression method, the correct file suffix should be appended to the backup name. I.E to compress the tar archive using gzip:
```
BACKUP TABLE test.table TO Disk('backups', '1.tar.gz')
```
The supported compression file suffixes are `tar.gz`, `.tgz` `tar.bz2`, `tar.lzma`, `.tar.zst`, `.tzst` and `.tar.xz`.
### Check the status of backups
The backup command returns an `id` and `status`, and that `id` can be used to get the status of the backup. This is very useful to check the progress of long ASYNC backups. The example below shows a failure that happened when trying to overwrite an existing backup file:

View File

@ -927,7 +927,7 @@ void BackupImpl::writeFile(const BackupFileInfo & info, BackupEntryPtr entry)
const auto write_info_to_archive = [&](const auto & file_name)
{
auto out = archive_writer->writeFile(file_name);
auto out = archive_writer->writeFile(file_name, info.size);
auto read_buffer = entry->getReadBuffer(writer->getReadSettings());
if (info.base_size != 0)
read_buffer->seek(info.base_size, SEEK_SET);

View File

@ -22,6 +22,8 @@ public:
/// of the function `writeFile()` should be destroyed before next call of `writeFile()`.
virtual std::unique_ptr<WriteBufferFromFileBase> writeFile(const String & filename) = 0;
virtual std::unique_ptr<WriteBufferFromFileBase> writeFile(const String & filename, size_t size) = 0;
/// Returns true if there is an active instance of WriteBuffer returned by writeFile().
/// This function should be used mostly for debugging purposes.
virtual bool isWritingFile() const = 0;

View File

@ -1,11 +1,9 @@
#include <IO/Archives/ArchiveUtils.h>
#include <IO/Archives/LibArchiveReader.h>
#include <IO/ReadBufferFromFileBase.h>
#include <Common/quoteString.h>
#include <Common/scope_guard_safe.h>
#include <IO/Archives/ArchiveUtils.h>
#include <mutex>
namespace DB
{
@ -14,35 +12,58 @@ namespace DB
namespace ErrorCodes
{
extern const int CANNOT_UNPACK_ARCHIVE;
extern const int LOGICAL_ERROR;
extern const int CANNOT_READ_ALL_DATA;
extern const int UNSUPPORTED_METHOD;
extern const int CANNOT_UNPACK_ARCHIVE;
extern const int LOGICAL_ERROR;
extern const int CANNOT_READ_ALL_DATA;
extern const int UNSUPPORTED_METHOD;
}
class LibArchiveReader::StreamInfo
{
public:
explicit StreamInfo(std::unique_ptr<SeekableReadBuffer> read_buffer_) : read_buffer(std::move(read_buffer_)) { }
static ssize_t read(struct archive *, void * client_data, const void ** buff)
{
auto * read_stream = reinterpret_cast<StreamInfo *>(client_data);
*buff = reinterpret_cast<void *>(read_stream->buf);
return read_stream->read_buffer->read(read_stream->buf, DBMS_DEFAULT_BUFFER_SIZE);
}
std::unique_ptr<SeekableReadBuffer> read_buffer;
char buf[DBMS_DEFAULT_BUFFER_SIZE];
};
class LibArchiveReader::Handle
{
public:
explicit Handle(std::string path_to_archive_, bool lock_on_reading_)
: path_to_archive(path_to_archive_), lock_on_reading(lock_on_reading_)
: path_to_archive(std::move(path_to_archive_)), lock_on_reading(lock_on_reading_)
{
current_archive = open(path_to_archive);
current_archive = openWithPath(path_to_archive);
}
explicit Handle(std::string path_to_archive_, bool lock_on_reading_, const ReadArchiveFunction & archive_read_function_)
: path_to_archive(std::move(path_to_archive_)), archive_read_function(archive_read_function_), lock_on_reading(lock_on_reading_)
{
read_stream = std::make_unique<StreamInfo>(archive_read_function());
current_archive = openWithReader(read_stream.get());
}
Handle(const Handle &) = delete;
Handle(Handle && other) noexcept
: current_archive(other.current_archive)
: read_stream(std::move(other.read_stream))
, current_archive(other.current_archive)
, current_entry(other.current_entry)
, archive_read_function(std::move(other.archive_read_function))
, lock_on_reading(other.lock_on_reading)
{
other.current_archive = nullptr;
other.current_entry = nullptr;
}
~Handle()
{
close(current_archive);
}
~Handle() { close(current_archive); }
bool locateFile(const std::string & filename)
{
@ -64,10 +85,14 @@ public:
break;
if (filter(archive_entry_pathname(current_entry)))
{
valid = true;
return true;
}
}
checkError(err);
valid = false;
return false;
}
@ -81,17 +106,19 @@ public:
} while (err == ARCHIVE_RETRY);
checkError(err);
return err == ARCHIVE_OK;
valid = err == ARCHIVE_OK;
return valid;
}
std::vector<std::string> getAllFiles(NameFilter filter)
{
auto * archive = open(path_to_archive);
SCOPE_EXIT(
close(archive);
);
std::unique_ptr<LibArchiveReader::StreamInfo> rs
= archive_read_function ? std::make_unique<StreamInfo>(archive_read_function()) : nullptr;
auto * archive = rs ? openWithReader(rs.get()) : openWithPath(path_to_archive);
struct archive_entry * entry = nullptr;
SCOPE_EXIT(close(archive););
Entry entry = nullptr;
std::vector<std::string> files;
int error = readNextHeader(archive, &entry);
@ -112,6 +139,8 @@ public:
const String & getFileName() const
{
chassert(current_entry);
if (!valid)
throw Exception(ErrorCodes::CANNOT_UNPACK_ARCHIVE, "No current file");
if (!file_name)
file_name.emplace(archive_entry_pathname(current_entry));
@ -121,6 +150,8 @@ public:
const FileInfo & getFileInfo() const
{
chassert(current_entry);
if (!valid)
throw Exception(ErrorCodes::CANNOT_UNPACK_ARCHIVE, "No current file");
if (!file_info)
{
file_info.emplace();
@ -132,13 +163,21 @@ public:
return *file_info;
}
struct archive * current_archive;
struct archive_entry * current_entry = nullptr;
la_ssize_t readData(void * buf, size_t len) { return archive_read_data(current_archive, buf, len); }
const char * getArchiveError() { return archive_error_string(current_archive); }
private:
using Archive = struct archive *;
using Entry = struct archive_entry *;
void checkError(int error) const
{
if (error == ARCHIVE_FATAL)
throw Exception(ErrorCodes::CANNOT_UNPACK_ARCHIVE, "Failed to read archive while fetching all files: {}", archive_error_string(current_archive));
throw Exception(
ErrorCodes::CANNOT_UNPACK_ARCHIVE,
"Failed to read archive while fetching all files: {}",
archive_error_string(current_archive));
}
void resetFileInfo()
@ -147,7 +186,7 @@ private:
file_info.reset();
}
static struct archive * open(const String & path_to_archive)
Archive openWithReader(StreamInfo * read_stream_)
{
auto * archive = archive_read_new();
try
@ -158,13 +197,18 @@ private:
archive_read_support_filter_xz(archive);
archive_read_support_filter_lz4(archive);
archive_read_support_filter_zstd(archive);
archive_read_support_filter_lzma(archive);
// Support tar, 7zip and zip
archive_read_support_format_tar(archive);
archive_read_support_format_7zip(archive);
archive_read_support_format_zip(archive);
if (archive_read_open_filename(archive, path_to_archive.c_str(), 10240) != ARCHIVE_OK)
throw Exception(ErrorCodes::CANNOT_UNPACK_ARCHIVE, "Couldn't open archive {}: {}", quoteString(path_to_archive), archive_error_string(archive));
if (archive_read_open(archive, read_stream_, nullptr, StreamInfo::read, nullptr) != ARCHIVE_OK)
throw Exception(
ErrorCodes::CANNOT_UNPACK_ARCHIVE,
"Couldn't open archive {}: {}",
quoteString(path_to_archive),
archive_error_string(archive));
}
catch (...)
{
@ -175,7 +219,39 @@ private:
return archive;
}
static void close(struct archive * archive)
Archive openWithPath(const String & path_to_archive_)
{
auto * archive = archive_read_new();
try
{
// Support for bzip2, gzip, lzip, xz, zstd and lz4
archive_read_support_filter_bzip2(archive);
archive_read_support_filter_gzip(archive);
archive_read_support_filter_xz(archive);
archive_read_support_filter_lz4(archive);
archive_read_support_filter_zstd(archive);
archive_read_support_filter_lzma(archive);
// Support tar, 7zip and zip
archive_read_support_format_tar(archive);
archive_read_support_format_7zip(archive);
archive_read_support_format_zip(archive);
if (archive_read_open_filename(archive, path_to_archive_.c_str(), 10240) != ARCHIVE_OK)
throw Exception(
ErrorCodes::CANNOT_UNPACK_ARCHIVE,
"Couldn't open archive {}: {}",
quoteString(path_to_archive),
archive_error_string(archive));
}
catch (...)
{
close(archive);
throw;
}
return archive;
}
static void close(Archive archive)
{
if (archive)
{
@ -193,7 +269,12 @@ private:
return archive_read_next_header(archive, entry);
}
const String path_to_archive;
String path_to_archive;
std::unique_ptr<StreamInfo> read_stream;
Archive current_archive;
Entry current_entry = nullptr;
bool valid = true;
IArchiveReader::ReadArchiveFunction archive_read_function;
/// for some archive types when we are reading headers static variables are used
/// which are not thread-safe
@ -207,7 +288,7 @@ private:
class LibArchiveReader::FileEnumeratorImpl : public FileEnumerator
{
public:
explicit FileEnumeratorImpl(Handle handle_) : handle(std::move(handle_)) {}
explicit FileEnumeratorImpl(Handle handle_) : handle(std::move(handle_)) { }
const String & getFileName() const override { return handle.getFileName(); }
const FileInfo & getFileInfo() const override { return handle.getFileInfo(); }
@ -215,6 +296,7 @@ public:
/// Releases owned handle to pass it to a read buffer.
Handle releaseHandle() && { return std::move(handle); }
private:
Handle handle;
};
@ -226,36 +308,33 @@ public:
: ReadBufferFromFileBase(DBMS_DEFAULT_BUFFER_SIZE, nullptr, 0)
, handle(std::move(handle_))
, path_to_archive(std::move(path_to_archive_))
{}
{
}
off_t seek(off_t /* off */, int /* whence */) override
{
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Seek is not supported when reading from archive");
}
bool checkIfActuallySeekable() override { return false; }
off_t getPosition() override
{
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "getPosition not supported when reading from archive");
}
off_t getPosition() override { throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "getPosition not supported when reading from archive"); }
String getFileName() const override { return handle.getFileName(); }
size_t getFileSize() override { return handle.getFileInfo().uncompressed_size; }
Handle releaseHandle() &&
{
return std::move(handle);
}
Handle releaseHandle() && { return std::move(handle); }
private:
bool nextImpl() override
{
auto bytes_read = archive_read_data(handle.current_archive, internal_buffer.begin(), static_cast<int>(internal_buffer.size()));
auto bytes_read = handle.readData(internal_buffer.begin(), internal_buffer.size());
if (bytes_read < 0)
throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Failed to read file {} from {}: {}", handle.getFileName(), path_to_archive, archive_error_string(handle.current_archive));
throw Exception(
ErrorCodes::CANNOT_READ_ALL_DATA,
"Failed to read file {} from {}: {}",
handle.getFileName(),
path_to_archive,
handle.getArchiveError());
if (!bytes_read)
return false;
@ -274,7 +353,17 @@ private:
LibArchiveReader::LibArchiveReader(std::string archive_name_, bool lock_on_reading_, std::string path_to_archive_)
: archive_name(std::move(archive_name_)), lock_on_reading(lock_on_reading_), path_to_archive(std::move(path_to_archive_))
{}
{
}
LibArchiveReader::LibArchiveReader(
std::string archive_name_, bool lock_on_reading_, std::string path_to_archive_, const ReadArchiveFunction & archive_read_function_)
: archive_name(std::move(archive_name_))
, lock_on_reading(lock_on_reading_)
, path_to_archive(std::move(path_to_archive_))
, archive_read_function(archive_read_function_)
{
}
LibArchiveReader::~LibArchiveReader() = default;
@ -285,21 +374,25 @@ const std::string & LibArchiveReader::getPath() const
bool LibArchiveReader::fileExists(const String & filename)
{
Handle handle(path_to_archive, lock_on_reading);
Handle handle = acquireHandle();
return handle.locateFile(filename);
}
LibArchiveReader::FileInfo LibArchiveReader::getFileInfo(const String & filename)
{
Handle handle(path_to_archive, lock_on_reading);
Handle handle = acquireHandle();
if (!handle.locateFile(filename))
throw Exception(ErrorCodes::CANNOT_UNPACK_ARCHIVE, "Couldn't unpack archive {}: file not found", path_to_archive);
throw Exception(
ErrorCodes::CANNOT_UNPACK_ARCHIVE,
"Couldn't unpack archive {}: File {} was not found in archive",
path_to_archive,
quoteString(filename));
return handle.getFileInfo();
}
std::unique_ptr<LibArchiveReader::FileEnumerator> LibArchiveReader::firstFile()
{
Handle handle(path_to_archive, lock_on_reading);
Handle handle = acquireHandle();
if (!handle.nextFile())
return nullptr;
@ -308,17 +401,28 @@ std::unique_ptr<LibArchiveReader::FileEnumerator> LibArchiveReader::firstFile()
std::unique_ptr<ReadBufferFromFileBase> LibArchiveReader::readFile(const String & filename, bool throw_on_not_found)
{
return readFile([&](const std::string & file) { return file == filename; }, throw_on_not_found);
Handle handle = acquireHandle();
if (!handle.locateFile(filename))
{
if (throw_on_not_found)
throw Exception(
ErrorCodes::CANNOT_UNPACK_ARCHIVE,
"Couldn't unpack archive {}: File {} was not found in archive",
path_to_archive,
quoteString(filename));
return nullptr;
}
return std::make_unique<ReadBufferFromLibArchive>(std::move(handle), path_to_archive);
}
std::unique_ptr<ReadBufferFromFileBase> LibArchiveReader::readFile(NameFilter filter, bool throw_on_not_found)
{
Handle handle(path_to_archive, lock_on_reading);
Handle handle = acquireHandle();
if (!handle.locateFile(filter))
{
if (throw_on_not_found)
throw Exception(
ErrorCodes::CANNOT_UNPACK_ARCHIVE, "Couldn't unpack archive {}: no file found satisfying the filter", path_to_archive);
ErrorCodes::CANNOT_UNPACK_ARCHIVE, "Couldn't unpack archive {}: No file satisfying filter in archive", path_to_archive);
return nullptr;
}
return std::make_unique<ReadBufferFromLibArchive>(std::move(handle), path_to_archive);
@ -337,7 +441,8 @@ std::unique_ptr<LibArchiveReader::FileEnumerator> LibArchiveReader::nextFile(std
{
if (!dynamic_cast<ReadBufferFromLibArchive *>(read_buffer.get()))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Wrong ReadBuffer passed to nextFile()");
auto read_buffer_from_libarchive = std::unique_ptr<ReadBufferFromLibArchive>(static_cast<ReadBufferFromLibArchive *>(read_buffer.release()));
auto read_buffer_from_libarchive
= std::unique_ptr<ReadBufferFromLibArchive>(static_cast<ReadBufferFromLibArchive *>(read_buffer.release()));
auto handle = std::move(*read_buffer_from_libarchive).releaseHandle();
if (!handle.nextFile())
return nullptr;
@ -348,7 +453,8 @@ std::unique_ptr<LibArchiveReader::FileEnumerator> LibArchiveReader::currentFile(
{
if (!dynamic_cast<ReadBufferFromLibArchive *>(read_buffer.get()))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Wrong ReadBuffer passed to nextFile()");
auto read_buffer_from_libarchive = std::unique_ptr<ReadBufferFromLibArchive>(static_cast<ReadBufferFromLibArchive *>(read_buffer.release()));
auto read_buffer_from_libarchive
= std::unique_ptr<ReadBufferFromLibArchive>(static_cast<ReadBufferFromLibArchive *>(read_buffer.release()));
auto handle = std::move(*read_buffer_from_libarchive).releaseHandle();
return std::make_unique<FileEnumeratorImpl>(std::move(handle));
}
@ -360,13 +466,22 @@ std::vector<std::string> LibArchiveReader::getAllFiles()
std::vector<std::string> LibArchiveReader::getAllFiles(NameFilter filter)
{
Handle handle(path_to_archive, lock_on_reading);
Handle handle = acquireHandle();
return handle.getAllFiles(filter);
}
void LibArchiveReader::setPassword(const String & /*password_*/)
void LibArchiveReader::setPassword(const String & password_)
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can not set password to {} archive", archive_name);
if (password_.empty())
return;
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot set password to {} archive", archive_name);
}
LibArchiveReader::Handle LibArchiveReader::acquireHandle()
{
std::lock_guard lock{mutex};
return archive_read_function ? Handle{path_to_archive, lock_on_reading, archive_read_function}
: Handle{path_to_archive, lock_on_reading};
}
#endif

View File

@ -1,8 +1,9 @@
#pragma once
#include "config.h"
#include <mutex>
#include <IO/Archives/IArchiveReader.h>
#include <IO/Archives/LibArchiveReader.h>
#include "config.h"
namespace DB
@ -52,26 +53,44 @@ protected:
/// Constructs an archive's reader that will read from a file in the local filesystem.
LibArchiveReader(std::string archive_name_, bool lock_on_reading_, std::string path_to_archive_);
LibArchiveReader(
std::string archive_name_, bool lock_on_reading_, std::string path_to_archive_, const ReadArchiveFunction & archive_read_function_);
private:
class ReadBufferFromLibArchive;
class Handle;
class FileEnumeratorImpl;
class StreamInfo;
Handle acquireHandle();
const std::string archive_name;
const bool lock_on_reading;
const String path_to_archive;
const ReadArchiveFunction archive_read_function;
mutable std::mutex mutex;
};
class TarArchiveReader : public LibArchiveReader
{
public:
explicit TarArchiveReader(std::string path_to_archive) : LibArchiveReader("tar", /*lock_on_reading_=*/ true, std::move(path_to_archive)) { }
explicit TarArchiveReader(std::string path_to_archive) : LibArchiveReader("tar", /*lock_on_reading_=*/true, std::move(path_to_archive))
{
}
explicit TarArchiveReader(std::string path_to_archive, const ReadArchiveFunction & archive_read_function)
: LibArchiveReader("tar", /*lock_on_reading_=*/true, std::move(path_to_archive), archive_read_function)
{
}
};
class SevenZipArchiveReader : public LibArchiveReader
{
public:
explicit SevenZipArchiveReader(std::string path_to_archive) : LibArchiveReader("7z", /*lock_on_reading_=*/ false, std::move(path_to_archive)) { }
explicit SevenZipArchiveReader(std::string path_to_archive)
: LibArchiveReader("7z", /*lock_on_reading_=*/false, std::move(path_to_archive))
{
}
};
#endif

View File

@ -0,0 +1,248 @@
#include <IO/Archives/LibArchiveWriter.h>
#include <filesystem>
#include <IO/WriteBufferFromFileBase.h>
#include <Common/quoteString.h>
#include <Common/scope_guard_safe.h>
#include <mutex>
#if USE_LIBARCHIVE
// this implemation follows the ZipArchiveWriter implemation as closely as possible.
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_PACK_ARCHIVE;
extern const int NOT_IMPLEMENTED;
}
namespace
{
void checkResultCodeImpl(int code, const String & filename)
{
if (code == ARCHIVE_OK)
return;
throw Exception(
ErrorCodes::CANNOT_PACK_ARCHIVE, "Couldn't pack archive: LibArchive Code = {}, filename={}", code, quoteString(filename));
}
}
// this is a thin wrapper for libarchive to be able to write the archive to a WriteBuffer
class LibArchiveWriter::StreamInfo
{
public:
explicit StreamInfo(std::unique_ptr<WriteBuffer> archive_write_buffer_) : archive_write_buffer(std::move(archive_write_buffer_)) { }
static ssize_t memory_write(struct archive *, void * client_data, const void * buff, size_t length)
{
auto * stream_info = reinterpret_cast<StreamInfo *>(client_data);
stream_info->archive_write_buffer->write(reinterpret_cast<const char *>(buff), length);
return length;
}
std::unique_ptr<WriteBuffer> archive_write_buffer;
};
class LibArchiveWriter::WriteBufferFromLibArchive : public WriteBufferFromFileBase
{
public:
WriteBufferFromLibArchive(std::shared_ptr<LibArchiveWriter> archive_writer_, const String & filename_, const size_t & size_)
: WriteBufferFromFileBase(DBMS_DEFAULT_BUFFER_SIZE, nullptr, 0), archive_writer(archive_writer_), filename(filename_), size(size_)
{
startWritingFile();
archive = archive_writer_->getArchive();
entry = nullptr;
}
~WriteBufferFromLibArchive() override
{
try
{
closeFile(/* throw_if_error= */ false);
endWritingFile();
}
catch (...)
{
tryLogCurrentException("WriteBufferFromTarArchive");
}
}
void finalizeImpl() override
{
next();
closeFile(/* throw_if_error=*/true);
endWritingFile();
}
void sync() override { next(); }
std::string getFileName() const override { return filename; }
private:
void nextImpl() override
{
if (!offset())
return;
if (entry == nullptr)
writeEntry();
ssize_t to_write = offset();
ssize_t written = archive_write_data(archive, working_buffer.begin(), offset());
if (written != to_write)
{
throw Exception(
ErrorCodes::CANNOT_PACK_ARCHIVE,
"Couldn't pack tar archive: Failed to write all bytes, {} of {}, filename={}",
written,
to_write,
quoteString(filename));
}
}
void writeEntry()
{
expected_size = getSize();
entry = archive_entry_new();
archive_entry_set_pathname(entry, filename.c_str());
archive_entry_set_size(entry, expected_size);
archive_entry_set_filetype(entry, static_cast<__LA_MODE_T>(0100000));
archive_entry_set_perm(entry, 0644);
checkResult(archive_write_header(archive, entry));
}
size_t getSize() const
{
if (size)
return size;
else
return offset();
}
void closeFile(bool throw_if_error)
{
if (entry)
{
archive_entry_free(entry);
entry = nullptr;
}
if (throw_if_error and bytes != expected_size)
{
throw Exception(
ErrorCodes::CANNOT_PACK_ARCHIVE,
"Couldn't pack tar archive: Wrote {} of expected {} , filename={}",
bytes,
expected_size,
quoteString(filename));
}
}
void endWritingFile()
{
if (auto archive_writer_ptr = archive_writer.lock())
archive_writer_ptr->endWritingFile();
}
void startWritingFile()
{
if (auto archive_writer_ptr = archive_writer.lock())
archive_writer_ptr->startWritingFile();
}
void checkResult(int code) { checkResultCodeImpl(code, filename); }
std::weak_ptr<LibArchiveWriter> archive_writer;
const String filename;
Entry entry;
Archive archive;
size_t size;
size_t expected_size;
};
LibArchiveWriter::LibArchiveWriter(const String & path_to_archive_, std::unique_ptr<WriteBuffer> archive_write_buffer_)
: path_to_archive(path_to_archive_)
{
if (archive_write_buffer_)
stream_info = std::make_unique<StreamInfo>(std::move(archive_write_buffer_));
}
void LibArchiveWriter::createArchive()
{
std::lock_guard lock{mutex};
archive = archive_write_new();
setFormatAndSettings();
if (stream_info)
{
//This allows use to write directly to a writebuffer rather than an intermediate buffer in libarchive.
//This has to be set otherwise zstd breaks due to extra bytes being written at the end of the archive.
archive_write_set_bytes_per_block(archive, 0);
archive_write_open2(archive, stream_info.get(), nullptr, &StreamInfo::memory_write, nullptr, nullptr);
}
else
archive_write_open_filename(archive, path_to_archive.c_str());
}
LibArchiveWriter::~LibArchiveWriter()
{
chassert((finalized || std::uncaught_exceptions() || std::current_exception()) && "LibArchiveWriter is not finalized in destructor.");
if (archive)
archive_write_free(archive);
}
std::unique_ptr<WriteBufferFromFileBase> LibArchiveWriter::writeFile(const String & filename, size_t size)
{
return std::make_unique<WriteBufferFromLibArchive>(std::static_pointer_cast<LibArchiveWriter>(shared_from_this()), filename, size);
}
std::unique_ptr<WriteBufferFromFileBase> LibArchiveWriter::writeFile(const String & filename)
{
return std::make_unique<WriteBufferFromLibArchive>(std::static_pointer_cast<LibArchiveWriter>(shared_from_this()), filename, 0);
}
bool LibArchiveWriter::isWritingFile() const
{
std::lock_guard lock{mutex};
return is_writing_file;
}
void LibArchiveWriter::endWritingFile()
{
std::lock_guard lock{mutex};
is_writing_file = false;
}
void LibArchiveWriter::startWritingFile()
{
std::lock_guard lock{mutex};
if (std::exchange(is_writing_file, true))
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Cannot write two files to a tar archive in parallel");
}
void LibArchiveWriter::finalize()
{
std::lock_guard lock{mutex};
if (finalized)
return;
if (archive)
archive_write_close(archive);
if (stream_info)
{
stream_info->archive_write_buffer->finalize();
stream_info.reset();
}
finalized = true;
}
void LibArchiveWriter::setPassword(const String & password_)
{
if (password_.empty())
return;
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Setting a password is not currently supported for libarchive");
}
LibArchiveWriter::Archive LibArchiveWriter::getArchive()
{
std::lock_guard lock{mutex};
return archive;
}
}
#endif

View File

@ -0,0 +1,77 @@
#pragma once
#include "config.h"
#if USE_LIBARCHIVE
# include <IO/Archives/ArchiveUtils.h>
# include <IO/Archives/IArchiveWriter.h>
# include <IO/WriteBufferFromFileBase.h>
# include <base/defines.h>
namespace DB
{
class WriteBufferFromFileBase;
/// Interface for writing an archive.
class LibArchiveWriter : public IArchiveWriter
{
public:
/// Constructs an archive that will be written as a file in the local filesystem.
explicit LibArchiveWriter(const String & path_to_archive_, std::unique_ptr<WriteBuffer> archive_write_buffer_);
/// Call finalize() before destructing IArchiveWriter.
~LibArchiveWriter() override;
/// Starts writing a file to the archive. The function returns a write buffer,
/// any data written to that buffer will be compressed and then put to the archive.
/// You can keep only one such buffer at a time, a buffer returned by previous call
/// of the function `writeFile()` should be destroyed before next call of `writeFile()`.
std::unique_ptr<WriteBufferFromFileBase> writeFile(const String & filename) override;
/// LibArchive needs to know the size of the file being written. If the file size is not
/// passed in the the archive writer tries to infer the size by looking at the available
/// data in the buffer, if next is called before all data is written to the buffer
/// an exception is thrown.
std::unique_ptr<WriteBufferFromFileBase> writeFile(const String & filename, size_t size) override;
/// Returns true if there is an active instance of WriteBuffer returned by writeFile().
/// This function should be used mostly for debugging purposes.
bool isWritingFile() const override;
/// Finalizes writing of the archive. This function must be always called at the end of writing.
/// (Unless an error appeared and the archive is in fact no longer needed.)
void finalize() override;
/// Sets compression method and level.
/// Changing them will affect next file in the archive.
//void setCompression(const String & compression_method_, int compression_level_) override;
/// Sets password. If the password is not empty it will enable encryption in the archive.
void setPassword(const String & password) override;
protected:
using Archive = struct archive *;
using Entry = struct archive_entry *;
/// derived classes must call createArchive. CreateArchive calls setFormatAndSettings.
void createArchive();
virtual void setFormatAndSettings() = 0;
Archive archive = nullptr;
String path_to_archive;
private:
class WriteBufferFromLibArchive;
class StreamInfo;
Archive getArchive();
void startWritingFile();
void endWritingFile();
std::unique_ptr<StreamInfo> stream_info TSA_GUARDED_BY(mutex) = nullptr;
bool is_writing_file TSA_GUARDED_BY(mutex) = false;
bool finalized TSA_GUARDED_BY(mutex) = false;
mutable std::mutex mutex;
};
}
#endif

View File

@ -0,0 +1,42 @@
#include <IO/Archives/TarArchiveWriter.h>
#if USE_LIBARCHIVE
namespace DB
{
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
extern const int CANNOT_PACK_ARCHIVE;
}
void TarArchiveWriter::setCompression(const String & compression_method_, int compression_level_)
{
// throw an error unless setCompression is passed the default value
if (compression_method_.empty() && compression_level_ == -1)
return;
throw Exception(
ErrorCodes::NOT_IMPLEMENTED, "Using compression_method and compression_level options are not supported for tar archives");
}
void TarArchiveWriter::setFormatAndSettings()
{
archive_write_set_format_pax_restricted(archive);
inferCompressionFromPath();
}
void TarArchiveWriter::inferCompressionFromPath()
{
if (path_to_archive.ends_with(".tar.gz") || path_to_archive.ends_with(".tgz"))
archive_write_add_filter_gzip(archive);
else if (path_to_archive.ends_with(".tar.bz2"))
archive_write_add_filter_bzip2(archive);
else if (path_to_archive.ends_with(".tar.lzma"))
archive_write_add_filter_lzma(archive);
else if (path_to_archive.ends_with(".tar.zst") || path_to_archive.ends_with(".tzst"))
archive_write_add_filter_zstd(archive);
else if (path_to_archive.ends_with(".tar.xz"))
archive_write_add_filter_xz(archive);
else if (!path_to_archive.ends_with(".tar"))
throw Exception(ErrorCodes::CANNOT_PACK_ARCHIVE, "Unknown compression format");
}
}
#endif

View File

@ -0,0 +1,26 @@
#pragma once
#include "config.h"
#if USE_LIBARCHIVE
# include <IO/Archives/LibArchiveWriter.h>
namespace DB
{
using namespace std::literals;
class TarArchiveWriter : public LibArchiveWriter
{
public:
explicit TarArchiveWriter(const String & path_to_archive_, std::unique_ptr<WriteBuffer> archive_write_buffer_)
: LibArchiveWriter(path_to_archive_, std::move(archive_write_buffer_))
{
createArchive();
}
void setCompression(const String & compression_method_, int compression_level_) override;
void setFormatAndSettings() override;
void inferCompressionFromPath();
};
}
#endif

View File

@ -274,6 +274,11 @@ std::unique_ptr<WriteBufferFromFileBase> ZipArchiveWriter::writeFile(const Strin
return std::make_unique<WriteBufferFromZipArchive>(std::static_pointer_cast<ZipArchiveWriter>(shared_from_this()), filename);
}
std::unique_ptr<WriteBufferFromFileBase> ZipArchiveWriter::writeFile(const String & filename, [[maybe_unused]] size_t size)
{
return ZipArchiveWriter::writeFile(filename);
}
bool ZipArchiveWriter::isWritingFile() const
{
std::lock_guard lock{mutex};

View File

@ -32,6 +32,9 @@ public:
/// of the function `writeFile()` should be destroyed before next call of `writeFile()`.
std::unique_ptr<WriteBufferFromFileBase> writeFile(const String & filename) override;
std::unique_ptr<WriteBufferFromFileBase> writeFile(const String & filename, size_t size) override;
/// Returns true if there is an active instance of WriteBuffer returned by writeFile().
/// This function should be used mostly for debugging purposes.
bool isWritingFile() const override;

View File

@ -1,6 +1,6 @@
#include <IO/Archives/createArchiveReader.h>
#include <IO/Archives/ZipArchiveReader.h>
#include <IO/Archives/LibArchiveReader.h>
#include <IO/Archives/ZipArchiveReader.h>
#include <IO/Archives/createArchiveReader.h>
#include <Common/Exception.h>
@ -8,8 +8,8 @@ namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_UNPACK_ARCHIVE;
extern const int SUPPORT_IS_DISABLED;
extern const int CANNOT_UNPACK_ARCHIVE;
extern const int SUPPORT_IS_DISABLED;
}
@ -25,16 +25,8 @@ std::shared_ptr<IArchiveReader> createArchiveReader(
[[maybe_unused]] size_t archive_size)
{
using namespace std::literals;
static constexpr std::array tar_extensions
{
".tar"sv,
".tar.gz"sv,
".tgz"sv,
".tar.zst"sv,
".tzst"sv,
".tar.xz"sv,
".tar.bz2"sv
};
static constexpr std::array tar_extensions{
".tar"sv, ".tar.gz"sv, ".tgz"sv, ".tar.zst"sv, ".tzst"sv, ".tar.xz"sv, ".tar.bz2"sv, ".tar.lzma"sv};
if (path_to_archive.ends_with(".zip") || path_to_archive.ends_with(".zipx"))
{
@ -48,7 +40,7 @@ std::shared_ptr<IArchiveReader> createArchiveReader(
tar_extensions.begin(), tar_extensions.end(), [&](const auto extension) { return path_to_archive.ends_with(extension); }))
{
#if USE_LIBARCHIVE
return std::make_shared<TarArchiveReader>(path_to_archive);
return std::make_shared<TarArchiveReader>(path_to_archive, archive_read_function);
#else
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "libarchive library is disabled");
#endif

View File

@ -1,5 +1,7 @@
#include <IO/Archives/createArchiveWriter.h>
#include <IO/Archives/LibArchiveWriter.h>
#include <IO/Archives/TarArchiveWriter.h>
#include <IO/Archives/ZipArchiveWriter.h>
#include <IO/Archives/createArchiveWriter.h>
#include <IO/WriteBuffer.h>
#include <Common/Exception.h>
@ -8,8 +10,8 @@ namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_PACK_ARCHIVE;
extern const int SUPPORT_IS_DISABLED;
extern const int CANNOT_PACK_ARCHIVE;
extern const int SUPPORT_IS_DISABLED;
}
@ -19,20 +21,30 @@ std::shared_ptr<IArchiveWriter> createArchiveWriter(const String & path_to_archi
}
std::shared_ptr<IArchiveWriter> createArchiveWriter(
const String & path_to_archive,
[[maybe_unused]] std::unique_ptr<WriteBuffer> archive_write_buffer)
std::shared_ptr<IArchiveWriter>
createArchiveWriter(const String & path_to_archive, [[maybe_unused]] std::unique_ptr<WriteBuffer> archive_write_buffer)
{
using namespace std::literals;
static constexpr std::array tar_extensions{
".tar"sv, ".tar.gz"sv, ".tgz"sv, ".tar.bz2"sv, ".tar.lzma"sv, ".tar.zst"sv, ".tzst"sv, ".tar.xz"sv};
if (path_to_archive.ends_with(".zip") || path_to_archive.ends_with(".zipx"))
{
#if USE_MINIZIP
return std::make_shared<ZipArchiveWriter>(path_to_archive, std::move(archive_write_buffer));
#else
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "minizip library is disabled");
#endif
}
else if (std::any_of(
tar_extensions.begin(), tar_extensions.end(), [&](const auto extension) { return path_to_archive.ends_with(extension); }))
{
#if USE_LIBARCHIVE
return std::make_shared<TarArchiveWriter>(path_to_archive, std::move(archive_write_buffer));
#else
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "libarchive library is disabled");
#endif
}
else
throw Exception(ErrorCodes::CANNOT_PACK_ARCHIVE, "Cannot determine the type of archive {}", path_to_archive);
}
}

View File

@ -6,7 +6,10 @@ namespace DB
bool hasRegisteredArchiveFileExtension(const String & path)
{
return path.ends_with(".zip") || path.ends_with(".zipx");
using namespace std::literals;
static constexpr std::array archive_extensions{
".zip"sv, ".zipx"sv, ".tar"sv, ".tar.gz"sv, ".tgz"sv, ".tar.bz2"sv, ".tar.lzma"sv, ".tar.zst"sv, ".tzst"sv, ".tar.xz"sv};
return std::any_of(
archive_extensions.begin(), archive_extensions.end(), [&](const auto extension) { return path.ends_with(extension); });
}
}

View File

@ -1,26 +1,29 @@
#include <gtest/gtest.h>
#include "config.h"
#include <filesystem>
#include <format>
#include <IO/Archives/ArchiveUtils.h>
#include <IO/Archives/IArchiveReader.h>
#include <IO/Archives/IArchiveWriter.h>
#include <IO/Archives/createArchiveReader.h>
#include <IO/Archives/createArchiveWriter.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadBufferFromFileBase.h>
#include <IO/ReadBufferFromString.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/WriteBufferFromFileBase.h>
#include <IO/WriteBufferFromString.h>
#include <IO/WriteHelpers.h>
#include <Common/Exception.h>
#include <Poco/TemporaryFile.h>
#include <filesystem>
#include <Common/Exception.h>
namespace DB::ErrorCodes
{
extern const int CANNOT_UNPACK_ARCHIVE;
extern const int LOGICAL_ERROR;
extern const int CANNOT_UNPACK_ARCHIVE;
extern const int LOGICAL_ERROR;
}
namespace fs = std::filesystem;
@ -49,7 +52,8 @@ bool createArchiveWithFiles(const std::string & archivename, const std::map<std:
archive_write_open_filename(a, archivename.c_str());
for (const auto & [filename, content] : files) {
for (const auto & [filename, content] : files)
{
entry = archive_entry_new();
archive_entry_set_pathname(entry, filename.c_str());
archive_entry_set_size(entry, content.size());
@ -59,12 +63,11 @@ bool createArchiveWithFiles(const std::string & archivename, const std::map<std:
archive_write_data(a, content.c_str(), content.size());
archive_entry_free(entry);
}
archive_write_close(a);
archive_write_free(a);
return true;
}
class ArchiveReaderAndWriterTest : public ::testing::TestWithParam<const char *>
@ -114,11 +117,13 @@ TEST_P(ArchiveReaderAndWriterTest, EmptyArchive)
EXPECT_FALSE(reader->fileExists("nofile.txt"));
expectException(ErrorCodes::CANNOT_UNPACK_ARCHIVE, "File 'nofile.txt' was not found in archive",
[&]{ reader->getFileInfo("nofile.txt"); });
expectException(
ErrorCodes::CANNOT_UNPACK_ARCHIVE, "File 'nofile.txt' was not found in archive", [&] { reader->getFileInfo("nofile.txt"); });
expectException(ErrorCodes::CANNOT_UNPACK_ARCHIVE, "File 'nofile.txt' was not found in archive",
[&]{ reader->readFile("nofile.txt", /*throw_on_not_found=*/true); });
expectException(
ErrorCodes::CANNOT_UNPACK_ARCHIVE,
"File 'nofile.txt' was not found in archive",
[&] { reader->readFile("nofile.txt", /*throw_on_not_found=*/true); });
EXPECT_EQ(reader->firstFile(), nullptr);
}
@ -182,11 +187,9 @@ TEST_P(ArchiveReaderAndWriterTest, SingleFileInArchive)
auto enumerator = reader->firstFile();
ASSERT_NE(enumerator, nullptr);
EXPECT_FALSE(enumerator->nextFile());
expectException(ErrorCodes::CANNOT_UNPACK_ARCHIVE, "No current file",
[&]{ enumerator->getFileName(); });
expectException(ErrorCodes::CANNOT_UNPACK_ARCHIVE, "No current file", [&] { enumerator->getFileName(); });
expectException(ErrorCodes::CANNOT_UNPACK_ARCHIVE, "No current file",
[&] { reader->readFile(std::move(enumerator)); });
expectException(ErrorCodes::CANNOT_UNPACK_ARCHIVE, "No current file", [&] { reader->readFile(std::move(enumerator)); });
}
}
@ -217,6 +220,10 @@ TEST_P(ArchiveReaderAndWriterTest, TwoFilesInArchive)
ASSERT_TRUE(reader->fileExists("a.txt"));
ASSERT_TRUE(reader->fileExists("b/c.txt"));
// Get all files
auto files = reader->getAllFiles();
EXPECT_EQ(files.size(), 2);
EXPECT_EQ(reader->getFileInfo("a.txt").uncompressed_size, a_contents.size());
EXPECT_EQ(reader->getFileInfo("b/c.txt").uncompressed_size, c_contents.size());
@ -272,6 +279,10 @@ TEST_P(ArchiveReaderAndWriterTest, TwoFilesInArchive)
enumerator = reader->nextFile(std::move(in));
EXPECT_EQ(enumerator, nullptr);
}
// Get all files one last time
files = reader->getAllFiles();
EXPECT_EQ(files.size(), 2);
}
@ -301,7 +312,8 @@ TEST_P(ArchiveReaderAndWriterTest, InMemory)
ASSERT_FALSE(fs::exists(getPathToArchive()));
/// Read the archive.
auto read_archive_func = [&]() -> std::unique_ptr<SeekableReadBuffer> { return std::make_unique<ReadBufferFromString>(archive_in_memory); };
auto read_archive_func
= [&]() -> std::unique_ptr<SeekableReadBuffer> { return std::make_unique<ReadBufferFromString>(archive_in_memory); };
auto reader = createArchiveReader(getPathToArchive(), read_archive_func, archive_in_memory.size());
ASSERT_TRUE(reader->fileExists("a.txt"));
@ -334,16 +346,163 @@ TEST_P(ArchiveReaderAndWriterTest, InMemory)
}
TEST_P(ArchiveReaderAndWriterTest, ManyFilesInMemory)
{
String archive_in_memory;
int files = 1000;
size_t times = 1;
/// Make an archive.
{
auto writer = createArchiveWriter(getPathToArchive(), std::make_unique<WriteBufferFromString>(archive_in_memory));
{
for (int i = 0; i < files; i++)
{
auto filename = std::format("{}.txt", i);
auto contents = std::format("The contents of {}.txt", i);
auto out = writer->writeFile(filename, times * contents.size());
for (int j = 0; j < times; j++)
writeString(contents, *out);
out->finalize();
}
}
writer->finalize();
}
/// The created archive is really in memory.
ASSERT_FALSE(fs::exists(getPathToArchive()));
/// Read the archive.
auto read_archive_func
= [&]() -> std::unique_ptr<SeekableReadBuffer> { return std::make_unique<ReadBufferFromString>(archive_in_memory); };
auto reader = createArchiveReader(getPathToArchive(), read_archive_func, archive_in_memory.size());
for (int i = 0; i < files; i++)
{
auto filename = std::format("{}.txt", i);
auto contents = std::format("The contents of {}.txt", i);
ASSERT_TRUE(reader->fileExists(filename));
EXPECT_EQ(reader->getFileInfo(filename).uncompressed_size, times * contents.size());
{
auto in = reader->readFile(filename, /*throw_on_not_found=*/true);
for (int j = 0; j < times; j++)
ASSERT_TRUE(checkString(String(contents), *in));
}
}
}
TEST_P(ArchiveReaderAndWriterTest, Password)
{
auto writer = createArchiveWriter(getPathToArchive());
//don't support passwords for tar archives
if (getPathToArchive().ends_with(".tar") || getPathToArchive().ends_with(".tar.gz") || getPathToArchive().ends_with(".tar.bz2")
|| getPathToArchive().ends_with(".tar.lzma") || getPathToArchive().ends_with(".tar.zst") || getPathToArchive().ends_with(".tar.xz"))
{
expectException(
ErrorCodes::NOT_IMPLEMENTED,
"Setting a password is not currently supported for libarchive",
[&] { writer->setPassword("a.txt"); });
writer->finalize();
}
else
{
/// Make an archive.
std::string_view contents = "The contents of a.txt";
{
writer->setPassword("Qwe123");
{
auto out = writer->writeFile("a.txt");
writeString(contents, *out);
out->finalize();
}
writer->finalize();
}
/// Read the archive.
auto reader = createArchiveReader(getPathToArchive());
/// Try to read without a password.
expectException(
ErrorCodes::CANNOT_UNPACK_ARCHIVE, "Password is required", [&] { reader->readFile("a.txt", /*throw_on_not_found=*/true); });
{
/// Try to read with a wrong password.
reader->setPassword("123Qwe");
expectException(
ErrorCodes::CANNOT_UNPACK_ARCHIVE, "Wrong password", [&] { reader->readFile("a.txt", /*throw_on_not_found=*/true); });
}
{
/// Reading with the right password is successful.
reader->setPassword("Qwe123");
auto in = reader->readFile("a.txt", /*throw_on_not_found=*/true);
String str;
readStringUntilEOF(str, *in);
EXPECT_EQ(str, contents);
}
}
}
TEST_P(ArchiveReaderAndWriterTest, ArchiveNotExist)
{
expectException(ErrorCodes::CANNOT_UNPACK_ARCHIVE, "Couldn't open", [&] { createArchiveReader(getPathToArchive()); });
}
TEST_P(ArchiveReaderAndWriterTest, ManyFilesOnDisk)
{
int files = 1000;
size_t times = 1;
/// Make an archive.
std::string_view contents = "The contents of a.txt";
{
auto writer = createArchiveWriter(getPathToArchive());
writer->setPassword("Qwe123");
{
auto out = writer->writeFile("a.txt");
writeString(contents, *out);
for (int i = 0; i < files; i++)
{
auto filename = std::format("{}.txt", i);
auto contents = std::format("The contents of {}.txt", i);
auto out = writer->writeFile(filename, times * contents.size());
for (int j = 0; j < times; j++)
writeString(contents, *out);
out->finalize();
}
}
writer->finalize();
}
/// The created archive is really in memory.
ASSERT_TRUE(fs::exists(getPathToArchive()));
/// Read the archive.
auto reader = createArchiveReader(getPathToArchive());
for (int i = 0; i < files; i++)
{
auto filename = std::format("{}.txt", i);
auto contents = std::format("The contents of {}.txt", i);
ASSERT_TRUE(reader->fileExists(filename));
EXPECT_EQ(reader->getFileInfo(filename).uncompressed_size, times * contents.size());
{
auto in = reader->readFile(filename, /*throw_on_not_found=*/true);
for (int j = 0; j < times; j++)
ASSERT_TRUE(checkString(String(contents), *in));
}
}
}
TEST_P(ArchiveReaderAndWriterTest, LargeFile)
{
/// Make an archive.
std::string_view contents = "The contents of a.txt\n";
int times = 10000000;
{
auto writer = createArchiveWriter(getPathToArchive());
{
auto out = writer->writeFile("a.txt", times * contents.size());
for (int i = 0; i < times; i++)
writeString(contents, *out);
out->finalize();
}
writer->finalize();
@ -352,35 +511,31 @@ TEST_P(ArchiveReaderAndWriterTest, Password)
/// Read the archive.
auto reader = createArchiveReader(getPathToArchive());
/// Try to read without a password.
expectException(ErrorCodes::CANNOT_UNPACK_ARCHIVE, "Password is required",
[&]{ reader->readFile("a.txt", /*throw_on_not_found=*/true); });
ASSERT_TRUE(reader->fileExists("a.txt"));
auto file_info = reader->getFileInfo("a.txt");
EXPECT_EQ(file_info.uncompressed_size, contents.size() * times);
EXPECT_GT(file_info.compressed_size, 0);
{
/// Try to read with a wrong password.
reader->setPassword("123Qwe");
expectException(ErrorCodes::CANNOT_UNPACK_ARCHIVE, "Wrong password",
[&]{ reader->readFile("a.txt", /*throw_on_not_found=*/true); });
}
{
/// Reading with the right password is successful.
reader->setPassword("Qwe123");
auto in = reader->readFile("a.txt", /*throw_on_not_found=*/true);
String str;
readStringUntilEOF(str, *in);
EXPECT_EQ(str, contents);
for (int i = 0; i < times; i++)
ASSERT_TRUE(checkString(String(contents), *in));
}
{
/// Use an enumerator.
auto enumerator = reader->firstFile();
ASSERT_NE(enumerator, nullptr);
EXPECT_EQ(enumerator->getFileName(), "a.txt");
EXPECT_EQ(enumerator->getFileInfo().uncompressed_size, contents.size() * times);
EXPECT_GT(enumerator->getFileInfo().compressed_size, 0);
EXPECT_FALSE(enumerator->nextFile());
}
}
TEST_P(ArchiveReaderAndWriterTest, ArchiveNotExist)
TEST(TarArchiveReaderTest, FileExists)
{
expectException(ErrorCodes::CANNOT_UNPACK_ARCHIVE, "Couldn't open",
[&]{ createArchiveReader(getPathToArchive()); });
}
TEST(TarArchiveReaderTest, FileExists) {
String archive_path = "archive.tar";
String filename = "file.txt";
String contents = "test";
@ -391,7 +546,8 @@ TEST(TarArchiveReaderTest, FileExists) {
fs::remove(archive_path);
}
TEST(TarArchiveReaderTest, ReadFile) {
TEST(TarArchiveReaderTest, ReadFile)
{
String archive_path = "archive.tar";
String filename = "file.txt";
String contents = "test";
@ -405,7 +561,8 @@ TEST(TarArchiveReaderTest, ReadFile) {
fs::remove(archive_path);
}
TEST(TarArchiveReaderTest, ReadTwoFiles) {
TEST(TarArchiveReaderTest, ReadTwoFiles)
{
String archive_path = "archive.tar";
String file1 = "file1.txt";
String contents1 = "test1";
@ -421,14 +578,15 @@ TEST(TarArchiveReaderTest, ReadTwoFiles) {
readStringUntilEOF(str, *in);
EXPECT_EQ(str, contents1);
in = reader->readFile(file2, /*throw_on_not_found=*/true);
readStringUntilEOF(str, *in);
EXPECT_EQ(str, contents2);
fs::remove(archive_path);
}
TEST(TarArchiveReaderTest, CheckFileInfo) {
TEST(TarArchiveReaderTest, CheckFileInfo)
{
String archive_path = "archive.tar";
String filename = "file.txt";
String contents = "test";
@ -441,7 +599,8 @@ TEST(TarArchiveReaderTest, CheckFileInfo) {
fs::remove(archive_path);
}
TEST(SevenZipArchiveReaderTest, FileExists) {
TEST(SevenZipArchiveReaderTest, FileExists)
{
String archive_path = "archive.7z";
String filename = "file.txt";
String contents = "test";
@ -452,7 +611,8 @@ TEST(SevenZipArchiveReaderTest, FileExists) {
fs::remove(archive_path);
}
TEST(SevenZipArchiveReaderTest, ReadFile) {
TEST(SevenZipArchiveReaderTest, ReadFile)
{
String archive_path = "archive.7z";
String filename = "file.txt";
String contents = "test";
@ -466,7 +626,8 @@ TEST(SevenZipArchiveReaderTest, ReadFile) {
fs::remove(archive_path);
}
TEST(SevenZipArchiveReaderTest, CheckFileInfo) {
TEST(SevenZipArchiveReaderTest, CheckFileInfo)
{
String archive_path = "archive.7z";
String filename = "file.txt";
String contents = "test";
@ -479,7 +640,8 @@ TEST(SevenZipArchiveReaderTest, CheckFileInfo) {
fs::remove(archive_path);
}
TEST(SevenZipArchiveReaderTest, ReadTwoFiles) {
TEST(SevenZipArchiveReaderTest, ReadTwoFiles)
{
String archive_path = "archive.7z";
String file1 = "file1.txt";
String contents1 = "test1";
@ -495,23 +657,28 @@ TEST(SevenZipArchiveReaderTest, ReadTwoFiles) {
readStringUntilEOF(str, *in);
EXPECT_EQ(str, contents1);
in = reader->readFile(file2, /*throw_on_not_found=*/true);
readStringUntilEOF(str, *in);
EXPECT_EQ(str, contents2);
fs::remove(archive_path);
}
#if USE_MINIZIP
namespace
{
const char * supported_archive_file_exts[] =
{
".zip"
};
const char * supported_archive_file_exts[] = {
#if USE_MINIZIP
".zip",
#endif
#if USE_LIBARCHIVE
".tar",
".tar.gz",
".tar.bz2",
".tar.lzma",
".tar.zst",
".tar.xz",
#endif
};
}
INSTANTIATE_TEST_SUITE_P(All, ArchiveReaderAndWriterTest, ::testing::ValuesIn(supported_archive_file_exts));
#endif

View File

@ -591,6 +591,138 @@ def test_zip_archive_with_bad_compression_method():
)
def test_tar_archive():
backup_name = f"Disk('backups', 'archive.tar')"
create_and_fill_table()
assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n"
instance.query(f"BACKUP TABLE test.table TO {backup_name}")
assert os.path.isfile(get_path_to_backup(backup_name))
instance.query("DROP TABLE test.table")
assert instance.query("EXISTS test.table") == "0\n"
instance.query(f"RESTORE TABLE test.table FROM {backup_name}")
assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n"
def test_tar_bz2_archive():
backup_name = f"Disk('backups', 'archive.tar.bz2')"
create_and_fill_table()
assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n"
instance.query(f"BACKUP TABLE test.table TO {backup_name}")
assert os.path.isfile(get_path_to_backup(backup_name))
instance.query("DROP TABLE test.table")
assert instance.query("EXISTS test.table") == "0\n"
instance.query(f"RESTORE TABLE test.table FROM {backup_name}")
assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n"
def test_tar_gz_archive():
backup_name = f"Disk('backups', 'archive.tar.gz')"
create_and_fill_table()
assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n"
instance.query(f"BACKUP TABLE test.table TO {backup_name}")
assert os.path.isfile(get_path_to_backup(backup_name))
instance.query("DROP TABLE test.table")
assert instance.query("EXISTS test.table") == "0\n"
instance.query(f"RESTORE TABLE test.table FROM {backup_name}")
assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n"
def test_tar_lzma_archive():
backup_name = f"Disk('backups', 'archive.tar.lzma')"
create_and_fill_table()
assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n"
instance.query(f"BACKUP TABLE test.table TO {backup_name}")
assert os.path.isfile(get_path_to_backup(backup_name))
instance.query("DROP TABLE test.table")
assert instance.query("EXISTS test.table") == "0\n"
instance.query(f"RESTORE TABLE test.table FROM {backup_name}")
assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n"
def test_tar_zst_archive():
backup_name = f"Disk('backups', 'archive.tar.zst')"
create_and_fill_table()
assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n"
instance.query(f"BACKUP TABLE test.table TO {backup_name}")
assert os.path.isfile(get_path_to_backup(backup_name))
instance.query("DROP TABLE test.table")
assert instance.query("EXISTS test.table") == "0\n"
instance.query(f"RESTORE TABLE test.table FROM {backup_name}")
assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n"
def test_tar_xz_archive():
backup_name = f"Disk('backups', 'archive.tar.xz')"
create_and_fill_table()
assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n"
instance.query(f"BACKUP TABLE test.table TO {backup_name}")
assert os.path.isfile(get_path_to_backup(backup_name))
instance.query("DROP TABLE test.table")
assert instance.query("EXISTS test.table") == "0\n"
instance.query(f"RESTORE TABLE test.table FROM {backup_name}")
assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n"
def test_tar_archive_with_password():
backup_name = f"Disk('backups', 'archive_with_password.tar')"
create_and_fill_table()
assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n"
expected_error = "Setting a password is not currently supported for libarchive"
assert expected_error in instance.query_and_get_error(
f"BACKUP TABLE test.table TO {backup_name} SETTINGS id='tar_archive_with_password', password='password123'"
)
assert (
instance.query(
"SELECT status FROM system.backups WHERE id='tar_archive_with_password'"
)
== "BACKUP_FAILED\n"
)
def test_tar_archive_with_bad_compression_method():
backup_name = f"Disk('backups', 'archive_with_bad_compression_method.tar')"
create_and_fill_table()
assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n"
expected_error = "Using compression_method and compression_level options are not supported for tar archives"
assert expected_error in instance.query_and_get_error(
f"BACKUP TABLE test.table TO {backup_name} SETTINGS id='tar_archive_with_bad_compression_method', compression_method='foobar'"
)
assert (
instance.query(
"SELECT status FROM system.backups WHERE id='tar_archive_with_bad_compression_method'"
)
== "BACKUP_FAILED\n"
)
def test_async():
create_and_fill_table()
assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n"

View File

@ -454,6 +454,48 @@ def test_backup_to_zip():
check_backup_and_restore(storage_policy, backup_destination)
def test_backup_to_tar():
storage_policy = "default"
backup_name = new_backup_name()
backup_destination = f"S3('http://minio1:9001/root/data/backups/{backup_name}.tar', 'minio', 'minio123')"
check_backup_and_restore(storage_policy, backup_destination)
def test_backup_to_tar_gz():
storage_policy = "default"
backup_name = new_backup_name()
backup_destination = f"S3('http://minio1:9001/root/data/backups/{backup_name}.tar.gz', 'minio', 'minio123')"
check_backup_and_restore(storage_policy, backup_destination)
def test_backup_to_tar_bz2():
storage_policy = "default"
backup_name = new_backup_name()
backup_destination = f"S3('http://minio1:9001/root/data/backups/{backup_name}.tar.bz2', 'minio', 'minio123')"
check_backup_and_restore(storage_policy, backup_destination)
def test_backup_to_tar_lzma():
storage_policy = "default"
backup_name = new_backup_name()
backup_destination = f"S3('http://minio1:9001/root/data/backups/{backup_name}.tar.lzma', 'minio', 'minio123')"
check_backup_and_restore(storage_policy, backup_destination)
def test_backup_to_tar_zst():
storage_policy = "default"
backup_name = new_backup_name()
backup_destination = f"S3('http://minio1:9001/root/data/backups/{backup_name}.tar.zst', 'minio', 'minio123')"
check_backup_and_restore(storage_policy, backup_destination)
def test_backup_to_tar_xz():
storage_policy = "default"
backup_name = new_backup_name()
backup_destination = f"S3('http://minio1:9001/root/data/backups/{backup_name}.tar.xz', 'minio', 'minio123')"
check_backup_and_restore(storage_policy, backup_destination)
def test_user_specific_auth(start_cluster):
def create_user(user):
node.query(f"CREATE USER {user}")