Merge branch 'master' into ADQM-1011

This commit is contained in:
Alexey Gerasimchuck 2023-08-07 16:29:21 +10:00 committed by GitHub
commit a5a7a121c0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
32 changed files with 3115 additions and 44 deletions

4
.gitmodules vendored
View File

@ -331,6 +331,10 @@
[submodule "contrib/liburing"]
path = contrib/liburing
url = https://github.com/axboe/liburing
[submodule "contrib/libarchive"]
path = contrib/libarchive
url = https://github.com/libarchive/libarchive.git
ignore = dirty
[submodule "contrib/libfiu"]
path = contrib/libfiu
url = https://github.com/ClickHouse/libfiu.git

View File

@ -92,6 +92,7 @@ add_contrib (google-protobuf-cmake google-protobuf)
add_contrib (openldap-cmake openldap)
add_contrib (grpc-cmake grpc)
add_contrib (msgpack-c-cmake msgpack-c)
add_contrib (libarchive-cmake libarchive)
add_contrib (corrosion-cmake corrosion)

1
contrib/libarchive vendored Submodule

@ -0,0 +1 @@
Subproject commit ee45796171324519f0c0bfd012018dd099296336

View File

@ -0,0 +1,172 @@
set (LIBRARY_DIR "${ClickHouse_SOURCE_DIR}/contrib/libarchive")
set(SRCS
"${LIBRARY_DIR}/libarchive/archive_acl.c"
"${LIBRARY_DIR}/libarchive/archive_blake2sp_ref.c"
"${LIBRARY_DIR}/libarchive/archive_blake2s_ref.c"
"${LIBRARY_DIR}/libarchive/archive_check_magic.c"
"${LIBRARY_DIR}/libarchive/archive_cmdline.c"
"${LIBRARY_DIR}/libarchive/archive_cryptor.c"
"${LIBRARY_DIR}/libarchive/archive_digest.c"
"${LIBRARY_DIR}/libarchive/archive_disk_acl_darwin.c"
"${LIBRARY_DIR}/libarchive/archive_disk_acl_freebsd.c"
"${LIBRARY_DIR}/libarchive/archive_disk_acl_linux.c"
"${LIBRARY_DIR}/libarchive/archive_disk_acl_sunos.c"
"${LIBRARY_DIR}/libarchive/archive_entry.c"
"${LIBRARY_DIR}/libarchive/archive_entry_copy_bhfi.c"
"${LIBRARY_DIR}/libarchive/archive_entry_copy_stat.c"
"${LIBRARY_DIR}/libarchive/archive_entry_link_resolver.c"
"${LIBRARY_DIR}/libarchive/archive_entry_sparse.c"
"${LIBRARY_DIR}/libarchive/archive_entry_stat.c"
"${LIBRARY_DIR}/libarchive/archive_entry_strmode.c"
"${LIBRARY_DIR}/libarchive/archive_entry_xattr.c"
"${LIBRARY_DIR}/libarchive/archive_getdate.c"
"${LIBRARY_DIR}/libarchive/archive_hmac.c"
"${LIBRARY_DIR}/libarchive/archive_match.c"
"${LIBRARY_DIR}/libarchive/archive_options.c"
"${LIBRARY_DIR}/libarchive/archive_pack_dev.c"
"${LIBRARY_DIR}/libarchive/archive_pathmatch.c"
"${LIBRARY_DIR}/libarchive/archive_ppmd7.c"
"${LIBRARY_DIR}/libarchive/archive_ppmd8.c"
"${LIBRARY_DIR}/libarchive/archive_random.c"
"${LIBRARY_DIR}/libarchive/archive_rb.c"
"${LIBRARY_DIR}/libarchive/archive_read_add_passphrase.c"
"${LIBRARY_DIR}/libarchive/archive_read_append_filter.c"
"${LIBRARY_DIR}/libarchive/archive_read.c"
"${LIBRARY_DIR}/libarchive/archive_read_data_into_fd.c"
"${LIBRARY_DIR}/libarchive/archive_read_disk_entry_from_file.c"
"${LIBRARY_DIR}/libarchive/archive_read_disk_posix.c"
"${LIBRARY_DIR}/libarchive/archive_read_disk_set_standard_lookup.c"
"${LIBRARY_DIR}/libarchive/archive_read_disk_windows.c"
"${LIBRARY_DIR}/libarchive/archive_read_extract2.c"
"${LIBRARY_DIR}/libarchive/archive_read_extract.c"
"${LIBRARY_DIR}/libarchive/archive_read_open_fd.c"
"${LIBRARY_DIR}/libarchive/archive_read_open_file.c"
"${LIBRARY_DIR}/libarchive/archive_read_open_filename.c"
"${LIBRARY_DIR}/libarchive/archive_read_open_memory.c"
"${LIBRARY_DIR}/libarchive/archive_read_set_format.c"
"${LIBRARY_DIR}/libarchive/archive_read_set_options.c"
"${LIBRARY_DIR}/libarchive/archive_read_support_filter_all.c"
"${LIBRARY_DIR}/libarchive/archive_read_support_filter_by_code.c"
"${LIBRARY_DIR}/libarchive/archive_read_support_filter_bzip2.c"
"${LIBRARY_DIR}/libarchive/archive_read_support_filter_compress.c"
"${LIBRARY_DIR}/libarchive/archive_read_support_filter_grzip.c"
"${LIBRARY_DIR}/libarchive/archive_read_support_filter_gzip.c"
"${LIBRARY_DIR}/libarchive/archive_read_support_filter_lrzip.c"
"${LIBRARY_DIR}/libarchive/archive_read_support_filter_lz4.c"
"${LIBRARY_DIR}/libarchive/archive_read_support_filter_lzop.c"
"${LIBRARY_DIR}/libarchive/archive_read_support_filter_none.c"
"${LIBRARY_DIR}/libarchive/archive_read_support_filter_program.c"
"${LIBRARY_DIR}/libarchive/archive_read_support_filter_rpm.c"
"${LIBRARY_DIR}/libarchive/archive_read_support_filter_uu.c"
"${LIBRARY_DIR}/libarchive/archive_read_support_filter_xz.c"
"${LIBRARY_DIR}/libarchive/archive_read_support_filter_zstd.c"
"${LIBRARY_DIR}/libarchive/archive_read_support_format_7zip.c"
"${LIBRARY_DIR}/libarchive/archive_read_support_format_all.c"
"${LIBRARY_DIR}/libarchive/archive_read_support_format_ar.c"
"${LIBRARY_DIR}/libarchive/archive_read_support_format_by_code.c"
"${LIBRARY_DIR}/libarchive/archive_read_support_format_cab.c"
"${LIBRARY_DIR}/libarchive/archive_read_support_format_cpio.c"
"${LIBRARY_DIR}/libarchive/archive_read_support_format_empty.c"
"${LIBRARY_DIR}/libarchive/archive_read_support_format_iso9660.c"
"${LIBRARY_DIR}/libarchive/archive_read_support_format_lha.c"
"${LIBRARY_DIR}/libarchive/archive_read_support_format_mtree.c"
"${LIBRARY_DIR}/libarchive/archive_read_support_format_rar5.c"
"${LIBRARY_DIR}/libarchive/archive_read_support_format_rar.c"
"${LIBRARY_DIR}/libarchive/archive_read_support_format_raw.c"
"${LIBRARY_DIR}/libarchive/archive_read_support_format_tar.c"
"${LIBRARY_DIR}/libarchive/archive_read_support_format_warc.c"
"${LIBRARY_DIR}/libarchive/archive_read_support_format_xar.c"
"${LIBRARY_DIR}/libarchive/archive_read_support_format_zip.c"
"${LIBRARY_DIR}/libarchive/archive_string.c"
"${LIBRARY_DIR}/libarchive/archive_string_sprintf.c"
"${LIBRARY_DIR}/libarchive/archive_util.c"
"${LIBRARY_DIR}/libarchive/archive_version_details.c"
"${LIBRARY_DIR}/libarchive/archive_virtual.c"
"${LIBRARY_DIR}/libarchive/archive_windows.c"
"${LIBRARY_DIR}/libarchive/archive_write_add_filter_b64encode.c"
"${LIBRARY_DIR}/libarchive/archive_write_add_filter_by_name.c"
"${LIBRARY_DIR}/libarchive/archive_write_add_filter_bzip2.c"
"${LIBRARY_DIR}/libarchive/archive_write_add_filter.c"
"${LIBRARY_DIR}/libarchive/archive_write_add_filter_compress.c"
"${LIBRARY_DIR}/libarchive/archive_write_add_filter_grzip.c"
"${LIBRARY_DIR}/libarchive/archive_write_add_filter_gzip.c"
"${LIBRARY_DIR}/libarchive/archive_write_add_filter_lrzip.c"
"${LIBRARY_DIR}/libarchive/archive_write_add_filter_lz4.c"
"${LIBRARY_DIR}/libarchive/archive_write_add_filter_lzop.c"
"${LIBRARY_DIR}/libarchive/archive_write_add_filter_none.c"
"${LIBRARY_DIR}/libarchive/archive_write_add_filter_program.c"
"${LIBRARY_DIR}/libarchive/archive_write_add_filter_uuencode.c"
"${LIBRARY_DIR}/libarchive/archive_write_add_filter_xz.c"
"${LIBRARY_DIR}/libarchive/archive_write_add_filter_zstd.c"
"${LIBRARY_DIR}/libarchive/archive_write.c"
"${LIBRARY_DIR}/libarchive/archive_write_disk_posix.c"
"${LIBRARY_DIR}/libarchive/archive_write_disk_set_standard_lookup.c"
"${LIBRARY_DIR}/libarchive/archive_write_disk_windows.c"
"${LIBRARY_DIR}/libarchive/archive_write_open_fd.c"
"${LIBRARY_DIR}/libarchive/archive_write_open_file.c"
"${LIBRARY_DIR}/libarchive/archive_write_open_filename.c"
"${LIBRARY_DIR}/libarchive/archive_write_open_memory.c"
"${LIBRARY_DIR}/libarchive/archive_write_set_format_7zip.c"
"${LIBRARY_DIR}/libarchive/archive_write_set_format_ar.c"
"${LIBRARY_DIR}/libarchive/archive_write_set_format_by_name.c"
"${LIBRARY_DIR}/libarchive/archive_write_set_format.c"
"${LIBRARY_DIR}/libarchive/archive_write_set_format_cpio_binary.c"
"${LIBRARY_DIR}/libarchive/archive_write_set_format_cpio.c"
"${LIBRARY_DIR}/libarchive/archive_write_set_format_cpio_newc.c"
"${LIBRARY_DIR}/libarchive/archive_write_set_format_cpio_odc.c"
"${LIBRARY_DIR}/libarchive/archive_write_set_format_filter_by_ext.c"
"${LIBRARY_DIR}/libarchive/archive_write_set_format_gnutar.c"
"${LIBRARY_DIR}/libarchive/archive_write_set_format_iso9660.c"
"${LIBRARY_DIR}/libarchive/archive_write_set_format_mtree.c"
"${LIBRARY_DIR}/libarchive/archive_write_set_format_pax.c"
"${LIBRARY_DIR}/libarchive/archive_write_set_format_raw.c"
"${LIBRARY_DIR}/libarchive/archive_write_set_format_shar.c"
"${LIBRARY_DIR}/libarchive/archive_write_set_format_ustar.c"
"${LIBRARY_DIR}/libarchive/archive_write_set_format_v7tar.c"
"${LIBRARY_DIR}/libarchive/archive_write_set_format_warc.c"
"${LIBRARY_DIR}/libarchive/archive_write_set_format_xar.c"
"${LIBRARY_DIR}/libarchive/archive_write_set_format_zip.c"
"${LIBRARY_DIR}/libarchive/archive_write_set_options.c"
"${LIBRARY_DIR}/libarchive/archive_write_set_passphrase.c"
"${LIBRARY_DIR}/libarchive/filter_fork_posix.c"
"${LIBRARY_DIR}/libarchive/filter_fork_windows.c"
"${LIBRARY_DIR}/libarchive/xxhash.c"
)
add_library(_libarchive ${SRCS})
target_include_directories(_libarchive PUBLIC
${CMAKE_CURRENT_SOURCE_DIR}
"${LIBRARY_DIR}/libarchive"
)
target_compile_definitions(_libarchive PUBLIC
HAVE_CONFIG_H
)
target_compile_options(_libarchive PRIVATE "-Wno-reserved-macro-identifier")
if (TARGET ch_contrib::xz)
target_compile_definitions(_libarchive PUBLIC HAVE_LZMA_H=1)
target_link_libraries(_libarchive PRIVATE ch_contrib::xz)
endif()
if (TARGET ch_contrib::zlib)
target_compile_definitions(_libarchive PUBLIC HAVE_ZLIB_H=1)
target_link_libraries(_libarchive PRIVATE ch_contrib::zlib)
endif()
if (OS_LINUX)
target_compile_definitions(
_libarchive PUBLIC
MAJOR_IN_SYSMACROS=1
HAVE_LINUX_FS_H=1
HAVE_STRUCT_STAT_ST_MTIM_TV_NSEC=1
HAVE_LINUX_TYPES_H=1
HAVE_SYS_STATFS_H=1
HAVE_FUTIMESAT=1
HAVE_ICONV=1
)
endif()
add_library(ch_contrib::libarchive ALIAS _libarchive)

