From 332893344d3cbca205b0d99671cd4c8ba26ec2da Mon Sep 17 00:00:00 2001 From: Smita Kulkarni Date: Tue, 13 Jun 2023 16:50:10 +0200 Subject: [PATCH 01/12] Updated lock for accessing azure blob storage iterator --- src/Storages/StorageAzureBlob.cpp | 6 +----- tests/integration/test_storage_azure_blob_storage/test.py | 1 - 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/src/Storages/StorageAzureBlob.cpp b/src/Storages/StorageAzureBlob.cpp index 3ee176a68b7..b9d59f04001 100644 --- a/src/Storages/StorageAzureBlob.cpp +++ b/src/Storages/StorageAzureBlob.cpp @@ -882,6 +882,7 @@ StorageAzureBlobSource::Iterator::Iterator( RelativePathWithMetadata StorageAzureBlobSource::Iterator::next() { + std::lock_guard lock(next_mutex); if (is_finished) return {}; @@ -900,7 +901,6 @@ RelativePathWithMetadata StorageAzureBlobSource::Iterator::next() { bool need_new_batch = false; { - std::lock_guard lock(next_mutex); need_new_batch = !blobs_with_metadata || index >= blobs_with_metadata->size(); } @@ -945,7 +945,6 @@ RelativePathWithMetadata StorageAzureBlobSource::Iterator::next() VirtualColumnUtils::filterBlockWithQuery(query, block, getContext(), filter_ast); const auto & idxs = typeid_cast(*block.getByName("_idx").column); - std::lock_guard lock(next_mutex); blob_path_with_globs.reset(); blob_path_with_globs.emplace(); for (UInt64 idx : idxs.getData()) @@ -961,7 +960,6 @@ RelativePathWithMetadata StorageAzureBlobSource::Iterator::next() if (outer_blobs) outer_blobs->insert(outer_blobs->end(), new_batch.begin(), new_batch.end()); - std::lock_guard lock(next_mutex); blobs_with_metadata = std::move(new_batch); for (const auto & [_, info] : *blobs_with_metadata) total_size.fetch_add(info.size_bytes, std::memory_order_relaxed); @@ -969,8 +967,6 @@ RelativePathWithMetadata StorageAzureBlobSource::Iterator::next() } size_t current_index = index.fetch_add(1, std::memory_order_relaxed); - - std::lock_guard lock(next_mutex); return (*blobs_with_metadata)[current_index]; } } diff --git a/tests/integration/test_storage_azure_blob_storage/test.py b/tests/integration/test_storage_azure_blob_storage/test.py index f9d337b6d86..bb25ac4b029 100644 --- a/tests/integration/test_storage_azure_blob_storage/test.py +++ b/tests/integration/test_storage_azure_blob_storage/test.py @@ -551,7 +551,6 @@ def test_schema_inference_no_globs_tf(cluster): "499500\t2890\t332833500\ttest_schema_inference_no_globs_tf.csv\tcont/test_schema_inference_no_globs_tf.csv" ] - def test_schema_inference_from_globs_tf(cluster): node = cluster.instances["node"] unique_prefix = random.randint(1, 10000) From 478bad32376ff2787e83107c8274f4b743c569ac Mon Sep 17 00:00:00 2001 From: robot-clickhouse Date: Tue, 13 Jun 2023 15:04:11 +0000 Subject: [PATCH 02/12] Automatic style fix --- tests/integration/test_storage_azure_blob_storage/test.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/integration/test_storage_azure_blob_storage/test.py b/tests/integration/test_storage_azure_blob_storage/test.py index bb25ac4b029..f9d337b6d86 100644 --- a/tests/integration/test_storage_azure_blob_storage/test.py +++ b/tests/integration/test_storage_azure_blob_storage/test.py @@ -551,6 +551,7 @@ def test_schema_inference_no_globs_tf(cluster): "499500\t2890\t332833500\ttest_schema_inference_no_globs_tf.csv\tcont/test_schema_inference_no_globs_tf.csv" ] + def test_schema_inference_from_globs_tf(cluster): node = cluster.instances["node"] unique_prefix = random.randint(1, 10000) From bc4724490239ea34b4924da17ddabbb1f90e2bee Mon Sep 17 00:00:00 2001 From: Smita Kulkarni Date: Tue, 13 Jun 2023 17:06:40 +0200 Subject: [PATCH 03/12] Updated tests for CI checks --- tests/integration/test_storage_azure_blob_storage/test.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/integration/test_storage_azure_blob_storage/test.py b/tests/integration/test_storage_azure_blob_storage/test.py index f9d337b6d86..8ab5d416b03 100644 --- a/tests/integration/test_storage_azure_blob_storage/test.py +++ b/tests/integration/test_storage_azure_blob_storage/test.py @@ -558,6 +558,7 @@ def test_schema_inference_from_globs_tf(cluster): node = cluster.instances["node"] # type: ClickHouseInstance table_format = "column1 UInt32, column2 UInt32, column3 UInt32" max_path = "" + for i in range(10): for j in range(10): path = "{}/{}_{}/{}.csv".format( From 8dde50eb3aeef2bd83a1c7c0ec244dfe1c66bf32 Mon Sep 17 00:00:00 2001 From: robot-clickhouse Date: Tue, 13 Jun 2023 15:19:16 +0000 Subject: [PATCH 04/12] Automatic style fix --- tests/integration/test_storage_azure_blob_storage/test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_storage_azure_blob_storage/test.py b/tests/integration/test_storage_azure_blob_storage/test.py index 8ab5d416b03..3d9c751be3c 100644 --- a/tests/integration/test_storage_azure_blob_storage/test.py +++ b/tests/integration/test_storage_azure_blob_storage/test.py @@ -558,7 +558,7 @@ def test_schema_inference_from_globs_tf(cluster): node = cluster.instances["node"] # type: ClickHouseInstance table_format = "column1 UInt32, column2 UInt32, column3 UInt32" max_path = "" - + for i in range(10): for j in range(10): path = "{}/{}_{}/{}.csv".format( From 918b8c4585025e8a357115945cce501c05d31be1 Mon Sep 17 00:00:00 2001 From: Smita Kulkarni Date: Wed, 14 Jun 2023 10:51:59 +0200 Subject: [PATCH 05/12] Updated filename in test --- tests/integration/test_storage_azure_blob_storage/test.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/integration/test_storage_azure_blob_storage/test.py b/tests/integration/test_storage_azure_blob_storage/test.py index 3d9c751be3c..e99ae72eb8b 100644 --- a/tests/integration/test_storage_azure_blob_storage/test.py +++ b/tests/integration/test_storage_azure_blob_storage/test.py @@ -583,13 +583,13 @@ def test_partition_by_tf(cluster): table_format = "column1 UInt32, column2 UInt32, column3 UInt32" partition_by = "column3" values = "(1, 2, 3), (3, 2, 1), (78, 43, 45)" - filename = "test_tf_{_partition_id}.csv" + filename = "test_partition_tf_{_partition_id}.csv" azure_query( node, f"INSERT INTO TABLE FUNCTION azureBlobStorage('http://azurite1:10000/devstoreaccount1', 'cont', '{filename}', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV', 'auto', '{table_format}') PARTITION BY {partition_by} VALUES {values}", ) - assert "1,2,3\n" == get_azure_file_content("test_tf_3.csv") - assert "3,2,1\n" == get_azure_file_content("test_tf_1.csv") - assert "78,43,45\n" == get_azure_file_content("test_tf_45.csv") + assert "1,2,3\n" == get_azure_file_content("test_partition_tf_3.csv") + assert "3,2,1\n" == get_azure_file_content("test_partition_tf_1.csv") + assert "78,43,45\n" == get_azure_file_content("test_partition_tfs_45.csv") From c98a194b571e8c39504afc829fa91492f4dcbe2d Mon Sep 17 00:00:00 2001 From: Smita Kulkarni Date: Wed, 14 Jun 2023 11:00:11 +0200 Subject: [PATCH 06/12] Updated unique names for test to avoid same names by random numbers --- tests/integration/test_storage_azure_blob_storage/test.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/integration/test_storage_azure_blob_storage/test.py b/tests/integration/test_storage_azure_blob_storage/test.py index e99ae72eb8b..e2077f8face 100644 --- a/tests/integration/test_storage_azure_blob_storage/test.py +++ b/tests/integration/test_storage_azure_blob_storage/test.py @@ -300,10 +300,10 @@ def test_put_get_with_globs(cluster): azure_query( node, - f"CREATE TABLE test_{i}_{j} ({table_format}) Engine = AzureBlobStorage(azure_conf2, container='cont', blob_path='{path}', format='CSV')", + f"CREATE TABLE test_put_{i}_{j} ({table_format}) Engine = AzureBlobStorage(azure_conf2, container='cont', blob_path='{path}', format='CSV')", ) - query = f"insert into test_{i}_{j} VALUES {values}" + query = f"insert into test_put_{i}_{j} VALUES {values}" azure_query(node, query) azure_query( @@ -332,9 +332,9 @@ def test_azure_glob_scheherazade(cluster): unique_num = random.randint(1, 10000) azure_query( node, - f"CREATE TABLE test_{i}_{unique_num} ({table_format}) Engine = AzureBlobStorage(azure_conf2, container='cont', blob_path='{path}', format='CSV')", + f"CREATE TABLE test_scheherazade_{i}_{unique_num} ({table_format}) Engine = AzureBlobStorage(azure_conf2, container='cont', blob_path='{path}', format='CSV')", ) - query = f"insert into test_{i}_{unique_num} VALUES {values}" + query = f"insert into test_scheherazade_{i}_{unique_num} VALUES {values}" azure_query(node, query) jobs.append( From f6bad2c064efeb997755be8b9f313a3859a6f81f Mon Sep 17 00:00:00 2001 From: robot-clickhouse Date: Wed, 14 Jun 2023 09:13:38 +0000 Subject: [PATCH 07/12] Automatic style fix --- tests/integration/test_storage_azure_blob_storage/test.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/integration/test_storage_azure_blob_storage/test.py b/tests/integration/test_storage_azure_blob_storage/test.py index e2077f8face..0002ccbf483 100644 --- a/tests/integration/test_storage_azure_blob_storage/test.py +++ b/tests/integration/test_storage_azure_blob_storage/test.py @@ -334,7 +334,9 @@ def test_azure_glob_scheherazade(cluster): node, f"CREATE TABLE test_scheherazade_{i}_{unique_num} ({table_format}) Engine = AzureBlobStorage(azure_conf2, container='cont', blob_path='{path}', format='CSV')", ) - query = f"insert into test_scheherazade_{i}_{unique_num} VALUES {values}" + query = ( + f"insert into test_scheherazade_{i}_{unique_num} VALUES {values}" + ) azure_query(node, query) jobs.append( From 011d666073968b1a8cbbd867513e4e8adec1362b Mon Sep 17 00:00:00 2001 From: Smita Kulkarni Date: Wed, 14 Jun 2023 14:55:34 +0200 Subject: [PATCH 08/12] Fixed typo in tests --- tests/integration/test_storage_azure_blob_storage/test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_storage_azure_blob_storage/test.py b/tests/integration/test_storage_azure_blob_storage/test.py index 0002ccbf483..0de325ccd14 100644 --- a/tests/integration/test_storage_azure_blob_storage/test.py +++ b/tests/integration/test_storage_azure_blob_storage/test.py @@ -594,4 +594,4 @@ def test_partition_by_tf(cluster): assert "1,2,3\n" == get_azure_file_content("test_partition_tf_3.csv") assert "3,2,1\n" == get_azure_file_content("test_partition_tf_1.csv") - assert "78,43,45\n" == get_azure_file_content("test_partition_tfs_45.csv") + assert "78,43,45\n" == get_azure_file_content("test_partition_tf_45.csv") From 5229544b559366a9a10df0ecb17485a54eee51fb Mon Sep 17 00:00:00 2001 From: Smita Kulkarni Date: Wed, 21 Jun 2023 22:17:39 +0200 Subject: [PATCH 09/12] Added function getCurrrentBatchAndScheduleNext to IObjectStorageIteratorAsync --- src/Disks/ObjectStorages/ObjectStorageIterator.h | 6 ++++++ .../ObjectStorageIteratorAsync.cpp | 16 ++++++++++++++++ .../ObjectStorages/ObjectStorageIteratorAsync.h | 3 ++- src/Storages/StorageAzureBlob.cpp | 11 +++++++---- 4 files changed, 31 insertions(+), 5 deletions(-) diff --git a/src/Disks/ObjectStorages/ObjectStorageIterator.h b/src/Disks/ObjectStorages/ObjectStorageIterator.h index 2ff5ce60acc..841b0ea6664 100644 --- a/src/Disks/ObjectStorages/ObjectStorageIterator.h +++ b/src/Disks/ObjectStorages/ObjectStorageIterator.h @@ -14,6 +14,7 @@ public: virtual bool isValid() = 0; virtual RelativePathWithMetadata current() = 0; virtual RelativePathsWithMetadata currentBatch() = 0; + virtual std::optional getCurrrentBatchAndScheduleNext() = 0; virtual size_t getAccumulatedSize() const = 0; virtual ~IObjectStorageIterator() = default; @@ -53,6 +54,11 @@ public: return batch; } + virtual std::optional getCurrrentBatchAndScheduleNext() override + { + return std::nullopt; + } + size_t getAccumulatedSize() const override { return batch.size(); diff --git a/src/Disks/ObjectStorages/ObjectStorageIteratorAsync.cpp b/src/Disks/ObjectStorages/ObjectStorageIteratorAsync.cpp index f91c19f2fb9..7425f629a5a 100644 --- a/src/Disks/ObjectStorages/ObjectStorageIteratorAsync.cpp +++ b/src/Disks/ObjectStorages/ObjectStorageIteratorAsync.cpp @@ -100,6 +100,22 @@ RelativePathsWithMetadata IObjectStorageIteratorAsync::currentBatch() return current_batch; } +std::optional IObjectStorageIteratorAsync::getCurrrentBatchAndScheduleNext() +{ + std::lock_guard lock(mutex); + if (!is_initialized) + nextBatch(); + + if (current_batch_iterator != current_batch.end()) + { + auto temp_current_batch = current_batch; + nextBatch(); + return temp_current_batch; + } + + return std::nullopt; +} + size_t IObjectStorageIteratorAsync::getAccumulatedSize() const { return accumulated_size.load(std::memory_order_relaxed); diff --git a/src/Disks/ObjectStorages/ObjectStorageIteratorAsync.h b/src/Disks/ObjectStorages/ObjectStorageIteratorAsync.h index a2b06da9a91..b0dd3cef39c 100644 --- a/src/Disks/ObjectStorages/ObjectStorageIteratorAsync.h +++ b/src/Disks/ObjectStorages/ObjectStorageIteratorAsync.h @@ -27,6 +27,7 @@ public: RelativePathWithMetadata current() override; RelativePathsWithMetadata currentBatch() override; size_t getAccumulatedSize() const override; + std::optional getCurrrentBatchAndScheduleNext() override; ~IObjectStorageIteratorAsync() override { @@ -48,7 +49,7 @@ protected: bool is_initialized{false}; bool is_finished{false}; - mutable std::mutex mutex; + mutable std::recursive_mutex mutex; ThreadPool list_objects_pool; ThreadPoolCallbackRunner list_objects_scheduler; std::future outcome_future; diff --git a/src/Storages/StorageAzureBlob.cpp b/src/Storages/StorageAzureBlob.cpp index b9d59f04001..91dc92f09e8 100644 --- a/src/Storages/StorageAzureBlob.cpp +++ b/src/Storages/StorageAzureBlob.cpp @@ -882,7 +882,6 @@ StorageAzureBlobSource::Iterator::Iterator( RelativePathWithMetadata StorageAzureBlobSource::Iterator::next() { - std::lock_guard lock(next_mutex); if (is_finished) return {}; @@ -901,6 +900,7 @@ RelativePathWithMetadata StorageAzureBlobSource::Iterator::next() { bool need_new_batch = false; { + std::lock_guard lock(next_mutex); need_new_batch = !blobs_with_metadata || index >= blobs_with_metadata->size(); } @@ -909,10 +909,10 @@ RelativePathWithMetadata StorageAzureBlobSource::Iterator::next() RelativePathsWithMetadata new_batch; while (new_batch.empty()) { - if (object_storage_iterator->isValid()) + auto result = object_storage_iterator->getCurrrentBatchAndScheduleNext(); + if (result.has_value()) { - new_batch = object_storage_iterator->currentBatch(); - object_storage_iterator->nextBatch(); + new_batch = result.value(); } else { @@ -945,6 +945,7 @@ RelativePathWithMetadata StorageAzureBlobSource::Iterator::next() VirtualColumnUtils::filterBlockWithQuery(query, block, getContext(), filter_ast); const auto & idxs = typeid_cast(*block.getByName("_idx").column); + std::lock_guard lock(next_mutex); blob_path_with_globs.reset(); blob_path_with_globs.emplace(); for (UInt64 idx : idxs.getData()) @@ -960,6 +961,7 @@ RelativePathWithMetadata StorageAzureBlobSource::Iterator::next() if (outer_blobs) outer_blobs->insert(outer_blobs->end(), new_batch.begin(), new_batch.end()); + std::lock_guard lock(next_mutex); blobs_with_metadata = std::move(new_batch); for (const auto & [_, info] : *blobs_with_metadata) total_size.fetch_add(info.size_bytes, std::memory_order_relaxed); @@ -967,6 +969,7 @@ RelativePathWithMetadata StorageAzureBlobSource::Iterator::next() } size_t current_index = index.fetch_add(1, std::memory_order_relaxed); + std::lock_guard lock(next_mutex); return (*blobs_with_metadata)[current_index]; } } From db8120722f5281996e29972ba2b9f3eab965e6d1 Mon Sep 17 00:00:00 2001 From: Smita Kulkarni Date: Fri, 23 Jun 2023 12:00:20 +0200 Subject: [PATCH 10/12] Separated functionality of Iterator into GlobIterator and KeysIterator, added test for filter by file --- src/Storages/StorageAzureBlob.cpp | 332 +++++++++--------- src/Storages/StorageAzureBlob.h | 64 +++- .../test_storage_azure_blob_storage/test.py | 16 + 3 files changed, 233 insertions(+), 179 deletions(-) diff --git a/src/Storages/StorageAzureBlob.cpp b/src/Storages/StorageAzureBlob.cpp index 91dc92f09e8..8a1c81e808e 100644 --- a/src/Storages/StorageAzureBlob.cpp +++ b/src/Storages/StorageAzureBlob.cpp @@ -611,19 +611,19 @@ Pipe StorageAzureBlob::read( requested_virtual_columns.push_back(virtual_column); } - std::shared_ptr iterator_wrapper; + std::shared_ptr iterator_wrapper; if (configuration.withGlobs()) { /// Iterate through disclosed globs and make a source for each file - iterator_wrapper = std::make_shared( - object_storage.get(), configuration.container, std::nullopt, - configuration.blob_path, query_info.query, virtual_block, local_context, nullptr); + iterator_wrapper = std::make_shared( + object_storage.get(), configuration.container, configuration.blob_path, + query_info.query, virtual_block, local_context, nullptr); } else { - iterator_wrapper = std::make_shared( + iterator_wrapper = std::make_shared( object_storage.get(), configuration.container, configuration.blobs_paths, - std::nullopt, query_info.query, virtual_block, local_context, nullptr); + query_info.query, virtual_block, local_context, nullptr); } ColumnsDescription columns_description; @@ -786,201 +786,129 @@ static void addPathToVirtualColumns(Block & block, const String & path, size_t i block.getByName("_idx").column->assumeMutableRef().insert(idx); } -StorageAzureBlobSource::Iterator::Iterator( +StorageAzureBlobSource::GlobIterator::GlobIterator( AzureObjectStorage * object_storage_, const std::string & container_, - std::optional keys_, - std::optional blob_path_with_globs_, + String blob_path_with_globs_, ASTPtr query_, const Block & virtual_header_, ContextPtr context_, RelativePathsWithMetadata * outer_blobs_) - : WithContext(context_) + : IIterator(context_) , object_storage(object_storage_) , container(container_) - , keys(keys_) , blob_path_with_globs(blob_path_with_globs_) , query(query_) , virtual_header(virtual_header_) , outer_blobs(outer_blobs_) { - if (keys.has_value() && blob_path_with_globs.has_value()) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot specify keys and glob simultaneously it's a bug"); - if (!keys.has_value() && !blob_path_with_globs.has_value()) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Both keys and glob mask are not specified"); + const String key_prefix = blob_path_with_globs.substr(0, blob_path_with_globs.find_first_of("*?{")); - if (keys) + /// We don't have to list bucket, because there is no asterisks. + if (key_prefix.size() == blob_path_with_globs.size()) { - Strings all_keys = *keys; - - blobs_with_metadata.emplace(); - /// Create a virtual block with one row to construct filter - if (query && virtual_header && !all_keys.empty()) - { - /// Append "idx" column as the filter result - virtual_header.insert({ColumnUInt64::create(), std::make_shared(), "_idx"}); - - auto block = virtual_header.cloneEmpty(); - addPathToVirtualColumns(block, fs::path(container) / all_keys.front(), 0); - - VirtualColumnUtils::prepareFilterBlockWithQuery(query, getContext(), block, filter_ast); - - if (filter_ast) - { - block = virtual_header.cloneEmpty(); - for (size_t i = 0; i < all_keys.size(); ++i) - addPathToVirtualColumns(block, fs::path(container) / all_keys[i], i); - - VirtualColumnUtils::filterBlockWithQuery(query, block, getContext(), filter_ast); - const auto & idxs = typeid_cast(*block.getByName("_idx").column); - - Strings filtered_keys; - filtered_keys.reserve(block.rows()); - for (UInt64 idx : idxs.getData()) - filtered_keys.emplace_back(std::move(all_keys[idx])); - - all_keys = std::move(filtered_keys); - } - } - - for (auto && key : all_keys) - { - ObjectMetadata object_metadata = object_storage->getObjectMetadata(key); - total_size += object_metadata.size_bytes; - blobs_with_metadata->emplace_back(RelativePathWithMetadata{key, object_metadata}); - if (outer_blobs) - outer_blobs->emplace_back(blobs_with_metadata->back()); - } - } - else - { - const String key_prefix = blob_path_with_globs->substr(0, blob_path_with_globs->find_first_of("*?{")); - - /// We don't have to list bucket, because there is no asterisks. - if (key_prefix.size() == blob_path_with_globs->size()) - { - ObjectMetadata object_metadata = object_storage->getObjectMetadata(*blob_path_with_globs); - blobs_with_metadata->emplace_back(*blob_path_with_globs, object_metadata); - if (outer_blobs) - outer_blobs->emplace_back(blobs_with_metadata->back()); - return; - } - - object_storage_iterator = object_storage->iterate(key_prefix); - - matcher = std::make_unique(makeRegexpPatternFromGlobs(*blob_path_with_globs)); - - if (!matcher->ok()) - throw Exception(ErrorCodes::CANNOT_COMPILE_REGEXP, - "Cannot compile regex from glob ({}): {}", *blob_path_with_globs, matcher->error()); - - recursive = *blob_path_with_globs == "/**" ? true : false; + ObjectMetadata object_metadata = object_storage->getObjectMetadata(blob_path_with_globs); + blobs_with_metadata.emplace_back(blob_path_with_globs, object_metadata); + if (outer_blobs) + outer_blobs->emplace_back(blobs_with_metadata.back()); + return; } + object_storage_iterator = object_storage->iterate(key_prefix); + + matcher = std::make_unique(makeRegexpPatternFromGlobs(blob_path_with_globs)); + + if (!matcher->ok()) + throw Exception( + ErrorCodes::CANNOT_COMPILE_REGEXP, "Cannot compile regex from glob ({}): {}", blob_path_with_globs, matcher->error()); + + recursive = blob_path_with_globs == "/**" ? true : false; } -RelativePathWithMetadata StorageAzureBlobSource::Iterator::next() +RelativePathWithMetadata StorageAzureBlobSource::GlobIterator::next() { + std::lock_guard lock(next_mutex); + if (is_finished) return {}; - if (keys) + bool need_new_batch = blobs_with_metadata.empty() || index >= blobs_with_metadata.size(); + + if (need_new_batch) { - size_t current_index = index.fetch_add(1, std::memory_order_relaxed); - if (current_index >= blobs_with_metadata->size()) + RelativePathsWithMetadata new_batch; + while (new_batch.empty()) { - is_finished = true; - return {}; - } - - return (*blobs_with_metadata)[current_index]; - } - else - { - bool need_new_batch = false; - { - std::lock_guard lock(next_mutex); - need_new_batch = !blobs_with_metadata || index >= blobs_with_metadata->size(); - } - - if (need_new_batch) - { - RelativePathsWithMetadata new_batch; - while (new_batch.empty()) + auto result = object_storage_iterator->getCurrrentBatchAndScheduleNext(); + if (result.has_value()) { - auto result = object_storage_iterator->getCurrrentBatchAndScheduleNext(); - if (result.has_value()) - { - new_batch = result.value(); - } - else - { - is_finished = true; - return {}; - } - - for (auto it = new_batch.begin(); it != new_batch.end();) - { - if (!recursive && !re2::RE2::FullMatch(it->relative_path, *matcher)) - it = new_batch.erase(it); - else - ++it; - } - } - - index.store(0, std::memory_order_relaxed); - if (!is_initialized) - { - createFilterAST(new_batch.front().relative_path); - is_initialized = true; - } - - if (filter_ast) - { - auto block = virtual_header.cloneEmpty(); - for (size_t i = 0; i < new_batch.size(); ++i) - addPathToVirtualColumns(block, fs::path(container) / new_batch[i].relative_path, i); - - VirtualColumnUtils::filterBlockWithQuery(query, block, getContext(), filter_ast); - const auto & idxs = typeid_cast(*block.getByName("_idx").column); - - std::lock_guard lock(next_mutex); - blob_path_with_globs.reset(); - blob_path_with_globs.emplace(); - for (UInt64 idx : idxs.getData()) - { - total_size.fetch_add(new_batch[idx].metadata.size_bytes, std::memory_order_relaxed); - blobs_with_metadata->emplace_back(std::move(new_batch[idx])); - if (outer_blobs) - outer_blobs->emplace_back(blobs_with_metadata->back()); - } + new_batch = result.value(); } else { - if (outer_blobs) - outer_blobs->insert(outer_blobs->end(), new_batch.begin(), new_batch.end()); + is_finished = true; + return {}; + } - std::lock_guard lock(next_mutex); - blobs_with_metadata = std::move(new_batch); - for (const auto & [_, info] : *blobs_with_metadata) - total_size.fetch_add(info.size_bytes, std::memory_order_relaxed); + for (auto it = new_batch.begin(); it != new_batch.end();) + { + if (!recursive && !re2::RE2::FullMatch(it->relative_path, *matcher)) + it = new_batch.erase(it); + else + ++it; } } - size_t current_index = index.fetch_add(1, std::memory_order_relaxed); - std::lock_guard lock(next_mutex); - return (*blobs_with_metadata)[current_index]; + index.store(0, std::memory_order_relaxed); + if (!is_initialized) + { + createFilterAST(new_batch.front().relative_path); + is_initialized = true; + } + + if (filter_ast) + { + auto block = virtual_header.cloneEmpty(); + for (size_t i = 0; i < new_batch.size(); ++i) + addPathToVirtualColumns(block, fs::path(container) / new_batch[i].relative_path, i); + + VirtualColumnUtils::filterBlockWithQuery(query, block, getContext(), filter_ast); + const auto & idxs = typeid_cast(*block.getByName("_idx").column); + + blobs_with_metadata.clear(); + for (UInt64 idx : idxs.getData()) + { + total_size.fetch_add(new_batch[idx].metadata.size_bytes, std::memory_order_relaxed); + blobs_with_metadata.emplace_back(std::move(new_batch[idx])); + if (outer_blobs) + outer_blobs->emplace_back(blobs_with_metadata.back()); + } + } + else + { + if (outer_blobs) + outer_blobs->insert(outer_blobs->end(), new_batch.begin(), new_batch.end()); + + blobs_with_metadata = std::move(new_batch); + for (const auto & [_, info] : blobs_with_metadata) + total_size.fetch_add(info.size_bytes, std::memory_order_relaxed); + } } + + size_t current_index = index.fetch_add(1, std::memory_order_relaxed); + if (current_index >= blobs_with_metadata.size()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Index out of bound for blob metadata"); + return blobs_with_metadata[current_index]; } -size_t StorageAzureBlobSource::Iterator::getTotalSize() const +size_t StorageAzureBlobSource::GlobIterator::getTotalSize() const { return total_size.load(std::memory_order_relaxed); } -void StorageAzureBlobSource::Iterator::createFilterAST(const String & any_key) +void StorageAzureBlobSource::GlobIterator::createFilterAST(const String & any_key) { if (!query || !virtual_header) return; @@ -995,6 +923,78 @@ void StorageAzureBlobSource::Iterator::createFilterAST(const String & any_key) } +StorageAzureBlobSource::KeysIterator::KeysIterator( + AzureObjectStorage * object_storage_, + const std::string & container_, + Strings keys_, + ASTPtr query_, + const Block & virtual_header_, + ContextPtr context_, + RelativePathsWithMetadata * outer_blobs_) + : IIterator(context_) + , object_storage(object_storage_) + , container(container_) + , query(query_) + , virtual_header(virtual_header_) + , outer_blobs(outer_blobs_) +{ + Strings all_keys = keys_; + + /// Create a virtual block with one row to construct filter + if (query && virtual_header && !all_keys.empty()) + { + /// Append "idx" column as the filter result + virtual_header.insert({ColumnUInt64::create(), std::make_shared(), "_idx"}); + + auto block = virtual_header.cloneEmpty(); + addPathToVirtualColumns(block, fs::path(container) / all_keys.front(), 0); + + VirtualColumnUtils::prepareFilterBlockWithQuery(query, getContext(), block, filter_ast); + + if (filter_ast) + { + block = virtual_header.cloneEmpty(); + for (size_t i = 0; i < all_keys.size(); ++i) + addPathToVirtualColumns(block, fs::path(container) / all_keys[i], i); + + VirtualColumnUtils::filterBlockWithQuery(query, block, getContext(), filter_ast); + const auto & idxs = typeid_cast(*block.getByName("_idx").column); + + Strings filtered_keys; + filtered_keys.reserve(block.rows()); + for (UInt64 idx : idxs.getData()) + filtered_keys.emplace_back(std::move(all_keys[idx])); + + all_keys = std::move(filtered_keys); + } + } + + for (auto && key : all_keys) + { + ObjectMetadata object_metadata = object_storage->getObjectMetadata(key); + total_size += object_metadata.size_bytes; + keys.emplace_back(RelativePathWithMetadata{key, object_metadata}); + } + + if (outer_blobs) + *outer_blobs = keys; +} + +RelativePathWithMetadata StorageAzureBlobSource::KeysIterator::next() +{ + size_t current_index = index.fetch_add(1, std::memory_order_relaxed); + if (current_index >= keys.size()) + return {}; + + return keys[current_index]; +} + +size_t StorageAzureBlobSource::KeysIterator::getTotalSize() const +{ + return total_size.load(std::memory_order_relaxed); +} + + Chunk StorageAzureBlobSource::generate() { while (true) @@ -1072,7 +1072,7 @@ StorageAzureBlobSource::StorageAzureBlobSource( String compression_hint_, AzureObjectStorage * object_storage_, const String & container_, - std::shared_ptr file_iterator_) + std::shared_ptr file_iterator_) :ISource(getHeader(sample_block_, requested_virtual_columns_)) , WithContext(context_) , requested_virtual_columns(requested_virtual_columns_) @@ -1167,18 +1167,16 @@ ColumnsDescription StorageAzureBlob::getTableStructureFromData( ContextPtr ctx) { RelativePathsWithMetadata read_keys; - std::shared_ptr file_iterator; + std::shared_ptr file_iterator; if (configuration.withGlobs()) { - file_iterator = std::make_shared( - object_storage, configuration.container, std::nullopt, - configuration.blob_path, nullptr, Block{}, ctx, &read_keys); + file_iterator = std::make_shared( + object_storage, configuration.container, configuration.blob_path, nullptr, Block{}, ctx, &read_keys); } else { - file_iterator = std::make_shared( - object_storage, configuration.container, configuration.blobs_paths, - std::nullopt, nullptr, Block{}, ctx, &read_keys); + file_iterator = std::make_shared( + object_storage, configuration.container, configuration.blobs_paths, nullptr, Block{}, ctx, &read_keys); } std::optional columns_from_cache; diff --git a/src/Storages/StorageAzureBlob.h b/src/Storages/StorageAzureBlob.h index e2001fa24ae..31b2beb05aa 100644 --- a/src/Storages/StorageAzureBlob.h +++ b/src/Storages/StorageAzureBlob.h @@ -142,28 +142,37 @@ private: class StorageAzureBlobSource : public ISource, WithContext { public: - class Iterator : WithContext + class IIterator : public WithContext { public: - Iterator( + IIterator(ContextPtr context_):WithContext(context_) {} + virtual ~IIterator() = default; + virtual RelativePathWithMetadata next() = 0; + virtual size_t getTotalSize() const = 0; + + RelativePathWithMetadata operator ()() { return next(); } + }; + + class GlobIterator : public IIterator + { + public: + GlobIterator( AzureObjectStorage * object_storage_, const std::string & container_, - std::optional keys_, - std::optional blob_path_with_globs_, + String blob_path_with_globs_, ASTPtr query_, const Block & virtual_header_, ContextPtr context_, RelativePathsWithMetadata * outer_blobs_); - RelativePathWithMetadata next(); - size_t getTotalSize() const; - ~Iterator() = default; + RelativePathWithMetadata next() override; + size_t getTotalSize() const override; + ~GlobIterator() override = default; private: AzureObjectStorage * object_storage; std::string container; - std::optional keys; - std::optional blob_path_with_globs; + String blob_path_with_globs; ASTPtr query; ASTPtr filter_ast; Block virtual_header; @@ -171,7 +180,7 @@ public: std::atomic index = 0; std::atomic total_size = 0; - std::optional blobs_with_metadata; + RelativePathsWithMetadata blobs_with_metadata; RelativePathsWithMetadata * outer_blobs; ObjectStorageIteratorPtr object_storage_iterator; bool recursive{false}; @@ -184,6 +193,37 @@ public: std::mutex next_mutex; }; + class KeysIterator : public IIterator + { + public: + KeysIterator( + AzureObjectStorage * object_storage_, + const std::string & container_, + Strings keys_, + ASTPtr query_, + const Block & virtual_header_, + ContextPtr context_, + RelativePathsWithMetadata * outer_blobs_); + + RelativePathWithMetadata next() override; + size_t getTotalSize() const override; + ~KeysIterator() override = default; + + private: + AzureObjectStorage * object_storage; + std::string container; + RelativePathsWithMetadata keys; + + ASTPtr query; + ASTPtr filter_ast; + Block virtual_header; + + std::atomic index = 0; + std::atomic total_size = 0; + + RelativePathsWithMetadata * outer_blobs; + }; + StorageAzureBlobSource( const std::vector & requested_virtual_columns_, const String & format_, @@ -196,7 +236,7 @@ public: String compression_hint_, AzureObjectStorage * object_storage_, const String & container_, - std::shared_ptr file_iterator_); + std::shared_ptr file_iterator_); ~StorageAzureBlobSource() override; @@ -217,7 +257,7 @@ private: String compression_hint; AzureObjectStorage * object_storage; String container; - std::shared_ptr file_iterator; + std::shared_ptr file_iterator; struct ReaderHolder { diff --git a/tests/integration/test_storage_azure_blob_storage/test.py b/tests/integration/test_storage_azure_blob_storage/test.py index 0de325ccd14..5f812cbe4fc 100644 --- a/tests/integration/test_storage_azure_blob_storage/test.py +++ b/tests/integration/test_storage_azure_blob_storage/test.py @@ -595,3 +595,19 @@ def test_partition_by_tf(cluster): assert "1,2,3\n" == get_azure_file_content("test_partition_tf_3.csv") assert "3,2,1\n" == get_azure_file_content("test_partition_tf_1.csv") assert "78,43,45\n" == get_azure_file_content("test_partition_tf_45.csv") + +def test_filter_using_file(cluster): + node = cluster.instances["node"] + table_format = "column1 UInt32, column2 UInt32, column3 UInt32" + partition_by = "column3" + values = "(1, 2, 3), (3, 2, 1), (78, 43, 45)" + filename = "test_partition_tf_{_partition_id}.csv" + + azure_query( + node, + f"INSERT INTO TABLE FUNCTION azureBlobStorage('http://azurite1:10000/devstoreaccount1', 'cont', '{filename}', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV', 'auto', '{table_format}') PARTITION BY {partition_by} VALUES {values}", + ) + + query = f"select count(*) from azureBlobStorage('http://azurite1:10000/devstoreaccount1', 'cont', 'test_partition_tf_*.csv', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV', 'auto', '{table_format}') WHERE _file='test_partition_tf_3.csv'" + assert azure_query(node, query) == "1\n" + From b6517217d8fe5c56bf5086a8965d626674cbfd7a Mon Sep 17 00:00:00 2001 From: robot-clickhouse Date: Fri, 23 Jun 2023 10:17:20 +0000 Subject: [PATCH 11/12] Automatic style fix --- tests/integration/test_storage_azure_blob_storage/test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_storage_azure_blob_storage/test.py b/tests/integration/test_storage_azure_blob_storage/test.py index 5f812cbe4fc..6089466ff5d 100644 --- a/tests/integration/test_storage_azure_blob_storage/test.py +++ b/tests/integration/test_storage_azure_blob_storage/test.py @@ -596,6 +596,7 @@ def test_partition_by_tf(cluster): assert "3,2,1\n" == get_azure_file_content("test_partition_tf_1.csv") assert "78,43,45\n" == get_azure_file_content("test_partition_tf_45.csv") + def test_filter_using_file(cluster): node = cluster.instances["node"] table_format = "column1 UInt32, column2 UInt32, column3 UInt32" @@ -610,4 +611,3 @@ def test_filter_using_file(cluster): query = f"select count(*) from azureBlobStorage('http://azurite1:10000/devstoreaccount1', 'cont', 'test_partition_tf_*.csv', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV', 'auto', '{table_format}') WHERE _file='test_partition_tf_3.csv'" assert azure_query(node, query) == "1\n" - From b5ad349c4f4a877b1cc2386a17a77ff65724394c Mon Sep 17 00:00:00 2001 From: Smita Kulkarni Date: Fri, 23 Jun 2023 14:34:24 +0200 Subject: [PATCH 12/12] Removed unwanted atomic variables to non atomic --- src/Storages/StorageAzureBlob.cpp | 4 ++-- src/Storages/StorageAzureBlob.h | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/Storages/StorageAzureBlob.cpp b/src/Storages/StorageAzureBlob.cpp index 8a1c81e808e..d582cfe6f4c 100644 --- a/src/Storages/StorageAzureBlob.cpp +++ b/src/Storages/StorageAzureBlob.cpp @@ -860,7 +860,7 @@ RelativePathWithMetadata StorageAzureBlobSource::GlobIterator::next() } } - index.store(0, std::memory_order_relaxed); + index = 0; if (!is_initialized) { createFilterAST(new_batch.front().relative_path); @@ -896,7 +896,7 @@ RelativePathWithMetadata StorageAzureBlobSource::GlobIterator::next() } } - size_t current_index = index.fetch_add(1, std::memory_order_relaxed); + size_t current_index = index++; if (current_index >= blobs_with_metadata.size()) throw Exception(ErrorCodes::LOGICAL_ERROR, "Index out of bound for blob metadata"); return blobs_with_metadata[current_index]; diff --git a/src/Storages/StorageAzureBlob.h b/src/Storages/StorageAzureBlob.h index 31b2beb05aa..746c84841ca 100644 --- a/src/Storages/StorageAzureBlob.h +++ b/src/Storages/StorageAzureBlob.h @@ -177,7 +177,7 @@ public: ASTPtr filter_ast; Block virtual_header; - std::atomic index = 0; + size_t index = 0; std::atomic total_size = 0; RelativePathsWithMetadata blobs_with_metadata; @@ -188,8 +188,8 @@ public: std::unique_ptr matcher; void createFilterAST(const String & any_key); - std::atomic is_finished = false; - std::atomic is_initialized = false; + bool is_finished = false; + bool is_initialized = false; std::mutex next_mutex; };