Support for archives (unfinished)

This commit is contained in:
kssenii 2024-05-16 18:17:46 +02:00
parent af6f124df0
commit b53e9eec7b
8 changed files with 255 additions and 21 deletions

View File

@ -29,6 +29,7 @@ struct URI
std::string key; std::string key;
std::string version_id; std::string version_id;
std::string storage_name; std::string storage_name;
/// Path (or path pattern) in archive if uri is an archive.
std::optional<std::string> archive_pattern; std::optional<std::string> archive_pattern;
std::string uri_str; std::string uri_str;

View File

@ -1,5 +1,6 @@
#include <Storages/ObjectStorage/ReadBufferIterator.h> #include <Storages/ObjectStorage/ReadBufferIterator.h>
#include <Storages/ObjectStorage/StorageObjectStorage.h> #include <Storages/ObjectStorage/StorageObjectStorage.h>
#include <Storages/ObjectStorage/StorageObjectStorageSource.h>
#include <IO/ReadBufferFromFileBase.h> #include <IO/ReadBufferFromFileBase.h>
@ -244,22 +245,35 @@ ReadBufferIterator::Data ReadBufferIterator::next()
} }
} }
std::unique_ptr<ReadBuffer> read_buffer = object_storage->readObject( std::unique_ptr<ReadBuffer> read_buf;
StoredObject(current_object_info->relative_path), CompressionMethod compression_method;
getContext()->getReadSettings(), using ObjectInfoInArchive = StorageObjectStorageSource::ArchiveIterator::ObjectInfoInArchive;
{}, if (auto object_info_in_archive = dynamic_cast<const ObjectInfoInArchive *>(current_object_info.get()))
current_object_info->metadata->size_bytes); {
compression_method = chooseCompressionMethod(configuration->getPathInArchive(), configuration->compression_method);
auto & archive_reader = object_info_in_archive->archive_reader;
read_buf = archive_reader->readFile(object_info_in_archive->path_in_archive, /*throw_on_not_found=*/true);
}
else
{
compression_method = chooseCompressionMethod(current_object_info->relative_path, configuration->compression_method);
read_buf = object_storage->readObject(
StoredObject(current_object_info->relative_path),
getContext()->getReadSettings(),
{},
current_object_info->metadata->size_bytes);
}
if (!query_settings.skip_empty_files || !read_buffer->eof()) if (!query_settings.skip_empty_files || !read_buf->eof())
{ {
first = false; first = false;
read_buffer = wrapReadBufferWithCompressionMethod( read_buf = wrapReadBufferWithCompressionMethod(
std::move(read_buffer), std::move(read_buf),
chooseCompressionMethod(current_object_info->relative_path, configuration->compression_method), compression_method,
static_cast<int>(getContext()->getSettingsRef().zstd_window_log_max)); static_cast<int>(getContext()->getSettingsRef().zstd_window_log_max));
return {std::move(read_buffer), std::nullopt, format}; return {std::move(read_buf), std::nullopt, format};
} }
} }
} }

View File

