From d468a0a57794d457dc29e5c92d1733b2df0237dc Mon Sep 17 00:00:00 2001 From: divanik Date: Mon, 13 May 2024 12:00:15 +0000 Subject: [PATCH 1/6] Add archives reading support to s3 --- docker/test/stateless/setup_minio.sh | 2 +- docs/en/sql-reference/table-functions/s3.md | 19 + src/IO/S3/URI.cpp | 50 +- src/IO/S3/URI.h | 7 + .../DataLakes/DeltaLakeMetadataParser.cpp | 1 + src/Storages/DataLakes/IStorageDataLake.h | 4 +- .../DataLakes/Iceberg/IcebergMetadata.cpp | 9 +- .../DataLakes/Iceberg/StorageIceberg.h | 2 +- src/Storages/DataLakes/S3MetadataReader.cpp | 1 + src/Storages/S3Queue/StorageS3Queue.cpp | 20 +- src/Storages/StorageS3.cpp | 977 +++++++++++------- src/Storages/StorageS3.h | 158 ++- src/Storages/StorageS3Cluster.cpp | 9 +- src/TableFunctions/TableFunctionS3.cpp | 14 +- .../03036_reading_s3_archives.reference | 52 + .../0_stateless/03036_reading_s3_archives.sql | 22 + .../0_stateless/data_minio/03036_archive1.tar | Bin 0 -> 10240 bytes .../0_stateless/data_minio/03036_archive1.zip | Bin 0 -> 372 bytes .../0_stateless/data_minio/03036_archive2.tar | Bin 0 -> 10240 bytes .../0_stateless/data_minio/03036_archive2.zip | Bin 0 -> 372 bytes .../data_minio/03036_archive3.tar.gz | Bin 0 -> 185 bytes .../03036_compressed_file_archive.zip | Bin 0 -> 231 bytes 22 files changed, 920 insertions(+), 427 deletions(-) create mode 100644 tests/queries/0_stateless/03036_reading_s3_archives.reference create mode 100644 tests/queries/0_stateless/03036_reading_s3_archives.sql create mode 100644 tests/queries/0_stateless/data_minio/03036_archive1.tar create mode 100644 tests/queries/0_stateless/data_minio/03036_archive1.zip create mode 100644 tests/queries/0_stateless/data_minio/03036_archive2.tar create mode 100644 tests/queries/0_stateless/data_minio/03036_archive2.zip create mode 100644 tests/queries/0_stateless/data_minio/03036_archive3.tar.gz create mode 100644 tests/queries/0_stateless/data_minio/03036_compressed_file_archive.zip diff --git a/docker/test/stateless/setup_minio.sh b/docker/test/stateless/setup_minio.sh index c756ce4669d..2b9433edd20 100755 --- a/docker/test/stateless/setup_minio.sh +++ b/docker/test/stateless/setup_minio.sh @@ -83,7 +83,7 @@ setup_minio() { ./mc alias set clickminio http://localhost:11111 clickhouse clickhouse ./mc admin user add clickminio test testtest ./mc admin policy set clickminio readwrite user=test - ./mc mb clickminio/test + ./mc mb --ignore-existing clickminio/test if [ "$test_type" = "stateless" ]; then ./mc policy set public clickminio/test fi diff --git a/docs/en/sql-reference/table-functions/s3.md b/docs/en/sql-reference/table-functions/s3.md index 970b3e52882..38d77a98749 100644 --- a/docs/en/sql-reference/table-functions/s3.md +++ b/docs/en/sql-reference/table-functions/s3.md @@ -248,6 +248,25 @@ FROM s3( LIMIT 5; ``` + +## Working with archives + +Suppose that we have several archive files with following URIs on S3: + +- 'https://s3-us-west-1.amazonaws.com/umbrella-static/top-1m-2018-01-10.csv.zip' +- 'https://s3-us-west-1.amazonaws.com/umbrella-static/top-1m-2018-01-11.csv.zip' +- 'https://s3-us-west-1.amazonaws.com/umbrella-static/top-1m-2018-01-12.csv.zip' + +Extracting data from these archives is possible using ::. Globs can be used both in the url part as well as in the part after :: (responsible for the name of a file inside the archive). + +``` sql +SELECT * +FROM s3( + 'https://s3-us-west-1.amazonaws.com/umbrella-static/top-1m-2018-01-1{0..2}.csv.zip :: *.csv' +); +``` + + ## Virtual Columns {#virtual-columns} - `_path` — Path to the file. Type: `LowCardinalty(String)`. diff --git a/src/IO/S3/URI.cpp b/src/IO/S3/URI.cpp index 4e679e6c477..4bf7a3ddf86 100644 --- a/src/IO/S3/URI.cpp +++ b/src/IO/S3/URI.cpp @@ -1,8 +1,7 @@ #include -#include -#include "Common/Macros.h" #include #include +#include "Common/Macros.h" #if USE_AWS_S3 #include #include @@ -55,7 +54,11 @@ URI::URI(const std::string & uri_) static constexpr auto OSS = "OSS"; static constexpr auto EOS = "EOS"; - uri = Poco::URI(uri_); + if (containsArchive(uri_)) + std::tie(uri_str, archive_pattern) = getPathToArchiveAndArchivePattern(uri_); + else + uri_str = uri_; + uri = Poco::URI(uri_str); std::unordered_map mapper; auto context = Context::getGlobalContextInstance(); @@ -126,9 +129,10 @@ URI::URI(const std::string & uri_) boost::to_upper(name); /// For S3Express it will look like s3express-eun1-az1, i.e. contain region and AZ info if (name != S3 && !name.starts_with(S3EXPRESS) && name != COS && name != OBS && name != OSS && name != EOS) - throw Exception(ErrorCodes::BAD_ARGUMENTS, - "Object storage system name is unrecognized in virtual hosted style S3 URI: {}", - quoteString(name)); + throw Exception( + ErrorCodes::BAD_ARGUMENTS, + "Object storage system name is unrecognized in virtual hosted style S3 URI: {}", + quoteString(name)); if (name == COS) storage_name = COSN; @@ -156,10 +160,40 @@ void URI::validateBucket(const String & bucket, const Poco::URI & uri) /// S3 specification requires at least 3 and at most 63 characters in bucket name. /// https://docs.aws.amazon.com/awscloudtrail/latest/userguide/cloudtrail-s3-bucket-naming-requirements.html if (bucket.length() < 3 || bucket.length() > 63) - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Bucket name length is out of bounds in virtual hosted style S3 URI: {}{}", - quoteString(bucket), !uri.empty() ? " (" + uri.toString() + ")" : ""); + throw Exception( + ErrorCodes::BAD_ARGUMENTS, + "Bucket name length is out of bounds in virtual hosted style S3 URI: {}{}", + quoteString(bucket), + !uri.empty() ? " (" + uri.toString() + ")" : ""); } +bool URI::containsArchive(const std::string & source) +{ + size_t pos = source.find("::"); + return (pos != std::string::npos); +} + +std::pair URI::getPathToArchiveAndArchivePattern(const std::string & source) +{ + size_t pos = source.find("::"); + assert(pos != std::string::npos); + + std::string path_to_archive = source.substr(0, pos); + while ((!path_to_archive.empty()) && path_to_archive.ends_with(' ')) + path_to_archive.pop_back(); + + if (path_to_archive.empty()) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Path to archive is empty"); + + std::string_view path_in_archive_view = std::string_view{source}.substr(pos + 2); + while (path_in_archive_view.front() == ' ') + path_in_archive_view.remove_prefix(1); + + if (path_in_archive_view.empty()) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Filename is empty"); + + return {path_to_archive, std::string{path_in_archive_view}}; +} } } diff --git a/src/IO/S3/URI.h b/src/IO/S3/URI.h index 06b7d03aa8c..c52e6bc1441 100644 --- a/src/IO/S3/URI.h +++ b/src/IO/S3/URI.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include "config.h" @@ -28,6 +29,8 @@ struct URI std::string key; std::string version_id; std::string storage_name; + std::optional archive_pattern; + std::string uri_str; bool is_virtual_hosted_style; @@ -36,6 +39,10 @@ struct URI void addRegionToURI(const std::string & region); static void validateBucket(const std::string & bucket, const Poco::URI & uri); + +private: + bool containsArchive(const std::string & source); + std::pair getPathToArchiveAndArchivePattern(const std::string & source); }; } diff --git a/src/Storages/DataLakes/DeltaLakeMetadataParser.cpp b/src/Storages/DataLakes/DeltaLakeMetadataParser.cpp index 14a912a180d..1687a4754f5 100644 --- a/src/Storages/DataLakes/DeltaLakeMetadataParser.cpp +++ b/src/Storages/DataLakes/DeltaLakeMetadataParser.cpp @@ -17,6 +17,7 @@ #include #include #include +#include namespace fs = std::filesystem; diff --git a/src/Storages/DataLakes/IStorageDataLake.h b/src/Storages/DataLakes/IStorageDataLake.h index 711abbde38c..2147f2c9e6b 100644 --- a/src/Storages/DataLakes/IStorageDataLake.h +++ b/src/Storages/DataLakes/IStorageDataLake.h @@ -54,7 +54,7 @@ public: { std::lock_guard lock(configuration_update_mutex); updateConfigurationImpl(local_context); - return Storage::getConfiguration(); + return Storage::getConfigurationCopy(); } void updateConfiguration(const ContextPtr & local_context) override @@ -106,7 +106,7 @@ private: const bool updated = base_configuration.update(local_context); auto new_keys = getDataFiles(base_configuration, local_context); - if (!updated && new_keys == Storage::getConfiguration().keys) + if (!updated && new_keys == Storage::getConfigurationCopy().keys) return; Storage::useConfiguration(getConfigurationForDataRead(base_configuration, local_context, new_keys)); diff --git a/src/Storages/DataLakes/Iceberg/IcebergMetadata.cpp b/src/Storages/DataLakes/Iceberg/IcebergMetadata.cpp index df1536f53fc..883b2a01dc5 100644 --- a/src/Storages/DataLakes/Iceberg/IcebergMetadata.cpp +++ b/src/Storages/DataLakes/Iceberg/IcebergMetadata.cpp @@ -31,16 +31,17 @@ #include #include +#include namespace DB { namespace ErrorCodes { - extern const int FILE_DOESNT_EXIST; - extern const int ILLEGAL_COLUMN; - extern const int BAD_ARGUMENTS; - extern const int UNSUPPORTED_METHOD; +extern const int FILE_DOESNT_EXIST; +extern const int ILLEGAL_COLUMN; +extern const int BAD_ARGUMENTS; +extern const int UNSUPPORTED_METHOD; } IcebergMetadata::IcebergMetadata( diff --git a/src/Storages/DataLakes/Iceberg/StorageIceberg.h b/src/Storages/DataLakes/Iceberg/StorageIceberg.h index 45cbef0b41b..9e3885124d6 100644 --- a/src/Storages/DataLakes/Iceberg/StorageIceberg.h +++ b/src/Storages/DataLakes/Iceberg/StorageIceberg.h @@ -63,7 +63,7 @@ public: { std::lock_guard lock(configuration_update_mutex); updateConfigurationImpl(local_context); - return StorageS3::getConfiguration(); + return StorageS3::getConfigurationCopy(); } void updateConfiguration(const ContextPtr & local_context) override diff --git a/src/Storages/DataLakes/S3MetadataReader.cpp b/src/Storages/DataLakes/S3MetadataReader.cpp index d66e21550a3..62a486951fe 100644 --- a/src/Storages/DataLakes/S3MetadataReader.cpp +++ b/src/Storages/DataLakes/S3MetadataReader.cpp @@ -10,6 +10,7 @@ #include #include +#include namespace DB { diff --git a/src/Storages/S3Queue/StorageS3Queue.cpp b/src/Storages/S3Queue/StorageS3Queue.cpp index b6daadf8bc4..d3449884b3c 100644 --- a/src/Storages/S3Queue/StorageS3Queue.cpp +++ b/src/Storages/S3Queue/StorageS3Queue.cpp @@ -1,3 +1,4 @@ +#include #include "config.h" #if USE_AWS_S3 @@ -372,7 +373,11 @@ std::shared_ptr StorageS3Queue::createSource( auto configuration_snapshot = updateConfigurationAndGetCopy(local_context); auto internal_source = std::make_unique( - info, configuration.format, getName(), local_context, format_settings, + info, + configuration.format, + getName(), + local_context, + format_settings, max_block_size, configuration_snapshot.request_settings, configuration_snapshot.compression_method, @@ -380,7 +385,9 @@ std::shared_ptr StorageS3Queue::createSource( configuration_snapshot.url.bucket, configuration_snapshot.url.version_id, configuration_snapshot.url.uri.getHost() + std::to_string(configuration_snapshot.url.uri.getPort()), - file_iterator, local_context->getSettingsRef().max_download_threads, false); + file_iterator, + local_context->getSettingsRef().max_download_threads, + false); auto file_deleter = [this, bucket = configuration_snapshot.url.bucket, client = configuration_snapshot.client, blob_storage_log = BlobStorageLogWriter::create()](const std::string & path) mutable { @@ -611,8 +618,13 @@ void StorageS3Queue::checkTableStructure(const String & zookeeper_prefix, const std::shared_ptr StorageS3Queue::createFileIterator(ContextPtr local_context, const ActionsDAG::Node * predicate) { auto glob_iterator = std::make_unique( - *configuration.client, configuration.url, predicate, getVirtualsList(), local_context, - /* read_keys */nullptr, configuration.request_settings); + *configuration.client, + configuration.url, + predicate, + getVirtualsList(), + local_context, + /* read_keys */ nullptr, + configuration.request_settings); return std::make_shared(files_metadata, std::move(glob_iterator), s3queue_settings->s3queue_current_shard_num, shutdown_called); } diff --git a/src/Storages/StorageS3.cpp b/src/Storages/StorageS3.cpp index 56417369869..ea4afb52572 100644 --- a/src/Storages/StorageS3.cpp +++ b/src/Storages/StorageS3.cpp @@ -1,4 +1,14 @@ -#include "config.h" +#include +#include +#include +#include +#include +#include +#include "Common/logger_useful.h" +#include "IO/CompressionMethod.h" +#include "IO/ReadBuffer.h" +#include "Interpreters/Context_fwd.h" +#include "Storages/MergeTree/ReplicatedMergeTreePartHeader.h" #if USE_AWS_S3 @@ -158,10 +168,11 @@ public: , storage(storage_) , read_from_format_info(std::move(read_from_format_info_)) , need_only_count(need_only_count_) + , query_configuration(storage.getConfigurationCopy()) , max_block_size(max_block_size_) , num_streams(num_streams_) { - query_configuration = storage.updateConfigurationAndGetCopy(context); + query_configuration.update(context); virtual_columns = storage.getVirtualsList(); } @@ -204,7 +215,8 @@ public: , virtual_columns(virtual_columns_) , read_keys(read_keys_) , request_settings(request_settings_) - , list_objects_pool(CurrentMetrics::StorageS3Threads, CurrentMetrics::StorageS3ThreadsActive, CurrentMetrics::StorageS3ThreadsScheduled, 1) + , list_objects_pool( + CurrentMetrics::StorageS3Threads, CurrentMetrics::StorageS3ThreadsActive, CurrentMetrics::StorageS3ThreadsScheduled, 1) , list_objects_scheduler(threadPoolCallbackRunnerUnsafe(list_objects_pool, "ListObjects")) , file_progress_callback(file_progress_callback_) { @@ -474,7 +486,8 @@ StorageS3Source::DisclosedGlobIterator::DisclosedGlobIterator( KeysWithInfo * read_keys_, const S3Settings::RequestSettings & request_settings_, std::function file_progress_callback_) - : pimpl(std::make_shared(client_, globbed_uri_, predicate, virtual_columns_, context, read_keys_, request_settings_, file_progress_callback_)) + : pimpl(std::make_shared( + client_, globbed_uri_, predicate, virtual_columns_, context, read_keys_, request_settings_, file_progress_callback_)) { } @@ -562,8 +575,7 @@ StorageS3Source::KeysIterator::KeysIterator( KeysWithInfo * read_keys, std::function file_progress_callback_) : pimpl(std::make_shared( - client_, version_id_, keys_, bucket_, request_settings_, - read_keys, file_progress_callback_)) + client_, version_id_, keys_, bucket_, request_settings_, read_keys, file_progress_callback_)) { } @@ -593,7 +605,7 @@ StorageS3Source::ReadTaskIterator::ReadTaskIterator( pool.wait(); buffer.reserve(max_threads_count); for (auto & key_future : keys) - buffer.emplace_back(std::make_shared(key_future.get(), std::nullopt)); + buffer.emplace_back(std::make_shared(key_future.get())); } StorageS3Source::KeyWithInfoPtr StorageS3Source::ReadTaskIterator::next(size_t) /// NOLINT @@ -618,6 +630,124 @@ size_t StorageS3Source::ReadTaskIterator::estimatedKeysCount() return buffer.size(); } + +StorageS3Source::ArchiveIterator::ArchiveIterator( + std::unique_ptr basic_iterator_, + const std::string & archive_pattern_, + std::shared_ptr client_, + const String & bucket_, + const String & version_id_, + const S3Settings::RequestSettings & request_settings_, + ContextPtr context_, + KeysWithInfo * read_keys_) + : WithContext(context_) + , basic_iterator(std::move(basic_iterator_)) + , basic_key_with_info_ptr(nullptr) + , client(client_) + , bucket(bucket_) + , version_id(version_id_) + , request_settings(request_settings_) + , read_keys(read_keys_) +{ + if (archive_pattern_.find_first_of("*?{") != std::string::npos) + { + auto matcher = std::make_shared(makeRegexpPatternFromGlobs(archive_pattern_)); + if (!matcher->ok()) + throw Exception( + ErrorCodes::CANNOT_COMPILE_REGEXP, "Cannot compile regex from glob ({}): {}", archive_pattern_, matcher->error()); + filter = IArchiveReader::NameFilter{[matcher](const std::string & p) mutable { return re2::RE2::FullMatch(p, *matcher); }}; + } + else + { + path_in_archive = archive_pattern_; + } +} + +StorageS3Source::KeyWithInfoPtr StorageS3Source::ArchiveIterator::next(size_t) +{ + if (!path_in_archive.empty()) + { + std::unique_lock lock{take_next_mutex}; + while (true) + { + basic_key_with_info_ptr = basic_iterator->next(); + if (!basic_key_with_info_ptr) + return {}; + refreshArchiveReader(); + bool file_exists = archive_reader->fileExists(path_in_archive); + if (file_exists) + { + KeyWithInfoPtr archive_key_with_info + = std::make_shared(basic_key_with_info_ptr->key, std::nullopt, path_in_archive, archive_reader); + if (read_keys != nullptr) + read_keys->push_back(archive_key_with_info); + return archive_key_with_info; + } + } + } + else + { + std::unique_lock lock{take_next_mutex}; + while (true) + { + if (!file_enumerator) + { + basic_key_with_info_ptr = basic_iterator->next(); + if (!basic_key_with_info_ptr) + return {}; + refreshArchiveReader(); + file_enumerator = archive_reader->firstFile(); + if (!file_enumerator) + { + file_enumerator.reset(); + continue; + } + } + else if (!file_enumerator->nextFile()) + { + file_enumerator.reset(); + continue; + } + + String current_filename = file_enumerator->getFileName(); + bool satisfies = filter(current_filename); + if (satisfies) + { + KeyWithInfoPtr archive_key_with_info + = std::make_shared(basic_key_with_info_ptr->key, std::nullopt, current_filename, archive_reader); + if (read_keys != nullptr) + read_keys->push_back(archive_key_with_info); + return archive_key_with_info; + } + } + } +} + +size_t StorageS3Source::ArchiveIterator::estimatedKeysCount() +{ + return basic_iterator->estimatedKeysCount(); +} + +void StorageS3Source::ArchiveIterator::refreshArchiveReader() +{ + if (basic_key_with_info_ptr) + { + if (!basic_key_with_info_ptr->info) + { + basic_key_with_info_ptr->info = S3::getObjectInfo(*client, bucket, basic_key_with_info_ptr->key, version_id, request_settings); + } + archive_reader = createArchiveReader( + basic_key_with_info_ptr->key, + [key = basic_key_with_info_ptr->key, archive_size = basic_key_with_info_ptr->info.value().size, context = getContext(), this]() + { return createS3ReadBuffer(key, archive_size, context, client, bucket, version_id, request_settings); }, + basic_key_with_info_ptr->info.value().size); + } + else + { + archive_reader = nullptr; + } +} + StorageS3Source::StorageS3Source( const ReadFromFormatInfo & info, const String & format_, @@ -653,7 +783,8 @@ StorageS3Source::StorageS3Source( , file_iterator(file_iterator_) , max_parsing_threads(max_parsing_threads_) , need_only_count(need_only_count_) - , create_reader_pool(CurrentMetrics::StorageS3Threads, CurrentMetrics::StorageS3ThreadsActive, CurrentMetrics::StorageS3ThreadsScheduled, 1) + , create_reader_pool( + CurrentMetrics::StorageS3Threads, CurrentMetrics::StorageS3ThreadsActive, CurrentMetrics::StorageS3ThreadsScheduled, 1) , create_reader_scheduler(threadPoolCallbackRunnerUnsafe(create_reader_pool, "CreateS3Reader")) { } @@ -699,9 +830,18 @@ StorageS3Source::ReaderHolder StorageS3Source::createReader(size_t idx) } else { - auto compression_method = chooseCompressionMethod(key_with_info->key, compression_hint); - read_buf = createS3ReadBuffer(key_with_info->key, key_with_info->info->size); - + auto compression_method = CompressionMethod::None; + if (!key_with_info->path_in_archive.has_value()) + { + compression_method = chooseCompressionMethod(key_with_info->key, compression_hint); + read_buf = createS3ReadBuffer( + key_with_info->key, key_with_info->info->size, getContext(), client, bucket, version_id, request_settings); + } + else + { + compression_method = chooseCompressionMethod(key_with_info->path_in_archive.value(), compression_hint); + read_buf = key_with_info->archive_reader->readFile(key_with_info->path_in_archive.value(), /*throw_on_not_found=*/true); + } auto input_format = FormatFactory::instance().getInput( format, *read_buf, @@ -753,12 +893,20 @@ std::future StorageS3Source::createReaderAsync(si return create_reader_scheduler([=, this] { return createReader(idx); }, Priority{}); } -std::unique_ptr StorageS3Source::createS3ReadBuffer(const String & key, size_t object_size) +std::unique_ptr createS3ReadBuffer( + const String & key, + size_t object_size, + std::shared_ptr context, + std::shared_ptr client_ptr, + const String & bucket, + const String & version_id, + const S3Settings::RequestSettings & request_settings) { - auto read_settings = getContext()->getReadSettings().adjustBufferSize(object_size); + auto read_settings = context->getReadSettings().adjustBufferSize(object_size); read_settings.enable_filesystem_cache = false; - auto download_buffer_size = getContext()->getSettings().max_download_buffer_size; + auto download_buffer_size = context->getSettings().max_download_buffer_size; const bool object_too_small = object_size <= 2 * download_buffer_size; + static LoggerPtr log = getLogger("StorageS3Source"); // Create a read buffer that will prefetch the first ~1 MB of the file. // When reading lots of tiny files, this prefetching almost doubles the throughput. @@ -766,33 +914,46 @@ std::unique_ptr StorageS3Source::createS3ReadBuffer(const String & k if (object_too_small && read_settings.remote_fs_method == RemoteFSReadMethod::threadpool) { LOG_TRACE(log, "Downloading object of size {} from S3 with initial prefetch", object_size); - return createAsyncS3ReadBuffer(key, read_settings, object_size); + return createAsyncS3ReadBuffer(key, read_settings, object_size, context, client_ptr, bucket, version_id, request_settings); } + return std::make_unique( - client, bucket, key, version_id, request_settings, read_settings, - /*use_external_buffer*/ false, /*offset_*/ 0, /*read_until_position_*/ 0, - /*restricted_seek_*/ false, object_size); + client_ptr, + bucket, + key, + version_id, + request_settings, + read_settings, + /*use_external_buffer*/ false, + /*offset_*/ 0, + /*read_until_position_*/ 0, + /*restricted_seek_*/ false, + object_size); } -std::unique_ptr StorageS3Source::createAsyncS3ReadBuffer( - const String & key, const ReadSettings & read_settings, size_t object_size) +std::unique_ptr createAsyncS3ReadBuffer( + const String & key, + const ReadSettings & read_settings, + size_t object_size, + std::shared_ptr context, + std::shared_ptr client_ptr, + const String & bucket, + const String & version_id, + const S3Settings::RequestSettings & request_settings) { - auto context = getContext(); - auto read_buffer_creator = - [this, read_settings, object_size] - (bool restricted_seek, const StoredObject & object) -> std::unique_ptr + auto read_buffer_creator = [=](bool restricted_seek, const StoredObject & object) -> std::unique_ptr { return std::make_unique( - client, + client_ptr, bucket, object.remote_path, version_id, request_settings, read_settings, - /* use_external_buffer */true, - /* offset */0, - /* read_until_position */0, + /* use_external_buffer */ true, + /* offset */ 0, + /* read_until_position */ 0, restricted_seek, object_size); }; @@ -809,12 +970,12 @@ std::unique_ptr StorageS3Source::createAsyncS3ReadBuffer( StoredObjects{StoredObject{key, /* local_path */ "", object_size}}, "", read_settings, - /* cache_log */nullptr, /* use_external_buffer */true); + /* cache_log */ nullptr, + /* use_external_buffer */ true); auto & pool_reader = context->getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER); auto async_reader = std::make_unique( - std::move(s3_impl), pool_reader, modified_settings, - context->getAsyncReadCounters(), context->getFilesystemReadPrefetchesLog()); + std::move(s3_impl), pool_reader, modified_settings, context->getAsyncReadCounters(), context->getFilesystemReadPrefetchesLog()); async_reader->setReadUntilEnd(); if (read_settings.remote_fs_prefetch) @@ -855,12 +1016,17 @@ Chunk StorageS3Source::generate() if (const auto * input_format = reader.getInputFormat()) chunk_size = reader.getInputFormat()->getApproxBytesReadForChunk(); progress(num_rows, chunk_size ? chunk_size : chunk.bytes()); - VirtualColumnUtils::addRequestedPathFileAndSizeVirtualsToChunk(chunk, requested_virtual_columns, reader.getPath(), reader.getFileSize()); + VirtualColumnUtils::addRequestedPathFileAndSizeVirtualsToChunk( + chunk, + requested_virtual_columns, + reader.getPath(), + reader.getFileSize(), + reader.isArchive() ? (&reader.getFile()) : nullptr); return chunk; } if (reader.getInputFormat() && getContext()->getSettingsRef().use_cache_for_count_from_files) - addNumRowsToCache(reader.getFile(), total_rows_in_file); + addNumRowsToCache(reader.getFileExtended(), total_rows_in_file); total_rows_in_file = 0; @@ -890,10 +1056,7 @@ std::optional StorageS3Source::tryGetNumRowsFromCache(const KeyWithInfo { String source = fs::path(url_host_and_port) / bucket / key_with_info.key; auto cache_key = getKeyForSchemaCache(source, format, format_settings, getContext()); - auto get_last_mod_time = [&]() -> std::optional - { - return key_with_info.info->last_modification_time; - }; + auto get_last_mod_time = [&]() -> std::optional { return key_with_info.info->last_modification_time; }; return StorageS3::getSchemaCache(getContext()).tryGetNumRows(cache_key, get_last_mod_time); } @@ -910,9 +1073,7 @@ public: const StorageS3::Configuration & configuration_, const String & bucket, const String & key) - : SinkToStorage(sample_block_) - , sample_block(sample_block_) - , format_settings(format_settings_) + : SinkToStorage(sample_block_), sample_block(sample_block_), format_settings(format_settings_) { BlobStorageLogWriterPtr blob_log = nullptr; if (auto blob_storage_log = context->getBlobStorageLog()) @@ -1013,32 +1174,36 @@ private: namespace { - std::optional checkAndGetNewFileOnInsertIfNeeded(const ContextPtr & context, const StorageS3::Configuration & configuration, const String & key, size_t sequence_number) + +std::optional checkAndGetNewFileOnInsertIfNeeded( + const ContextPtr & context, const StorageS3::Configuration & configuration, const String & key, size_t sequence_number) +{ + if (context->getSettingsRef().s3_truncate_on_insert + || !S3::objectExists( + *configuration.client, configuration.url.bucket, key, configuration.url.version_id, configuration.request_settings)) + return std::nullopt; + + if (context->getSettingsRef().s3_create_new_file_on_insert) { - if (context->getSettingsRef().s3_truncate_on_insert || !S3::objectExists(*configuration.client, configuration.url.bucket, key, configuration.url.version_id, configuration.request_settings)) - return std::nullopt; - - if (context->getSettingsRef().s3_create_new_file_on_insert) + auto pos = key.find_first_of('.'); + String new_key; + do { - auto pos = key.find_first_of('.'); - String new_key; - do - { - new_key = key.substr(0, pos) + "." + std::to_string(sequence_number) + (pos == std::string::npos ? "" : key.substr(pos)); - ++sequence_number; - } - while (S3::objectExists(*configuration.client, configuration.url.bucket, new_key, configuration.url.version_id, configuration.request_settings)); + new_key = key.substr(0, pos) + "." + std::to_string(sequence_number) + (pos == std::string::npos ? "" : key.substr(pos)); + ++sequence_number; + } while (S3::objectExists( + *configuration.client, configuration.url.bucket, new_key, configuration.url.version_id, configuration.request_settings)); - return new_key; - } - - throw Exception( - ErrorCodes::BAD_ARGUMENTS, - "Object in bucket {} with key {} already exists. " - "If you want to overwrite it, enable setting s3_truncate_on_insert, if you " - "want to create a new file on each insert, enable setting s3_create_new_file_on_insert", - configuration.url.bucket, key); + return new_key; } + + throw Exception( + ErrorCodes::BAD_ARGUMENTS, + "Object in bucket {} with key {} already exists. " + "If you want to overwrite it, enable setting s3_truncate_on_insert, if you " + "want to create a new file on each insert, enable setting s3_create_new_file_on_insert", + configuration.url.bucket, key); +} } @@ -1055,7 +1220,8 @@ public: const StorageS3::Configuration & configuration_, const String & bucket_, const String & key_) - : PartitionedSink(partition_by, context_, sample_block_), WithContext(context_) + : PartitionedSink(partition_by, context_, sample_block_) + , WithContext(context_) , format(format_) , sample_block(sample_block_) , compression_method(compression_method_) @@ -1074,19 +1240,11 @@ public: auto partition_key = replaceWildcards(key, partition_id); validateKey(partition_key); - if (auto new_key = checkAndGetNewFileOnInsertIfNeeded(getContext(), configuration, partition_key, /* sequence_number */1)) + if (auto new_key = checkAndGetNewFileOnInsertIfNeeded(getContext(), configuration, partition_key, /* sequence_number */ 1)) partition_key = *new_key; return std::make_shared( - format, - sample_block, - getContext(), - format_settings, - compression_method, - configuration, - partition_bucket, - partition_key - ); + format, sample_block, getContext(), format_settings, compression_method, configuration, partition_bucket, partition_key); } private: @@ -1167,7 +1325,8 @@ StorageS3::StorageS3( /// We don't allow special columns in S3 storage. if (!columns_.hasOnlyOrdinary()) - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Table engine S3 doesn't support special columns like MATERIALIZED, ALIAS or EPHEMERAL"); + throw Exception( + ErrorCodes::BAD_ARGUMENTS, "Table engine S3 doesn't support special columns like MATERIALIZED, ALIAS or EPHEMERAL"); storage_metadata.setColumns(columns_); } @@ -1178,57 +1337,91 @@ StorageS3::StorageS3( } static std::shared_ptr createFileIterator( - const StorageS3::Configuration & configuration, + StorageS3::Configuration configuration, bool distributed_processing, ContextPtr local_context, const ActionsDAG::Node * predicate, const NamesAndTypesList & virtual_columns, - StorageS3::KeysWithInfo * read_keys = nullptr, + StorageS3Source::KeysWithInfo * read_keys = nullptr, std::function file_progress_callback = {}) { if (distributed_processing) { - return std::make_shared(local_context->getReadTaskCallback(), local_context->getSettingsRef().max_threads); - } - else if (configuration.withGlobs()) - { - /// Iterate through disclosed globs and make a source for each file - return std::make_shared( - *configuration.client, configuration.url, predicate, virtual_columns, - local_context, read_keys, configuration.request_settings, file_progress_callback); + return std::make_shared( + local_context->getReadTaskCallback(), local_context->getSettingsRef().max_threads); } else { - Strings keys = configuration.keys; - auto filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns); - if (filter_dag) + auto basic_iterator = [&]() -> std::unique_ptr { - std::vector paths; - paths.reserve(keys.size()); - for (const auto & key : keys) - paths.push_back(fs::path(configuration.url.bucket) / key); - VirtualColumnUtils::filterByPathOrFile(keys, paths, filter_dag, virtual_columns, local_context); + StorageS3Source::KeysWithInfo * local_read_keys = configuration.url.archive_pattern.has_value() ? nullptr : read_keys; + if (configuration.withGlobs()) + { + /// Iterate through disclosed globs and make a source for each file + return std::make_unique( + *configuration.client, + configuration.url, + predicate, + virtual_columns, + local_context, + local_read_keys, + configuration.request_settings, + file_progress_callback); + } + else + { + Strings keys = configuration.keys; + auto filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns); + if (filter_dag) + { + std::vector paths; + paths.reserve(keys.size()); + for (const auto & key : keys) + paths.push_back(fs::path(configuration.url.bucket) / key); + VirtualColumnUtils::filterByPathOrFile(keys, paths, filter_dag, virtual_columns, local_context); + } + return std::make_unique( + *configuration.client, + configuration.url.version_id, + keys, + configuration.url.bucket, + configuration.request_settings, + local_read_keys, + file_progress_callback); + } + }(); + if (configuration.url.archive_pattern.has_value()) + { + return std::make_shared( + std::move(basic_iterator), + configuration.url.archive_pattern.value(), + configuration.client, + configuration.url.bucket, + configuration.url.version_id, + configuration.request_settings, + local_context, + read_keys); + } + else + { + return basic_iterator; } - - return std::make_shared( - *configuration.client, configuration.url.version_id, keys, - configuration.url.bucket, configuration.request_settings, read_keys, file_progress_callback); } } bool StorageS3::supportsSubsetOfColumns(const ContextPtr & context) const { - return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(configuration.format, context, format_settings); + return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(getFormatCopy(), context, format_settings); } bool StorageS3::prefersLargeBlocks() const { - return FormatFactory::instance().checkIfOutputFormatPrefersLargeBlocks(configuration.format); + return FormatFactory::instance().checkIfOutputFormatPrefersLargeBlocks(getFormatCopy()); } bool StorageS3::parallelizeOutputAfterReading(ContextPtr context) const { - return FormatFactory::instance().checkParallelizeOutputAfterReading(configuration.format, context); + return FormatFactory::instance().checkParallelizeOutputAfterReading(getFormatCopy(), context); } void StorageS3::read( @@ -1241,6 +1434,7 @@ void StorageS3::read( size_t max_block_size, size_t num_streams) { + updateConfiguration(local_context); auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(local_context)); bool need_only_count = (query_info.optimize_trivial_count || read_from_format_info.requested_columns.empty()) @@ -1267,7 +1461,6 @@ void ReadFromStorageS3Step::applyFilters(ActionDAGNodes added_filter_nodes) const ActionsDAG::Node * predicate = nullptr; if (filter_actions_dag) predicate = filter_actions_dag->getOutputs().at(0); - createIterator(predicate); } @@ -1277,17 +1470,21 @@ void ReadFromStorageS3Step::createIterator(const ActionsDAG::Node * predicate) return; iterator_wrapper = createFileIterator( - query_configuration, storage.distributed_processing, context, predicate, - virtual_columns, nullptr, context->getFileProgressCallback()); + storage.getConfigurationCopy(), + storage.distributed_processing, + context, + predicate, + virtual_columns, + nullptr, + context->getFileProgressCallback()); } void ReadFromStorageS3Step::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) { - if (storage.partition_by && query_configuration.withWildcard()) + if (storage.partition_by && query_configuration.withPartitionWildcard()) throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Reading from a partitioned S3 storage is not implemented yet"); createIterator(nullptr); - size_t estimated_keys_count = iterator_wrapper->estimatedKeysCount(); if (estimated_keys_count > 1) num_streams = std::min(num_streams, estimated_keys_count); @@ -1297,9 +1494,8 @@ void ReadFromStorageS3Step::initializePipeline(QueryPipelineBuilder & pipeline, num_streams = 1; } - const auto & settings = context->getSettingsRef(); - const size_t max_parsing_threads = num_streams >= settings.max_parsing_threads ? 1 : (settings.max_parsing_threads / std::max(num_streams, 1ul)); - LOG_DEBUG(getLogger("StorageS3"), "Reading in {} streams, {} threads per stream", num_streams, max_parsing_threads); + const size_t max_threads = context->getSettingsRef().max_threads; + const size_t max_parsing_threads = num_streams >= max_threads ? 1 : (max_threads / std::max(num_streams, 1ul)); Pipes pipes; pipes.reserve(num_streams); @@ -1336,17 +1532,22 @@ void ReadFromStorageS3Step::initializePipeline(QueryPipelineBuilder & pipeline, pipeline.init(std::move(pipe)); } -SinkToStoragePtr StorageS3::write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context, bool /*async_insert*/) +SinkToStoragePtr StorageS3::write( + const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context, bool /*async_insert*/) { auto query_configuration = updateConfigurationAndGetCopy(local_context); auto key = query_configuration.keys.front(); + if (query_configuration.withGlobsIgnorePartitionWildcard()) + throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED, + "S3 key '{}' contains globs, so the table is in readonly mode", query_configuration.url.key); + auto sample_block = metadata_snapshot->getSampleBlock(); auto chosen_compression_method = chooseCompressionMethod(query_configuration.keys.back(), query_configuration.compression_method); auto insert_query = std::dynamic_pointer_cast(query); auto partition_by_ast = insert_query ? (insert_query->partition_by ? insert_query->partition_by : partition_by) : nullptr; - bool is_partitioned_implementation = partition_by_ast && query_configuration.withWildcard(); + bool is_partitioned_implementation = partition_by_ast && query_configuration.withPartitionWildcard(); if (is_partitioned_implementation) { @@ -1363,12 +1564,9 @@ SinkToStoragePtr StorageS3::write(const ASTPtr & query, const StorageMetadataPtr } else { - if (query_configuration.withGlobs()) - throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED, - "S3 key '{}' contains globs, so the table is in readonly mode", query_configuration.url.key); - - if (auto new_key = checkAndGetNewFileOnInsertIfNeeded(local_context, configuration, query_configuration.keys.front(), query_configuration.keys.size())) + if (auto new_key = checkAndGetNewFileOnInsertIfNeeded(local_context, query_configuration, query_configuration.keys.front(), query_configuration.keys.size())) { + std::lock_guard lock{configuration_update_mutex}; query_configuration.keys.push_back(*new_key); configuration.keys.push_back(*new_key); key = *new_key; @@ -1417,10 +1615,9 @@ void StorageS3::truncate(const ASTPtr & /* query */, const StorageMetadataPtr &, const auto * response_error = response.IsSuccess() ? nullptr : &response.GetError(); auto time_now = std::chrono::system_clock::now(); if (auto blob_storage_log = BlobStorageLogWriter::create()) - { for (const auto & key : query_configuration.keys) - blob_storage_log->addEvent(BlobStorageLogElement::EventType::Delete, query_configuration.url.bucket, key, {}, 0, response_error, time_now); - } + blob_storage_log->addEvent( + BlobStorageLogElement::EventType::Delete, query_configuration.url.bucket, key, {}, 0, response_error, time_now); if (!response.IsSuccess()) { @@ -1445,18 +1642,24 @@ void StorageS3::updateConfiguration(const ContextPtr & local_context) configuration.update(local_context); } -void StorageS3::useConfiguration(const Configuration & new_configuration) +void StorageS3::useConfiguration(const StorageS3::Configuration & new_configuration) { std::lock_guard lock(configuration_update_mutex); configuration = new_configuration; } -const StorageS3::Configuration & StorageS3::getConfiguration() +StorageS3::Configuration StorageS3::getConfigurationCopy() const { std::lock_guard lock(configuration_update_mutex); return configuration; } +String StorageS3::getFormatCopy() const +{ + std::lock_guard lock(configuration_update_mutex); + return configuration.format; +} + bool StorageS3::Configuration::update(const ContextPtr & context) { auto s3_settings = context->getStorageS3Settings().getSettings(url.uri.toString(), context->getUserName()); @@ -1511,7 +1714,8 @@ void StorageS3::Configuration::connect(const ContextPtr & context) .is_s3express_bucket = S3::isS3ExpressEndpoint(url.endpoint), }; - auto credentials = Aws::Auth::AWSCredentials(auth_settings.access_key_id, auth_settings.secret_access_key, auth_settings.session_token); + auto credentials + = Aws::Auth::AWSCredentials(auth_settings.access_key_id, auth_settings.secret_access_key, auth_settings.session_token); client = S3::ClientFactory::instance().create( client_configuration, client_settings, @@ -1530,6 +1734,14 @@ void StorageS3::Configuration::connect(const ContextPtr & context) credentials.GetSessionToken()); } +bool StorageS3::Configuration::withGlobsIgnorePartitionWildcard() const +{ + if (!withPartitionWildcard()) + return withGlobs(); + + return PartitionedSink::replaceWildcards(getPath(), "").find_first_of("*?{") != std::string::npos; +} + void StorageS3::processNamedCollectionResult(StorageS3::Configuration & configuration, const NamedCollection & collection) { validateNamedCollection(collection, required_configuration_keys, optional_configuration_keys); @@ -1544,10 +1756,12 @@ void StorageS3::processNamedCollectionResult(StorageS3::Configuration & configur configuration.auth_settings.secret_access_key = collection.getOrDefault("secret_access_key", ""); configuration.auth_settings.use_environment_credentials = collection.getOrDefault("use_environment_credentials", 1); configuration.auth_settings.no_sign_request = collection.getOrDefault("no_sign_request", false); - configuration.auth_settings.expiration_window_seconds = collection.getOrDefault("expiration_window_seconds", S3::DEFAULT_EXPIRATION_WINDOW_SECONDS); + configuration.auth_settings.expiration_window_seconds + = collection.getOrDefault("expiration_window_seconds", S3::DEFAULT_EXPIRATION_WINDOW_SECONDS); configuration.format = collection.getOrDefault("format", configuration.format); - configuration.compression_method = collection.getOrDefault("compression_method", collection.getOrDefault("compression", "auto")); + configuration.compression_method + = collection.getOrDefault("compression_method", collection.getOrDefault("compression", "auto")); configuration.structure = collection.getOrDefault("structure", "auto"); configuration.request_settings = S3Settings::RequestSettings(collection); @@ -1583,8 +1797,8 @@ StorageS3::Configuration StorageS3::getConfiguration(ASTs & engine_args, const C if (count == 0 || count > 6) throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, - "Storage S3 requires 1 to 5 arguments: " - "url, [NOSIGN | access_key_id, secret_access_key], name of used format and [compression_method]"); + "Storage S3 requires 1 to 6 positional arguments: " + "url, [NOSIGN | access_key_id, secret_access_key], [session_token], [name of used format], [compression_method], [headers], [extra_credentials]"); std::unordered_map engine_args_to_idx; bool no_sign_request = false; @@ -1636,13 +1850,9 @@ StorageS3::Configuration StorageS3::getConfiguration(ASTs & engine_args, const C { auto fourth_arg = checkAndGetLiteralArgument(engine_args[3], "session_token/format"); if (fourth_arg == "auto" || FormatFactory::instance().exists(fourth_arg)) - { engine_args_to_idx = {{"access_key_id", 1}, {"secret_access_key", 2}, {"format", 3}}; - } else - { engine_args_to_idx = {{"access_key_id", 1}, {"secret_access_key", 2}, {"session_token", 3}}; - } } } /// For 5 arguments we support 2 possible variants: @@ -1652,17 +1862,14 @@ StorageS3::Configuration StorageS3::getConfiguration(ASTs & engine_args, const C { auto fourth_arg = checkAndGetLiteralArgument(engine_args[3], "session_token/format"); if (fourth_arg == "auto" || FormatFactory::instance().exists(fourth_arg)) - { engine_args_to_idx = {{"access_key_id", 1}, {"secret_access_key", 2}, {"format", 3}, {"compression", 4}}; - } else - { engine_args_to_idx = {{"access_key_id", 1}, {"secret_access_key", 2}, {"session_token", 3}, {"format", 4}}; - } } else if (count == 6) { - engine_args_to_idx = {{"access_key_id", 1}, {"secret_access_key", 2}, {"session_token", 3}, {"format", 4}, {"compression_method", 5}}; + engine_args_to_idx + = {{"access_key_id", 1}, {"secret_access_key", 2}, {"session_token", 3}, {"format", 4}, {"compression_method", 5}}; } /// This argument is always the first @@ -1672,80 +1879,140 @@ StorageS3::Configuration StorageS3::getConfiguration(ASTs & engine_args, const C configuration.format = checkAndGetLiteralArgument(engine_args[engine_args_to_idx["format"]], "format"); if (engine_args_to_idx.contains("compression_method")) - configuration.compression_method = checkAndGetLiteralArgument(engine_args[engine_args_to_idx["compression_method"]], "compression_method"); + configuration.compression_method + = checkAndGetLiteralArgument(engine_args[engine_args_to_idx["compression_method"]], "compression_method"); if (engine_args_to_idx.contains("access_key_id")) - configuration.auth_settings.access_key_id = checkAndGetLiteralArgument(engine_args[engine_args_to_idx["access_key_id"]], "access_key_id"); + configuration.auth_settings.access_key_id + = checkAndGetLiteralArgument(engine_args[engine_args_to_idx["access_key_id"]], "access_key_id"); if (engine_args_to_idx.contains("secret_access_key")) - configuration.auth_settings.secret_access_key = checkAndGetLiteralArgument(engine_args[engine_args_to_idx["secret_access_key"]], "secret_access_key"); + configuration.auth_settings.secret_access_key + = checkAndGetLiteralArgument(engine_args[engine_args_to_idx["secret_access_key"]], "secret_access_key"); if (engine_args_to_idx.contains("session_token")) - configuration.auth_settings.session_token = checkAndGetLiteralArgument(engine_args[engine_args_to_idx["session_token"]], "session_token"); + configuration.auth_settings.session_token + = checkAndGetLiteralArgument(engine_args[engine_args_to_idx["session_token"]], "session_token"); if (no_sign_request) configuration.auth_settings.no_sign_request = no_sign_request; } - configuration.static_configuration = !configuration.auth_settings.access_key_id.empty() || configuration.auth_settings.no_sign_request.has_value(); + configuration.static_configuration + = !configuration.auth_settings.access_key_id.empty() || configuration.auth_settings.no_sign_request.has_value(); configuration.keys = {configuration.url.key}; if (configuration.format == "auto" && get_format_from_file) - configuration.format = FormatFactory::instance().tryGetFormatFromFileName(configuration.url.key).value_or("auto"); + { + if (configuration.url.archive_pattern.has_value()) + { + configuration.format = FormatFactory::instance() + .tryGetFormatFromFileName(Poco::URI(configuration.url.archive_pattern.value()).getPath()) + .value_or("auto"); + } + else + { + configuration.format + = FormatFactory::instance().tryGetFormatFromFileName(Poco::URI(configuration.url.uri_str).getPath()).value_or("auto"); + } + } return configuration; } ColumnsDescription StorageS3::getTableStructureFromData( - const StorageS3::Configuration & configuration, - const std::optional & format_settings, - const ContextPtr & ctx) + const StorageS3::Configuration & configuration_, const std::optional & format_settings_, const ContextPtr & ctx) { - return getTableStructureAndFormatFromDataImpl(configuration.format, configuration, format_settings, ctx).first; + return getTableStructureAndFormatFromDataImpl(configuration_.format, configuration_, format_settings_, ctx).first; } std::pair StorageS3::getTableStructureAndFormatFromData( - const StorageS3::Configuration & configuration, - const std::optional & format_settings, - const ContextPtr & ctx) + const StorageS3::Configuration & configuration, const std::optional & format_settings, const ContextPtr & ctx) { return getTableStructureAndFormatFromDataImpl(std::nullopt, configuration, format_settings, ctx); } -namespace +class ReadBufferIterator : public IReadBufferIterator, WithContext { - class ReadBufferIterator : public IReadBufferIterator, WithContext +public: + ReadBufferIterator( + std::shared_ptr file_iterator_, + const StorageS3Source::KeysWithInfo & read_keys_, + const StorageS3::Configuration & configuration_, + std::optional format_, + const std::optional & format_settings_, + ContextPtr context_) + : WithContext(context_) + , file_iterator(file_iterator_) + , read_keys(read_keys_) + , configuration(configuration_) + , format(std::move(format_)) + , format_settings(format_settings_) + , prev_read_keys_size(read_keys_.size()) { - public: - ReadBufferIterator( - std::shared_ptr file_iterator_, - const StorageS3Source::KeysWithInfo & read_keys_, - const StorageS3::Configuration & configuration_, - std::optional format_, - const std::optional & format_settings_, - const ContextPtr & context_) - : WithContext(context_) - , file_iterator(file_iterator_) - , read_keys(read_keys_) - , configuration(configuration_) - , format(std::move(format_)) - , format_settings(format_settings_) - , prev_read_keys_size(read_keys_.size()) + } + + Data next() override + { + if (first) { + /// If format is unknown we iterate through all currently read keys on first iteration and + /// try to determine format by file name. + if (!format) + { + for (const auto & key_with_info : read_keys) + { + if (auto format_from_file_name + = FormatFactory::instance().tryGetFormatFromFileName(key_with_info->formatInferenceName())) + { + format = format_from_file_name; + break; + } + } + } + + /// For default mode check cached columns for currently read keys on first iteration. + if (first && getContext()->getSettingsRef().schema_inference_mode == SchemaInferenceMode::DEFAULT) + { + if (auto cached_columns = tryGetColumnsFromCache(read_keys.begin(), read_keys.end())) + return {nullptr, cached_columns, format}; + } } - Data next() override + while (true) { - if (first) + current_key_with_info = (*file_iterator)(); + + if (!current_key_with_info || current_key_with_info->key.empty()) { - /// If format is unknown we iterate through all currently read keys on first iteration and - /// try to determine format by file name. + if (first) + { + if (format) + throw Exception( + ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, + "The table structure cannot be extracted from a {} format file, because there are no files with provided path " + "in S3 or all files are empty. You can specify table structure manually", + *format); + + throw Exception( + ErrorCodes::CANNOT_DETECT_FORMAT, + "The data format cannot be detected by the contents of the files, because there are no files with provided path " + "in S3 or all files are empty. You can specify the format manually"); + } + + return {nullptr, std::nullopt, format}; + } + + if (read_keys.size() > prev_read_keys_size) + { + /// If format is unknown we can try to determine it by new file names. if (!format) { - for (const auto & key_with_info : read_keys) + for (auto it = read_keys.begin() + prev_read_keys_size; it != read_keys.end(); ++it) { - if (auto format_from_file_name = FormatFactory::instance().tryGetFormatFromFileName(key_with_info->key)) + if (auto format_from_file_name + = FormatFactory::instance().tryGetFormatFromFileName((*it)->formatInferenceName())) { format = format_from_file_name; break; @@ -1753,228 +2020,209 @@ namespace } } - /// For default mode check cached columns for currently read keys on first iteration. - if (first && getContext()->getSettingsRef().schema_inference_mode == SchemaInferenceMode::DEFAULT) + /// Check new files in schema cache if schema inference mode is default. + if (getContext()->getSettingsRef().schema_inference_mode == SchemaInferenceMode::DEFAULT) { - if (auto cached_columns = tryGetColumnsFromCache(read_keys.begin(), read_keys.end())) - return {nullptr, cached_columns, format}; + auto columns_from_cache = tryGetColumnsFromCache(read_keys.begin() + prev_read_keys_size, read_keys.end()); + if (columns_from_cache) + return {nullptr, columns_from_cache, format}; } + + prev_read_keys_size = read_keys.size(); } - while (true) + if (getContext()->getSettingsRef().s3_skip_empty_files && current_key_with_info->info && current_key_with_info->info->size == 0) + continue; + + /// In union mode, check cached columns only for current key. + if (getContext()->getSettingsRef().schema_inference_mode == SchemaInferenceMode::UNION) { - current_key_with_info = (*file_iterator)(); - - if (!current_key_with_info || current_key_with_info->key.empty()) - { - if (first) - { - if (format) - throw Exception( - ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, - "The table structure cannot be extracted from a {} format file, because there are no files with provided path " - "in S3 or all files are empty. You can specify table structure manually", - *format); - - throw Exception( - ErrorCodes::CANNOT_DETECT_FORMAT, - "The data format cannot be detected by the contents of the files, because there are no files with provided path " - "in S3 or all files are empty. You can specify the format manually"); - } - - return {nullptr, std::nullopt, format}; - } - - /// S3 file iterator could get new keys after new iteration - if (read_keys.size() > prev_read_keys_size) - { - /// If format is unknown we can try to determine it by new file names. - if (!format) - { - for (auto it = read_keys.begin() + prev_read_keys_size; it != read_keys.end(); ++it) - { - if (auto format_from_file_name = FormatFactory::instance().tryGetFormatFromFileName((*it)->key)) - { - format = format_from_file_name; - break; - } - } - } - - /// Check new files in schema cache if schema inference mode is default. - if (getContext()->getSettingsRef().schema_inference_mode == SchemaInferenceMode::DEFAULT) - { - auto columns_from_cache = tryGetColumnsFromCache(read_keys.begin() + prev_read_keys_size, read_keys.end()); - if (columns_from_cache) - return {nullptr, columns_from_cache, format}; - } - - prev_read_keys_size = read_keys.size(); - } - - if (getContext()->getSettingsRef().s3_skip_empty_files && current_key_with_info->info && current_key_with_info->info->size == 0) - continue; - - /// In union mode, check cached columns only for current key. - if (getContext()->getSettingsRef().schema_inference_mode == SchemaInferenceMode::UNION) - { - StorageS3::KeysWithInfo keys = {current_key_with_info}; - if (auto columns_from_cache = tryGetColumnsFromCache(keys.begin(), keys.end())) - { - first = false; - return {nullptr, columns_from_cache, format}; - } - } - - int zstd_window_log_max = static_cast(getContext()->getSettingsRef().zstd_window_log_max); - auto impl = std::make_unique(configuration.client, configuration.url.bucket, current_key_with_info->key, configuration.url.version_id, configuration.request_settings, getContext()->getReadSettings()); - if (!getContext()->getSettingsRef().s3_skip_empty_files || !impl->eof()) + StorageS3Source::KeysWithInfo keys = {current_key_with_info}; + if (auto columns_from_cache = tryGetColumnsFromCache(keys.begin(), keys.end())) { first = false; - return {wrapReadBufferWithCompressionMethod(std::move(impl), chooseCompressionMethod(current_key_with_info->key, configuration.compression_method), zstd_window_log_max), std::nullopt, format}; + return {nullptr, columns_from_cache, format}; } } - } - void setNumRowsToLastFile(size_t num_rows) override - { - if (!getContext()->getSettingsRef().schema_inference_use_cache_for_s3) - return; - - String source = fs::path(configuration.url.uri.getHost() + std::to_string(configuration.url.uri.getPort())) / configuration.url.bucket / current_key_with_info->key; - auto key = getKeyForSchemaCache(source, *format, format_settings, getContext()); - StorageS3::getSchemaCache(getContext()).addNumRows(key, num_rows); - } - - void setSchemaToLastFile(const ColumnsDescription & columns) override - { - if (!getContext()->getSettingsRef().schema_inference_use_cache_for_s3 - || getContext()->getSettingsRef().schema_inference_mode != SchemaInferenceMode::UNION) - return; - - String source = fs::path(configuration.url.uri.getHost() + std::to_string(configuration.url.uri.getPort())) / configuration.url.bucket / current_key_with_info->key; - auto cache_key = getKeyForSchemaCache(source, *format, format_settings, getContext()); - StorageS3::getSchemaCache(getContext()).addColumns(cache_key, columns); - } - - void setResultingSchema(const ColumnsDescription & columns) override - { - if (!getContext()->getSettingsRef().schema_inference_use_cache_for_s3 - || getContext()->getSettingsRef().schema_inference_mode != SchemaInferenceMode::DEFAULT) - return; - - auto host_and_bucket = fs::path(configuration.url.uri.getHost() + std::to_string(configuration.url.uri.getPort())) / configuration.url.bucket; - Strings sources; - sources.reserve(read_keys.size()); - std::transform(read_keys.begin(), read_keys.end(), std::back_inserter(sources), [&](const auto & elem){ return host_and_bucket / elem->key; }); - auto cache_keys = getKeysForSchemaCache(sources, *format, format_settings, getContext()); - StorageS3::getSchemaCache(getContext()).addManyColumns(cache_keys, columns); - } - - void setFormatName(const String & format_name) override - { - format = format_name; - } - - String getLastFileName() const override - { - if (current_key_with_info) - return current_key_with_info->key; - return ""; - } - - bool supportsLastReadBufferRecreation() const override { return true; } - - std::unique_ptr recreateLastReadBuffer() override - { - chassert(current_key_with_info); int zstd_window_log_max = static_cast(getContext()->getSettingsRef().zstd_window_log_max); - auto impl = std::make_unique(configuration.client, configuration.url.bucket, current_key_with_info->key, configuration.url.version_id, configuration.request_settings, getContext()->getReadSettings()); - return wrapReadBufferWithCompressionMethod(std::move(impl), chooseCompressionMethod(current_key_with_info->key, configuration.compression_method), zstd_window_log_max); - } + std::unique_ptr impl; - private: - std::optional tryGetColumnsFromCache( - const StorageS3::KeysWithInfo::const_iterator & begin, - const StorageS3::KeysWithInfo::const_iterator & end) - { - auto context = getContext(); - if (!context->getSettingsRef().schema_inference_use_cache_for_s3) - return std::nullopt; - - auto & schema_cache = StorageS3::getSchemaCache(context); - for (auto it = begin; it < end; ++it) + if (!current_key_with_info->path_in_archive.has_value()) { - auto get_last_mod_time = [&] + impl = std::make_unique( + configuration.client, + configuration.url.bucket, + current_key_with_info->key, + configuration.url.version_id, + configuration.request_settings, + getContext()->getReadSettings()); + } + else + { + assert(current_key_with_info->archive_reader); + impl = current_key_with_info->archive_reader->readFile( + current_key_with_info->path_in_archive.value(), /*throw_on_not_found=*/true); + } + if (!getContext()->getSettingsRef().s3_skip_empty_files || !impl->eof()) + { + first = false; + // We do not need to use any data decompression algorithm if we take data from an archive because it will be decompressed automatically. + return { + wrapReadBufferWithCompressionMethod( + std::move(impl), + current_key_with_info->path_in_archive.has_value() + ? chooseCompressionMethod(current_key_with_info->path_in_archive.value(), configuration.compression_method) + : chooseCompressionMethod(current_key_with_info->key, configuration.compression_method), + zstd_window_log_max), + std::nullopt, + format}; + } + } + } + + void setNumRowsToLastFile(size_t num_rows) override + { + if (!getContext()->getSettingsRef().schema_inference_use_cache_for_s3) + return; + + String source = fs::path(configuration.url.uri.getHost() + std::to_string(configuration.url.uri.getPort())) + / configuration.url.bucket / current_key_with_info->getPath(); + auto key = getKeyForSchemaCache(source, *format, format_settings, getContext()); + StorageS3::getSchemaCache(getContext()).addNumRows(key, num_rows); + } + + void setSchemaToLastFile(const ColumnsDescription & columns) override + { + if (!getContext()->getSettingsRef().schema_inference_use_cache_for_s3 + || getContext()->getSettingsRef().schema_inference_mode != SchemaInferenceMode::UNION) + return; + + String source = fs::path(configuration.url.uri.getHost() + std::to_string(configuration.url.uri.getPort())) + / configuration.url.bucket / current_key_with_info->getPath(); + auto cache_key = getKeyForSchemaCache(source, *format, format_settings, getContext()); + StorageS3::getSchemaCache(getContext()).addColumns(cache_key, columns); + } + + void setResultingSchema(const ColumnsDescription & columns) override + { + if (!getContext()->getSettingsRef().schema_inference_use_cache_for_s3 + || getContext()->getSettingsRef().schema_inference_mode != SchemaInferenceMode::DEFAULT) + return; + + auto host_and_bucket = fs::path(configuration.url.uri.getHost() + std::to_string(configuration.url.uri.getPort())) / configuration.url.bucket; + Strings sources; + sources.reserve(read_keys.size()); + std::transform( + read_keys.begin(), + read_keys.end(), + std::back_inserter(sources), + [&](const auto & elem) { return host_and_bucket / elem->getPath(); }); + auto cache_keys = getKeysForSchemaCache(sources, *format, format_settings, getContext()); + StorageS3::getSchemaCache(getContext()).addManyColumns(cache_keys, columns); + } + + void setFormatName(const String & format_name) override + { + format = format_name; + } + + String getLastFileName() const override + { + if (current_key_with_info) + return current_key_with_info->getPath(); + return ""; + } + + bool supportsLastReadBufferRecreation() const override { return true; } + + std::unique_ptr recreateLastReadBuffer() override + { + chassert(current_key_with_info); + int zstd_window_log_max = static_cast(getContext()->getSettingsRef().zstd_window_log_max); + auto impl = std::make_unique(configuration.client, configuration.url.bucket, current_key_with_info->key, configuration.url.version_id, configuration.request_settings, getContext()->getReadSettings()); + return wrapReadBufferWithCompressionMethod(std::move(impl), chooseCompressionMethod(current_key_with_info->key, configuration.compression_method), zstd_window_log_max); + } + +private: + std::optional tryGetColumnsFromCache( + const StorageS3Source::KeysWithInfo::const_iterator & begin, const StorageS3Source::KeysWithInfo::const_iterator & end) + { + auto context = getContext(); + if (!context->getSettingsRef().schema_inference_use_cache_for_s3) + return std::nullopt; + + auto & schema_cache = StorageS3::getSchemaCache(context); + for (auto it = begin; it < end; ++it) + { + auto get_last_mod_time = [&] + { + time_t last_modification_time = 0; + if ((*it)->info) { - time_t last_modification_time = 0; - if ((*it)->info) - { - last_modification_time = (*it)->info->last_modification_time; - } - else - { - /// Note that in case of exception in getObjectInfo returned info will be empty, - /// but schema cache will handle this case and won't return columns from cache - /// because we can't say that it's valid without last modification time. - last_modification_time = S3::getObjectInfo( - *configuration.client, - configuration.url.bucket, - (*it)->key, - configuration.url.version_id, - configuration.request_settings, - /*with_metadata=*/ false, - /*throw_on_error= */ false).last_modification_time; - } - - return last_modification_time ? std::make_optional(last_modification_time) : std::nullopt; - }; - - String path = fs::path(configuration.url.bucket) / (*it)->key; - String source = fs::path(configuration.url.uri.getHost() + std::to_string(configuration.url.uri.getPort())) / path; - - if (format) - { - auto cache_key = getKeyForSchemaCache(source, *format, format_settings, context); - if (auto columns = schema_cache.tryGetColumns(cache_key, get_last_mod_time)) - return columns; + last_modification_time = (*it)->info->last_modification_time; } else { - /// If format is unknown, we can iterate through all possible input formats - /// and check if we have an entry with this format and this file in schema cache. - /// If we have such entry for some format, we can use this format to read the file. - for (const auto & format_name : FormatFactory::instance().getAllInputFormats()) + /// Note that in case of exception in getObjectInfo returned info will be empty, + /// but schema cache will handle this case and won't return columns from cache + /// because we can't say that it's valid without last modification time. + last_modification_time = S3::getObjectInfo( + *configuration.client, + configuration.url.bucket, + (*it)->key, + configuration.url.version_id, + configuration.request_settings, + /*with_metadata=*/ false, + /*throw_on_error= */ false).last_modification_time; + } + + return last_modification_time ? std::make_optional(last_modification_time) : std::nullopt; + }; + String path = fs::path(configuration.url.bucket) / (*it)->getPath(); + + String source = fs::path(configuration.url.uri.getHost() + std::to_string(configuration.url.uri.getPort())) / path; + + if (format) + { + auto cache_key = getKeyForSchemaCache(source, *format, format_settings, context); + if (auto columns = schema_cache.tryGetColumns(cache_key, get_last_mod_time)) + return columns; + } + else + { + /// If format is unknown, we can iterate through all possible input formats + /// and check if we have an entry with this format and this file in schema cache. + /// If we have such entry fcreateor some format, we can use this format to read the file. + for (const auto & format_name : FormatFactory::instance().getAllInputFormats()) + { + auto cache_key = getKeyForSchemaCache(source, format_name, format_settings, context); + if (auto columns = schema_cache.tryGetColumns(cache_key, get_last_mod_time)) { - auto cache_key = getKeyForSchemaCache(source, format_name, format_settings, context); - if (auto columns = schema_cache.tryGetColumns(cache_key, get_last_mod_time)) - { - /// Now format is known. It should be the same for all files. - format = format_name; - return columns; - } + /// Now format is known. It should be the same for all files. + format = format_name; + return columns; } } } - - return std::nullopt; } - std::shared_ptr file_iterator; - const StorageS3Source::KeysWithInfo & read_keys; - const StorageS3::Configuration & configuration; - std::optional format; - const std::optional & format_settings; - StorageS3Source::KeyWithInfoPtr current_key_with_info; - size_t prev_read_keys_size; - bool first = true; - }; + return std::nullopt; + } -} + std::shared_ptr file_iterator; + const StorageS3Source::KeysWithInfo & read_keys; + const StorageS3::Configuration & configuration; + std::optional format; + const std::optional & format_settings; + StorageS3Source::KeyWithInfoPtr current_key_with_info; + size_t prev_read_keys_size; + bool first = true; +}; std::pair StorageS3::getTableStructureAndFormatFromDataImpl( std::optional format, - const Configuration & configuration, + const StorageS3::Configuration & configuration, const std::optional & format_settings, const ContextPtr & ctx) { @@ -2071,7 +2319,6 @@ SchemaCache & StorageS3::getSchemaCache(const ContextPtr & ctx) static SchemaCache schema_cache(ctx->getConfigRef().getUInt("schema_inference_cache_max_elements_for_s3", DEFAULT_SCHEMA_CACHE_ELEMENTS)); return schema_cache; } - } #endif diff --git a/src/Storages/StorageS3.h b/src/Storages/StorageS3.h index c8ab28fb20e..7711483b186 100644 --- a/src/Storages/StorageS3.h +++ b/src/Storages/StorageS3.h @@ -1,7 +1,10 @@ #pragma once -#include "config.h" - +#include +#include +#include "IO/Archives/IArchiveReader.h" +#include "IO/Archives/createArchiveReader.h" +#include "IO/ReadBuffer.h" #if USE_AWS_S3 #include @@ -23,36 +26,52 @@ #include #include -#include - -namespace fs = std::filesystem; - namespace DB { +namespace ErrorCodes +{ +extern const int LOGICAL_ERROR; +} + class PullingPipelineExecutor; class NamedCollection; class StorageS3Source : public SourceWithKeyCondition, WithContext { public: - struct KeyWithInfo { KeyWithInfo() = default; - explicit KeyWithInfo(String key_, std::optional info_ = std::nullopt) - : key(std::move(key_)), info(std::move(info_)) {} + explicit KeyWithInfo( + String key_, + std::optional info_ = std::nullopt, + std::optional path_in_archive_ = std::nullopt, + std::shared_ptr archive_reader_ = nullptr) + : key(std::move(key_)) + , info(std::move(info_)) + , path_in_archive(std::move(path_in_archive_)) + , archive_reader(std::move(archive_reader_)) + { + if (path_in_archive.has_value() != (archive_reader != nullptr)) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Archive reader and path in archive must exist simultaneously"); + } virtual ~KeyWithInfo() = default; String key; std::optional info; + std::optional path_in_archive; + std::shared_ptr archive_reader; + + String getPath() { return path_in_archive.has_value() ? (key + "::" + path_in_archive.value()) : key; } + String formatInferenceName() { return path_in_archive.has_value() ? path_in_archive.value() : key; } }; + using KeyWithInfoPtr = std::shared_ptr; using KeysWithInfo = std::vector; - class IIterator { public: @@ -65,7 +84,7 @@ public: /// fixme: May underestimate if the glob has a strong filter, so there are few matches among the first 1000 ListObjects results. virtual size_t estimatedKeysCount() = 0; - KeyWithInfoPtr operator ()() { return next(); } + KeyWithInfoPtr operator()() { return next(); } }; class DisclosedGlobIterator : public IIterator @@ -126,6 +145,41 @@ public: ReadTaskCallback callback; }; + class ArchiveIterator : public IIterator, public WithContext + { + public: + explicit ArchiveIterator( + std::unique_ptr basic_iterator_, + const std::string & archive_pattern_, + std::shared_ptr client_, + const String & bucket_, + const String & version_id_, + const S3Settings::RequestSettings & request_settings, + ContextPtr context_, + KeysWithInfo * read_keys_); + + KeyWithInfoPtr next(size_t) override; /// NOLINT + size_t estimatedKeysCount() override; + void refreshArchiveReader(); + + private: + std::unique_ptr basic_iterator; + KeyWithInfoPtr basic_key_with_info_ptr; + std::unique_ptr basic_read_buffer; + std::shared_ptr archive_reader{nullptr}; + std::unique_ptr file_enumerator = nullptr; + std::string path_in_archive = {}; // used when reading a single file from archive + IArchiveReader::NameFilter filter = {}; // used when files inside archive are defined with a glob + std::shared_ptr client; + const String bucket; + const String version_id; + S3Settings::RequestSettings request_settings; + std::mutex take_next_mutex; + KeysWithInfo * read_keys; + }; + + friend StorageS3Source::ArchiveIterator; + StorageS3Source( const ReadFromFormatInfo & info, const String & format, @@ -194,10 +248,7 @@ private: ReaderHolder(const ReaderHolder & other) = delete; ReaderHolder & operator=(const ReaderHolder & other) = delete; - ReaderHolder(ReaderHolder && other) noexcept - { - *this = std::move(other); - } + ReaderHolder(ReaderHolder && other) noexcept { *this = std::move(other); } ReaderHolder & operator=(ReaderHolder && other) noexcept { @@ -215,8 +266,22 @@ private: explicit operator bool() const { return reader != nullptr; } PullingPipelineExecutor * operator->() { return reader.get(); } const PullingPipelineExecutor * operator->() const { return reader.get(); } - String getPath() const { return fs::path(bucket) / key_with_info->key; } - const String & getFile() const { return key_with_info->key; } + String getPath() const + { + return key_with_info->path_in_archive.has_value() + ? (bucket + "/" + key_with_info->key + "::" + key_with_info->path_in_archive.value()) + : (bucket + "/" + key_with_info->key); + } + const String & getFile() const + { + return key_with_info->path_in_archive.has_value() ? key_with_info->path_in_archive.value() : key_with_info->key; + } + String getFileExtended() const + { + return key_with_info->path_in_archive.has_value() ? (String{key_with_info->key} + "::" + key_with_info->path_in_archive.value()) + : key_with_info->key; + } + bool isArchive() { return key_with_info->path_in_archive.has_value(); } const KeyWithInfo & getKeyWithInfo() const { return *key_with_info; } std::optional getFileSize() const { return key_with_info->info ? std::optional(key_with_info->info->size) : std::nullopt; } @@ -255,9 +320,6 @@ private: ReaderHolder createReader(size_t idx = 0); std::future createReaderAsync(size_t idx = 0); - std::unique_ptr createS3ReadBuffer(const String & key, size_t object_size); - std::unique_ptr createAsyncS3ReadBuffer(const String & key, const ReadSettings & read_settings, size_t object_size); - void addNumRowsToCache(const String & key, size_t num_rows); std::optional tryGetNumRowsFromCache(const KeyWithInfo & key_with_info); }; @@ -274,7 +336,7 @@ public: { Configuration() = default; - String getPath() const { return url.key; } + const String & getPath() const { return url.key; } bool update(const ContextPtr & context); @@ -282,13 +344,14 @@ public: bool withGlobs() const { return url.key.find_first_of("*?{") != std::string::npos; } - bool withWildcard() const + bool withPartitionWildcard() const { static const String PARTITION_ID_WILDCARD = "{_partition_id}"; - return url.bucket.find(PARTITION_ID_WILDCARD) != String::npos - || keys.back().find(PARTITION_ID_WILDCARD) != String::npos; + return url.bucket.find(PARTITION_ID_WILDCARD) != String::npos || keys.back().find(PARTITION_ID_WILDCARD) != String::npos; } + bool withGlobsIgnorePartitionWildcard() const; + S3::URI url; S3::AuthSettings auth_settings; S3Settings::RequestSettings request_settings; @@ -313,10 +376,7 @@ public: bool distributed_processing_ = false, ASTPtr partition_by_ = nullptr); - String getName() const override - { - return name; - } + String getName() const override { return name; } void read( QueryPlan & query_plan, @@ -328,27 +388,25 @@ public: size_t max_block_size, size_t num_streams) override; - SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context, bool async_insert) override; + SinkToStoragePtr + write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context, bool async_insert) override; - void truncate(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context, TableExclusiveLockHolder &) override; + void truncate( + const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context, TableExclusiveLockHolder &) override; bool supportsPartitionBy() const override; - static void processNamedCollectionResult(StorageS3::Configuration & configuration, const NamedCollection & collection); + static void processNamedCollectionResult(Configuration & configuration, const NamedCollection & collection); static SchemaCache & getSchemaCache(const ContextPtr & ctx); - static StorageS3::Configuration getConfiguration(ASTs & engine_args, const ContextPtr & local_context, bool get_format_from_file = true); + static Configuration getConfiguration(ASTs & engine_args, const ContextPtr & local_context, bool get_format_from_file = true); static ColumnsDescription getTableStructureFromData( - const StorageS3::Configuration & configuration, - const std::optional & format_settings, - const ContextPtr & ctx); + const Configuration & configuration_, const std::optional & format_settings_, const ContextPtr & ctx); static std::pair getTableStructureAndFormatFromData( - const StorageS3::Configuration & configuration, - const std::optional & format_settings, - const ContextPtr & ctx); + const Configuration & configuration, const std::optional & format_settings, const ContextPtr & ctx); using KeysWithInfo = StorageS3Source::KeysWithInfo; @@ -361,7 +419,9 @@ protected: void useConfiguration(const Configuration & new_configuration); - const Configuration & getConfiguration(); + Configuration getConfigurationCopy() const; + + String getFormatCopy() const; private: friend class StorageS3Cluster; @@ -370,7 +430,7 @@ private: friend class ReadFromStorageS3Step; Configuration configuration; - std::mutex configuration_update_mutex; + mutable std::mutex configuration_update_mutex; String name; const bool distributed_processing; @@ -392,6 +452,24 @@ private: bool parallelizeOutputAfterReading(ContextPtr context) const override; }; +std::unique_ptr createS3ReadBuffer( + const String & key, + size_t object_size, + std::shared_ptr context, + std::shared_ptr client_ptr, + const String & bucket, + const String & version_id, + const S3Settings::RequestSettings & request_settings); + +std::unique_ptr createAsyncS3ReadBuffer( + const String & key, + const ReadSettings & read_settings, + size_t object_size, + std::shared_ptr context, + std::shared_ptr client_ptr, + const String & bucket, + const String & version_id, + const S3Settings::RequestSettings & request_settings); } #endif diff --git a/src/Storages/StorageS3Cluster.cpp b/src/Storages/StorageS3Cluster.cpp index 6b22771b38f..0060450eea7 100644 --- a/src/Storages/StorageS3Cluster.cpp +++ b/src/Storages/StorageS3Cluster.cpp @@ -91,7 +91,14 @@ void StorageS3Cluster::updateConfigurationIfChanged(ContextPtr local_context) RemoteQueryExecutor::Extension StorageS3Cluster::getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context) const { auto iterator = std::make_shared( - *s3_configuration.client, s3_configuration.url, predicate, getVirtualsList(), context, nullptr, s3_configuration.request_settings, context->getFileProgressCallback()); + *s3_configuration.client, + s3_configuration.url, + predicate, + getVirtualsList(), + context, + nullptr, + s3_configuration.request_settings, + context->getFileProgressCallback()); auto callback = std::make_shared>([iterator]() mutable -> String { diff --git a/src/TableFunctions/TableFunctionS3.cpp b/src/TableFunctions/TableFunctionS3.cpp index a8c100ebd44..dfb427a3bba 100644 --- a/src/TableFunctions/TableFunctionS3.cpp +++ b/src/TableFunctions/TableFunctionS3.cpp @@ -216,7 +216,19 @@ void TableFunctionS3::parseArgumentsImpl(ASTs & args, const ContextPtr & context configuration.auth_settings.no_sign_request = no_sign_request; if (configuration.format == "auto") - configuration.format = FormatFactory::instance().tryGetFormatFromFileName(Poco::URI(url).getPath()).value_or("auto"); + { + if (configuration.url.archive_pattern.has_value()) + { + configuration.format = FormatFactory::instance() + .tryGetFormatFromFileName(Poco::URI(configuration.url.archive_pattern.value()).getPath()) + .value_or("auto"); + } + else + { + configuration.format + = FormatFactory::instance().tryGetFormatFromFileName(Poco::URI(configuration.url.uri_str).getPath()).value_or("auto"); + } + } } configuration.keys = {configuration.url.key}; diff --git a/tests/queries/0_stateless/03036_reading_s3_archives.reference b/tests/queries/0_stateless/03036_reading_s3_archives.reference new file mode 100644 index 00000000000..36ced212a1b --- /dev/null +++ b/tests/queries/0_stateless/03036_reading_s3_archives.reference @@ -0,0 +1,52 @@ +1 Str1 example1.csv test/03036_archive1.zip::example1.csv +2 Str2 example1.csv test/03036_archive1.zip::example1.csv +3 Str3 example2.csv test/03036_archive2.zip::example2.csv +4 Str4 example2.csv test/03036_archive2.zip::example2.csv +5 Str5 example3.csv test/03036_archive2.zip::example3.csv +6 Str6 example3.csv test/03036_archive2.zip::example3.csv +3 Str3 example2.csv test/03036_archive1.zip::example2.csv +3 Str3 example2.csv test/03036_archive2.zip::example2.csv +4 Str4 example2.csv test/03036_archive1.zip::example2.csv +4 Str4 example2.csv test/03036_archive2.zip::example2.csv +1 Str1 example1.csv test/03036_archive1.zip::example1.csv +2 Str2 example1.csv test/03036_archive1.zip::example1.csv +3 Str3 example2.csv test/03036_archive1.zip::example2.csv +3 Str3 example2.csv test/03036_archive2.zip::example2.csv +4 Str4 example2.csv test/03036_archive1.zip::example2.csv +4 Str4 example2.csv test/03036_archive2.zip::example2.csv +5 Str5 example3.csv test/03036_archive2.zip::example3.csv +6 Str6 example3.csv test/03036_archive2.zip::example3.csv +1 Str1 example1.csv test/03036_archive1.tar::example1.csv +2 Str2 example1.csv test/03036_archive1.tar::example1.csv +7 Str7 example4.csv test/03036_archive1.tar::example4.csv +7 Str7 example4.csv test/03036_archive2.tar::example4.csv +8 Str8 example4.csv test/03036_archive1.tar::example4.csv +8 Str8 example4.csv test/03036_archive2.tar::example4.csv +5 Str5 example3.csv test/03036_archive2.tar::example3.csv +6 Str6 example3.csv test/03036_archive2.tar::example3.csv +7 Str7 example4.csv test/03036_archive2.tar::example4.csv +8 Str8 example4.csv test/03036_archive2.tar::example4.csv +9 Str9 example5.csv test/03036_archive2.tar::example5.csv +10 Str10 example5.csv test/03036_archive2.tar::example5.csv +3 Str3 example2.csv test/03036_archive3.tar.gz::example2.csv +4 Str4 example2.csv test/03036_archive3.tar.gz::example2.csv +11 Str11 example6.csv test/03036_archive3.tar.gz::example6.csv +12 Str12 example6.csv test/03036_archive3.tar.gz::example6.csv +3 Str3 example2.csv test/03036_archive3.tar.gz::example2.csv +4 Str4 example2.csv test/03036_archive3.tar.gz::example2.csv +5 Str5 example3.csv test/03036_archive2.tar::example3.csv +6 Str6 example3.csv test/03036_archive2.tar::example3.csv +3 Str3 example2.csv test/03036_archive2.zip::example2.csv +4 Str4 example2.csv test/03036_archive2.zip::example2.csv +5 Str5 example3.csv test/03036_archive2.tar::example3.csv +6 Str6 example3.csv test/03036_archive2.tar::example3.csv +7 Str7 example4.csv test/03036_archive2.tar::example4.csv +8 Str8 example4.csv test/03036_archive2.tar::example4.csv +9 Str9 example5.csv test/03036_archive2.tar::example5.csv +10 Str10 example5.csv test/03036_archive2.tar::example5.csv +3 Str3 example2.csv test/03036_archive3.tar.gz::example2.csv +4 Str4 example2.csv test/03036_archive3.tar.gz::example2.csv +5 Str5 example3.csv test/03036_archive2.tar::example3.csv +6 Str6 example3.csv test/03036_archive2.tar::example3.csv +13 Str13 example7.csv test/03036_compressed_file_archive.zip::example7.csv +14 Str14 example7.csv test/03036_compressed_file_archive.zip::example7.csv diff --git a/tests/queries/0_stateless/03036_reading_s3_archives.sql b/tests/queries/0_stateless/03036_reading_s3_archives.sql new file mode 100644 index 00000000000..98ca0425174 --- /dev/null +++ b/tests/queries/0_stateless/03036_reading_s3_archives.sql @@ -0,0 +1,22 @@ +-- Tags: no-fasttest +-- Tag no-fasttest: Depends on AWS + +SELECT id, data, _file, _path FROM s3(s3_conn, filename='03036_archive1.zip :: example1.csv') ORDER BY (id, _file, _path); +SELECT id, data, _file, _path FROM s3(s3_conn, filename='03036_archive2.zip :: example*.csv') ORDER BY (id, _file, _path); +SELECT id, data, _file, _path FROM s3(s3_conn, filename='03036_archive*.zip :: example2.csv') ORDER BY (id, _file, _path); +SELECT id, data, _file, _path FROM s3(s3_conn, filename='03036_archive*.zip :: example*') ORDER BY (id, _file, _path); +SELECT id, data, _file, _path FROM s3(s3_conn, filename='03036_archive1.tar :: example1.csv') ORDER BY (id, _file, _path); +SELECT id, data, _file, _path FROM s3(s3_conn, filename='03036_archive*.tar :: example4.csv') ORDER BY (id, _file, _path); +SELECT id, data, _file, _path FROM s3(s3_conn, filename='03036_archive2.tar :: example*.csv') ORDER BY (id, _file, _path); +SELECT id, data, _file, _path FROM s3(s3_conn, filename='03036_archive*.tar.gz :: example*.csv') ORDER BY (id, _file, _path); +SELECT id, data, _file, _path FROM s3(s3_conn, filename='03036_archive*.tar* :: example{2..3}.csv') ORDER BY (id, _file, _path); +select id, data, _file, _path from s3(s3_conn, filename='03036_archive2.zip :: nonexistent.csv'); -- { serverError CANNOT_EXTRACT_TABLE_STRUCTURE } +select id, data, _file, _path from s3(s3_conn, filename='03036_archive2.zip :: nonexistent{2..3}.csv'); -- { serverError CANNOT_EXTRACT_TABLE_STRUCTURE } +CREATE TABLE table_zip22 Engine S3(s3_conn, filename='03036_archive2.zip :: example2.csv'); +select id, data, _file, _path from table_zip22 ORDER BY (id, _file, _path); +CREATE table table_tar2star Engine S3(s3_conn, filename='03036_archive2.tar :: example*.csv'); +SELECT id, data, _file, _path FROM table_tar2star ORDER BY (id, _file, _path); +CREATE table table_tarstarglobs Engine S3(s3_conn, filename='03036_archive*.tar* :: example{2..3}.csv'); +SELECT id, data, _file, _path FROM table_tarstarglobs ORDER BY (id, _file, _path); +CREATE table table_noexist Engine s3(s3_conn, filename='03036_archive2.zip :: nonexistent.csv'); -- { serverError INCORRECT_QUERY } +SELECT id, data, _file, _path FROM s3(s3_conn, filename='03036_compressed_file_archive.zip :: example7.csv', format='CSV', structure='auto', compression_method='gz') ORDER BY (id, _file, _path) \ No newline at end of file diff --git a/tests/queries/0_stateless/data_minio/03036_archive1.tar b/tests/queries/0_stateless/data_minio/03036_archive1.tar new file mode 100644 index 0000000000000000000000000000000000000000..55b3ddc745a2af5a59229a0ecabaee0e52d6e7dd GIT binary patch literal 10240 zcmeIx(F(#K7=~ewcNJsDBYrnGyN|A5wH*W%v1xSqEhEUI24mFV^+e@Y(MP`S-kh(e zPHzswO)b=9Y;|qa*lAnIXa6jTlv-(1ixH_zIo5Syi%O-k(0!s~{otBDO_ksKi}O(_ zntZEAi`O$bwt3sQCeeAebG^=zA`FUbJx}1KGX%O<{>{JSzp^&u-x%^g<69?hedphX z*xM}GhQXHnPbOp`0tg_000IagfB*srAbt30;YM)7 z)qw_U;4%Cf*zmANMvTTjAj27%|I>1_nkT-qHwSq56as;uADqAe)HkU}O_x d5hg;N4>S?vd<+v=*+8~40pSuLy$i%)000M4P!|9I literal 0 HcmV?d00001 diff --git a/tests/queries/0_stateless/data_minio/03036_archive2.tar b/tests/queries/0_stateless/data_minio/03036_archive2.tar new file mode 100644 index 0000000000000000000000000000000000000000..4cc3f6830a57979e88d1ea5eeeb16e43a470f925 GIT binary patch literal 10240 zcmeIzOAdlC7)H^KSq16Xn7`16uEQ0eGB8nN6pYJPD;gO|f-jQWN!zqQIpw3}so3n+ zWj?q zHPq_78gES=PG_|oEsLs1+(^%=z0--04X=mu3aqq;KzYl*{hR!U5c6*>`R}o+a~{<|6lp8?)jg_*iUt0B({w32l&bc@DKzL mKmY**5I_I{1Q0*~0R#|0009ILKmY**5I_I{1Q0;rO#)|UT~~1c literal 0 HcmV?d00001 diff --git a/tests/queries/0_stateless/data_minio/03036_archive2.zip b/tests/queries/0_stateless/data_minio/03036_archive2.zip new file mode 100644 index 0000000000000000000000000000000000000000..8b49dc8d9f4d529f79eb0faf9d99acf401a6d4d1 GIT binary patch literal 372 zcmWIWW@Zs#U|`^2;4Ep4NGZ6oT^z`h1Y#Zr8HUt~#N2|MRAas5;D`hbCf5s0@mf>@|NVTJev%@@cfVmcVv e#8`xhQ0D_p1UVnWL{>JC?My(p1W4}!aTow}oKKYi literal 0 HcmV?d00001 diff --git a/tests/queries/0_stateless/data_minio/03036_archive3.tar.gz b/tests/queries/0_stateless/data_minio/03036_archive3.tar.gz new file mode 100644 index 0000000000000000000000000000000000000000..888717640710248dc8668d69a15ae0c6864832c7 GIT binary patch literal 185 zcmb2|=3oE==C@aT`I-y_93HN-WKHxv;}d<#(Z*3!Of8GY{@CMg0pYM+OeU50mrdT% zeRI#X{)(M*vv*r>E8i?2+43NM`LxzE=R1QJ+j%6$O6c{h(A{giQfKRhnEPcLJ%caH z|H{5zU-A9vKmWO__H@Pi#LRoU`&eqHJp26Iy%J%8{(swZf9oInx7Q=EtLN`-hyU3X h-`87}t)Hcuwfy2Ic8K#Jx)000@hQpo@S literal 0 HcmV?d00001 diff --git a/tests/queries/0_stateless/data_minio/03036_compressed_file_archive.zip b/tests/queries/0_stateless/data_minio/03036_compressed_file_archive.zip new file mode 100644 index 0000000000000000000000000000000000000000..619f81327a8b321b4366cb5906ba871d94271136 GIT binary patch literal 231 zcmWIWW@h1H0D&-{g%L_Ntq(1MY!K#QkYPxzNX#wBNj29?E-niV;bdUm!ef*s#%q*T zTEWf0$nt`jfdNd&cXM!nq!^fybugUvx#;Ps Date: Mon, 13 May 2024 12:37:03 +0000 Subject: [PATCH 2/6] Resolve several issues --- src/Storages/StorageS3.cpp | 20 +++++++------------- src/Storages/StorageS3.h | 23 +++++------------------ 2 files changed, 12 insertions(+), 31 deletions(-) diff --git a/src/Storages/StorageS3.cpp b/src/Storages/StorageS3.cpp index 5d5f6ee56d2..9768653f3fe 100644 --- a/src/Storages/StorageS3.cpp +++ b/src/Storages/StorageS3.cpp @@ -1016,17 +1016,14 @@ Chunk StorageS3Source::generate() if (const auto * input_format = reader.getInputFormat()) chunk_size = reader.getInputFormat()->getApproxBytesReadForChunk(); progress(num_rows, chunk_size ? chunk_size : chunk.bytes()); + String file_name = reader.getFile(); VirtualColumnUtils::addRequestedPathFileAndSizeVirtualsToChunk( - chunk, - requested_virtual_columns, - reader.getPath(), - reader.getFileSize(), - reader.isArchive() ? (&reader.getFile()) : nullptr); + chunk, requested_virtual_columns, reader.getPath(), reader.getFileSize(), reader.isArchive() ? (&file_name) : nullptr); return chunk; } if (reader.getInputFormat() && getContext()->getSettingsRef().use_cache_for_count_from_files) - addNumRowsToCache(reader.getFileExtended(), total_rows_in_file); + addNumRowsToCache(reader.getPath(), total_rows_in_file); total_rows_in_file = 0; @@ -1045,9 +1042,9 @@ Chunk StorageS3Source::generate() return {}; } -void StorageS3Source::addNumRowsToCache(const String & key, size_t num_rows) +void StorageS3Source::addNumRowsToCache(const String & bucket_with_key, size_t num_rows) { - String source = fs::path(url_host_and_port) / bucket / key; + String source = fs::path(url_host_and_port) / bucket_with_key; auto cache_key = getKeyForSchemaCache(source, format, format_settings, getContext()); StorageS3::getSchemaCache(getContext()).addNumRows(cache_key, num_rows); } @@ -1963,8 +1960,7 @@ public: { for (const auto & key_with_info : read_keys) { - if (auto format_from_file_name - = FormatFactory::instance().tryGetFormatFromFileName(key_with_info->formatInferenceName())) + if (auto format_from_file_name = FormatFactory::instance().tryGetFormatFromFileName(key_with_info->getFileName())) { format = format_from_file_name; break; @@ -2011,8 +2007,7 @@ public: { for (auto it = read_keys.begin() + prev_read_keys_size; it != read_keys.end(); ++it) { - if (auto format_from_file_name - = FormatFactory::instance().tryGetFormatFromFileName((*it)->formatInferenceName())) + if (auto format_from_file_name = FormatFactory::instance().tryGetFormatFromFileName((*it)->getFileName())) { format = format_from_file_name; break; @@ -2067,7 +2062,6 @@ public: if (!getContext()->getSettingsRef().s3_skip_empty_files || !impl->eof()) { first = false; - // We do not need to use any data decompression algorithm if we take data from an archive because it will be decompressed automatically. return { wrapReadBufferWithCompressionMethod( std::move(impl), diff --git a/src/Storages/StorageS3.h b/src/Storages/StorageS3.h index 7711483b186..606c677f915 100644 --- a/src/Storages/StorageS3.h +++ b/src/Storages/StorageS3.h @@ -65,8 +65,8 @@ public: std::optional path_in_archive; std::shared_ptr archive_reader; - String getPath() { return path_in_archive.has_value() ? (key + "::" + path_in_archive.value()) : key; } - String formatInferenceName() { return path_in_archive.has_value() ? path_in_archive.value() : key; } + String getPath() const { return path_in_archive.has_value() ? (key + "::" + path_in_archive.value()) : key; } + String getFileName() const { return path_in_archive.has_value() ? path_in_archive.value() : key; } }; using KeyWithInfoPtr = std::shared_ptr; @@ -266,21 +266,8 @@ private: explicit operator bool() const { return reader != nullptr; } PullingPipelineExecutor * operator->() { return reader.get(); } const PullingPipelineExecutor * operator->() const { return reader.get(); } - String getPath() const - { - return key_with_info->path_in_archive.has_value() - ? (bucket + "/" + key_with_info->key + "::" + key_with_info->path_in_archive.value()) - : (bucket + "/" + key_with_info->key); - } - const String & getFile() const - { - return key_with_info->path_in_archive.has_value() ? key_with_info->path_in_archive.value() : key_with_info->key; - } - String getFileExtended() const - { - return key_with_info->path_in_archive.has_value() ? (String{key_with_info->key} + "::" + key_with_info->path_in_archive.value()) - : key_with_info->key; - } + String getPath() const { return bucket + "/" + key_with_info->getPath(); } + String getFile() const { return key_with_info->getFileName(); } bool isArchive() { return key_with_info->path_in_archive.has_value(); } const KeyWithInfo & getKeyWithInfo() const { return *key_with_info; } std::optional getFileSize() const { return key_with_info->info ? std::optional(key_with_info->info->size) : std::nullopt; } @@ -320,7 +307,7 @@ private: ReaderHolder createReader(size_t idx = 0); std::future createReaderAsync(size_t idx = 0); - void addNumRowsToCache(const String & key, size_t num_rows); + void addNumRowsToCache(const String & bucket_with_key, size_t num_rows); std::optional tryGetNumRowsFromCache(const KeyWithInfo & key_with_info); }; From 6ec868a3df1dac8f905a9df00e609743eb0c3346 Mon Sep 17 00:00:00 2001 From: divanik Date: Mon, 13 May 2024 20:04:21 +0000 Subject: [PATCH 3/6] Add schema inference test --- ...6_schema_inference_cache_s3_archives.reference | 14 ++++++++++++++ .../03036_schema_inference_cache_s3_archives.sql | 6 ++++++ .../0_stateless/data_minio/03036_json_archive.zip | Bin 0 -> 418 bytes 3 files changed, 20 insertions(+) create mode 100644 tests/queries/0_stateless/03036_schema_inference_cache_s3_archives.reference create mode 100644 tests/queries/0_stateless/03036_schema_inference_cache_s3_archives.sql create mode 100644 tests/queries/0_stateless/data_minio/03036_json_archive.zip diff --git a/tests/queries/0_stateless/03036_schema_inference_cache_s3_archives.reference b/tests/queries/0_stateless/03036_schema_inference_cache_s3_archives.reference new file mode 100644 index 00000000000..8bee9a685e3 --- /dev/null +++ b/tests/queries/0_stateless/03036_schema_inference_cache_s3_archives.reference @@ -0,0 +1,14 @@ +1 Str1 +2 Str2 +3 Str3 +4 Str4 +DEFAULT 03036_archive1.zip::example1.csv id Nullable(Int64), data Nullable(String) +DEFAULT 03036_archive1.zip::example2.csv \N +21 Str21 +22 Str22 +23 Str23 +24 Str24 +DEFAULT 03036_archive1.zip::example1.csv id Nullable(Int64), data Nullable(String) +DEFAULT 03036_archive1.zip::example2.csv \N +UNION 03036_json_archive.zip::example11.jsonl id Nullable(Int64), data Nullable(String) +UNION 03036_json_archive.zip::example12.jsonl id Nullable(Int64), data Nullable(String) diff --git a/tests/queries/0_stateless/03036_schema_inference_cache_s3_archives.sql b/tests/queries/0_stateless/03036_schema_inference_cache_s3_archives.sql new file mode 100644 index 00000000000..bfa50f1ebe1 --- /dev/null +++ b/tests/queries/0_stateless/03036_schema_inference_cache_s3_archives.sql @@ -0,0 +1,6 @@ +select * from s3(s3_conn, filename='03036_archive1.zip :: example{1,2}.csv') order by tuple(*); +select schema_inference_mode, splitByChar('/', source)[-1] as file, schema from system.schema_inference_cache order by file; + +set schema_inference_mode = 'union'; +select * from s3(s3_conn, filename='03036_json_archive.zip :: example{11,12}.jsonl') order by tuple(*); +select schema_inference_mode, splitByChar('/', source)[-1] as file, schema from system.schema_inference_cache order by file; \ No newline at end of file diff --git a/tests/queries/0_stateless/data_minio/03036_json_archive.zip b/tests/queries/0_stateless/data_minio/03036_json_archive.zip new file mode 100644 index 0000000000000000000000000000000000000000..31aa2c168b2f79142f92f178e09bd2124e5c5fe7 GIT binary patch literal 418 zcmWIWW@Zs#U|`^2aG1X~!ctkuSOdtj17dy#8HUt~#N2|MR6|3(tm6ErLy&>dmB0WmPaRK|!YNJOT|GQgvr0uCTylGH z$<1r)Muq^mg&sf)U#wVAj?F?NBnz(tExU%-!fPlN_CPFDdvQq(Vj&}w95XIIN&tPz zz`zK^TN*(u3?H*Xe2nJX0B?k$nEpUEv=GTqxUYbQf_#NxC@UMt4NO3|3P?W!aToyi C?rYEh literal 0 HcmV?d00001 From 1eb72e7af3f7f4d9aed43e535712a4d41dd60b71 Mon Sep 17 00:00:00 2001 From: divanik Date: Tue, 14 May 2024 09:25:23 +0000 Subject: [PATCH 4/6] Add no fasttest tag --- .../0_stateless/03036_schema_inference_cache_s3_archives.sql | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/queries/0_stateless/03036_schema_inference_cache_s3_archives.sql b/tests/queries/0_stateless/03036_schema_inference_cache_s3_archives.sql index bfa50f1ebe1..1a73eadbb53 100644 --- a/tests/queries/0_stateless/03036_schema_inference_cache_s3_archives.sql +++ b/tests/queries/0_stateless/03036_schema_inference_cache_s3_archives.sql @@ -1,3 +1,6 @@ +-- Tags: no-fasttest +-- Tag no-fasttest: Depends on AWS + select * from s3(s3_conn, filename='03036_archive1.zip :: example{1,2}.csv') order by tuple(*); select schema_inference_mode, splitByChar('/', source)[-1] as file, schema from system.schema_inference_cache order by file; From 3737e6fdea48d5aea308f08bf9b7684345b84165 Mon Sep 17 00:00:00 2001 From: divanik Date: Tue, 14 May 2024 13:05:41 +0000 Subject: [PATCH 5/6] Possible fix of stability of schema_infernce_cache test --- .../0_stateless/03036_schema_inference_cache_s3_archives.sql | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/queries/0_stateless/03036_schema_inference_cache_s3_archives.sql b/tests/queries/0_stateless/03036_schema_inference_cache_s3_archives.sql index 1a73eadbb53..6f7134cfa38 100644 --- a/tests/queries/0_stateless/03036_schema_inference_cache_s3_archives.sql +++ b/tests/queries/0_stateless/03036_schema_inference_cache_s3_archives.sql @@ -1,5 +1,6 @@ -- Tags: no-fasttest -- Tag no-fasttest: Depends on AWS +SYSTEM DROP SCHEMA CACHE FOR S3; select * from s3(s3_conn, filename='03036_archive1.zip :: example{1,2}.csv') order by tuple(*); select schema_inference_mode, splitByChar('/', source)[-1] as file, schema from system.schema_inference_cache order by file; From 851a8495208cc77e56788bf52f86ce89f17cb741 Mon Sep 17 00:00:00 2001 From: divanik Date: Tue, 14 May 2024 15:05:33 +0000 Subject: [PATCH 6/6] Fix test --- ...03036_schema_inference_cache_s3_archives.reference | 3 --- .../03036_schema_inference_cache_s3_archives.sql | 11 +++++------ 2 files changed, 5 insertions(+), 9 deletions(-) diff --git a/tests/queries/0_stateless/03036_schema_inference_cache_s3_archives.reference b/tests/queries/0_stateless/03036_schema_inference_cache_s3_archives.reference index 8bee9a685e3..34f4287f360 100644 --- a/tests/queries/0_stateless/03036_schema_inference_cache_s3_archives.reference +++ b/tests/queries/0_stateless/03036_schema_inference_cache_s3_archives.reference @@ -3,12 +3,9 @@ 3 Str3 4 Str4 DEFAULT 03036_archive1.zip::example1.csv id Nullable(Int64), data Nullable(String) -DEFAULT 03036_archive1.zip::example2.csv \N 21 Str21 22 Str22 23 Str23 24 Str24 -DEFAULT 03036_archive1.zip::example1.csv id Nullable(Int64), data Nullable(String) -DEFAULT 03036_archive1.zip::example2.csv \N UNION 03036_json_archive.zip::example11.jsonl id Nullable(Int64), data Nullable(String) UNION 03036_json_archive.zip::example12.jsonl id Nullable(Int64), data Nullable(String) diff --git a/tests/queries/0_stateless/03036_schema_inference_cache_s3_archives.sql b/tests/queries/0_stateless/03036_schema_inference_cache_s3_archives.sql index 6f7134cfa38..61b3e1d6f43 100644 --- a/tests/queries/0_stateless/03036_schema_inference_cache_s3_archives.sql +++ b/tests/queries/0_stateless/03036_schema_inference_cache_s3_archives.sql @@ -1,10 +1,9 @@ -- Tags: no-fasttest -- Tag no-fasttest: Depends on AWS -SYSTEM DROP SCHEMA CACHE FOR S3; -select * from s3(s3_conn, filename='03036_archive1.zip :: example{1,2}.csv') order by tuple(*); -select schema_inference_mode, splitByChar('/', source)[-1] as file, schema from system.schema_inference_cache order by file; +SELECT * FROM s3(s3_conn, filename='03036_archive1.zip :: example{1,2}.csv') ORDER BY tuple(*); +SELECT schema_inference_mode, splitByChar('/', source)[-1] as file, schema FROM system.schema_inference_cache WHERE file = '03036_archive1.zip::example1.csv' ORDER BY file; -set schema_inference_mode = 'union'; -select * from s3(s3_conn, filename='03036_json_archive.zip :: example{11,12}.jsonl') order by tuple(*); -select schema_inference_mode, splitByChar('/', source)[-1] as file, schema from system.schema_inference_cache order by file; \ No newline at end of file +SET schema_inference_mode = 'union'; +SELECT * FROM s3(s3_conn, filename='03036_json_archive.zip :: example{11,12}.jsonl') ORDER BY tuple(*); +SELECT schema_inference_mode, splitByChar('/', source)[-1] as file, schema FROM system.schema_inference_cache WHERE startsWith(file, '03036_json_archive.zip') ORDER BY file; \ No newline at end of file