From 189cbe25fe4321f020a9f7b72c901d7358c1f1af Mon Sep 17 00:00:00 2001 From: Yarik Briukhovetskyi <114298166+yariks5s@users.noreply.github.com> Date: Tue, 27 Aug 2024 16:28:18 +0200 Subject: [PATCH] init --- .../StorageObjectStorageSource.cpp | 12 +++--- .../StorageObjectStorageSource.h | 1 + src/Storages/StorageFile.cpp | 4 +- src/Storages/StorageURL.cpp | 7 +++- src/Storages/VirtualColumnUtils.cpp | 37 +++++++++++++++--- src/Storages/VirtualColumnUtils.h | 8 ++-- ...3231_hive_partitioning_filtering.reference | 6 +++ .../03231_hive_partitioning_filtering.sh | 29 ++++++++++++++ .../partitioning/array=[1,2,3]/sample.parquet | Bin 0 -> 1308 bytes .../partitioning/array=[1,2,4]/sample.parquet | Bin 0 -> 1308 bytes .../partitioning/column0=Stacy/sample.parquet | Bin 0 -> 1308 bytes .../partitioning/identifier=2071/email.csv | 5 +++ 12 files changed, 91 insertions(+), 18 deletions(-) create mode 100644 tests/queries/0_stateless/03231_hive_partitioning_filtering.reference create mode 100644 tests/queries/0_stateless/03231_hive_partitioning_filtering.sh create mode 100644 tests/queries/0_stateless/data_hive/partitioning/array=[1,2,3]/sample.parquet create mode 100644 tests/queries/0_stateless/data_hive/partitioning/array=[1,2,4]/sample.parquet create mode 100644 tests/queries/0_stateless/data_hive/partitioning/column0=Stacy/sample.parquet create mode 100644 tests/queries/0_stateless/data_hive/partitioning/identifier=2071/email.csv diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp index 04e319cd0b8..0d4471e3bda 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp @@ -131,10 +131,11 @@ std::shared_ptr StorageObjectStorageSourc else { ConfigurationPtr copy_configuration = configuration->clone(); - auto filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns); + auto keys = configuration->getPaths(); + String partitioning_path = fs::path(configuration->getNamespace()) / keys[0]; + auto filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns, partitioning_path, local_context); if (filter_dag) { - auto keys = configuration->getPaths(); std::vector paths; paths.reserve(keys.size()); for (const auto & key : keys) @@ -142,7 +143,7 @@ std::shared_ptr StorageObjectStorageSourc VirtualColumnUtils::buildSetsForDAG(*filter_dag, local_context); auto actions = std::make_shared(std::move(*filter_dag)); - VirtualColumnUtils::filterByPathOrFile(keys, paths, actions, virtual_columns); + VirtualColumnUtils::filterByPathOrFile(keys, paths, actions, virtual_columns, local_context); copy_configuration->setPaths(keys); } @@ -492,6 +493,7 @@ StorageObjectStorageSource::GlobIterator::GlobIterator( , virtual_columns(virtual_columns_) , throw_on_zero_files_match(throw_on_zero_files_match_) , read_keys(read_keys_) + , local_context(context_) , file_progress_callback(file_progress_callback_) { if (configuration->isNamespaceWithGlobs()) @@ -513,7 +515,7 @@ StorageObjectStorageSource::GlobIterator::GlobIterator( } recursive = key_with_globs == "/**"; - if (auto filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns)) + if (auto filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns, key_with_globs, local_context)) { VirtualColumnUtils::buildSetsForDAG(*filter_dag, getContext()); filter_expr = std::make_shared(std::move(*filter_dag)); @@ -588,7 +590,7 @@ StorageObjectStorage::ObjectInfoPtr StorageObjectStorageSource::GlobIterator::ne for (const auto & object_info : new_batch) paths.push_back(getUniqueStoragePathIdentifier(*configuration, *object_info, false)); - VirtualColumnUtils::filterByPathOrFile(new_batch, paths, filter_expr, virtual_columns); + VirtualColumnUtils::filterByPathOrFile(new_batch, paths, filter_expr, virtual_columns, local_context); LOG_TEST(logger, "Filtered files: {} -> {}", paths.size(), new_batch.size()); } diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSource.h b/src/Storages/ObjectStorage/StorageObjectStorageSource.h index 7ae7a2358e9..8ee3b023638 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSource.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageSource.h @@ -220,6 +220,7 @@ private: bool is_finished = false; bool first_iteration = true; std::mutex next_mutex; + const ContextPtr local_context; std::function file_progress_callback; }; diff --git a/src/Storages/StorageFile.cpp b/src/Storages/StorageFile.cpp index 50294df32a4..639af41c1cd 100644 --- a/src/Storages/StorageFile.cpp +++ b/src/Storages/StorageFile.cpp @@ -1140,13 +1140,13 @@ StorageFileSource::FilesIterator::FilesIterator( { std::optional filter_dag; if (!distributed_processing && !archive_info && !files.empty()) - filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns); + filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns, files[0], context_); if (filter_dag) { VirtualColumnUtils::buildSetsForDAG(*filter_dag, context_); auto actions = std::make_shared(std::move(*filter_dag)); - VirtualColumnUtils::filterByPathOrFile(files, files, actions, virtual_columns); + VirtualColumnUtils::filterByPathOrFile(files, files, actions, virtual_columns, context_); } } diff --git a/src/Storages/StorageURL.cpp b/src/Storages/StorageURL.cpp index fc1354b780a..572c4f20fa3 100644 --- a/src/Storages/StorageURL.cpp +++ b/src/Storages/StorageURL.cpp @@ -214,7 +214,10 @@ public: std::optional filter_dag; if (!uris.empty()) - filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns); + { + String partitioning_path = Poco::URI(uris[0]).getPath(); + filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns, partitioning_path, context); + } if (filter_dag) { @@ -225,7 +228,7 @@ public: VirtualColumnUtils::buildSetsForDAG(*filter_dag, context); auto actions = std::make_shared(std::move(*filter_dag)); - VirtualColumnUtils::filterByPathOrFile(uris, paths, actions, virtual_columns); + VirtualColumnUtils::filterByPathOrFile(uris, paths, actions, virtual_columns, context); } } diff --git a/src/Storages/VirtualColumnUtils.cpp b/src/Storages/VirtualColumnUtils.cpp index f0d276e4e56..788aeb66657 100644 --- a/src/Storages/VirtualColumnUtils.cpp +++ b/src/Storages/VirtualColumnUtils.cpp @@ -46,6 +46,7 @@ #include "Functions/IFunction.h" #include "Functions/IFunctionAdaptors.h" #include "Functions/indexHint.h" +#include #include #include #include @@ -197,7 +198,7 @@ VirtualColumnsDescription getVirtualsForFileLikeStorage(ColumnsDescription & sto return desc; } -static void addPathAndFileToVirtualColumns(Block & block, const String & path, size_t idx) +static void addFilterDataToVirtualColumns(Block & block, const String & path, size_t idx, ColumnsWithTypeAndName partitioning_keys, const ContextPtr & context) { if (block.has("_path")) block.getByName("_path").column->assumeMutableRef().insert(path); @@ -214,18 +215,31 @@ static void addPathAndFileToVirtualColumns(Block & block, const String & path, s block.getByName("_file").column->assumeMutableRef().insert(file); } + for (const auto & item : partitioning_keys) + { + if (block.has(item.name)) + { + auto column = block.getByName(item.name).column; + ReadBufferFromString buf(item.column->getDataAt(0).toView()); + item.type->getDefaultSerialization()->deserializeWholeText(column->assumeMutableRef(), buf, getFormatSettings(context)); + } + } + block.getByName("_idx").column->assumeMutableRef().insert(idx); } -std::optional createPathAndFileFilterDAG(const ActionsDAG::Node * predicate, const NamesAndTypesList & virtual_columns) +std::optional createPathAndFileFilterDAG(const ActionsDAG::Node * predicate, const NamesAndTypesList & virtual_columns, const String & path, const ContextPtr & context) { if (!predicate || virtual_columns.empty()) return {}; Block block; + std::unordered_map keys; + if (context->getSettingsRef().use_hive_partitioning) + keys = parseHivePartitioningKeysAndValues(path); for (const auto & column : virtual_columns) { - if (column.name == "_file" || column.name == "_path") + if (column.name == "_file" || column.name == "_path" || keys.contains(column.name)) block.insert({column.type->createColumn(), column.type, column.name}); } @@ -233,18 +247,31 @@ std::optional createPathAndFileFilterDAG(const ActionsDAG::Node * pr return splitFilterDagForAllowedInputs(predicate, &block); } -ColumnPtr getFilterByPathAndFileIndexes(const std::vector & paths, const ExpressionActionsPtr & actions, const NamesAndTypesList & virtual_columns) +ColumnPtr getFilterByPathAndFileIndexes(const std::vector & paths, const ExpressionActionsPtr & actions, const NamesAndTypesList & virtual_columns, const ContextPtr & context) { Block block; + std::unordered_map keys; + ColumnsWithTypeAndName partitioning_columns; + if (context->getSettingsRef().use_hive_partitioning) + keys = parseHivePartitioningKeysAndValues(paths[0]); for (const auto & column : virtual_columns) { if (column.name == "_file" || column.name == "_path") block.insert({column.type->createColumn(), column.type, column.name}); + + auto it = keys.find(column.name); + if (it != keys.end()) + { + auto c = std::make_shared()->createColumn(); + c->insert(it->second); + block.insert({column.type->createColumn(), column.type, column.name}); + partitioning_columns.push_back({c->getPtr(), column.type, column.name}); + } } block.insert({ColumnUInt64::create(), std::make_shared(), "_idx"}); for (size_t i = 0; i != paths.size(); ++i) - addPathAndFileToVirtualColumns(block, paths[i], i); + addFilterDataToVirtualColumns(block, paths[i], i, partitioning_columns, context); filterBlockWithExpression(actions, block); diff --git a/src/Storages/VirtualColumnUtils.h b/src/Storages/VirtualColumnUtils.h index 6aa08b2aef2..ecfe44a1956 100644 --- a/src/Storages/VirtualColumnUtils.h +++ b/src/Storages/VirtualColumnUtils.h @@ -75,14 +75,14 @@ VirtualColumnsDescription getVirtualsForFileLikeStorage( const std::string & sample_path = "", std::optional format_settings_ = std::nullopt); -std::optional createPathAndFileFilterDAG(const ActionsDAG::Node * predicate, const NamesAndTypesList & virtual_columns); +std::optional createPathAndFileFilterDAG(const ActionsDAG::Node * predicate, const NamesAndTypesList & virtual_columns, const String & path, const ContextPtr & context); -ColumnPtr getFilterByPathAndFileIndexes(const std::vector & paths, const ExpressionActionsPtr & actions, const NamesAndTypesList & virtual_columns); +ColumnPtr getFilterByPathAndFileIndexes(const std::vector & paths, const ExpressionActionsPtr & actions, const NamesAndTypesList & virtual_columns, const ContextPtr & context); template -void filterByPathOrFile(std::vector & sources, const std::vector & paths, const ExpressionActionsPtr & actions, const NamesAndTypesList & virtual_columns) +void filterByPathOrFile(std::vector & sources, const std::vector & paths, const ExpressionActionsPtr & actions, const NamesAndTypesList & virtual_columns, const ContextPtr & context) { - auto indexes_column = getFilterByPathAndFileIndexes(paths, actions, virtual_columns); + auto indexes_column = getFilterByPathAndFileIndexes(paths, actions, virtual_columns, context); const auto & indexes = typeid_cast(*indexes_column).getData(); if (indexes.size() == sources.size()) return; diff --git a/tests/queries/0_stateless/03231_hive_partitioning_filtering.reference b/tests/queries/0_stateless/03231_hive_partitioning_filtering.reference new file mode 100644 index 00000000000..a9e2f17562a --- /dev/null +++ b/tests/queries/0_stateless/03231_hive_partitioning_filtering.reference @@ -0,0 +1,6 @@ +1 +1 +1 +1 +1 +1 diff --git a/tests/queries/0_stateless/03231_hive_partitioning_filtering.sh b/tests/queries/0_stateless/03231_hive_partitioning_filtering.sh new file mode 100644 index 00000000000..719fed5bdaa --- /dev/null +++ b/tests/queries/0_stateless/03231_hive_partitioning_filtering.sh @@ -0,0 +1,29 @@ +#!/usr/bin/env bash + +CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CUR_DIR"/../shell_config.sh + +${CLICKHOUSE_CLIENT} --query_id="test_03231_1" --query " + SELECT countDistinct(_path) FROM file('$CURDIR/data_hive/partitioning/column0=*/sample.parquet') WHERE column0 = 'Elizabeth'; +" + +${CLICKHOUSE_CLIENT} --query " + SELECT ProfileEvents['EngineFileLikeReadFiles'] FROM system.query_log WHERE query_id='test_03231_1'; +" + +${CLICKHOUSE_CLIENT} --query_id="test_03231_2" --query " + SELECT countDistinct(_path) FROM file('$CURDIR/data_hive/partitioning/identifier=*/email.csv') WHERE identifier = 2070; +" + +${CLICKHOUSE_CLIENT} --query " + SELECT ProfileEvents['EngineFileLikeReadFiles'] FROM system.query_log WHERE query_id='test_03231_2'; +" + +${CLICKHOUSE_CLIENT} --query_id="test_03231_3" --query " + SELECT countDistinct(_path) FROM file('$CURDIR/data_hive/partitioning/array=*/sample.parquet') WHERE array = [1,2,3]; +" + +${CLICKHOUSE_CLIENT} --query " + SELECT ProfileEvents['EngineFileLikeReadFiles'] FROM system.query_log WHERE query_id='test_03231_3'; +" diff --git a/tests/queries/0_stateless/data_hive/partitioning/array=[1,2,3]/sample.parquet b/tests/queries/0_stateless/data_hive/partitioning/array=[1,2,3]/sample.parquet new file mode 100644 index 0000000000000000000000000000000000000000..9b6a78cf8cc7cd3ece15e13c9b2f222c8f09b81e GIT binary patch literal 1308 zcmWG=3^EjD5Z%Hr`iosh^b{kI%_hpmz#!kv!2kyTLxU6Z9~tnZHa*@opEc(Au?K1g zOD4aYo#~scS*oJ`_R8gD$~^!1^Jl8v?6d#uZT@&1Z&h*OEsK+iS@vM( z^NvMD|`UJH2qG^xTWJ-dT6$7G6DVTky7Woy5#*nvWVEJpR{CJ{Fy0- zE8ux@_5^8x!?dEIRau&2MyW=j!5h*xtj<|H$T%nI_ zrsjz?W}YW@dt8{DRBI|`*(jU(m2ZmM@u#NQ!s{)z%{yLgtZF$)cAddC?xOT5D^_mz z-x7J9sr1v$v$K{(^`5h;Sz-1gc2*AGUh7}8F0R?}-B&E(IrH;G`GUhY z?q@1K*wQW0otd;iYI&}N?~AIE{%tkCroWN7t$#4bGw~KP0|PJ-eBc+|z=1tM#0JF{ zU3TEfTh6OHr)jl`=8?k_CV5&tiR=x1?{{sI`|Af*?oUEIqS_tiuleY8e||}EY3bMB zzp9qaKhIf|e>9xYs^&t{(WWC|y8X+=Uc{}=?T>Xh_5JxVk(1Vsywf&)T&i$tu2}yJ zsTDW>>9!Q_yZT7oEaCof4t43QdkFv1JFG`q9?h6g zxTpBgk6%&qwlli6{)!hkc#l_C=)}P;-Ys+NvjP>bYG~cCGCw}YQ1x-0z@w1)u@}^n zTV#|>Z7-{GtbTT=rr=<)~?``+iTxh4l+3|MS-tdVRHm+9w`h0!z=3knV zrSnX_{WmK}KJ?@4(a#30zmF(AmC{eNN7s8Lx}H>x1pMHFk2oys;%$ zvXN_R)m$dd8M|y^7q?Bh-x;&%icdYm3!CL}KR{`PNz%rYL4r4>G&wsZDZV&4BQ-Zs zl!ZZ*N0mu}Jvl$8G&j!xn4o|vkwidc4g-VODMm>dNgXu?8BrcdQ3gqbdKRFR7=zd% z4mA!N3D&gCqT&(>R>!2I%v3Q34HQ1GkiyV!C<@hogF|f<&;XY3{QMLNR)w6z;u4^K eWG+xU(4JF_Y8(t2Y%V}QxHvIf1_}lM%S8a*|2_@? literal 0 HcmV?d00001 diff --git a/tests/queries/0_stateless/data_hive/partitioning/array=[1,2,4]/sample.parquet b/tests/queries/0_stateless/data_hive/partitioning/array=[1,2,4]/sample.parquet new file mode 100644 index 0000000000000000000000000000000000000000..9b6a78cf8cc7cd3ece15e13c9b2f222c8f09b81e GIT binary patch literal 1308 zcmWG=3^EjD5Z%Hr`iosh^b{kI%_hpmz#!kv!2kyTLxU6Z9~tnZHa*@opEc(Au?K1g zOD4aYo#~scS*oJ`_R8gD$~^!1^Jl8v?6d#uZT@&1Z&h*OEsK+iS@vM( z^NvMD|`UJH2qG^xTWJ-dT6$7G6DVTky7Woy5#*nvWVEJpR{CJ{Fy0- zE8ux@_5^8x!?dEIRau&2MyW=j!5h*xtj<|H$T%nI_ zrsjz?W}YW@dt8{DRBI|`*(jU(m2ZmM@u#NQ!s{)z%{yLgtZF$)cAddC?xOT5D^_mz z-x7J9sr1v$v$K{(^`5h;Sz-1gc2*AGUh7}8F0R?}-B&E(IrH;G`GUhY z?q@1K*wQW0otd;iYI&}N?~AIE{%tkCroWN7t$#4bGw~KP0|PJ-eBc+|z=1tM#0JF{ zU3TEfTh6OHr)jl`=8?k_CV5&tiR=x1?{{sI`|Af*?oUEIqS_tiuleY8e||}EY3bMB zzp9qaKhIf|e>9xYs^&t{(WWC|y8X+=Uc{}=?T>Xh_5JxVk(1Vsywf&)T&i$tu2}yJ zsTDW>>9!Q_yZT7oEaCof4t43QdkFv1JFG`q9?h6g zxTpBgk6%&qwlli6{)!hkc#l_C=)}P;-Ys+NvjP>bYG~cCGCw}YQ1x-0z@w1)u@}^n zTV#|>Z7-{GtbTT=rr=<)~?``+iTxh4l+3|MS-tdVRHm+9w`h0!z=3knV zrSnX_{WmK}KJ?@4(a#30zmF(AmC{eNN7s8Lx}H>x1pMHFk2oys;%$ zvXN_R)m$dd8M|y^7q?Bh-x;&%icdYm3!CL}KR{`PNz%rYL4r4>G&wsZDZV&4BQ-Zs zl!ZZ*N0mu}Jvl$8G&j!xn4o|vkwidc4g-VODMm>dNgXu?8BrcdQ3gqbdKRFR7=zd% z4mA!N3D&gCqT&(>R>!2I%v3Q34HQ1GkiyV!C<@hogF|f<&;XY3{QMLNR)w6z;u4^K eWG+xU(4JF_Y8(t2Y%V}QxHvIf1_}lM%S8a*|2_@? literal 0 HcmV?d00001 diff --git a/tests/queries/0_stateless/data_hive/partitioning/column0=Stacy/sample.parquet b/tests/queries/0_stateless/data_hive/partitioning/column0=Stacy/sample.parquet new file mode 100644 index 0000000000000000000000000000000000000000..9b6a78cf8cc7cd3ece15e13c9b2f222c8f09b81e GIT binary patch literal 1308 zcmWG=3^EjD5Z%Hr`iosh^b{kI%_hpmz#!kv!2kyTLxU6Z9~tnZHa*@opEc(Au?K1g zOD4aYo#~scS*oJ`_R8gD$~^!1^Jl8v?6d#uZT@&1Z&h*OEsK+iS@vM( z^NvMD|`UJH2qG^xTWJ-dT6$7G6DVTky7Woy5#*nvWVEJpR{CJ{Fy0- zE8ux@_5^8x!?dEIRau&2MyW=j!5h*xtj<|H$T%nI_ zrsjz?W}YW@dt8{DRBI|`*(jU(m2ZmM@u#NQ!s{)z%{yLgtZF$)cAddC?xOT5D^_mz z-x7J9sr1v$v$K{(^`5h;Sz-1gc2*AGUh7}8F0R?}-B&E(IrH;G`GUhY z?q@1K*wQW0otd;iYI&}N?~AIE{%tkCroWN7t$#4bGw~KP0|PJ-eBc+|z=1tM#0JF{ zU3TEfTh6OHr)jl`=8?k_CV5&tiR=x1?{{sI`|Af*?oUEIqS_tiuleY8e||}EY3bMB zzp9qaKhIf|e>9xYs^&t{(WWC|y8X+=Uc{}=?T>Xh_5JxVk(1Vsywf&)T&i$tu2}yJ zsTDW>>9!Q_yZT7oEaCof4t43QdkFv1JFG`q9?h6g zxTpBgk6%&qwlli6{)!hkc#l_C=)}P;-Ys+NvjP>bYG~cCGCw}YQ1x-0z@w1)u@}^n zTV#|>Z7-{GtbTT=rr=<)~?``+iTxh4l+3|MS-tdVRHm+9w`h0!z=3knV zrSnX_{WmK}KJ?@4(a#30zmF(AmC{eNN7s8Lx}H>x1pMHFk2oys;%$ zvXN_R)m$dd8M|y^7q?Bh-x;&%icdYm3!CL}KR{`PNz%rYL4r4>G&wsZDZV&4BQ-Zs zl!ZZ*N0mu}Jvl$8G&j!xn4o|vkwidc4g-VODMm>dNgXu?8BrcdQ3gqbdKRFR7=zd% z4mA!N3D&gCqT&(>R>!2I%v3Q34HQ1GkiyV!C<@hogF|f<&;XY3{QMLNR)w6z;u4^K eWG+xU(4JF_Y8(t2Y%V}QxHvIf1_}lM%S8a*|2_@? literal 0 HcmV?d00001 diff --git a/tests/queries/0_stateless/data_hive/partitioning/identifier=2071/email.csv b/tests/queries/0_stateless/data_hive/partitioning/identifier=2071/email.csv new file mode 100644 index 00000000000..936d995cc64 --- /dev/null +++ b/tests/queries/0_stateless/data_hive/partitioning/identifier=2071/email.csv @@ -0,0 +1,5 @@ +_login_email,_identifier,_first_name,_last_name +laura@example.com,2070,Laura,Grey +craig@example.com,4081,Craig,Johnson +mary@example.com,9346,Mary,Jenkins +jamie@example.com,5079,Jamie,Smith