@ -55,6 +55,14 @@ String StorageS3Configuration::getDataSourceDescription()
return std::filesystem::path(url.uri.getHost() + std::to_string(url.uri.getPort())) / url.bucket; return std::filesystem::path(url.uri.getHost() + std::to_string(url.uri.getPort())) / url.bucket;
} }
std::string StorageS3Configuration::getPathInArchive() const
{
if (url.archive_pattern.has_value())
return url.archive_pattern.value();
throw Exception(ErrorCodes::LOGICAL_ERROR, "Path {} is not an archive", getPath());
}
void StorageS3Configuration::check(ContextPtr context) const void StorageS3Configuration::check(ContextPtr context) const
{ {
validateNamespace(url.bucket); validateNamespace(url.bucket);

View File

@ -34,6 +34,9 @@ public:
String getDataSourceDescription() override; String getDataSourceDescription() override;
StorageObjectStorage::QuerySettings getQuerySettings(const ContextPtr &) const override; StorageObjectStorage::QuerySettings getQuerySettings(const ContextPtr &) const override;
bool isArchive() const override { return url.archive_pattern.has_value(); }
std::string getPathInArchive() const override;
void check(ContextPtr context) const override; void check(ContextPtr context) const override;
void validateNamespace(const String & name) const override; void validateNamespace(const String & name) const override;
ConfigurationPtr clone() override { return std::make_shared<StorageS3Configuration>(*this); } ConfigurationPtr clone() override { return std::make_shared<StorageS3Configuration>(*this); }

View File

@ -452,6 +452,16 @@ std::string StorageObjectStorage::Configuration::getPathWithoutGlobs() const
return getPath().substr(0, getPath().find_first_of("*?{")); return getPath().substr(0, getPath().find_first_of("*?{"));
} }
bool StorageObjectStorage::Configuration::isPathInArchiveWithGlobs() const
{
return getPathInArchive().find_first_of("*?{") != std::string::npos;
}
std::string StorageObjectStorage::Configuration::getPathInArchive() const
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Path {} is not archive", getPath());
}
void StorageObjectStorage::Configuration::assertInitialized() const void StorageObjectStorage::Configuration::assertInitialized() const
{ {
if (!initialized) if (!initialized)

View File

@ -175,6 +175,10 @@ public:
bool isNamespaceWithGlobs() const; bool isNamespaceWithGlobs() const;
virtual std::string getPathWithoutGlobs() const; virtual std::string getPathWithoutGlobs() const;
virtual bool isArchive() const { return false; }
bool isPathInArchiveWithGlobs() const;
virtual std::string getPathInArchive() const;
virtual void check(ContextPtr context) const; virtual void check(ContextPtr context) const;
virtual void validateNamespace(const String & /* name */) const {} virtual void validateNamespace(const String & /* name */) const {}

View File

@ -7,6 +7,7 @@
#include <Processors/Executors/PullingPipelineExecutor.h> #include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/Transforms/ExtractColumnsTransform.h> #include <Processors/Transforms/ExtractColumnsTransform.h>
#include <IO/ReadBufferFromFileBase.h> #include <IO/ReadBufferFromFileBase.h>
#include <IO/Archives/createArchiveReader.h>
#include <Formats/FormatFactory.h> #include <Formats/FormatFactory.h>
#include <Formats/ReadSchemaUtils.h> #include <Formats/ReadSchemaUtils.h>
#include <Storages/ObjectStorage/StorageObjectStorage.h> #include <Storages/ObjectStorage/StorageObjectStorage.h>
@ -100,10 +101,11 @@ std::shared_ptr<StorageObjectStorageSource::IIterator> StorageObjectStorageSourc
auto settings = configuration->getQuerySettings(local_context); auto settings = configuration->getQuerySettings(local_context);
std::unique_ptr<IIterator> iterator;
if (configuration->isPathWithGlobs()) if (configuration->isPathWithGlobs())
{ {
/// Iterate through disclosed globs and make a source for each file /// Iterate through disclosed globs and make a source for each file
return std::make_shared<GlobIterator>( iterator = std::make_unique<GlobIterator>(
object_storage, configuration, predicate, virtual_columns, object_storage, configuration, predicate, virtual_columns,
local_context, read_keys, settings.list_object_keys_size, local_context, read_keys, settings.list_object_keys_size,
settings.throw_on_zero_files_match, file_progress_callback); settings.throw_on_zero_files_match, file_progress_callback);
@ -123,10 +125,17 @@ std::shared_ptr<StorageObjectStorageSource::IIterator> StorageObjectStorageSourc
copy_configuration->setPaths(keys); copy_configuration->setPaths(keys);
} }
return std::make_shared<KeysIterator>( iterator = std::make_unique<KeysIterator>(
object_storage, copy_configuration, virtual_columns, read_keys, object_storage, copy_configuration, virtual_columns, read_keys,
settings.ignore_non_existent_file, file_progress_callback); settings.ignore_non_existent_file, file_progress_callback);
} }
if (configuration->isArchive())
{
return std::make_shared<ArchiveIterator>(object_storage, configuration, std::move(iterator), local_context, read_keys);
}
return iterator;
} }
void StorageObjectStorageSource::lazyInitialize(size_t processor) void StorageObjectStorageSource::lazyInitialize(size_t processor)
@ -262,9 +271,20 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade
} }
else else
{ {
const auto compression_method = chooseCompressionMethod(object_info->relative_path, configuration->compression_method); CompressionMethod compression_method;
const auto max_parsing_threads = need_only_count ? std::optional<size_t>(1) : std::nullopt; const auto max_parsing_threads = need_only_count ? std::optional<size_t>(1) : std::nullopt;
read_buf = createReadBuffer(object_info->relative_path, object_info->metadata->size_bytes);
if (auto object_info_in_archive = dynamic_cast<const ArchiveIterator::ObjectInfoInArchive *>(object_info.get()))
{
compression_method = chooseCompressionMethod(configuration->getPathInArchive(), configuration->compression_method);
auto & archive_reader = object_info_in_archive->archive_reader;
read_buf = archive_reader->readFile(object_info_in_archive->path_in_archive, /*throw_on_not_found=*/true);
}
else
{
compression_method = chooseCompressionMethod(object_info->relative_path, configuration->compression_method);
read_buf = createReadBuffer(*object_info);
}
auto input_format = FormatFactory::instance().getInput( auto input_format = FormatFactory::instance().getInput(
configuration->format, *read_buf, read_from_format_info.format_header, configuration->format, *read_buf, read_from_format_info.format_header,
@ -312,8 +332,10 @@ std::future<StorageObjectStorageSource::ReaderHolder> StorageObjectStorageSource
return create_reader_scheduler([=, this] { return createReader(processor); }, Priority{}); return create_reader_scheduler([=, this] { return createReader(processor); }, Priority{});
} }
std::unique_ptr<ReadBuffer> StorageObjectStorageSource::createReadBuffer(const String & key, size_t object_size) std::unique_ptr<ReadBuffer> StorageObjectStorageSource::createReadBuffer(const ObjectInfo & object_info)
{ {
const auto & object_size = object_info.metadata->size_bytes;
auto read_settings = getContext()->getReadSettings().adjustBufferSize(object_size); auto read_settings = getContext()->getReadSettings().adjustBufferSize(object_size);
read_settings.enable_filesystem_cache = false; read_settings.enable_filesystem_cache = false;
/// FIXME: Changing this setting to default value breaks something around parquet reading /// FIXME: Changing this setting to default value breaks something around parquet reading
@ -333,7 +355,7 @@ std::unique_ptr<ReadBuffer> StorageObjectStorageSource::createReadBuffer(const S
LOG_TRACE(log, "Downloading object of size {} with initial prefetch", object_size); LOG_TRACE(log, "Downloading object of size {} with initial prefetch", object_size);
auto async_reader = object_storage->readObjects( auto async_reader = object_storage->readObjects(
StoredObjects{StoredObject{key, /* local_path */ "", object_size}}, read_settings); StoredObjects{StoredObject{object_info.relative_path, /* local_path */ "", object_size}}, read_settings);
async_reader->setReadUntilEnd(); async_reader->setReadUntilEnd();
if (read_settings.remote_fs_prefetch) if (read_settings.remote_fs_prefetch)
@ -344,7 +366,7 @@ std::unique_ptr<ReadBuffer> StorageObjectStorageSource::createReadBuffer(const S
else else
{ {
/// FIXME: this is inconsistent that readObject always reads synchronously ignoring read_method setting. /// FIXME: this is inconsistent that readObject always reads synchronously ignoring read_method setting.
return object_storage->readObject(StoredObject(key), read_settings); return object_storage->readObject(StoredObject(object_info.relative_path, "", object_size), read_settings);
} }
} }
@ -609,4 +631,114 @@ StorageObjectStorage::ObjectInfoPtr StorageObjectStorageSource::ReadTaskIterator
return buffer[current_index]; return buffer[current_index];
} }
static IArchiveReader::NameFilter createArchivePathFilter(const std::string & archive_pattern)
{
auto matcher = std::make_shared<re2::RE2>(makeRegexpPatternFromGlobs(archive_pattern));
if (!matcher->ok())
{
throw Exception(ErrorCodes::CANNOT_COMPILE_REGEXP,
"Cannot compile regex from glob ({}): {}",
archive_pattern, matcher->error());
}
return [matcher](const std::string & p) mutable { return re2::RE2::FullMatch(p, *matcher); };
}
StorageObjectStorageSource::ArchiveIterator::ObjectInfoInArchive::ObjectInfoInArchive(
ObjectInfoPtr archive_object_,
const std::string & path_in_archive_,
std::shared_ptr<IArchiveReader> archive_reader_)
: archive_object(archive_object_)
, path_in_archive(path_in_archive_)
, archive_reader(archive_reader_)
{
}
StorageObjectStorageSource::ArchiveIterator::ArchiveIterator(
ObjectStoragePtr object_storage_,
ConfigurationPtr configuration_,
std::unique_ptr<IIterator> archives_iterator_,
ContextPtr context_,
ObjectInfos * read_keys_)
: IIterator("ArchiveIterator")
, WithContext(context_)
, object_storage(object_storage_)
, is_path_in_archive_with_globs(configuration_->isPathInArchiveWithGlobs())
, archives_iterator(std::move(archives_iterator_))
, filter(is_path_in_archive_with_globs ? createArchivePathFilter(configuration_->getPathInArchive()) : IArchiveReader::NameFilter{})
, path_in_archive(is_path_in_archive_with_globs ? "" : configuration_->getPathInArchive())
, read_keys(read_keys_)
{
}
std::shared_ptr<IArchiveReader>
StorageObjectStorageSource::ArchiveIterator::createArchiveReader(ObjectInfoPtr object_info) const
{
const auto size = object_info->metadata->size_bytes;
return DB::createArchiveReader(
/* path_to_archive */object_info->relative_path,
/* archive_read_function */[=, this]()
{
StoredObject stored_object(object_info->relative_path, "", size);
return object_storage->readObject(stored_object, getContext()->getReadSettings());
},
/* archive_size */size);
}
StorageObjectStorageSource::ObjectInfoPtr
StorageObjectStorageSource::ArchiveIterator::nextImpl(size_t processor)
{
std::unique_lock lock{next_mutex};
while (true)
{
if (filter)
{
if (!file_enumerator)
{
archive_object = archives_iterator->next(processor);
if (!archive_object)
return {};
archive_reader = createArchiveReader(archive_object);
file_enumerator = archive_reader->firstFile();
if (!file_enumerator)
continue;
}
else if (!file_enumerator->nextFile())
{
file_enumerator.reset();
continue;
}
path_in_archive = file_enumerator->getFileName();
if (!filter(path_in_archive))
continue;
}
else
{
archive_object = archives_iterator->next(processor);
if (!archive_object)
return {};
if (!archive_object->metadata)
archive_object->metadata = object_storage->getObjectMetadata(archive_object->relative_path);
archive_reader = createArchiveReader(archive_object);
if (!archive_reader->fileExists(path_in_archive))
continue;
}
auto object_in_archive = std::make_shared<ObjectInfoInArchive>(archive_object, path_in_archive, archive_reader);
if (read_keys != nullptr)
read_keys->push_back(object_in_archive);
return object_in_archive;
}
}
size_t StorageObjectStorageSource::ArchiveIterator::estimatedKeysCount()
{
return archives_iterator->estimatedKeysCount();
}
} }