File diff suppressed because it is too large Load Diff

View File

@ -41,6 +41,8 @@ RUN apt-get update -y \
zstd \
file \
pv \
zip \
p7zip-full \
&& apt-get clean
RUN pip3 install numpy scipy pandas Jinja2

View File

@ -13,16 +13,18 @@ The `file` function can be used in `SELECT` and `INSERT` queries to read from or
**Syntax**
``` sql
file(path [,format] [,structure] [,compression])
file([path_to_archive ::] path [,format] [,structure] [,compression])
```
**Parameters**
- `path` — The relative path to the file from [user_files_path](/docs/en/operations/server-configuration-parameters/settings.md#server_configuration_parameters-user_files_path). Path to file support following globs in read-only mode: `*`, `?`, `{abc,def}` and `{N..M}` where `N`, `M` — numbers, `'abc', 'def'` — strings.
- `path_to_archive` - The relative path to zip/tar/7z archive. Path to archive support the same globs as `path`.
- `format` — The [format](/docs/en/interfaces/formats.md#formats) of the file.
- `structure` — Structure of the table. Format: `'column1_name column1_type, column2_name column2_type, ...'`.
- `compression` — The existing compression type when used in a `SELECT` query, or the desired compression type when used in an `INSERT` query. The supported compression types are `gz`, `br`, `xz`, `zst`, `lz4`, and `bz2`.
**Returned value**
A table with the specified structure for reading or writing data in the specified file.
@ -128,6 +130,11 @@ file('test.csv', 'CSV', 'column1 UInt32, column2 UInt32, column3 UInt32');
└─────────┴─────────┴─────────┘
```
Getting data from table in table.csv, located in archive1.zip or/and archive2.zip
``` sql
SELECT * FROM file('user_files/archives/archive{1..2}.zip :: table.csv');
```
## Globs in Path
Multiple path components can have globs. For being processed file must exist and match to the whole path pattern (not only suffix or prefix).

View File

@ -576,6 +576,10 @@ if (TARGET ch_contrib::bzip2)
target_link_libraries (clickhouse_common_io PRIVATE ch_contrib::bzip2)
endif()
if (TARGET ch_contrib::libarchive)
target_link_libraries (clickhouse_common_io PUBLIC ch_contrib::libarchive)
endif()
if (TARGET ch_contrib::minizip)
target_link_libraries (clickhouse_common_io PRIVATE ch_contrib::minizip)
endif ()

View File

@ -1436,6 +1436,7 @@ void ClientBase::sendData(Block & sample, const ColumnsDescription & columns_des
ConstraintsDescription{},
String{},
{},
String{},
};
StoragePtr storage = std::make_shared<StorageFile>(in_file, global_context->getUserFilesPath(), args);
storage->startup();

View File

@ -59,6 +59,7 @@
#cmakedefine01 USE_ULID
#cmakedefine01 FIU_ENABLE
#cmakedefine01 USE_BCRYPT
#cmakedefine01 USE_LIBARCHIVE
/// This is needed for .incbin in assembly. For some reason, include paths don't work there in presence of LTO.
/// That's why we use absolute paths.

View File

@ -0,0 +1,14 @@
#pragma once
#include "config.h"
#if USE_LIBARCHIVE
#ifdef __clang__
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wreserved-macro-identifier"
#include <archive.h>
#include <archive_entry.h>
#endif
#endif

View File

@ -40,18 +40,26 @@ public:
virtual bool nextFile() = 0;
};
virtual const std::string & getPath() const = 0;
/// Starts enumerating files in the archive.
virtual std::unique_ptr<FileEnumerator> firstFile() = 0;
using NameFilter = std::function<bool(const std::string &)>;
/// Starts reading a file from the archive. The function returns a read buffer,
/// you can read that buffer to extract uncompressed data from the archive.
/// Several read buffers can be used at the same time in parallel.
virtual std::unique_ptr<ReadBufferFromFileBase> readFile(const String & filename) = 0;
virtual std::unique_ptr<ReadBufferFromFileBase> readFile(NameFilter filter) = 0;
/// It's possible to convert a file enumerator to a read buffer and vice versa.
virtual std::unique_ptr<ReadBufferFromFileBase> readFile(std::unique_ptr<FileEnumerator> enumerator) = 0;
virtual std::unique_ptr<FileEnumerator> nextFile(std::unique_ptr<ReadBuffer> read_buffer) = 0;
virtual std::vector<std::string> getAllFiles() = 0;
virtual std::vector<std::string> getAllFiles(NameFilter filter) = 0;
/// Sets password used to decrypt files in the archive.
virtual void setPassword(const String & /* password */) {}

View File

@ -0,0 +1,348 @@
#include <IO/Archives/LibArchiveReader.h>
#include <IO/ReadBufferFromFileBase.h>
#include <Common/quoteString.h>
#include <Common/scope_guard_safe.h>
#include <IO/Archives/ArchiveUtils.h>
#include <mutex>
namespace DB
{
#if USE_LIBARCHIVE
namespace ErrorCodes
{
extern const int CANNOT_UNPACK_ARCHIVE;
extern const int LOGICAL_ERROR;
extern const int CANNOT_READ_ALL_DATA;
extern const int UNSUPPORTED_METHOD;
}
class LibArchiveReader::Handle
{
public:
explicit Handle(std::string path_to_archive_, bool lock_on_reading_)
: path_to_archive(path_to_archive_), lock_on_reading(lock_on_reading_)
{
current_archive = open(path_to_archive);
}
Handle(const Handle &) = delete;
Handle(Handle && other) noexcept
: current_archive(other.current_archive)
, current_entry(other.current_entry)
, lock_on_reading(other.lock_on_reading)
{
other.current_archive = nullptr;
other.current_entry = nullptr;
}
~Handle()
{
close(current_archive);
}
bool locateFile(const std::string & filename)
{
return locateFile([&](const std::string & file) { return file == filename; });
}
bool locateFile(NameFilter filter)
{
resetFileInfo();
int err = ARCHIVE_OK;
while (true)
{
err = readNextHeader(current_archive, &current_entry);
if (err == ARCHIVE_RETRY)
continue;
if (err != ARCHIVE_OK)
break;
if (filter(archive_entry_pathname(current_entry)))
return true;
}
checkError(err);
return false;
}
bool nextFile()
{
resetFileInfo();
int err = ARCHIVE_OK;
do
{
err = readNextHeader(current_archive, &current_entry);
} while (err == ARCHIVE_RETRY);
checkError(err);
return err == ARCHIVE_OK;
}
std::vector<std::string> getAllFiles(NameFilter filter)
{
auto * archive = open(path_to_archive);
SCOPE_EXIT(
close(archive);
);
struct archive_entry * entry = nullptr;
std::vector<std::string> files;
int error = readNextHeader(archive, &entry);
while (error == ARCHIVE_OK || error == ARCHIVE_RETRY)
{
chassert(entry != nullptr);
std::string name = archive_entry_pathname(entry);
if (!filter || filter(name))
files.push_back(std::move(name));
error = readNextHeader(archive, &entry);
}
checkError(error);
return files;
}
const String & getFileName() const
{
chassert(current_entry);
if (!file_name)
file_name.emplace(archive_entry_pathname(current_entry));
return *file_name;
}
const FileInfo & getFileInfo() const
{
chassert(current_entry);
if (!file_info)
{
file_info.emplace();
file_info->uncompressed_size = archive_entry_size(current_entry);
file_info->compressed_size = archive_entry_size(current_entry);
file_info->is_encrypted = false;
}
return *file_info;
}
struct archive * current_archive;
struct archive_entry * current_entry = nullptr;
private:
void checkError(int error) const
{
if (error == ARCHIVE_FATAL)
throw Exception(ErrorCodes::CANNOT_UNPACK_ARCHIVE, "Failed to read archive while fetching all files: {}", archive_error_string(current_archive));
}
void resetFileInfo()
{
file_name.reset();
file_info.reset();
}
static struct archive * open(const String & path_to_archive)
{
auto * archive = archive_read_new();
try
{
archive_read_support_filter_all(archive);
archive_read_support_format_all(archive);
if (archive_read_open_filename(archive, path_to_archive.c_str(), 10240) != ARCHIVE_OK)
throw Exception(ErrorCodes::CANNOT_UNPACK_ARCHIVE, "Couldn't open archive: {}", quoteString(path_to_archive));
}
catch (...)
{
close(archive);
throw;
}
return archive;
}
static void close(struct archive * archive)
{
if (archive)
{
archive_read_close(archive);
archive_read_free(archive);
}
}
int readNextHeader(struct archive * archive, struct archive_entry ** entry) const
{
std::unique_lock lock(Handle::read_lock, std::defer_lock);
if (lock_on_reading)
lock.lock();
return archive_read_next_header(archive, entry);
}
const String path_to_archive;
/// for some archive types when we are reading headers static variables are used
/// which are not thread-safe
const bool lock_on_reading;
static inline std::mutex read_lock;
mutable std::optional<String> file_name;
mutable std::optional<FileInfo> file_info;
};
class LibArchiveReader::FileEnumeratorImpl : public FileEnumerator
{
public:
explicit FileEnumeratorImpl(Handle handle_) : handle(std::move(handle_)) {}
const String & getFileName() const override { return handle.getFileName(); }
const FileInfo & getFileInfo() const override { return handle.getFileInfo(); }
bool nextFile() override { return handle.nextFile(); }
/// Releases owned handle to pass it to a read buffer.
Handle releaseHandle() && { return std::move(handle); }
private:
Handle handle;
};
class LibArchiveReader::ReadBufferFromLibArchive : public ReadBufferFromFileBase
{
public:
explicit ReadBufferFromLibArchive(Handle handle_, std::string path_to_archive_)
: ReadBufferFromFileBase(DBMS_DEFAULT_BUFFER_SIZE, nullptr, 0)
, handle(std::move(handle_))
, path_to_archive(std::move(path_to_archive_))
{}
off_t seek(off_t /* off */, int /* whence */) override
{
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Seek is not supported when reading from archive");
}
off_t getPosition() override
{
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "getPosition not supported when reading from archive");
}
String getFileName() const override { return handle.getFileName(); }
Handle releaseHandle() &&
{
return std::move(handle);
}
private:
bool nextImpl() override
{
auto bytes_read = archive_read_data(handle.current_archive, internal_buffer.begin(), static_cast<int>(internal_buffer.size()));
if (bytes_read < 0)
throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Failed to read file {} from {}: {}", handle.getFileName(), path_to_archive, archive_error_string(handle.current_archive));
if (!bytes_read)
return false;
total_bytes_read += bytes;
working_buffer = internal_buffer;
working_buffer.resize(bytes_read);
return true;
}
Handle handle;
const String path_to_archive;
size_t total_bytes_read = 0;
};
LibArchiveReader::LibArchiveReader(std::string archive_name_, bool lock_on_reading_, std::string path_to_archive_)
: archive_name(std::move(archive_name_)), lock_on_reading(lock_on_reading_), path_to_archive(std::move(path_to_archive_))
{}
LibArchiveReader::~LibArchiveReader() = default;
const std::string & LibArchiveReader::getPath() const
{
return path_to_archive;
}
bool LibArchiveReader::fileExists(const String & filename)
{
Handle handle(path_to_archive, lock_on_reading);
return handle.locateFile(filename);
}
LibArchiveReader::FileInfo LibArchiveReader::getFileInfo(const String & filename)
{
Handle handle(path_to_archive, lock_on_reading);
if (!handle.locateFile(filename))
throw Exception(ErrorCodes::CANNOT_UNPACK_ARCHIVE, "Couldn't unpack archive {}: file not found", path_to_archive);
return handle.getFileInfo();
}
std::unique_ptr<LibArchiveReader::FileEnumerator> LibArchiveReader::firstFile()
{
Handle handle(path_to_archive, lock_on_reading);
if (!handle.nextFile())
return nullptr;
return std::make_unique<FileEnumeratorImpl>(std::move(handle));
}
std::unique_ptr<ReadBufferFromFileBase> LibArchiveReader::readFile(const String & filename)
{
return readFile([&](const std::string & file) { return file == filename; });
}
std::unique_ptr<ReadBufferFromFileBase> LibArchiveReader::readFile(NameFilter filter)
{
Handle handle(path_to_archive, lock_on_reading);
if (!handle.locateFile(filter))
throw Exception(
ErrorCodes::CANNOT_UNPACK_ARCHIVE, "Couldn't unpack archive {}: no file found satisfying the filter", path_to_archive);
return std::make_unique<ReadBufferFromLibArchive>(std::move(handle), path_to_archive);
}
std::unique_ptr<ReadBufferFromFileBase> LibArchiveReader::readFile(std::unique_ptr<FileEnumerator> enumerator)
{
if (!dynamic_cast<FileEnumeratorImpl *>(enumerator.get()))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Wrong enumerator passed to readFile()");
auto enumerator_impl = std::unique_ptr<FileEnumeratorImpl>(static_cast<FileEnumeratorImpl *>(enumerator.release()));
auto handle = std::move(*enumerator_impl).releaseHandle();
return std::make_unique<ReadBufferFromLibArchive>(std::move(handle), path_to_archive);
}
std::unique_ptr<LibArchiveReader::FileEnumerator> LibArchiveReader::nextFile(std::unique_ptr<ReadBuffer> read_buffer)
{
if (!dynamic_cast<ReadBufferFromLibArchive *>(read_buffer.get()))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Wrong ReadBuffer passed to nextFile()");
auto read_buffer_from_libarchive = std::unique_ptr<ReadBufferFromLibArchive>(static_cast<ReadBufferFromLibArchive *>(read_buffer.release()));
auto handle = std::move(*read_buffer_from_libarchive).releaseHandle();
if (!handle.nextFile())
return nullptr;
return std::make_unique<FileEnumeratorImpl>(std::move(handle));
}
std::vector<std::string> LibArchiveReader::getAllFiles()
{
return getAllFiles({});
}
std::vector<std::string> LibArchiveReader::getAllFiles(NameFilter filter)
{
Handle handle(path_to_archive, lock_on_reading);
return handle.getAllFiles(filter);
}
void LibArchiveReader::setPassword(const String & /*password_*/)
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can not set password to {} archive", archive_name);
}
#endif
}

View File

@ -0,0 +1,79 @@
#pragma once
#include "config.h"
#include <IO/Archives/IArchiveReader.h>
#include <iostream>
namespace DB
{
#if USE_LIBARCHIVE
class ReadBuffer;
class ReadBufferFromFileBase;
class SeekableReadBuffer;
/// Implementation of IArchiveReader for reading archives using libarchive.
class LibArchiveReader : public IArchiveReader
{
public:
~LibArchiveReader() override;
const std::string & getPath() const override;
/// Returns true if there is a specified file in the archive.
bool fileExists(const String & filename) override;
/// Returns the information about a file stored in the archive.
FileInfo getFileInfo(const String & filename) override;
/// Starts enumerating files in the archive.
std::unique_ptr<FileEnumerator> firstFile() override;
/// Starts reading a file from the archive. The function returns a read buffer,
/// you can read that buffer to extract uncompressed data from the archive.
/// Several read buffers can be used at the same time in parallel.
std::unique_ptr<ReadBufferFromFileBase> readFile(const String & filename) override;
std::unique_ptr<ReadBufferFromFileBase> readFile(NameFilter filter) override;
/// It's possible to convert a file enumerator to a read buffer and vice versa.
std::unique_ptr<ReadBufferFromFileBase> readFile(std::unique_ptr<FileEnumerator> enumerator) override;
std::unique_ptr<FileEnumerator> nextFile(std::unique_ptr<ReadBuffer> read_buffer) override;
std::vector<std::string> getAllFiles() override;
std::vector<std::string> getAllFiles(NameFilter filter) override;
/// Sets password used to decrypt the contents of the files in the archive.
void setPassword(const String & password_) override;
protected:
/// Constructs an archive's reader that will read from a file in the local filesystem.
LibArchiveReader(std::string archive_name_, bool lock_on_reading_, std::string path_to_archive_);
private:
class ReadBufferFromLibArchive;
class Handle;
class FileEnumeratorImpl;
const std::string archive_name;
const bool lock_on_reading;
const String path_to_archive;
};
class TarArchiveReader : public LibArchiveReader
{
public:
explicit TarArchiveReader(std::string path_to_archive) : LibArchiveReader("tar", /*lock_on_reading_=*/ true, std::move(path_to_archive)) { }
};
class SevenZipArchiveReader : public LibArchiveReader
{
public:
explicit SevenZipArchiveReader(std::string path_to_archive) : LibArchiveReader("7z", /*lock_on_reading_=*/ false, std::move(path_to_archive)) { }
};
#endif
}

View File

@ -85,6 +85,26 @@ public:
file_name = file_name_;
}
void locateFile(NameFilter filter)
{
int err = unzGoToFirstFile(raw_handle);
if (err == UNZ_END_OF_LIST_OF_FILE)
showError("No file was found satisfying the filter");
do
{
checkResult(err);
resetFileInfo();
retrieveFileInfo();
if (filter(getFileName()))
return;
err = unzGoToNextFile(raw_handle);
} while (err != UNZ_END_OF_LIST_OF_FILE);
showError("No file was found satisfying the filter");
}
bool tryLocateFile(const String & file_name_)
{
resetFileInfo();
@ -131,6 +151,27 @@ public:
return *file_info;
}
std::vector<std::string> getAllFiles(NameFilter filter)
{
std::vector<std::string> files;
resetFileInfo();
int err = unzGoToFirstFile(raw_handle);
if (err == UNZ_END_OF_LIST_OF_FILE)
return files;
do
{
checkResult(err);
resetFileInfo();
retrieveFileInfo();
if (!filter || filter(getFileName()))
files.push_back(*file_name);
err = unzGoToNextFile(raw_handle);
} while (err != UNZ_END_OF_LIST_OF_FILE);
return files;
}
void closeFile()
{
int err = unzCloseCurrentFile(raw_handle);
@ -459,6 +500,11 @@ ZipArchiveReader::~ZipArchiveReader()
}
}
const std::string & ZipArchiveReader::getPath() const
{
return path_to_archive;
}
bool ZipArchiveReader::fileExists(const String & filename)
{
return acquireHandle().tryLocateFile(filename);
@ -486,6 +532,13 @@ std::unique_ptr<ReadBufferFromFileBase> ZipArchiveReader::readFile(const String
return std::make_unique<ReadBufferFromZipArchive>(std::move(handle));
}
std::unique_ptr<ReadBufferFromFileBase> ZipArchiveReader::readFile(NameFilter filter)
{
auto handle = acquireHandle();
handle.locateFile(filter);
return std::make_unique<ReadBufferFromZipArchive>(std::move(handle));
}
std::unique_ptr<ReadBufferFromFileBase> ZipArchiveReader::readFile(std::unique_ptr<FileEnumerator> enumerator)
{
if (!dynamic_cast<FileEnumeratorImpl *>(enumerator.get()))
@ -506,6 +559,17 @@ std::unique_ptr<ZipArchiveReader::FileEnumerator> ZipArchiveReader::nextFile(std
return std::make_unique<FileEnumeratorImpl>(std::move(handle));
}
std::vector<std::string> ZipArchiveReader::getAllFiles()
{
return getAllFiles({});
}
std::vector<std::string> ZipArchiveReader::getAllFiles(NameFilter filter)
{
auto handle = acquireHandle();
return handle.getAllFiles(filter);
}
void ZipArchiveReader::setPassword(const String & password_)
{
std::lock_guard lock{mutex};

View File

@ -27,6 +27,8 @@ public:
~ZipArchiveReader() override;
const std::string & getPath() const override;
/// Returns true if there is a specified file in the archive.
bool fileExists(const String & filename) override;
@ -40,11 +42,15 @@ public:
/// you can read that buffer to extract uncompressed data from the archive.
/// Several read buffers can be used at the same time in parallel.
std::unique_ptr<ReadBufferFromFileBase> readFile(const String & filename) override;
std::unique_ptr<ReadBufferFromFileBase> readFile(NameFilter filter) override;
/// It's possible to convert a file enumerator to a read buffer and vice versa.
std::unique_ptr<ReadBufferFromFileBase> readFile(std::unique_ptr<FileEnumerator> enumerator) override;
std::unique_ptr<FileEnumerator> nextFile(std::unique_ptr<ReadBuffer> read_buffer) override;
std::vector<std::string> getAllFiles() override;
std::vector<std::string> getAllFiles(NameFilter filter) override;
/// Sets password used to decrypt the contents of the files in the archive.
void setPassword(const String & password_) override;

View File

@ -1,5 +1,6 @@
#include <IO/Archives/createArchiveReader.h>
#include <IO/Archives/ZipArchiveReader.h>
#include <IO/Archives/LibArchiveReader.h>
#include <Common/Exception.h>
@ -29,10 +30,28 @@ std::shared_ptr<IArchiveReader> createArchiveReader(
return std::make_shared<ZipArchiveReader>(path_to_archive, archive_read_function, archive_size);
#else
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "minizip library is disabled");
#endif
}
else if (path_to_archive.ends_with(".tar") || path_to_archive.ends_with("tar.gz"))
{
#if USE_LIBARCHIVE
return std::make_shared<TarArchiveReader>(path_to_archive);
#else
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "libarchive library is disabled");
#endif
}
else if (path_to_archive.ends_with(".7z"))
{
#if USE_LIBARCHIVE
return std::make_shared<SevenZipArchiveReader>(path_to_archive);
#else
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "libarchive library is disabled");
#endif
}
else
{
throw Exception(ErrorCodes::CANNOT_UNPACK_ARCHIVE, "Cannot determine the type of archive {}", path_to_archive);
}
}
}

View File

@ -1,6 +1,7 @@
#include <gtest/gtest.h>
#include "config.h"
#include <IO/Archives/ArchiveUtils.h>
#include <IO/Archives/IArchiveReader.h>
#include <IO/Archives/IArchiveWriter.h>
#include <IO/Archives/createArchiveReader.h>
@ -19,11 +20,52 @@
namespace DB::ErrorCodes
{
extern const int CANNOT_UNPACK_ARCHIVE;
extern const int LOGICAL_ERROR;
}
namespace fs = std::filesystem;
using namespace DB;
enum class ArchiveType : uint8_t
{
Tar,
SevenZip
};
template <ArchiveType archive_type>
bool createArchiveWithFiles(const std::string & archivename, const std::map<std::string, std::string> & files)
{
struct archive * a;
struct archive_entry * entry;
a = archive_write_new();
if constexpr (archive_type == ArchiveType::Tar)
archive_write_set_format_pax_restricted(a);
else if constexpr (archive_type == ArchiveType::SevenZip)
archive_write_set_format_7zip(a);
else
throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "Invalid archive type requested: {}", static_cast<size_t>(archive_type));
archive_write_open_filename(a, archivename.c_str());
for (const auto & [filename, content] : files) {
entry = archive_entry_new();
archive_entry_set_pathname(entry, filename.c_str());
archive_entry_set_size(entry, content.size());
archive_entry_set_mode(entry, S_IFREG | 0644); // regular file with rw-r--r-- permissions
archive_entry_set_mtime(entry, time(nullptr), 0);
archive_write_header(a, entry);
archive_write_data(a, content.c_str(), content.size());
archive_entry_free(entry);
}
archive_write_close(a);
archive_write_free(a);
return true;
}
class ArchiveReaderAndWriterTest : public ::testing::TestWithParam<const char *>
{
@ -327,6 +369,127 @@ TEST_P(ArchiveReaderAndWriterTest, ArchiveNotExist)
[&]{ createArchiveReader(getPathToArchive()); });
}
TEST(TarArchiveReaderTest, FileExists) {
String archive_path = "archive.tar";
String filename = "file.txt";
String contents = "test";
bool created = createArchiveWithFiles<ArchiveType::Tar>(archive_path, {{filename, contents}});
EXPECT_EQ(created, true);
auto reader = createArchiveReader(archive_path);
EXPECT_EQ(reader->fileExists(filename), true);
fs::remove(archive_path);
}
TEST(TarArchiveReaderTest, ReadFile) {
String archive_path = "archive.tar";
String filename = "file.txt";
String contents = "test";
bool created = createArchiveWithFiles<ArchiveType::Tar>(archive_path, {{filename, contents}});
EXPECT_EQ(created, true);
auto reader = createArchiveReader(archive_path);
auto in = reader->readFile(filename);
String str;
readStringUntilEOF(str, *in);
EXPECT_EQ(str, contents);
fs::remove(archive_path);
}
TEST(TarArchiveReaderTest, ReadTwoFiles) {
String archive_path = "archive.tar";
String file1 = "file1.txt";
String contents1 = "test1";
String file2 = "file2.txt";
String contents2 = "test2";
bool created = createArchiveWithFiles<ArchiveType::Tar>(archive_path, {{file1, contents1}, {file2, contents2}});
EXPECT_EQ(created, true);
auto reader = createArchiveReader(archive_path);
EXPECT_EQ(reader->fileExists(file1), true);
EXPECT_EQ(reader->fileExists(file2), true);
auto in = reader->readFile(file1);
String str;
readStringUntilEOF(str, *in);
EXPECT_EQ(str, contents1);
in = reader->readFile(file2);
readStringUntilEOF(str, *in);
EXPECT_EQ(str, contents2);
fs::remove(archive_path);
}
TEST(TarArchiveReaderTest, CheckFileInfo) {
String archive_path = "archive.tar";
String filename = "file.txt";
String contents = "test";
bool created = createArchiveWithFiles<ArchiveType::Tar>(archive_path, {{filename, contents}});
EXPECT_EQ(created, true);
auto reader = createArchiveReader(archive_path);
auto info = reader->getFileInfo(filename);
EXPECT_EQ(info.uncompressed_size, contents.size());
EXPECT_GT(info.compressed_size, 0);
fs::remove(archive_path);
}
TEST(SevenZipArchiveReaderTest, FileExists) {
String archive_path = "archive.7z";
String filename = "file.txt";
String contents = "test";
bool created = createArchiveWithFiles<ArchiveType::SevenZip>(archive_path, {{filename, contents}});
EXPECT_EQ(created, true);
auto reader = createArchiveReader(archive_path);
EXPECT_EQ(reader->fileExists(filename), true);
fs::remove(archive_path);
}
TEST(SevenZipArchiveReaderTest, ReadFile) {
String archive_path = "archive.7z";
String filename = "file.txt";
String contents = "test";
bool created = createArchiveWithFiles<ArchiveType::SevenZip>(archive_path, {{filename, contents}});
EXPECT_EQ(created, true);
auto reader = createArchiveReader(archive_path);
auto in = reader->readFile(filename);
String str;
readStringUntilEOF(str, *in);
EXPECT_EQ(str, contents);
fs::remove(archive_path);
}
TEST(SevenZipArchiveReaderTest, CheckFileInfo) {
String archive_path = "archive.7z";
String filename = "file.txt";
String contents = "test";
bool created = createArchiveWithFiles<ArchiveType::SevenZip>(archive_path, {{filename, contents}});
EXPECT_EQ(created, true);
auto reader = createArchiveReader(archive_path);
auto info = reader->getFileInfo(filename);
EXPECT_EQ(info.uncompressed_size, contents.size());
EXPECT_GT(info.compressed_size, 0);
fs::remove(archive_path);
}
TEST(SevenZipArchiveReaderTest, ReadTwoFiles) {
String archive_path = "archive.7z";
String file1 = "file1.txt";
String contents1 = "test1";
String file2 = "file2.txt";
String contents2 = "test2";
bool created = createArchiveWithFiles<ArchiveType::SevenZip>(archive_path, {{file1, contents1}, {file2, contents2}});
EXPECT_EQ(created, true);
auto reader = createArchiveReader(archive_path);
EXPECT_EQ(reader->fileExists(file1), true);
EXPECT_EQ(reader->fileExists(file2), true);
auto in = reader->readFile(file1);
String str;
readStringUntilEOF(str, *in);
EXPECT_EQ(str, contents1);
in = reader->readFile(file2);
readStringUntilEOF(str, *in);
EXPECT_EQ(str, contents2);
fs::remove(archive_path);
}
#if USE_MINIZIP
@ -334,7 +497,7 @@ namespace
{
const char * supported_archive_file_exts[] =
{
".zip",
".zip"
};
}

View File

@ -22,6 +22,8 @@
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/WriteHelpers.h>
#include <IO/Archives/createArchiveReader.h>
#include <IO/Archives/IArchiveReader.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <DataTypes/DataTypeString.h>
@ -57,7 +59,6 @@
#include <cmath>
#include <algorithm>
namespace ProfileEvents
{
extern const Event CreatedReadBufferOrdinary;
@ -379,10 +380,33 @@ std::unique_ptr<ReadBuffer> createReadBuffer(
bool use_table_fd,
int table_fd,
const String & compression_method,
ContextPtr context)
ContextPtr context,
const String & path_to_archive = "")
{
CompressionMethod method;
if (!path_to_archive.empty())
{
auto reader = createArchiveReader(path_to_archive);
if (current_path.find_first_of("*?{") != std::string::npos)
{
auto matcher = std::make_shared<re2::RE2>(makeRegexpPatternFromGlobs(current_path));
if (!matcher->ok())
throw Exception(ErrorCodes::CANNOT_COMPILE_REGEXP,
"Cannot compile regex from glob ({}): {}", current_path, matcher->error());
return reader->readFile([matcher = std::move(matcher)](const std::string & path)
{
return re2::RE2::FullMatch(path, *matcher);
});
}
else
{
return reader->readFile(current_path);
}
}
if (use_table_fd)
method = chooseCompressionMethod("", compression_method);
else
@ -471,7 +495,8 @@ ColumnsDescription StorageFile::getTableStructureFromFile(
const std::vector<String> & paths,
const String & compression_method,
const std::optional<FormatSettings> & format_settings,
ContextPtr context)
ContextPtr context,
const std::vector<String> & paths_to_archive)
{
if (format == "Distributed")
{
@ -491,30 +516,62 @@ ColumnsDescription StorageFile::getTableStructureFromFile(
if (context->getSettingsRef().schema_inference_use_cache_for_file)
columns_from_cache = tryGetColumnsFromCache(paths, format, format_settings, context);
ReadBufferIterator read_buffer_iterator = [&, it = paths.begin(), first = true](ColumnsDescription &) mutable -> std::unique_ptr<ReadBuffer>
ReadBufferIterator read_buffer_iterator;
if (paths_to_archive.empty())
{
String path;
struct stat file_stat;
do
read_buffer_iterator = [&, it = paths.begin(), first = true](ColumnsDescription &) mutable -> std::unique_ptr<ReadBuffer>
{
if (it == paths.end())
String path;
struct stat file_stat;
do
{
if (first)
throw Exception(
ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE,
"Cannot extract table structure from {} format file, because all files are empty. You must specify table structure manually",
format);
return nullptr;
if (it == paths.end())
{
if (first)
throw Exception(
ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE,
"Cannot extract table structure from {} format file, because all files are empty. You must specify table structure manually",
format);
return nullptr;
}
path = *it++;
file_stat = getFileStat(path, false, -1, "File");
}
while (context->getSettingsRef().engine_file_skip_empty_files && file_stat.st_size == 0);
path = *it++;
file_stat = getFileStat(path, false, -1, "File");
}
while (context->getSettingsRef().engine_file_skip_empty_files && file_stat.st_size == 0);
first = false;
return createReadBuffer(path, file_stat, false, -1, compression_method, context);
};
}
else
{
read_buffer_iterator = [&, path_it = paths.begin(), archive_it = paths_to_archive.begin(), first = true](ColumnsDescription &) mutable -> std::unique_ptr<ReadBuffer>
{
String path;
struct stat file_stat;
do
{
if (archive_it == paths_to_archive.end())
{
if (first)
throw Exception(
ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE,
"Cannot extract table structure from {} format file, because all files are empty. You must specify table structure manually",
format);
return nullptr;
}
first = false;
return createReadBuffer(path, file_stat, false, -1, compression_method, context);
};
path = *archive_it++;
file_stat = getFileStat(path, false, -1, "File");
}
while (context->getSettingsRef().engine_file_skip_empty_files && file_stat.st_size == 0);
first = false;
return createReadBuffer(*path_it, file_stat, false, -1, compression_method, context, path);
};
}
ColumnsDescription columns;
if (columns_from_cache)
@ -566,8 +623,17 @@ StorageFile::StorageFile(int table_fd_, CommonArguments args)
StorageFile::StorageFile(const std::string & table_path_, const std::string & user_files_path, CommonArguments args)
: StorageFile(args)
{
if (!args.path_to_archive.empty())
{
paths_to_archive = getPathsList(args.path_to_archive, user_files_path, args.getContext(), total_bytes_to_read);
paths = {table_path_};
}
else
{
paths = getPathsList(table_path_, user_files_path, args.getContext(), total_bytes_to_read);
}
is_db_table = false;
paths = getPathsList(table_path_, user_files_path, args.getContext(), total_bytes_to_read);
is_path_with_globs = paths.size() > 1;
if (!paths.empty())
path_for_partitioned_write = paths.front();
@ -621,7 +687,7 @@ void StorageFile::setStorageMetadata(CommonArguments args)
columns = getTableStructureFromFileDescriptor(args.getContext());
else
{
columns = getTableStructureFromFile(format_name, paths, compression_method, format_settings, args.getContext());
columns = getTableStructureFromFile(format_name, paths, compression_method, format_settings, args.getContext(), paths_to_archive);
if (!args.columns.empty() && args.columns != columns)
throw Exception(ErrorCodes::INCOMPATIBLE_COLUMNS, "Table structure and file structure are different");
}
@ -654,7 +720,9 @@ public:
class FilesIterator
{
public:
explicit FilesIterator(const Strings & files_) : files(files_)
explicit FilesIterator(
const Strings & files_, std::vector<std::string> archives_, std::vector<std::pair<uint64_t, std::string>> files_in_archive_)
: files(files_), archives(std::move(archives_)), files_in_archive(std::move(files_in_archive_))
{
}
@ -667,8 +735,25 @@ public:
return files[current_index];
}
std::pair<String, String> nextFileFromArchive()
{
auto current_index = index.fetch_add(1, std::memory_order_relaxed);
if (current_index >= files_in_archive.size())
return {"", ""};
const auto & [archive_index, filename] = files_in_archive[current_index];
return {archives[archive_index], filename};
}
bool fromArchive() const
{
return !archives.empty();
}
private:
std::vector<std::string> files;
std::vector<std::string> archives;
std::vector<std::pair<uint64_t, std::string>> files_in_archive;
std::atomic<size_t> index = 0;
};
@ -776,9 +861,35 @@ public:
{
if (!storage->use_table_fd)
{
current_path = files_iterator->next();
if (current_path.empty())
return {};
if (files_iterator->fromArchive())
{
auto [archive, filename] = files_iterator->nextFileFromArchive();
if (archive.empty())
return {};
current_path = std::move(filename);
if (!archive_reader || archive_reader->getPath() != archive)
{
archive_reader = createArchiveReader(archive);
file_enumerator = archive_reader->firstFile();
}
if (file_enumerator == nullptr)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Failed to find a file in archive {}", archive);
while (file_enumerator->getFileName() != current_path)
{
if (!file_enumerator->nextFile())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected file {} is missing from archive {}", current_path, archive);
}
}
else
{
current_path = files_iterator->next();
if (current_path.empty())
return {};
}
/// Special case for distributed format. Defaults are not needed here.
if (storage->format_name == "Distributed")
@ -791,10 +902,24 @@ public:
if (!read_buf)
{
auto file_stat = getFileStat(current_path, storage->use_table_fd, storage->table_fd, storage->getName());
if (context->getSettingsRef().engine_file_skip_empty_files && file_stat.st_size == 0)
continue;
read_buf = createReadBuffer(current_path, file_stat, storage->use_table_fd, storage->table_fd, storage->compression_method, context);
struct stat file_stat;
if (archive_reader == nullptr)
{
file_stat = getFileStat(current_path, storage->use_table_fd, storage->table_fd, storage->getName());
if (context->getSettingsRef().engine_file_skip_empty_files && file_stat.st_size == 0)
continue;
}
if (archive_reader == nullptr)
{
read_buf = createReadBuffer(current_path, file_stat, storage->use_table_fd, storage->table_fd, storage->compression_method, context);
}
else
{
chassert(file_enumerator);
read_buf = archive_reader->readFile(std::move(file_enumerator));
}
}
const Settings & settings = context->getSettingsRef();
@ -861,7 +986,11 @@ public:
reader.reset();
pipeline.reset();
input_format.reset();
read_buf.reset();
if (archive_reader != nullptr)
file_enumerator = archive_reader->nextFile(std::move(read_buf));
else
read_buf.reset();
}
return {};
@ -879,6 +1008,9 @@ private:
std::unique_ptr<QueryPipeline> pipeline;
std::unique_ptr<PullingPipelineExecutor> reader;
std::shared_ptr<IArchiveReader> archive_reader;
std::unique_ptr<IArchiveReader::FileEnumerator> file_enumerator = nullptr;
ColumnsDescription columns_description;
NamesAndTypesList requested_columns;
NamesAndTypesList requested_virtual_columns;
@ -908,21 +1040,67 @@ Pipe StorageFile::read(
}
else
{
if (paths.size() == 1 && !fs::exists(paths[0]))
const auto & p = paths_to_archive.empty() ? paths : paths_to_archive;
if (p.size() == 1 && !fs::exists(p[0]))
{
if (context->getSettingsRef().engine_file_empty_if_not_exists)
return Pipe(std::make_shared<NullSource>(storage_snapshot->getSampleBlockForColumns(column_names)));
else
throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "File {} doesn't exist", paths[0]);
throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "File {} doesn't exist", p[0]);
}
}
auto files_iterator = std::make_shared<StorageFileSource::FilesIterator>(paths);
std::vector<std::pair<uint64_t, std::string>> files_in_archive;
size_t files_in_archive_num = 0;
if (!paths_to_archive.empty())
{
if (paths.size() != 1)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Multiple paths defined for reading from archive");
const auto & path = paths[0];
IArchiveReader::NameFilter filter;
if (path.find_first_of("*?{") != std::string::npos)
{
auto matcher = std::make_shared<re2::RE2>(makeRegexpPatternFromGlobs(path));
if (!matcher->ok())
throw Exception(ErrorCodes::CANNOT_COMPILE_REGEXP,
"Cannot compile regex from glob ({}): {}", path, matcher->error());
filter = [matcher](const std::string & p)
{
return re2::RE2::FullMatch(p, *matcher);
};
}
for (size_t i = 0; i < paths_to_archive.size(); ++i)
{
if (filter)
{
const auto & path_to_archive = paths_to_archive[i];
auto archive_reader = createArchiveReader(path_to_archive);
auto files = archive_reader->getAllFiles(filter);
for (auto & file : files)
files_in_archive.push_back({i, std::move(file)});
}
else
{
files_in_archive.push_back({i, path});
}
}
files_in_archive_num = files_in_archive.size();
}
auto files_iterator = std::make_shared<StorageFileSource::FilesIterator>(paths, paths_to_archive, std::move(files_in_archive));
auto this_ptr = std::static_pointer_cast<StorageFile>(shared_from_this());
size_t num_streams = max_num_streams;
if (max_num_streams > paths.size())
num_streams = paths.size();
auto files_to_read = std::max(files_in_archive_num, paths.size());
if (max_num_streams > files_to_read)
num_streams = files_to_read;
Pipes pipes;
pipes.reserve(num_streams);
@ -1202,6 +1380,9 @@ SinkToStoragePtr StorageFile::write(
ContextPtr context,
bool /*async_insert*/)
{
if (!use_table_fd && !paths_to_archive.empty())
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Writing to archives is not supported");
if (format_name == "Distributed")
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method write is not implemented for Distributed format");
@ -1375,6 +1556,7 @@ void registerStorageFile(StorageFactory & factory)
factory_args.constraints,
factory_args.comment,
{},
{},
};
ASTs & engine_args_ast = factory_args.engine_args;
@ -1445,7 +1627,7 @@ void registerStorageFile(StorageFactory & factory)
else if (type == Field::Types::UInt64)
source_fd = static_cast<int>(literal->value.get<UInt64>());
else if (type == Field::Types::String)
source_path = literal->value.get<String>();
StorageFile::parseFileSource(literal->value.get<String>(), source_path, storage_args.path_to_archive);
else
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Second argument must be path or file descriptor");
}
@ -1517,4 +1699,32 @@ void StorageFile::addColumnsToCache(
schema_cache.addMany(cache_keys, columns);
}
void StorageFile::parseFileSource(String source, String & filename, String & path_to_archive)
{
size_t pos = source.find("::");
if (pos == String::npos)
{
filename = std::move(source);
return;
}
std::string_view path_to_archive_view = std::string_view{source}.substr(0, pos);
while (path_to_archive_view.back() == ' ')
path_to_archive_view.remove_suffix(1);
if (path_to_archive_view.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Path to archive is empty");
path_to_archive = path_to_archive_view;
std::string_view filename_view = std::string_view{source}.substr(pos + 2);
while (filename_view.front() == ' ')
filename_view.remove_prefix(1);
if (filename_view.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Filename is empty");
filename = filename_view;
}
}

View File

@ -22,8 +22,8 @@ public:
const ColumnsDescription & columns;
const ConstraintsDescription & constraints;
const String & comment;
const std::string rename_after_processing;
std::string path_to_archive;
};
/// From file descriptor
@ -90,10 +90,13 @@ public:
const std::vector<String> & paths,
const String & compression_method,
const std::optional<FormatSettings> & format_settings,
ContextPtr context);
ContextPtr context,
const std::vector<String> & paths_to_archive = {"auto"});
static SchemaCache & getSchemaCache(const ContextPtr & context);
static void parseFileSource(String source, String & filename, String & path_to_archive);
protected:
friend class StorageFileSource;
friend class StorageFileSink;
@ -123,6 +126,7 @@ private:
std::string base_path;
std::vector<std::string> paths;
std::vector<std::string> paths_to_archive;
bool is_db_table = true; /// Table is stored in real database, not user's file
bool use_table_fd = false; /// Use table_fd instead of path

View File

@ -42,6 +42,7 @@ protected:
virtual String getFormatFromFirstArgument();
String filename;
String path_to_archive;
String format = "auto";
String structure = "auto";
String compression_method = "auto";

View File

@ -25,6 +25,7 @@ void TableFunctionFile::parseFirstArguments(const ASTPtr & arg, const ContextPtr
if (context->getApplicationType() != Context::ApplicationType::LOCAL)
{
ITableFunctionFileLike::parseFirstArguments(arg, context);
StorageFile::parseFileSource(std::move(filename), filename, path_to_archive);
return;
}
@ -39,6 +40,8 @@ void TableFunctionFile::parseFirstArguments(const ASTPtr & arg, const ContextPtr
fd = STDOUT_FILENO;
else if (filename == "stderr")
fd = STDERR_FILENO;
else
StorageFile::parseFileSource(std::move(filename), filename, path_to_archive);
}
else if (type == Field::Types::Int64 || type == Field::Types::UInt64)
{
@ -76,7 +79,9 @@ StoragePtr TableFunctionFile::getStorage(const String & source,
ConstraintsDescription{},
String{},
global_context->getSettingsRef().rename_files_after_processing,
path_to_archive,
};
if (fd >= 0)
return std::make_shared<StorageFile>(fd, args);
@ -90,8 +95,15 @@ ColumnsDescription TableFunctionFile::getActualTableStructure(ContextPtr context
if (fd >= 0)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Schema inference is not supported for table function '{}' with file descriptor", getName());
size_t total_bytes_to_read = 0;
Strings paths = StorageFile::getPathsList(filename, context->getUserFilesPath(), context, total_bytes_to_read);
return StorageFile::getTableStructureFromFile(format, paths, compression_method, std::nullopt, context);
Strings paths;
Strings paths_to_archives;
if (path_to_archive.empty())
paths = StorageFile::getPathsList(filename, context->getUserFilesPath(), context, total_bytes_to_read);
else
paths_to_archives = StorageFile::getPathsList(path_to_archive, context->getUserFilesPath(), context, total_bytes_to_read);
return StorageFile::getTableStructureFromFile(format, paths, compression_method, std::nullopt, context, paths_to_archives);
}

View File

@ -162,5 +162,8 @@ endif ()
if (TARGET ch_contrib::fiu)
set(FIU_ENABLE 1)
endif()
if (TARGET ch_contrib::libarchive)
set(USE_LIBARCHIVE 1)
endif()
set(SOURCE_DIR ${CMAKE_SOURCE_DIR})

View File

@ -0,0 +1,48 @@
#!/usr/bin/env bash
# Tags: no-fasttest
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
function read_archive_file() {
$CLICKHOUSE_LOCAL --query "SELECT * FROM file('${user_files_path}/$1') ORDER BY 1, 2"
$CLICKHOUSE_CLIENT --query "SELECT * FROM file('${user_files_path}/$1') ORDER BY 1, 2"
$CLICKHOUSE_CLIENT --query "CREATE TABLE 02661_archive_table Engine=File('CSV', '${user_files_path}/$1')"
$CLICKHOUSE_CLIENT --query "SELECT * FROM 02661_archive_table ORDER BY 1, 2"
$CLICKHOUSE_CLIENT --query "DROP TABLE 02661_archive_table"
}
function run_archive_test() {
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS 02661_archive_table"
user_files_path=$(clickhouse-client --query "select _path,_file from file('nonexist.txt', 'CSV', 'val1 char')" 2>&1 | grep Exception | awk '{gsub("/nonexist.txt","",$9); print $9}')
echo -e "1,2\n3,4" > ${CLICKHOUSE_TEST_UNIQUE_NAME}_data1.csv
echo -e "5,6\n7,8" > ${CLICKHOUSE_TEST_UNIQUE_NAME}_data2.csv
echo -e "9,10\n11,12" > ${CLICKHOUSE_TEST_UNIQUE_NAME}_data3.csv
eval "$2 ${user_files_path}/${CLICKHOUSE_TEST_UNIQUE_NAME}_archive1.$1 ${CLICKHOUSE_TEST_UNIQUE_NAME}_data1.csv ${CLICKHOUSE_TEST_UNIQUE_NAME}_data2.csv > /dev/null"
eval "$2 ${user_files_path}/${CLICKHOUSE_TEST_UNIQUE_NAME}_archive2.$1 ${CLICKHOUSE_TEST_UNIQUE_NAME}_data1.csv ${CLICKHOUSE_TEST_UNIQUE_NAME}_data3.csv > /dev/null"
eval "$2 ${user_files_path}/${CLICKHOUSE_TEST_UNIQUE_NAME}_archive3.$1 ${CLICKHOUSE_TEST_UNIQUE_NAME}_data2.csv ${CLICKHOUSE_TEST_UNIQUE_NAME}_data3.csv > /dev/null"
echo "archive1 data1.csv"
read_archive_file "${CLICKHOUSE_TEST_UNIQUE_NAME}_archive1.$1 :: ${CLICKHOUSE_TEST_UNIQUE_NAME}_data1.csv"
echo "archive{1..2} data1.csv"
read_archive_file "${CLICKHOUSE_TEST_UNIQUE_NAME}_archive{1..2}.$1 :: ${CLICKHOUSE_TEST_UNIQUE_NAME}_data1.csv"
echo "archive{1,2} data{1,3}.csv"
read_archive_file "${CLICKHOUSE_TEST_UNIQUE_NAME}_archive{1,2}.$1 :: ${CLICKHOUSE_TEST_UNIQUE_NAME}_data{1,3}.csv"
echo "archive3 data*.csv"
read_archive_file "${CLICKHOUSE_TEST_UNIQUE_NAME}_archive3.$1 :: ${CLICKHOUSE_TEST_UNIQUE_NAME}_data*.csv"
echo "archive* *.csv"
read_archive_file "${CLICKHOUSE_TEST_UNIQUE_NAME}_archive*.$1 :: *.csv"
echo "archive* {2..3}.csv"
read_archive_file "${CLICKHOUSE_TEST_UNIQUE_NAME}_archive*.$1 :: ${CLICKHOUSE_TEST_UNIQUE_NAME}_data{2..3}.csv"
$CLICKHOUSE_LOCAL --query "SELECT * FROM file('${user_files_path}/${CLICKHOUSE_TEST_UNIQUE_NAME}_archive1.$1::nonexistent.csv')" 2>&1 | grep -q "CANNOT_UNPACK_ARCHIVE" && echo "OK" || echo "FAIL"
$CLICKHOUSE_LOCAL --query "SELECT * FROM file('${user_files_path}/${CLICKHOUSE_TEST_UNIQUE_NAME}_archive3.$1::{2..3}.csv')" 2>&1 | grep -q "CANNOT_UNPACK_ARCHIVE" && echo "OK" || echo "FAIL"
rm ${user_files_path}/${CLICKHOUSE_TEST_UNIQUE_NAME}_archive{1..3}.$1
rm ${CLICKHOUSE_TEST_UNIQUE_NAME}_data{1..3}.csv
}

View File

@ -0,0 +1,116 @@
archive1 data1.csv
1 2
3 4
1 2
3 4
1 2
3 4
archive{1..2} data1.csv
1 2
1 2
3 4
3 4
1 2
1 2
3 4
3 4
1 2
1 2
3 4
3 4
archive{1,2} data{1,3}.csv
1 2
1 2
3 4
3 4
9 10
11 12
1 2
1 2
3 4
3 4
9 10
11 12
1 2
1 2
3 4
3 4
9 10
11 12
archive3 data*.csv
5 6
7 8
9 10
11 12
5 6
7 8
9 10
11 12
5 6
7 8
9 10
11 12
archive* *.csv
1 2
1 2
3 4
3 4
5 6
5 6
7 8
7 8
9 10
9 10
11 12
11 12
1 2
1 2
3 4
3 4
5 6
5 6
7 8
7 8
9 10
9 10
11 12
11 12
1 2
1 2
3 4
3 4
5 6
5 6
7 8
7 8
9 10
9 10
11 12
11 12
archive* {2..3}.csv
5 6
5 6
7 8
7 8
9 10
9 10
11 12
11 12
5 6
5 6
7 8
7 8
9 10
9 10
11 12
11 12
5 6
5 6
7 8
7 8
9 10
9 10
11 12
11 12
OK
OK

View File

@ -0,0 +1,11 @@
#!/usr/bin/env bash
# Tags: no-fasttest, long
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
# shellcheck source=./02661_read_from_archive.lib
. "$CUR_DIR"/02661_read_from_archive.lib
run_archive_test "7z" "7z a"

View File

@ -0,0 +1,116 @@
archive1 data1.csv
1 2
3 4
1 2
3 4
1 2
3 4
archive{1..2} data1.csv
1 2
1 2
3 4
3 4
1 2
1 2
3 4
3 4
1 2
1 2
3 4
3 4
archive{1,2} data{1,3}.csv
1 2
1 2
3 4
3 4
9 10
11 12
1 2
1 2
3 4
3 4
9 10
11 12
1 2
1 2
3 4
3 4
9 10
11 12
archive3 data*.csv
5 6
7 8
9 10
11 12
5 6
7 8
9 10
11 12
5 6
7 8
9 10
11 12
archive* *.csv
1 2
1 2
3 4
3 4
5 6
5 6
7 8
7 8
9 10
9 10
11 12
11 12
1 2
1 2
3 4
3 4
5 6
5 6
7 8
7 8
9 10
9 10
11 12
11 12
1 2
1 2
3 4
3 4
5 6
5 6
7 8
7 8
9 10
9 10
11 12
11 12
archive* {2..3}.csv
5 6
5 6
7 8
7 8
9 10
9 10
11 12
11 12
5 6
5 6
7 8
7 8
9 10
9 10
11 12
11 12
5 6
5 6
7 8
7 8
9 10
9 10
11 12
11 12
OK
OK

View File

@ -0,0 +1,11 @@
#!/usr/bin/env bash
# Tags: no-fasttest, long
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
# shellcheck source=./02661_read_from_archive.lib
. "$CUR_DIR"/02661_read_from_archive.lib
run_archive_test "tar" "tar -cvf"

View File

@ -0,0 +1,116 @@
archive1 data1.csv
1 2
3 4
1 2
3 4
1 2
3 4
archive{1..2} data1.csv
1 2
1 2
3 4
3 4
1 2
1 2
3 4
3 4
1 2
1 2
3 4
3 4
archive{1,2} data{1,3}.csv
1 2
1 2
3 4
3 4
9 10
11 12
1 2
1 2
3 4
3 4
9 10
11 12
1 2
1 2
3 4
3 4
9 10
11 12
archive3 data*.csv
5 6
7 8
9 10
11 12
5 6
7 8
9 10
11 12
5 6
7 8
9 10
11 12
archive* *.csv
1 2
1 2
3 4
3 4
5 6
5 6
7 8
7 8
9 10
9 10
11 12
11 12
1 2
1 2
3 4
3 4
5 6
5 6
7 8
7 8
9 10
9 10
11 12
11 12
1 2
1 2
3 4
3 4
5 6
5 6
7 8
7 8
9 10
9 10
11 12
11 12
archive* {2..3}.csv
5 6
5 6
7 8
7 8
9 10
9 10
11 12
11 12
5 6
5 6
7 8
7 8
9 10
9 10
11 12
11 12
5 6
5 6
7 8
7 8
9 10
9 10
11 12
11 12
OK
OK

View File

@ -0,0 +1,11 @@
#!/usr/bin/env bash
# Tags: no-fasttest, long
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
# shellcheck source=./02661_read_from_archive.lib
. "$CUR_DIR"/02661_read_from_archive.lib
run_archive_test "tar.gz" "tar -cvzf"

View File

@ -0,0 +1,116 @@
archive1 data1.csv
1 2
3 4
1 2
3 4
1 2
3 4
archive{1..2} data1.csv
1 2
1 2
3 4
3 4
1 2
1 2
3 4
3 4
1 2
1 2
3 4
3 4
archive{1,2} data{1,3}.csv
1 2
1 2
3 4
3 4
9 10
11 12
1 2
1 2
3 4
3 4
9 10
11 12
1 2
1 2
3 4
3 4
9 10
11 12
archive3 data*.csv
5 6
7 8
9 10
11 12
5 6
7 8
9 10
11 12
5 6
7 8
9 10
11 12
archive* *.csv
1 2
1 2
3 4
3 4
5 6
5 6
7 8
7 8
9 10
9 10
11 12
11 12
1 2
1 2
3 4
3 4
5 6
5 6
7 8
7 8
9 10
9 10
11 12
11 12
1 2
1 2
3 4
3 4
5 6
5 6
7 8
7 8
9 10
9 10
11 12
11 12
archive* {2..3}.csv
5 6
5 6
7 8
7 8
9 10
9 10
11 12
11 12
5 6
5 6
7 8
7 8
9 10
9 10
11 12
11 12
5 6
5 6
7 8
7 8
9 10
9 10
11 12
11 12
OK
OK

View File

@ -0,0 +1,11 @@
#!/usr/bin/env bash
# Tags: no-fasttest, long
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
# shellcheck source=./02661_read_from_archive.lib
. "$CUR_DIR"/02661_read_from_archive.lib
run_archive_test "zip" "zip"