Fix tests

This commit is contained in:
Antonio Andelic 2023-08-04 08:59:55 +00:00
parent 5c7788b154
commit 8e7eb7f3fa
4 changed files with 110 additions and 88 deletions

View File

@ -1,9 +1,11 @@
#include <IO/Archives/LibArchiveReader.h> #include <IO/Archives/LibArchiveReader.h>
#include <IO/ReadBufferFromFileBase.h> #include <IO/ReadBufferFromFileBase.h>
#include <Common/quoteString.h> #include <Common/quoteString.h>
#include <Common/scope_guard_safe.h>
#include <IO/Archives/ArchiveUtils.h> #include <IO/Archives/ArchiveUtils.h>
#include <mutex>
namespace DB namespace DB
{ {
@ -18,12 +20,11 @@ namespace ErrorCodes
extern const int UNSUPPORTED_METHOD; extern const int UNSUPPORTED_METHOD;
} }
class LibArchiveReader::Handle
template <typename ArchiveInfo>
class LibArchiveReader<ArchiveInfo>::Handle
{ {
public: public:
explicit Handle(const String & path_to_archive_) : path_to_archive(path_to_archive_) explicit Handle(std::string path_to_archive_, bool lock_on_reading_)
: path_to_archive(path_to_archive_), lock_on_reading(lock_on_reading_)
{ {
current_archive = open(path_to_archive); current_archive = open(path_to_archive);
current_entry = archive_entry_new(); current_entry = archive_entry_new();
@ -40,11 +41,7 @@ public:
~Handle() ~Handle()
{ {
if (current_archive) close(current_archive);
{
archive_read_close(current_archive);
archive_read_free(current_archive);
}
} }
bool locateFile(const std::string & filename) bool locateFile(const std::string & filename)
@ -58,7 +55,7 @@ public:
int err = ARCHIVE_OK; int err = ARCHIVE_OK;
while (true) while (true)
{ {
err = archive_read_next_header(current_archive, &current_entry); err = readNextHeader(current_archive, &current_entry);
if (err == ARCHIVE_RETRY) if (err == ARCHIVE_RETRY)
continue; continue;
@ -80,59 +77,37 @@ public:
int err = ARCHIVE_OK; int err = ARCHIVE_OK;
do do
{ {
err = archive_read_next_header(current_archive, &current_entry); err = readNextHeader(current_archive, &current_entry);
} while (err == ARCHIVE_RETRY); } while (err == ARCHIVE_RETRY);
checkError(err); checkError(err);
return err == ARCHIVE_OK; return err == ARCHIVE_OK;
} }
static struct archive * open(const String & path_to_archive)
{
auto * archive = archive_read_new();
archive_read_support_filter_all(archive);
archive_read_support_format_all(archive);
if (archive_read_open_filename(archive, path_to_archive.c_str(), 10240) != ARCHIVE_OK)
throw Exception(ErrorCodes::CANNOT_UNPACK_ARCHIVE, "Couldn't open {} archive: {}", ArchiveInfo::name, quoteString(path_to_archive));
return archive;
}
std::vector<std::string> getAllFiles(NameFilter filter) std::vector<std::string> getAllFiles(NameFilter filter)
{ {
auto * archive = open(path_to_archive); auto * archive = open(path_to_archive);
SCOPE_EXIT(
close(archive);
);
auto * entry = archive_entry_new(); auto * entry = archive_entry_new();
std::vector<std::string> files; std::vector<std::string> files;
int error = archive_read_next_header(archive, &entry); int error = readNextHeader(archive, &entry);
while (error == ARCHIVE_OK || error == ARCHIVE_RETRY) while (error == ARCHIVE_OK || error == ARCHIVE_RETRY)
{ {
std::string name = archive_entry_pathname(entry); std::string name = archive_entry_pathname(entry);
if (!filter || filter(name)) if (!filter || filter(name))
files.push_back(std::move(name)); files.push_back(std::move(name));
error = archive_read_next_header(archive, &entry); error = readNextHeader(archive, &entry);
} }
archive_read_close(archive);
archive_read_free(archive);
checkError(error); checkError(error);
return files; return files;
} }
void checkError(int error)
{
if (error == ARCHIVE_FATAL)
throw Exception(ErrorCodes::CANNOT_UNPACK_ARCHIVE, "Failed to read archive while fetching all files: {}", archive_error_string(current_archive));
}
void resetFileInfo()
{
file_name.reset();
file_info.reset();
}
const String & getFileName() const const String & getFileName() const
{ {
if (!file_name) if (!file_name)
@ -157,13 +132,67 @@ public:
struct archive * current_archive; struct archive * current_archive;
struct archive_entry * current_entry; struct archive_entry * current_entry;
private: private:
void checkError(int error) const
{
if (error == ARCHIVE_FATAL)
throw Exception(ErrorCodes::CANNOT_UNPACK_ARCHIVE, "Failed to read archive while fetching all files: {}", archive_error_string(current_archive));
}
void resetFileInfo()
{
file_name.reset();
file_info.reset();
}
static struct archive * open(const String & path_to_archive)
{
auto * archive = archive_read_new();
try
{
archive_read_support_filter_all(archive);
archive_read_support_format_all(archive);
if (archive_read_open_filename(archive, path_to_archive.c_str(), 10240) != ARCHIVE_OK)
throw Exception(ErrorCodes::CANNOT_UNPACK_ARCHIVE, "Couldn't open archive: {}", quoteString(path_to_archive));
}
catch (...)
{
close(archive);
throw;
}
return archive;
}
static void close(struct archive * archive)
{
if (archive)
{
archive_read_close(archive);
archive_read_free(archive);
}
}
int readNextHeader(struct archive * archive, struct archive_entry ** entry) const
{
std::unique_lock lock(Handle::read_lock, std::defer_lock);
if (lock_on_reading)
lock.lock();
return archive_read_next_header(archive, entry);
}
const String path_to_archive; const String path_to_archive;
/// for some archive types when we are reading headers static variables are used
/// which are not thread-safe
const bool lock_on_reading = false;
static inline std::mutex read_lock;
mutable std::optional<String> file_name; mutable std::optional<String> file_name;
mutable std::optional<FileInfo> file_info; mutable std::optional<FileInfo> file_info;
}; };
template <typename ArchiveInfo> class LibArchiveReader::FileEnumeratorImpl : public FileEnumerator
class LibArchiveReader<ArchiveInfo>::FileEnumeratorImpl : public FileEnumerator
{ {
public: public:
explicit FileEnumeratorImpl(Handle handle_) : handle(std::move(handle_)) {} explicit FileEnumeratorImpl(Handle handle_) : handle(std::move(handle_)) {}
@ -178,8 +207,7 @@ private:
Handle handle; Handle handle;
}; };
template <typename ArchiveInfo> class LibArchiveReader::ReadBufferFromLibArchive : public ReadBufferFromFileBase
class LibArchiveReader<ArchiveInfo>::ReadBufferFromLibArchive : public ReadBufferFromFileBase
{ {
public: public:
explicit ReadBufferFromLibArchive(Handle handle_, std::string path_to_archive_) explicit ReadBufferFromLibArchive(Handle handle_, std::string path_to_archive_)
@ -228,63 +256,55 @@ private:
size_t total_bytes_read = 0; size_t total_bytes_read = 0;
}; };
template <typename ArchiveInfo> LibArchiveReader::LibArchiveReader(std::string archive_name_, bool lock_on_reading_, std::string path_to_archive_)
LibArchiveReader<ArchiveInfo>::LibArchiveReader(const String & path_to_archive_) : path_to_archive(path_to_archive_) : archive_name(std::move(archive_name_)), lock_on_reading(lock_on_reading_), path_to_archive(std::move(path_to_archive_))
{} {}
template <typename ArchiveInfo> LibArchiveReader::~LibArchiveReader() = default;
LibArchiveReader<ArchiveInfo>::~LibArchiveReader() = default;
template <typename ArchiveInfo> const std::string & LibArchiveReader::getPath() const
const std::string & LibArchiveReader<ArchiveInfo>::getPath() const
{ {
return path_to_archive; return path_to_archive;
} }
template <typename ArchiveInfo> bool LibArchiveReader::fileExists(const String & filename)
bool LibArchiveReader<ArchiveInfo>::fileExists(const String & filename)
{ {
Handle handle(path_to_archive); Handle handle(path_to_archive, lock_on_reading);
return handle.locateFile(filename); return handle.locateFile(filename);
} }
template <typename ArchiveInfo> LibArchiveReader::FileInfo LibArchiveReader::getFileInfo(const String & filename)
LibArchiveReader<ArchiveInfo>::FileInfo LibArchiveReader<ArchiveInfo>::getFileInfo(const String & filename)
{ {
Handle handle(path_to_archive); Handle handle(path_to_archive, lock_on_reading);
if (!handle.locateFile(filename)) if (!handle.locateFile(filename))
throw Exception(ErrorCodes::CANNOT_UNPACK_ARCHIVE, "Couldn't unpack archive {}: file not found", path_to_archive); throw Exception(ErrorCodes::CANNOT_UNPACK_ARCHIVE, "Couldn't unpack archive {}: file not found", path_to_archive);
return handle.getFileInfo(); return handle.getFileInfo();
} }
template <typename ArchiveInfo> std::unique_ptr<LibArchiveReader::FileEnumerator> LibArchiveReader::firstFile()
std::unique_ptr<typename LibArchiveReader<ArchiveInfo>::FileEnumerator> LibArchiveReader<ArchiveInfo>::firstFile()
{ {
Handle handle(path_to_archive); Handle handle(path_to_archive, lock_on_reading);
if (!handle.nextFile()) if (!handle.nextFile())
return nullptr; return nullptr;
return std::make_unique<FileEnumeratorImpl>(std::move(handle)); return std::make_unique<FileEnumeratorImpl>(std::move(handle));
} }
template <typename ArchiveInfo> std::unique_ptr<ReadBufferFromFileBase> LibArchiveReader::readFile(const String & filename)
std::unique_ptr<ReadBufferFromFileBase> LibArchiveReader<ArchiveInfo>::readFile(const String & filename)
{ {
return readFile([&](const std::string & file) { return file == filename; }); return readFile([&](const std::string & file) { return file == filename; });
} }
template <typename ArchiveInfo> std::unique_ptr<ReadBufferFromFileBase> LibArchiveReader::readFile(NameFilter filter)
std::unique_ptr<ReadBufferFromFileBase> LibArchiveReader<ArchiveInfo>::readFile(NameFilter filter)
{ {
Handle handle(path_to_archive); Handle handle(path_to_archive, lock_on_reading);
if (!handle.locateFile(filter)) if (!handle.locateFile(filter))
throw Exception( throw Exception(
ErrorCodes::CANNOT_UNPACK_ARCHIVE, "Couldn't unpack archive {}: no file found satisfying the filter", path_to_archive); ErrorCodes::CANNOT_UNPACK_ARCHIVE, "Couldn't unpack archive {}: no file found satisfying the filter", path_to_archive);
return std::make_unique<ReadBufferFromLibArchive>(std::move(handle), path_to_archive); return std::make_unique<ReadBufferFromLibArchive>(std::move(handle), path_to_archive);
} }
template <typename ArchiveInfo> std::unique_ptr<ReadBufferFromFileBase> LibArchiveReader::readFile(std::unique_ptr<FileEnumerator> enumerator)
std::unique_ptr<ReadBufferFromFileBase> LibArchiveReader<ArchiveInfo>::readFile(std::unique_ptr<FileEnumerator> enumerator)
{ {
if (!dynamic_cast<FileEnumeratorImpl *>(enumerator.get())) if (!dynamic_cast<FileEnumeratorImpl *>(enumerator.get()))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Wrong enumerator passed to readFile()"); throw Exception(ErrorCodes::LOGICAL_ERROR, "Wrong enumerator passed to readFile()");
@ -293,8 +313,7 @@ std::unique_ptr<ReadBufferFromFileBase> LibArchiveReader<ArchiveInfo>::readFile(
return std::make_unique<ReadBufferFromLibArchive>(std::move(handle), path_to_archive); return std::make_unique<ReadBufferFromLibArchive>(std::move(handle), path_to_archive);
} }
template <typename ArchiveInfo> std::unique_ptr<typename LibArchiveReader<ArchiveInfo>::FileEnumerator> std::unique_ptr<LibArchiveReader::FileEnumerator> LibArchiveReader::nextFile(std::unique_ptr<ReadBuffer> read_buffer)
LibArchiveReader<ArchiveInfo>::nextFile(std::unique_ptr<ReadBuffer> read_buffer)
{ {
if (!dynamic_cast<ReadBufferFromLibArchive *>(read_buffer.get())) if (!dynamic_cast<ReadBufferFromLibArchive *>(read_buffer.get()))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Wrong ReadBuffer passed to nextFile()"); throw Exception(ErrorCodes::LOGICAL_ERROR, "Wrong ReadBuffer passed to nextFile()");
@ -305,28 +324,22 @@ LibArchiveReader<ArchiveInfo>::nextFile(std::unique_ptr<ReadBuffer> read_buffer)
return std::make_unique<FileEnumeratorImpl>(std::move(handle)); return std::make_unique<FileEnumeratorImpl>(std::move(handle));
} }
template <typename ArchiveInfo> std::vector<std::string> LibArchiveReader::getAllFiles()
std::vector<std::string> LibArchiveReader<ArchiveInfo>::getAllFiles()
{ {
return getAllFiles({}); return getAllFiles({});
} }
template <typename ArchiveInfo> std::vector<std::string> LibArchiveReader::getAllFiles(NameFilter filter)
std::vector<std::string> LibArchiveReader<ArchiveInfo>::getAllFiles(NameFilter filter)
{ {
Handle handle(path_to_archive); Handle handle(path_to_archive, lock_on_reading);
return handle.getAllFiles(filter); return handle.getAllFiles(filter);
} }
template <typename ArchiveInfo> void LibArchiveReader::setPassword(const String & /*password_*/)
void LibArchiveReader<ArchiveInfo>::setPassword(const String & /*password_*/)
{ {
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can not set password to {} archive", ArchiveInfo::name); throw Exception(ErrorCodes::LOGICAL_ERROR, "Can not set password to {} archive", archive_name);
} }
template class LibArchiveReader<TarArchiveInfo>;
template class LibArchiveReader<SevenZipArchiveInfo>;
#endif #endif
} }

View File

@ -16,13 +16,9 @@ class ReadBufferFromFileBase;
class SeekableReadBuffer; class SeekableReadBuffer;
/// Implementation of IArchiveReader for reading archives using libarchive. /// Implementation of IArchiveReader for reading archives using libarchive.
template <typename ArchiveInfo>
class LibArchiveReader : public IArchiveReader class LibArchiveReader : public IArchiveReader
{ {
public: public:
/// Constructs an archive's reader that will read from a file in the local filesystem.
explicit LibArchiveReader(const String & path_to_archive_);
~LibArchiveReader() override; ~LibArchiveReader() override;
const std::string & getPath() const override; const std::string & getPath() const override;
@ -52,18 +48,31 @@ public:
/// Sets password used to decrypt the contents of the files in the archive. /// Sets password used to decrypt the contents of the files in the archive.
void setPassword(const String & password_) override; void setPassword(const String & password_) override;
protected:
/// Constructs an archive's reader that will read from a file in the local filesystem.
LibArchiveReader(std::string archive_name_, bool lock_on_reading_, std::string path_to_archive_);
private: private:
class ReadBufferFromLibArchive; class ReadBufferFromLibArchive;
class Handle; class Handle;
class FileEnumeratorImpl; class FileEnumeratorImpl;
const std::string archive_name;
const bool lock_on_reading;
const String path_to_archive; const String path_to_archive;
}; };
struct TarArchiveInfo { static constexpr std::string_view name = "tar"; }; class TarArchiveReader : public LibArchiveReader
using TarArchiveReader = LibArchiveReader<TarArchiveInfo>; {
struct SevenZipArchiveInfo { static constexpr std::string_view name = "7z"; }; public:
using SevenZipArchiveReader = LibArchiveReader<SevenZipArchiveInfo>; explicit TarArchiveReader(std::string path_to_archive) : LibArchiveReader("tar", /*lock_on_reading_=*/ true, std::move(path_to_archive)) { }
};
class SevenZipArchiveReader : public LibArchiveReader
{
public:
explicit SevenZipArchiveReader(std::string path_to_archive) : LibArchiveReader("7z", /*lock_on_reading_=*/ false, std::move(path_to_archive)) { }
};
#endif #endif

View File

@ -24,8 +24,7 @@ namespace ErrorCodes
void ITableFunctionFileLike::parseFirstArguments(const ASTPtr & arg, const ContextPtr &) void ITableFunctionFileLike::parseFirstArguments(const ASTPtr & arg, const ContextPtr &)
{ {
String path = checkAndGetLiteralArgument<String>(arg, "source"); filename = checkAndGetLiteralArgument<String>(arg, "source");
StorageFile::parseFileSource(std::move(path), filename, path_to_archive);
} }
String ITableFunctionFileLike::getFormatFromFirstArgument() String ITableFunctionFileLike::getFormatFromFirstArgument()

View File

@ -25,6 +25,7 @@ void TableFunctionFile::parseFirstArguments(const ASTPtr & arg, const ContextPtr
if (context->getApplicationType() != Context::ApplicationType::LOCAL) if (context->getApplicationType() != Context::ApplicationType::LOCAL)
{ {
ITableFunctionFileLike::parseFirstArguments(arg, context); ITableFunctionFileLike::parseFirstArguments(arg, context);
StorageFile::parseFileSource(std::move(filename), filename, path_to_archive);
return; return;
} }