diff --git a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index d1a285b8818..0bc02ab7395 100644 --- a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -778,23 +778,11 @@ std::optional> MergeTreeDataSelectExecutor::filterPar { if (!filter_dag) return {}; - auto sample = data.getSampleBlockWithVirtualColumns(); - std::unordered_set allowed_inputs; - for (const auto * input : filter_dag->getInputs()) - if (sample.has(input->result_name)) - allowed_inputs.insert(input); - - if (allowed_inputs.empty()) + auto dag = VirtualColumnUtils::splitFilterDagForAllowedInputs(sample, filter_dag, context); + if (!dag) return {}; - auto atoms = filter_dag->extractConjunctionAtoms(filter_dag->getOutputs().at(0)); - atoms = ActionsDAG::filterNodesByAllowedInputs(std::move(atoms), allowed_inputs); - if (atoms.empty()) - return {}; - - auto dag = ActionsDAG::buildFilterActionsDAG(atoms, {}, context); - auto virtual_columns_block = data.getBlockWithVirtualPartColumns(parts, false /* one_part */); VirtualColumnUtils::filterBlockWithQuery(dag, virtual_columns_block, context); return VirtualColumnUtils::extractSingleValueFromBlock(virtual_columns_block, "_part"); diff --git a/src/Storages/StorageS3.cpp b/src/Storages/StorageS3.cpp index 61c5fcdbfa8..194f9cb647f 100644 --- a/src/Storages/StorageS3.cpp +++ b/src/Storages/StorageS3.cpp @@ -42,8 +42,12 @@ #include #include #include +#include + #include +#include +#include #include @@ -126,6 +130,129 @@ namespace ErrorCodes extern const int FILE_DOESNT_EXIST; } + +class ReadFromStorageS3Step : public SourceStepWithFilter +{ +public: + std::string getName() const override { return "ReadFromStorageS3Step"; } + + void initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override; + + void applyFilters() override; + + ReadFromStorageS3Step( + Block sample_block, + const Names & column_names_, + StorageSnapshotPtr storage_snapshot_, + StorageS3 & storage_, + SelectQueryInfo query_info_, + ContextPtr context_, + size_t max_block_size_, + size_t num_streams_) + : SourceStepWithFilter(DataStream{.header = std::move(sample_block)}) + , column_names(column_names_) + , storage_snapshot(std::move(storage_snapshot_)) + , storage(storage_) + , query_info(std::move(query_info_)) + , local_context(std::move(context_)) + , max_block_size(max_block_size_) + , num_streams(num_streams_) + { + } + +private: + Names column_names; + StorageSnapshotPtr storage_snapshot; + StorageS3 & storage; + SelectQueryInfo query_info; + ContextPtr local_context; + + size_t max_block_size; + size_t num_streams; +}; + + +static Block getBlockWithVirtuals(const NamesAndTypesList & virtual_columns, const String & bucket, const std::unordered_set & keys) +{ + Block virtual_columns_block; + fs::path bucket_path(bucket); + + for (const auto & [column_name, column_type] : virtual_columns) + { + if (column_name == "_path") + { + auto column = column_type->createColumn(); + for (const auto & key : keys) + column->insert((bucket_path / key).string()); + virtual_columns_block.insert({std::move(column), column_type, column_name}); + } + else if (column_name == "_file") + { + auto column = column_type->createColumn(); + for (const auto & key : keys) + { + auto pos = key.find_last_of('/'); + if (pos != std::string::npos) + column->insert(key.substr(pos + 1)); + else + column->insert(key); + } + virtual_columns_block.insert({std::move(column), column_type, column_name}); + } + else if (column_name == "_key") + { + auto column = column_type->createColumn(); + for (const auto & key : keys) + column->insert(key); + virtual_columns_block.insert({std::move(column), column_type, column_name}); + } + else + { + auto column = column_type->createColumn(); + column->insertManyDefaults(keys.size()); + virtual_columns_block.insert({std::move(column), column_type, column_name}); + } + } + + /// Column _key is mandatory and may not be in virtual_columns list + if (!virtual_columns_block.has("_key")) + { + auto column_type = std::make_shared(); + auto column = column_type->createColumn(); for (const auto & key : keys) + column->insert(key); + virtual_columns_block.insert({std::move(column), column_type, "_key"}); + } + + return virtual_columns_block; +} + +static std::vector filterKeysForPartitionPruning( + const std::vector & keys, + const String & bucket, + const NamesAndTypesList & virtual_columns, + const std::vector & filter_dags, + ContextPtr context) +{ + std::unordered_set result_keys(keys.begin(), keys.end()); + for (const auto & filter_dag : filter_dags) + { + if (result_keys.empty()) + break; + + auto block = getBlockWithVirtuals(virtual_columns, bucket, result_keys); + + auto filter_actions = VirtualColumnUtils::splitFilterDagForAllowedInputs(block, filter_dag, context); + if (!filter_actions) + continue; + VirtualColumnUtils::filterBlockWithQuery(filter_actions, block, context); + + result_keys = VirtualColumnUtils::extractSingleValueFromBlock(block, "_key"); + } + + LOG_DEBUG(&Poco::Logger::get("StorageS3"), "Applied partition pruning {} from {} keys left", result_keys.size(), keys.size()); + return std::vector(result_keys.begin(), result_keys.end()); +} + class IOutputFormat; using OutputFormatPtr = std::shared_ptr; @@ -390,7 +517,7 @@ size_t StorageS3Source::DisclosedGlobIterator::estimatedKeysCount() return pimpl->objectsCount(); } -class StorageS3Source::KeysIterator::Impl : WithContext +class StorageS3Source::KeysIterator::Impl { public: explicit Impl( @@ -399,35 +526,15 @@ public: const std::vector & keys_, const String & bucket_, const S3Settings::RequestSettings & request_settings_, - ASTPtr query_, - const NamesAndTypesList & virtual_columns_, - ContextPtr context_, KeysWithInfo * read_keys_, std::function file_progress_callback_) - : WithContext(context_) - , keys(keys_) + : keys(keys_) , client(client_.clone()) , version_id(version_id_) , bucket(bucket_) , request_settings(request_settings_) - , query(query_) - , virtual_columns(virtual_columns_) , file_progress_callback(file_progress_callback_) { - ASTPtr filter_ast; - if (!keys.empty()) - filter_ast = VirtualColumnUtils::createPathAndFileFilterAst(query, virtual_columns, fs::path(bucket) / keys[0], getContext()); - - if (filter_ast) - { - std::vector paths; - paths.reserve(keys.size()); - for (const auto & key : keys) - paths.push_back(fs::path(bucket) / key); - - VirtualColumnUtils::filterByPathOrFile(keys, paths, query, virtual_columns, getContext(), filter_ast); - } - if (read_keys_) { for (const auto & key : keys) @@ -463,8 +570,6 @@ private: String version_id; String bucket; S3Settings::RequestSettings request_settings; - ASTPtr query; - NamesAndTypesList virtual_columns; std::function file_progress_callback; }; @@ -474,14 +579,11 @@ StorageS3Source::KeysIterator::KeysIterator( const std::vector & keys_, const String & bucket_, const S3Settings::RequestSettings & request_settings_, - ASTPtr query, - const NamesAndTypesList & virtual_columns_, - ContextPtr context, KeysWithInfo * read_keys, std::function file_progress_callback_) : pimpl(std::make_shared( client_, version_id_, keys_, bucket_, request_settings_, - query, virtual_columns_, context, read_keys, file_progress_callback_)) + read_keys, file_progress_callback_)) { } @@ -973,8 +1075,6 @@ private: const String key; const std::optional format_settings; - ExpressionActionsPtr partition_by_expr; - static void validateBucket(const String & str) { S3::URI::validateBucket(str, {}); @@ -1046,14 +1146,15 @@ StorageS3::StorageS3( virtual_columns = VirtualColumnUtils::getPathAndFileVirtualsForStorage(storage_metadata.getSampleBlock().getNamesAndTypesList()); } -std::shared_ptr StorageS3::createFileIterator( - const Configuration & configuration, +static std::shared_ptr createFileIterator( + const StorageS3::Configuration & configuration, bool distributed_processing, ContextPtr local_context, ASTPtr query, + const std::vector & filter_dags, const NamesAndTypesList & virtual_columns, - KeysWithInfo * read_keys, - std::function file_progress_callback) + StorageS3::KeysWithInfo * read_keys = nullptr, + std::function file_progress_callback = {}) { if (distributed_processing) { @@ -1068,10 +1169,10 @@ std::shared_ptr StorageS3::createFileIterator( } else { + Strings keys = filterKeysForPartitionPruning(configuration.keys, configuration.url.bucket, virtual_columns, filter_dags, local_context); return std::make_shared( - *configuration.client, configuration.url.version_id, configuration.keys, - configuration.url.bucket, configuration.request_settings, query, - virtual_columns, local_context, read_keys, file_progress_callback); + *configuration.client, configuration.url.version_id, keys, + configuration.url.bucket, configuration.request_settings, read_keys, file_progress_callback); } } @@ -1090,7 +1191,8 @@ bool StorageS3::parallelizeOutputAfterReading(ContextPtr context) const return FormatFactory::instance().checkParallelizeOutputAfterReading(configuration.format, context); } -Pipe StorageS3::read( +void StorageS3::read( + QueryPlan & query_plan, const Names & column_names, const StorageSnapshotPtr & storage_snapshot, SelectQueryInfo & query_info, @@ -1099,15 +1201,34 @@ Pipe StorageS3::read( size_t max_block_size, size_t num_streams) { - auto query_configuration = updateConfigurationAndGetCopy(local_context); + auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(local_context), virtual_columns); - if (partition_by && query_configuration.withWildcard()) + auto reading = std::make_unique( + read_from_format_info.source_header, + column_names, + storage_snapshot, + *this, + query_info, + local_context, + max_block_size, + num_streams); + + query_plan.addStep(std::move(reading)); +} + +void ReadFromStorageS3Step::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) +{ + auto query_configuration = storage.updateConfigurationAndGetCopy(local_context); + + if (storage.partition_by && query_configuration.withWildcard()) throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Reading from a partitioned S3 storage is not implemented yet"); - Pipes pipes; + auto virtual_columns = storage.getVirtuals(); + auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, storage.supportsSubsetOfColumns(local_context), virtual_columns); std::shared_ptr iterator_wrapper = createFileIterator( - query_configuration, distributed_processing, local_context, query_info.query, virtual_columns, nullptr, local_context->getFileProgressCallback()); + query_configuration, storage.distributed_processing, local_context, query_info.query, filter_dags, + virtual_columns, nullptr, local_context->getFileProgressCallback()); size_t estimated_keys_count = iterator_wrapper->estimatedKeysCount(); if (estimated_keys_count > 1) @@ -1116,7 +1237,6 @@ Pipe StorageS3::read( /// Disclosed glob iterator can underestimate the amount of keys in some cases. We will keep one stream for this particular case. num_streams = 1; - auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(local_context), getVirtuals()); bool need_only_count = (query_info.optimize_trivial_count || read_from_format_info.requested_columns.empty()) && local_context->getSettingsRef().optimize_count_from_files; @@ -1124,15 +1244,16 @@ Pipe StorageS3::read( const size_t max_parsing_threads = num_streams >= max_threads ? 1 : (max_threads / std::max(num_streams, 1ul)); LOG_DEBUG(&Poco::Logger::get("StorageS3"), "Reading in {} streams, {} threads per stream", num_streams, max_parsing_threads); + Pipes pipes; pipes.reserve(num_streams); for (size_t i = 0; i < num_streams; ++i) { pipes.emplace_back(std::make_shared( read_from_format_info, query_configuration.format, - getName(), + storage.getName(), local_context, - format_settings, + storage.format_settings, max_block_size, query_configuration.request_settings, query_configuration.compression_method, @@ -1146,7 +1267,13 @@ Pipe StorageS3::read( query_info)); } - return Pipe::unitePipes(std::move(pipes)); + pipeline.init(Pipe::unitePipes(std::move(pipes))); +} + + +void ReadFromStorageS3Step::applyFilters() +{ + /// We will use filter_dags in filterKeysForPartitionPruning called from initializePipeline, nothing to do here } SinkToStoragePtr StorageS3::write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context, bool /*async_insert*/) @@ -1597,7 +1724,7 @@ ColumnsDescription StorageS3::getTableStructureFromDataImpl( { KeysWithInfo read_keys; - auto file_iterator = createFileIterator(configuration, false, ctx, nullptr, {}, &read_keys); + auto file_iterator = createFileIterator(configuration, false, ctx, {}, {}, {}, &read_keys); std::optional columns_from_cache; if (ctx->getSettingsRef().schema_inference_use_cache_for_s3) diff --git a/src/Storages/StorageS3.h b/src/Storages/StorageS3.h index 469e5725715..d83022a9223 100644 --- a/src/Storages/StorageS3.h +++ b/src/Storages/StorageS3.h @@ -103,9 +103,6 @@ public: const std::vector & keys_, const String & bucket_, const S3Settings::RequestSettings & request_settings_, - ASTPtr query, - const NamesAndTypesList & virtual_columns, - ContextPtr context, KeysWithInfo * read_keys = nullptr, std::function progress_callback_ = {}); @@ -331,7 +328,8 @@ public: return name; } - Pipe read( + void read( + QueryPlan & query_plan, const Names & column_names, const StorageSnapshotPtr & storage_snapshot, SelectQueryInfo & query_info, @@ -391,6 +389,7 @@ private: friend class StorageS3Cluster; friend class TableFunctionS3Cluster; friend class StorageS3Queue; + friend class ReadFromStorageS3Step; Configuration configuration; std::mutex configuration_update_mutex; @@ -401,15 +400,6 @@ private: std::optional format_settings; ASTPtr partition_by; - static std::shared_ptr createFileIterator( - const Configuration & configuration, - bool distributed_processing, - ContextPtr local_context, - ASTPtr query, - const NamesAndTypesList & virtual_columns, - KeysWithInfo * read_keys = nullptr, - std::function progress_callback = {}); - static ColumnsDescription getTableStructureFromDataImpl( const Configuration & configuration, const std::optional & format_settings, diff --git a/src/Storages/VirtualColumnUtils.cpp b/src/Storages/VirtualColumnUtils.cpp index 219043f25c6..7aec5ce0d78 100644 --- a/src/Storages/VirtualColumnUtils.cpp +++ b/src/Storages/VirtualColumnUtils.cpp @@ -445,6 +445,24 @@ void addRequestedPathAndFileVirtualsToChunk( } } +ActionsDAGPtr splitFilterDagForAllowedInputs(const Block & header, const ActionsDAGPtr & filter_dag, ContextPtr context) +{ + std::unordered_set allowed_inputs; + for (const auto * input : filter_dag->getInputs()) + if (header.has(input->result_name)) + allowed_inputs.insert(input); + + if (allowed_inputs.empty()) + return {}; + + auto atoms = filter_dag->extractConjunctionAtoms(filter_dag->getOutputs().at(0)); + atoms = ActionsDAG::filterNodesByAllowedInputs(std::move(atoms), allowed_inputs); + if (atoms.empty()) + return {}; + + return ActionsDAG::buildFilterActionsDAG(atoms, {}, context); +} + } } diff --git a/src/Storages/VirtualColumnUtils.h b/src/Storages/VirtualColumnUtils.h index a21f2b05552..d8d4b44b4ff 100644 --- a/src/Storages/VirtualColumnUtils.h +++ b/src/Storages/VirtualColumnUtils.h @@ -35,6 +35,9 @@ bool prepareFilterBlockWithQuery(const ASTPtr & query, ContextPtr context, Block void filterBlockWithQuery(const ASTPtr & query, Block & block, ContextPtr context, ASTPtr expression_ast = {}); void filterBlockWithQuery(ActionsDAGPtr dag, Block & block, ContextPtr context); +/// Extract subset of filter_dag that can be evaluated using only columns from header +ActionsDAGPtr splitFilterDagForAllowedInputs(const Block & header, const ActionsDAGPtr & filter_dag, ContextPtr context); + /// Extract from the input stream a set of `name` column values template auto extractSingleValueFromBlock(const Block & block, const String & name) diff --git a/tests/analyzer_tech_debt.txt b/tests/analyzer_tech_debt.txt index 392c4e2e313..e0055a9d617 100644 --- a/tests/analyzer_tech_debt.txt +++ b/tests/analyzer_tech_debt.txt @@ -24,7 +24,6 @@ 01952_optimize_distributed_group_by_sharding_key 02139_MV_with_scalar_subquery 02174_cte_scalar_cache_mv -02302_s3_file_pruning 02352_grouby_shadows_arg 02354_annoy 02428_parameterized_view diff --git a/tests/queries/0_stateless/02302_s3_file_pruning.reference b/tests/queries/0_stateless/02302_s3_file_pruning.reference index f8d2bdd0612..7e69bdd55db 100644 --- a/tests/queries/0_stateless/02302_s3_file_pruning.reference +++ b/tests/queries/0_stateless/02302_s3_file_pruning.reference @@ -24,4 +24,14 @@ insert into test_02302 select 1 settings s3_create_new_file_on_insert = true; insert into test_02302 select 2 settings s3_create_new_file_on_insert = true; select * from test_02302 where _file like '%1'; 1 +select _file, * from test_02302 where _file like '%1'; +test_02302.1 1 +set max_rows_to_read = 2; +select * from test_02302 where (_file like '%.1' OR _file like '%.2') AND a > 1; +2 +set max_rows_to_read = 999; +select 'a1' as _file, * from test_02302 where _file like '%1' ORDER BY a; +a1 0 +a1 1 +a1 2 drop table test_02302; diff --git a/tests/queries/0_stateless/02302_s3_file_pruning.sql b/tests/queries/0_stateless/02302_s3_file_pruning.sql index 624a87506d1..93fc8a1bc25 100644 --- a/tests/queries/0_stateless/02302_s3_file_pruning.sql +++ b/tests/queries/0_stateless/02302_s3_file_pruning.sql @@ -1,5 +1,5 @@ -- Tags: no-parallel, no-fasttest --- Tag no-fasttest: Depends on AWS +-- Tag no-fasttest: Depends on S3 -- { echo } drop table if exists test_02302; @@ -32,4 +32,14 @@ insert into test_02302 select 1 settings s3_create_new_file_on_insert = true; insert into test_02302 select 2 settings s3_create_new_file_on_insert = true; select * from test_02302 where _file like '%1'; + +select _file, * from test_02302 where _file like '%1'; + +set max_rows_to_read = 2; +select * from test_02302 where (_file like '%.1' OR _file like '%.2') AND a > 1; + +set max_rows_to_read = 999; + +select 'a1' as _file, * from test_02302 where _file like '%1' ORDER BY a; + drop table test_02302;