diff --git a/src/Storages/StorageAzureBlob.cpp b/src/Storages/StorageAzureBlob.cpp index 91dc92f09e8..8a1c81e808e 100644 --- a/src/Storages/StorageAzureBlob.cpp +++ b/src/Storages/StorageAzureBlob.cpp @@ -611,19 +611,19 @@ Pipe StorageAzureBlob::read( requested_virtual_columns.push_back(virtual_column); } - std::shared_ptr iterator_wrapper; + std::shared_ptr iterator_wrapper; if (configuration.withGlobs()) { /// Iterate through disclosed globs and make a source for each file - iterator_wrapper = std::make_shared( - object_storage.get(), configuration.container, std::nullopt, - configuration.blob_path, query_info.query, virtual_block, local_context, nullptr); + iterator_wrapper = std::make_shared( + object_storage.get(), configuration.container, configuration.blob_path, + query_info.query, virtual_block, local_context, nullptr); } else { - iterator_wrapper = std::make_shared( + iterator_wrapper = std::make_shared( object_storage.get(), configuration.container, configuration.blobs_paths, - std::nullopt, query_info.query, virtual_block, local_context, nullptr); + query_info.query, virtual_block, local_context, nullptr); } ColumnsDescription columns_description; @@ -786,201 +786,129 @@ static void addPathToVirtualColumns(Block & block, const String & path, size_t i block.getByName("_idx").column->assumeMutableRef().insert(idx); } -StorageAzureBlobSource::Iterator::Iterator( +StorageAzureBlobSource::GlobIterator::GlobIterator( AzureObjectStorage * object_storage_, const std::string & container_, - std::optional keys_, - std::optional blob_path_with_globs_, + String blob_path_with_globs_, ASTPtr query_, const Block & virtual_header_, ContextPtr context_, RelativePathsWithMetadata * outer_blobs_) - : WithContext(context_) + : IIterator(context_) , object_storage(object_storage_) , container(container_) - , keys(keys_) , blob_path_with_globs(blob_path_with_globs_) , query(query_) , virtual_header(virtual_header_) , outer_blobs(outer_blobs_) { - if (keys.has_value() && blob_path_with_globs.has_value()) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot specify keys and glob simultaneously it's a bug"); - if (!keys.has_value() && !blob_path_with_globs.has_value()) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Both keys and glob mask are not specified"); + const String key_prefix = blob_path_with_globs.substr(0, blob_path_with_globs.find_first_of("*?{")); - if (keys) + /// We don't have to list bucket, because there is no asterisks. + if (key_prefix.size() == blob_path_with_globs.size()) { - Strings all_keys = *keys; - - blobs_with_metadata.emplace(); - /// Create a virtual block with one row to construct filter - if (query && virtual_header && !all_keys.empty()) - { - /// Append "idx" column as the filter result - virtual_header.insert({ColumnUInt64::create(), std::make_shared(), "_idx"}); - - auto block = virtual_header.cloneEmpty(); - addPathToVirtualColumns(block, fs::path(container) / all_keys.front(), 0); - - VirtualColumnUtils::prepareFilterBlockWithQuery(query, getContext(), block, filter_ast); - - if (filter_ast) - { - block = virtual_header.cloneEmpty(); - for (size_t i = 0; i < all_keys.size(); ++i) - addPathToVirtualColumns(block, fs::path(container) / all_keys[i], i); - - VirtualColumnUtils::filterBlockWithQuery(query, block, getContext(), filter_ast); - const auto & idxs = typeid_cast(*block.getByName("_idx").column); - - Strings filtered_keys; - filtered_keys.reserve(block.rows()); - for (UInt64 idx : idxs.getData()) - filtered_keys.emplace_back(std::move(all_keys[idx])); - - all_keys = std::move(filtered_keys); - } - } - - for (auto && key : all_keys) - { - ObjectMetadata object_metadata = object_storage->getObjectMetadata(key); - total_size += object_metadata.size_bytes; - blobs_with_metadata->emplace_back(RelativePathWithMetadata{key, object_metadata}); - if (outer_blobs) - outer_blobs->emplace_back(blobs_with_metadata->back()); - } - } - else - { - const String key_prefix = blob_path_with_globs->substr(0, blob_path_with_globs->find_first_of("*?{")); - - /// We don't have to list bucket, because there is no asterisks. - if (key_prefix.size() == blob_path_with_globs->size()) - { - ObjectMetadata object_metadata = object_storage->getObjectMetadata(*blob_path_with_globs); - blobs_with_metadata->emplace_back(*blob_path_with_globs, object_metadata); - if (outer_blobs) - outer_blobs->emplace_back(blobs_with_metadata->back()); - return; - } - - object_storage_iterator = object_storage->iterate(key_prefix); - - matcher = std::make_unique(makeRegexpPatternFromGlobs(*blob_path_with_globs)); - - if (!matcher->ok()) - throw Exception(ErrorCodes::CANNOT_COMPILE_REGEXP, - "Cannot compile regex from glob ({}): {}", *blob_path_with_globs, matcher->error()); - - recursive = *blob_path_with_globs == "/**" ? true : false; + ObjectMetadata object_metadata = object_storage->getObjectMetadata(blob_path_with_globs); + blobs_with_metadata.emplace_back(blob_path_with_globs, object_metadata); + if (outer_blobs) + outer_blobs->emplace_back(blobs_with_metadata.back()); + return; } + object_storage_iterator = object_storage->iterate(key_prefix); + + matcher = std::make_unique(makeRegexpPatternFromGlobs(blob_path_with_globs)); + + if (!matcher->ok()) + throw Exception( + ErrorCodes::CANNOT_COMPILE_REGEXP, "Cannot compile regex from glob ({}): {}", blob_path_with_globs, matcher->error()); + + recursive = blob_path_with_globs == "/**" ? true : false; } -RelativePathWithMetadata StorageAzureBlobSource::Iterator::next() +RelativePathWithMetadata StorageAzureBlobSource::GlobIterator::next() { + std::lock_guard lock(next_mutex); + if (is_finished) return {}; - if (keys) + bool need_new_batch = blobs_with_metadata.empty() || index >= blobs_with_metadata.size(); + + if (need_new_batch) { - size_t current_index = index.fetch_add(1, std::memory_order_relaxed); - if (current_index >= blobs_with_metadata->size()) + RelativePathsWithMetadata new_batch; + while (new_batch.empty()) { - is_finished = true; - return {}; - } - - return (*blobs_with_metadata)[current_index]; - } - else - { - bool need_new_batch = false; - { - std::lock_guard lock(next_mutex); - need_new_batch = !blobs_with_metadata || index >= blobs_with_metadata->size(); - } - - if (need_new_batch) - { - RelativePathsWithMetadata new_batch; - while (new_batch.empty()) + auto result = object_storage_iterator->getCurrrentBatchAndScheduleNext(); + if (result.has_value()) { - auto result = object_storage_iterator->getCurrrentBatchAndScheduleNext(); - if (result.has_value()) - { - new_batch = result.value(); - } - else - { - is_finished = true; - return {}; - } - - for (auto it = new_batch.begin(); it != new_batch.end();) - { - if (!recursive && !re2::RE2::FullMatch(it->relative_path, *matcher)) - it = new_batch.erase(it); - else - ++it; - } - } - - index.store(0, std::memory_order_relaxed); - if (!is_initialized) - { - createFilterAST(new_batch.front().relative_path); - is_initialized = true; - } - - if (filter_ast) - { - auto block = virtual_header.cloneEmpty(); - for (size_t i = 0; i < new_batch.size(); ++i) - addPathToVirtualColumns(block, fs::path(container) / new_batch[i].relative_path, i); - - VirtualColumnUtils::filterBlockWithQuery(query, block, getContext(), filter_ast); - const auto & idxs = typeid_cast(*block.getByName("_idx").column); - - std::lock_guard lock(next_mutex); - blob_path_with_globs.reset(); - blob_path_with_globs.emplace(); - for (UInt64 idx : idxs.getData()) - { - total_size.fetch_add(new_batch[idx].metadata.size_bytes, std::memory_order_relaxed); - blobs_with_metadata->emplace_back(std::move(new_batch[idx])); - if (outer_blobs) - outer_blobs->emplace_back(blobs_with_metadata->back()); - } + new_batch = result.value(); } else { - if (outer_blobs) - outer_blobs->insert(outer_blobs->end(), new_batch.begin(), new_batch.end()); + is_finished = true; + return {}; + } - std::lock_guard lock(next_mutex); - blobs_with_metadata = std::move(new_batch); - for (const auto & [_, info] : *blobs_with_metadata) - total_size.fetch_add(info.size_bytes, std::memory_order_relaxed); + for (auto it = new_batch.begin(); it != new_batch.end();) + { + if (!recursive && !re2::RE2::FullMatch(it->relative_path, *matcher)) + it = new_batch.erase(it); + else + ++it; } } - size_t current_index = index.fetch_add(1, std::memory_order_relaxed); - std::lock_guard lock(next_mutex); - return (*blobs_with_metadata)[current_index]; + index.store(0, std::memory_order_relaxed); + if (!is_initialized) + { + createFilterAST(new_batch.front().relative_path); + is_initialized = true; + } + + if (filter_ast) + { + auto block = virtual_header.cloneEmpty(); + for (size_t i = 0; i < new_batch.size(); ++i) + addPathToVirtualColumns(block, fs::path(container) / new_batch[i].relative_path, i); + + VirtualColumnUtils::filterBlockWithQuery(query, block, getContext(), filter_ast); + const auto & idxs = typeid_cast(*block.getByName("_idx").column); + + blobs_with_metadata.clear(); + for (UInt64 idx : idxs.getData()) + { + total_size.fetch_add(new_batch[idx].metadata.size_bytes, std::memory_order_relaxed); + blobs_with_metadata.emplace_back(std::move(new_batch[idx])); + if (outer_blobs) + outer_blobs->emplace_back(blobs_with_metadata.back()); + } + } + else + { + if (outer_blobs) + outer_blobs->insert(outer_blobs->end(), new_batch.begin(), new_batch.end()); + + blobs_with_metadata = std::move(new_batch); + for (const auto & [_, info] : blobs_with_metadata) + total_size.fetch_add(info.size_bytes, std::memory_order_relaxed); + } } + + size_t current_index = index.fetch_add(1, std::memory_order_relaxed); + if (current_index >= blobs_with_metadata.size()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Index out of bound for blob metadata"); + return blobs_with_metadata[current_index]; } -size_t StorageAzureBlobSource::Iterator::getTotalSize() const +size_t StorageAzureBlobSource::GlobIterator::getTotalSize() const { return total_size.load(std::memory_order_relaxed); } -void StorageAzureBlobSource::Iterator::createFilterAST(const String & any_key) +void StorageAzureBlobSource::GlobIterator::createFilterAST(const String & any_key) { if (!query || !virtual_header) return; @@ -995,6 +923,78 @@ void StorageAzureBlobSource::Iterator::createFilterAST(const String & any_key) } +StorageAzureBlobSource::KeysIterator::KeysIterator( + AzureObjectStorage * object_storage_, + const std::string & container_, + Strings keys_, + ASTPtr query_, + const Block & virtual_header_, + ContextPtr context_, + RelativePathsWithMetadata * outer_blobs_) + : IIterator(context_) + , object_storage(object_storage_) + , container(container_) + , query(query_) + , virtual_header(virtual_header_) + , outer_blobs(outer_blobs_) +{ + Strings all_keys = keys_; + + /// Create a virtual block with one row to construct filter + if (query && virtual_header && !all_keys.empty()) + { + /// Append "idx" column as the filter result + virtual_header.insert({ColumnUInt64::create(), std::make_shared(), "_idx"}); + + auto block = virtual_header.cloneEmpty(); + addPathToVirtualColumns(block, fs::path(container) / all_keys.front(), 0); + + VirtualColumnUtils::prepareFilterBlockWithQuery(query, getContext(), block, filter_ast); + + if (filter_ast) + { + block = virtual_header.cloneEmpty(); + for (size_t i = 0; i < all_keys.size(); ++i) + addPathToVirtualColumns(block, fs::path(container) / all_keys[i], i); + + VirtualColumnUtils::filterBlockWithQuery(query, block, getContext(), filter_ast); + const auto & idxs = typeid_cast(*block.getByName("_idx").column); + + Strings filtered_keys; + filtered_keys.reserve(block.rows()); + for (UInt64 idx : idxs.getData()) + filtered_keys.emplace_back(std::move(all_keys[idx])); + + all_keys = std::move(filtered_keys); + } + } + + for (auto && key : all_keys) + { + ObjectMetadata object_metadata = object_storage->getObjectMetadata(key); + total_size += object_metadata.size_bytes; + keys.emplace_back(RelativePathWithMetadata{key, object_metadata}); + } + + if (outer_blobs) + *outer_blobs = keys; +} + +RelativePathWithMetadata StorageAzureBlobSource::KeysIterator::next() +{ + size_t current_index = index.fetch_add(1, std::memory_order_relaxed); + if (current_index >= keys.size()) + return {}; + + return keys[current_index]; +} + +size_t StorageAzureBlobSource::KeysIterator::getTotalSize() const +{ + return total_size.load(std::memory_order_relaxed); +} + + Chunk StorageAzureBlobSource::generate() { while (true) @@ -1072,7 +1072,7 @@ StorageAzureBlobSource::StorageAzureBlobSource( String compression_hint_, AzureObjectStorage * object_storage_, const String & container_, - std::shared_ptr file_iterator_) + std::shared_ptr file_iterator_) :ISource(getHeader(sample_block_, requested_virtual_columns_)) , WithContext(context_) , requested_virtual_columns(requested_virtual_columns_) @@ -1167,18 +1167,16 @@ ColumnsDescription StorageAzureBlob::getTableStructureFromData( ContextPtr ctx) { RelativePathsWithMetadata read_keys; - std::shared_ptr file_iterator; + std::shared_ptr file_iterator; if (configuration.withGlobs()) { - file_iterator = std::make_shared( - object_storage, configuration.container, std::nullopt, - configuration.blob_path, nullptr, Block{}, ctx, &read_keys); + file_iterator = std::make_shared( + object_storage, configuration.container, configuration.blob_path, nullptr, Block{}, ctx, &read_keys); } else { - file_iterator = std::make_shared( - object_storage, configuration.container, configuration.blobs_paths, - std::nullopt, nullptr, Block{}, ctx, &read_keys); + file_iterator = std::make_shared( + object_storage, configuration.container, configuration.blobs_paths, nullptr, Block{}, ctx, &read_keys); } std::optional columns_from_cache; diff --git a/src/Storages/StorageAzureBlob.h b/src/Storages/StorageAzureBlob.h index e2001fa24ae..31b2beb05aa 100644 --- a/src/Storages/StorageAzureBlob.h +++ b/src/Storages/StorageAzureBlob.h @@ -142,28 +142,37 @@ private: class StorageAzureBlobSource : public ISource, WithContext { public: - class Iterator : WithContext + class IIterator : public WithContext { public: - Iterator( + IIterator(ContextPtr context_):WithContext(context_) {} + virtual ~IIterator() = default; + virtual RelativePathWithMetadata next() = 0; + virtual size_t getTotalSize() const = 0; + + RelativePathWithMetadata operator ()() { return next(); } + }; + + class GlobIterator : public IIterator + { + public: + GlobIterator( AzureObjectStorage * object_storage_, const std::string & container_, - std::optional keys_, - std::optional blob_path_with_globs_, + String blob_path_with_globs_, ASTPtr query_, const Block & virtual_header_, ContextPtr context_, RelativePathsWithMetadata * outer_blobs_); - RelativePathWithMetadata next(); - size_t getTotalSize() const; - ~Iterator() = default; + RelativePathWithMetadata next() override; + size_t getTotalSize() const override; + ~GlobIterator() override = default; private: AzureObjectStorage * object_storage; std::string container; - std::optional keys; - std::optional blob_path_with_globs; + String blob_path_with_globs; ASTPtr query; ASTPtr filter_ast; Block virtual_header; @@ -171,7 +180,7 @@ public: std::atomic index = 0; std::atomic total_size = 0; - std::optional blobs_with_metadata; + RelativePathsWithMetadata blobs_with_metadata; RelativePathsWithMetadata * outer_blobs; ObjectStorageIteratorPtr object_storage_iterator; bool recursive{false}; @@ -184,6 +193,37 @@ public: std::mutex next_mutex; }; + class KeysIterator : public IIterator + { + public: + KeysIterator( + AzureObjectStorage * object_storage_, + const std::string & container_, + Strings keys_, + ASTPtr query_, + const Block & virtual_header_, + ContextPtr context_, + RelativePathsWithMetadata * outer_blobs_); + + RelativePathWithMetadata next() override; + size_t getTotalSize() const override; + ~KeysIterator() override = default; + + private: + AzureObjectStorage * object_storage; + std::string container; + RelativePathsWithMetadata keys; + + ASTPtr query; + ASTPtr filter_ast; + Block virtual_header; + + std::atomic index = 0; + std::atomic total_size = 0; + + RelativePathsWithMetadata * outer_blobs; + }; + StorageAzureBlobSource( const std::vector & requested_virtual_columns_, const String & format_, @@ -196,7 +236,7 @@ public: String compression_hint_, AzureObjectStorage * object_storage_, const String & container_, - std::shared_ptr file_iterator_); + std::shared_ptr file_iterator_); ~StorageAzureBlobSource() override; @@ -217,7 +257,7 @@ private: String compression_hint; AzureObjectStorage * object_storage; String container; - std::shared_ptr file_iterator; + std::shared_ptr file_iterator; struct ReaderHolder { diff --git a/tests/integration/test_storage_azure_blob_storage/test.py b/tests/integration/test_storage_azure_blob_storage/test.py index 0de325ccd14..5f812cbe4fc 100644 --- a/tests/integration/test_storage_azure_blob_storage/test.py +++ b/tests/integration/test_storage_azure_blob_storage/test.py @@ -595,3 +595,19 @@ def test_partition_by_tf(cluster): assert "1,2,3\n" == get_azure_file_content("test_partition_tf_3.csv") assert "3,2,1\n" == get_azure_file_content("test_partition_tf_1.csv") assert "78,43,45\n" == get_azure_file_content("test_partition_tf_45.csv") + +def test_filter_using_file(cluster): + node = cluster.instances["node"] + table_format = "column1 UInt32, column2 UInt32, column3 UInt32" + partition_by = "column3" + values = "(1, 2, 3), (3, 2, 1), (78, 43, 45)" + filename = "test_partition_tf_{_partition_id}.csv" + + azure_query( + node, + f"INSERT INTO TABLE FUNCTION azureBlobStorage('http://azurite1:10000/devstoreaccount1', 'cont', '{filename}', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV', 'auto', '{table_format}') PARTITION BY {partition_by} VALUES {values}", + ) + + query = f"select count(*) from azureBlobStorage('http://azurite1:10000/devstoreaccount1', 'cont', 'test_partition_tf_*.csv', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV', 'auto', '{table_format}') WHERE _file='test_partition_tf_3.csv'" + assert azure_query(node, query) == "1\n" +