View File

@ -1,10 +1,11 @@
#pragma once #pragma once
#include <Common/re2.h>
#include <Interpreters/Context_fwd.h>
#include <IO/Archives/IArchiveReader.h>
#include <Processors/SourceWithKeyCondition.h> #include <Processors/SourceWithKeyCondition.h>
#include <Processors/Executors/PullingPipelineExecutor.h> #include <Processors/Executors/PullingPipelineExecutor.h>
#include <Interpreters/Context_fwd.h>
#include <Storages/ObjectStorage/StorageObjectStorage.h>
#include <Processors/Formats/IInputFormat.h> #include <Processors/Formats/IInputFormat.h>
#include <Common/re2.h> #include <Storages/ObjectStorage/StorageObjectStorage.h>
namespace DB namespace DB
@ -25,6 +26,7 @@ public:
class ReadTaskIterator; class ReadTaskIterator;
class GlobIterator; class GlobIterator;
class KeysIterator; class KeysIterator;
class ArchiveIterator;
StorageObjectStorageSource( StorageObjectStorageSource(
String name_, String name_,
@ -109,7 +111,7 @@ protected:
/// Recreate ReadBuffer and Pipeline for each file. /// Recreate ReadBuffer and Pipeline for each file.
ReaderHolder createReader(size_t processor = 0); ReaderHolder createReader(size_t processor = 0);
std::future<ReaderHolder> createReaderAsync(size_t processor = 0); std::future<ReaderHolder> createReaderAsync(size_t processor = 0);
std::unique_ptr<ReadBuffer> createReadBuffer(const String & key, size_t object_size); std::unique_ptr<ReadBuffer> createReadBuffer(const ObjectInfo & object_info);
void addNumRowsToCache(const String & path, size_t num_rows); void addNumRowsToCache(const String & path, size_t num_rows);
std::optional<size_t> tryGetNumRowsFromCache(const ObjectInfoPtr & object_info); std::optional<size_t> tryGetNumRowsFromCache(const ObjectInfoPtr & object_info);
@ -218,4 +220,64 @@ private:
std::atomic<size_t> index = 0; std::atomic<size_t> index = 0;
bool ignore_non_existent_files; bool ignore_non_existent_files;
}; };
/*
* An archives iterator.
* Allows to iterate files inside one or many archives.
* `archives_iterator` is an iterator which iterates over different archives.
* There are two ways to read files in archives:
* 1. When we want to read one concete file in each archive.
* In this case we go through all archives, check if this certain file
* exists within this archive and read it if it exists.
* 2. When we have a certain pattern of files we want to read in each archive.
* For this purpose we create a filter defined as IArchiveReader::NameFilter.
*/
class StorageObjectStorageSource::ArchiveIterator : public IIterator, private WithContext
{
public:
explicit ArchiveIterator(
ObjectStoragePtr object_storage_,
ConfigurationPtr configuration_,
std::unique_ptr<IIterator> archives_iterator_,
ContextPtr context_,
ObjectInfos * read_keys_);
size_t estimatedKeysCount() override;
struct ObjectInfoInArchive : public ObjectInfo
{
ObjectInfoInArchive(
ObjectInfoPtr archive_object_,
const std::string & path_in_archive_,
std::shared_ptr<IArchiveReader> archive_reader_);
const ObjectInfoPtr archive_object;
const std::string path_in_archive;
const std::shared_ptr<IArchiveReader> archive_reader;
};
private:
ObjectInfoPtr nextImpl(size_t processor) override;
std::shared_ptr<IArchiveReader> createArchiveReader(ObjectInfoPtr object_info) const;
const ObjectStoragePtr object_storage;
const bool is_path_in_archive_with_globs;
/// Iterator which iterates through different archives.
const std::unique_ptr<IIterator> archives_iterator;
/// Used when files inside archive are defined with a glob
const IArchiveReader::NameFilter filter = {};
/// Current file inside the archive.
std::string path_in_archive = {};
/// Read keys of files inside archives.
ObjectInfos * read_keys;
/// Object pointing to archive (NOT path within archive).
ObjectInfoPtr archive_object;
/// Reader of the archive.
std::shared_ptr<IArchiveReader> archive_reader;
/// File enumerator inside the archive.
std::unique_ptr<IArchiveReader::FileEnumerator> file_enumerator;
std::mutex next_mutex;
};
} }