From b53e9eec7b6560ebb67a5d868689494a7f0ab008 Mon Sep 17 00:00:00 2001 From: kssenii Date: Thu, 16 May 2024 18:17:46 +0200 Subject: [PATCH] Support for archives (unfinished) --- src/IO/S3/URI.h | 1 + .../ObjectStorage/ReadBufferIterator.cpp | 34 ++-- .../ObjectStorage/S3/Configuration.cpp | 8 + src/Storages/ObjectStorage/S3/Configuration.h | 3 + .../ObjectStorage/StorageObjectStorage.cpp | 10 ++ .../ObjectStorage/StorageObjectStorage.h | 4 + .../StorageObjectStorageSource.cpp | 146 +++++++++++++++++- .../StorageObjectStorageSource.h | 70 ++++++++- 8 files changed, 255 insertions(+), 21 deletions(-) diff --git a/src/IO/S3/URI.h b/src/IO/S3/URI.h index c52e6bc1441..363f98c46f5 100644 --- a/src/IO/S3/URI.h +++ b/src/IO/S3/URI.h @@ -29,6 +29,7 @@ struct URI std::string key; std::string version_id; std::string storage_name; + /// Path (or path pattern) in archive if uri is an archive. std::optional archive_pattern; std::string uri_str; diff --git a/src/Storages/ObjectStorage/ReadBufferIterator.cpp b/src/Storages/ObjectStorage/ReadBufferIterator.cpp index 3705725ffe1..61575b0115a 100644 --- a/src/Storages/ObjectStorage/ReadBufferIterator.cpp +++ b/src/Storages/ObjectStorage/ReadBufferIterator.cpp @@ -1,5 +1,6 @@ #include #include +#include #include @@ -244,22 +245,35 @@ ReadBufferIterator::Data ReadBufferIterator::next() } } - std::unique_ptr read_buffer = object_storage->readObject( - StoredObject(current_object_info->relative_path), - getContext()->getReadSettings(), - {}, - current_object_info->metadata->size_bytes); + std::unique_ptr read_buf; + CompressionMethod compression_method; + using ObjectInfoInArchive = StorageObjectStorageSource::ArchiveIterator::ObjectInfoInArchive; + if (auto object_info_in_archive = dynamic_cast(current_object_info.get())) + { + compression_method = chooseCompressionMethod(configuration->getPathInArchive(), configuration->compression_method); + auto & archive_reader = object_info_in_archive->archive_reader; + read_buf = archive_reader->readFile(object_info_in_archive->path_in_archive, /*throw_on_not_found=*/true); + } + else + { + compression_method = chooseCompressionMethod(current_object_info->relative_path, configuration->compression_method); + read_buf = object_storage->readObject( + StoredObject(current_object_info->relative_path), + getContext()->getReadSettings(), + {}, + current_object_info->metadata->size_bytes); + } - if (!query_settings.skip_empty_files || !read_buffer->eof()) + if (!query_settings.skip_empty_files || !read_buf->eof()) { first = false; - read_buffer = wrapReadBufferWithCompressionMethod( - std::move(read_buffer), - chooseCompressionMethod(current_object_info->relative_path, configuration->compression_method), + read_buf = wrapReadBufferWithCompressionMethod( + std::move(read_buf), + compression_method, static_cast(getContext()->getSettingsRef().zstd_window_log_max)); - return {std::move(read_buffer), std::nullopt, format}; + return {std::move(read_buf), std::nullopt, format}; } } } diff --git a/src/Storages/ObjectStorage/S3/Configuration.cpp b/src/Storages/ObjectStorage/S3/Configuration.cpp index 9fcbc6a6816..00d569fea9f 100644 --- a/src/Storages/ObjectStorage/S3/Configuration.cpp +++ b/src/Storages/ObjectStorage/S3/Configuration.cpp @@ -55,6 +55,14 @@ String StorageS3Configuration::getDataSourceDescription() return std::filesystem::path(url.uri.getHost() + std::to_string(url.uri.getPort())) / url.bucket; } +std::string StorageS3Configuration::getPathInArchive() const +{ + if (url.archive_pattern.has_value()) + return url.archive_pattern.value(); + + throw Exception(ErrorCodes::LOGICAL_ERROR, "Path {} is not an archive", getPath()); +} + void StorageS3Configuration::check(ContextPtr context) const { validateNamespace(url.bucket); diff --git a/src/Storages/ObjectStorage/S3/Configuration.h b/src/Storages/ObjectStorage/S3/Configuration.h index 0bd7f1ab108..de6c02d5020 100644 --- a/src/Storages/ObjectStorage/S3/Configuration.h +++ b/src/Storages/ObjectStorage/S3/Configuration.h @@ -34,6 +34,9 @@ public: String getDataSourceDescription() override; StorageObjectStorage::QuerySettings getQuerySettings(const ContextPtr &) const override; + bool isArchive() const override { return url.archive_pattern.has_value(); } + std::string getPathInArchive() const override; + void check(ContextPtr context) const override; void validateNamespace(const String & name) const override; ConfigurationPtr clone() override { return std::make_shared(*this); } diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.cpp b/src/Storages/ObjectStorage/StorageObjectStorage.cpp index bc5b347d1e0..73e3d861cff 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorage.cpp @@ -452,6 +452,16 @@ std::string StorageObjectStorage::Configuration::getPathWithoutGlobs() const return getPath().substr(0, getPath().find_first_of("*?{")); } +bool StorageObjectStorage::Configuration::isPathInArchiveWithGlobs() const +{ + return getPathInArchive().find_first_of("*?{") != std::string::npos; +} + +std::string StorageObjectStorage::Configuration::getPathInArchive() const +{ + throw Exception(ErrorCodes::LOGICAL_ERROR, "Path {} is not archive", getPath()); +} + void StorageObjectStorage::Configuration::assertInitialized() const { if (!initialized) diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.h b/src/Storages/ObjectStorage/StorageObjectStorage.h index 26b153ca0db..7b118cb7e6b 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.h +++ b/src/Storages/ObjectStorage/StorageObjectStorage.h @@ -175,6 +175,10 @@ public: bool isNamespaceWithGlobs() const; virtual std::string getPathWithoutGlobs() const; + virtual bool isArchive() const { return false; } + bool isPathInArchiveWithGlobs() const; + virtual std::string getPathInArchive() const; + virtual void check(ContextPtr context) const; virtual void validateNamespace(const String & /* name */) const {} diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp index 8d5df96ca6e..56905e6c29b 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp @@ -7,6 +7,7 @@ #include #include #include +#include #include #include #include @@ -100,10 +101,11 @@ std::shared_ptr StorageObjectStorageSourc auto settings = configuration->getQuerySettings(local_context); + std::unique_ptr iterator; if (configuration->isPathWithGlobs()) { /// Iterate through disclosed globs and make a source for each file - return std::make_shared( + iterator = std::make_unique( object_storage, configuration, predicate, virtual_columns, local_context, read_keys, settings.list_object_keys_size, settings.throw_on_zero_files_match, file_progress_callback); @@ -123,10 +125,17 @@ std::shared_ptr StorageObjectStorageSourc copy_configuration->setPaths(keys); } - return std::make_shared( + iterator = std::make_unique( object_storage, copy_configuration, virtual_columns, read_keys, settings.ignore_non_existent_file, file_progress_callback); } + + if (configuration->isArchive()) + { + return std::make_shared(object_storage, configuration, std::move(iterator), local_context, read_keys); + } + + return iterator; } void StorageObjectStorageSource::lazyInitialize(size_t processor) @@ -262,9 +271,20 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade } else { - const auto compression_method = chooseCompressionMethod(object_info->relative_path, configuration->compression_method); + CompressionMethod compression_method; const auto max_parsing_threads = need_only_count ? std::optional(1) : std::nullopt; - read_buf = createReadBuffer(object_info->relative_path, object_info->metadata->size_bytes); + + if (auto object_info_in_archive = dynamic_cast(object_info.get())) + { + compression_method = chooseCompressionMethod(configuration->getPathInArchive(), configuration->compression_method); + auto & archive_reader = object_info_in_archive->archive_reader; + read_buf = archive_reader->readFile(object_info_in_archive->path_in_archive, /*throw_on_not_found=*/true); + } + else + { + compression_method = chooseCompressionMethod(object_info->relative_path, configuration->compression_method); + read_buf = createReadBuffer(*object_info); + } auto input_format = FormatFactory::instance().getInput( configuration->format, *read_buf, read_from_format_info.format_header, @@ -312,8 +332,10 @@ std::future StorageObjectStorageSource return create_reader_scheduler([=, this] { return createReader(processor); }, Priority{}); } -std::unique_ptr StorageObjectStorageSource::createReadBuffer(const String & key, size_t object_size) +std::unique_ptr StorageObjectStorageSource::createReadBuffer(const ObjectInfo & object_info) { + const auto & object_size = object_info.metadata->size_bytes; + auto read_settings = getContext()->getReadSettings().adjustBufferSize(object_size); read_settings.enable_filesystem_cache = false; /// FIXME: Changing this setting to default value breaks something around parquet reading @@ -333,7 +355,7 @@ std::unique_ptr StorageObjectStorageSource::createReadBuffer(const S LOG_TRACE(log, "Downloading object of size {} with initial prefetch", object_size); auto async_reader = object_storage->readObjects( - StoredObjects{StoredObject{key, /* local_path */ "", object_size}}, read_settings); + StoredObjects{StoredObject{object_info.relative_path, /* local_path */ "", object_size}}, read_settings); async_reader->setReadUntilEnd(); if (read_settings.remote_fs_prefetch) @@ -344,7 +366,7 @@ std::unique_ptr StorageObjectStorageSource::createReadBuffer(const S else { /// FIXME: this is inconsistent that readObject always reads synchronously ignoring read_method setting. - return object_storage->readObject(StoredObject(key), read_settings); + return object_storage->readObject(StoredObject(object_info.relative_path, "", object_size), read_settings); } } @@ -609,4 +631,114 @@ StorageObjectStorage::ObjectInfoPtr StorageObjectStorageSource::ReadTaskIterator return buffer[current_index]; } +static IArchiveReader::NameFilter createArchivePathFilter(const std::string & archive_pattern) +{ + auto matcher = std::make_shared(makeRegexpPatternFromGlobs(archive_pattern)); + if (!matcher->ok()) + { + throw Exception(ErrorCodes::CANNOT_COMPILE_REGEXP, + "Cannot compile regex from glob ({}): {}", + archive_pattern, matcher->error()); + } + return [matcher](const std::string & p) mutable { return re2::RE2::FullMatch(p, *matcher); }; +} + +StorageObjectStorageSource::ArchiveIterator::ObjectInfoInArchive::ObjectInfoInArchive( + ObjectInfoPtr archive_object_, + const std::string & path_in_archive_, + std::shared_ptr archive_reader_) + : archive_object(archive_object_) + , path_in_archive(path_in_archive_) + , archive_reader(archive_reader_) +{ +} + +StorageObjectStorageSource::ArchiveIterator::ArchiveIterator( + ObjectStoragePtr object_storage_, + ConfigurationPtr configuration_, + std::unique_ptr archives_iterator_, + ContextPtr context_, + ObjectInfos * read_keys_) + : IIterator("ArchiveIterator") + , WithContext(context_) + , object_storage(object_storage_) + , is_path_in_archive_with_globs(configuration_->isPathInArchiveWithGlobs()) + , archives_iterator(std::move(archives_iterator_)) + , filter(is_path_in_archive_with_globs ? createArchivePathFilter(configuration_->getPathInArchive()) : IArchiveReader::NameFilter{}) + , path_in_archive(is_path_in_archive_with_globs ? "" : configuration_->getPathInArchive()) + , read_keys(read_keys_) +{ +} + +std::shared_ptr +StorageObjectStorageSource::ArchiveIterator::createArchiveReader(ObjectInfoPtr object_info) const +{ + const auto size = object_info->metadata->size_bytes; + return DB::createArchiveReader( + /* path_to_archive */object_info->relative_path, + /* archive_read_function */[=, this]() + { + StoredObject stored_object(object_info->relative_path, "", size); + return object_storage->readObject(stored_object, getContext()->getReadSettings()); + }, + /* archive_size */size); +} + +StorageObjectStorageSource::ObjectInfoPtr +StorageObjectStorageSource::ArchiveIterator::nextImpl(size_t processor) +{ + std::unique_lock lock{next_mutex}; + while (true) + { + if (filter) + { + if (!file_enumerator) + { + archive_object = archives_iterator->next(processor); + if (!archive_object) + return {}; + + archive_reader = createArchiveReader(archive_object); + file_enumerator = archive_reader->firstFile(); + if (!file_enumerator) + continue; + } + else if (!file_enumerator->nextFile()) + { + file_enumerator.reset(); + continue; + } + + path_in_archive = file_enumerator->getFileName(); + if (!filter(path_in_archive)) + continue; + } + else + { + archive_object = archives_iterator->next(processor); + if (!archive_object) + return {}; + + if (!archive_object->metadata) + archive_object->metadata = object_storage->getObjectMetadata(archive_object->relative_path); + + archive_reader = createArchiveReader(archive_object); + if (!archive_reader->fileExists(path_in_archive)) + continue; + } + + auto object_in_archive = std::make_shared(archive_object, path_in_archive, archive_reader); + + if (read_keys != nullptr) + read_keys->push_back(object_in_archive); + + return object_in_archive; + } +} + +size_t StorageObjectStorageSource::ArchiveIterator::estimatedKeysCount() +{ + return archives_iterator->estimatedKeysCount(); +} + } diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSource.h b/src/Storages/ObjectStorage/StorageObjectStorageSource.h index abaf51edc4e..664aad56928 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSource.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageSource.h @@ -1,10 +1,11 @@ #pragma once +#include +#include +#include #include #include -#include -#include #include -#include +#include namespace DB @@ -25,6 +26,7 @@ public: class ReadTaskIterator; class GlobIterator; class KeysIterator; + class ArchiveIterator; StorageObjectStorageSource( String name_, @@ -109,7 +111,7 @@ protected: /// Recreate ReadBuffer and Pipeline for each file. ReaderHolder createReader(size_t processor = 0); std::future createReaderAsync(size_t processor = 0); - std::unique_ptr createReadBuffer(const String & key, size_t object_size); + std::unique_ptr createReadBuffer(const ObjectInfo & object_info); void addNumRowsToCache(const String & path, size_t num_rows); std::optional tryGetNumRowsFromCache(const ObjectInfoPtr & object_info); @@ -218,4 +220,64 @@ private: std::atomic index = 0; bool ignore_non_existent_files; }; + +/* + * An archives iterator. + * Allows to iterate files inside one or many archives. + * `archives_iterator` is an iterator which iterates over different archives. + * There are two ways to read files in archives: + * 1. When we want to read one concete file in each archive. + * In this case we go through all archives, check if this certain file + * exists within this archive and read it if it exists. + * 2. When we have a certain pattern of files we want to read in each archive. + * For this purpose we create a filter defined as IArchiveReader::NameFilter. + */ +class StorageObjectStorageSource::ArchiveIterator : public IIterator, private WithContext +{ +public: + explicit ArchiveIterator( + ObjectStoragePtr object_storage_, + ConfigurationPtr configuration_, + std::unique_ptr archives_iterator_, + ContextPtr context_, + ObjectInfos * read_keys_); + + size_t estimatedKeysCount() override; + + struct ObjectInfoInArchive : public ObjectInfo + { + ObjectInfoInArchive( + ObjectInfoPtr archive_object_, + const std::string & path_in_archive_, + std::shared_ptr archive_reader_); + + const ObjectInfoPtr archive_object; + const std::string path_in_archive; + const std::shared_ptr archive_reader; + }; + +private: + ObjectInfoPtr nextImpl(size_t processor) override; + std::shared_ptr createArchiveReader(ObjectInfoPtr object_info) const; + + const ObjectStoragePtr object_storage; + const bool is_path_in_archive_with_globs; + /// Iterator which iterates through different archives. + const std::unique_ptr archives_iterator; + /// Used when files inside archive are defined with a glob + const IArchiveReader::NameFilter filter = {}; + /// Current file inside the archive. + std::string path_in_archive = {}; + /// Read keys of files inside archives. + ObjectInfos * read_keys; + /// Object pointing to archive (NOT path within archive). + ObjectInfoPtr archive_object; + /// Reader of the archive. + std::shared_ptr archive_reader; + /// File enumerator inside the archive. + std::unique_ptr file_enumerator; + + std::mutex next_mutex; +}; + }