diff --git a/src/Storages/HDFS/StorageHDFS.cpp b/src/Storages/HDFS/StorageHDFS.cpp index fdbb5e9f171..9d719413c8d 100644 --- a/src/Storages/HDFS/StorageHDFS.cpp +++ b/src/Storages/HDFS/StorageHDFS.cpp @@ -15,6 +15,8 @@ #include #include #include +#include +#include #include #include @@ -408,6 +410,35 @@ ColumnsDescription StorageHDFS::getTableStructureFromData( class HDFSSource::DisclosedGlobIterator::Impl { public: + + Impl(const String & uri, const ActionsDAG::Node * predicate, const NamesAndTypesList & virtual_columns, const ContextPtr & context) + { + const auto [path_from_uri, uri_without_path] = getPathFromUriAndUriWithoutPath(uri); + uris = getPathsList(path_from_uri, uri_without_path, context); + ActionsDAGPtr filter_dag; + if (!uris.empty()) + filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns, uris[0].path); + + if (filter_dag) + { + std::vector paths; + paths.reserve(uris.size()); + for (const auto & path_with_info : uris) + paths.push_back(path_with_info.path); + + VirtualColumnUtils::filterByPathOrFile(uris, paths, filter_dag, virtual_columns, context); + } + auto file_progress_callback = context->getFileProgressCallback(); + + for (auto & elem : uris) + { + elem.path = uri_without_path + elem.path; + if (file_progress_callback && elem.info) + file_progress_callback(FileProgress(0, elem.info->size)); + } + uris_iter = uris.begin(); + } + Impl(const String & uri, const ASTPtr & query, const NamesAndTypesList & virtual_columns, const ContextPtr & context) { const auto [path_from_uri, uri_without_path] = getPathFromUriAndUriWithoutPath(uri); @@ -456,21 +487,21 @@ private: class HDFSSource::URISIterator::Impl : WithContext { public: - explicit Impl(const std::vector & uris_, const ASTPtr & query, const NamesAndTypesList & virtual_columns, const ContextPtr & context_) + explicit Impl(const std::vector & uris_, const ActionsDAG::Node * predicate, const NamesAndTypesList & virtual_columns, const ContextPtr & context_) : WithContext(context_), uris(uris_), file_progress_callback(context_->getFileProgressCallback()) { - ASTPtr filter_ast; + ActionsDAGPtr filter_dag; if (!uris.empty()) - filter_ast = VirtualColumnUtils::createPathAndFileFilterAst(query, virtual_columns, getPathFromUriAndUriWithoutPath(uris[0]).first, getContext()); + filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns, getPathFromUriAndUriWithoutPath(uris[0]).first); - if (filter_ast) + if (filter_dag) { std::vector paths; paths.reserve(uris.size()); for (const auto & uri : uris) paths.push_back(getPathFromUriAndUriWithoutPath(uri).first); - VirtualColumnUtils::filterByPathOrFile(uris, paths, query, virtual_columns, getContext(), filter_ast); + VirtualColumnUtils::filterByPathOrFile(uris, paths, filter_dag, virtual_columns, getContext()); } if (!uris.empty()) @@ -520,13 +551,16 @@ private: HDFSSource::DisclosedGlobIterator::DisclosedGlobIterator(const String & uri, const ASTPtr & query, const NamesAndTypesList & virtual_columns, const ContextPtr & context) : pimpl(std::make_shared(uri, query, virtual_columns, context)) {} +HDFSSource::DisclosedGlobIterator::DisclosedGlobIterator(const String & uri, const ActionsDAG::Node * predicate, const NamesAndTypesList & virtual_columns, const ContextPtr & context) + : pimpl(std::make_shared(uri, predicate, virtual_columns, context)) {} + StorageHDFS::PathWithInfo HDFSSource::DisclosedGlobIterator::next() { return pimpl->next(); } -HDFSSource::URISIterator::URISIterator(const std::vector & uris_, const ASTPtr & query, const NamesAndTypesList & virtual_columns, const ContextPtr & context) - : pimpl(std::make_shared(uris_, query, virtual_columns, context)) +HDFSSource::URISIterator::URISIterator(const std::vector & uris_, const ActionsDAG::Node * predicate, const NamesAndTypesList & virtual_columns, const ContextPtr & context) + : pimpl(std::make_shared(uris_, predicate, virtual_columns, context)) { } @@ -541,8 +575,8 @@ HDFSSource::HDFSSource( ContextPtr context_, UInt64 max_block_size_, std::shared_ptr file_iterator_, - bool need_only_count_, - const SelectQueryInfo & query_info_) + bool need_only_count_) + //const SelectQueryInfo & query_info_) : ISource(info.source_header, false) , WithContext(context_) , storage(std::move(storage_)) @@ -553,7 +587,7 @@ HDFSSource::HDFSSource( , file_iterator(file_iterator_) , columns_description(info.columns_description) , need_only_count(need_only_count_) - , query_info(query_info_) + //, query_info(query_info_) { initialize(); } @@ -843,7 +877,82 @@ bool StorageHDFS::supportsSubsetOfColumns(const ContextPtr & context_) const return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(format_name, context_); } -Pipe StorageHDFS::read( +class ReadFromHDFS : public SourceStepWithFilter +{ +public: + std::string getName() const override { return "ReadFromHDFS"; } + void initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override; + void applyFilters() override; + + ReadFromHDFS( + Block sample_block, + std::vector uris_, + bool distributed_processing_, + NamesAndTypesList virtual_columns_, + bool is_path_with_globs_, + ReadFromFormatInfo info_, + bool need_only_count_, + std::shared_ptr storage_, + // StorageSnapshotPtr storage_snapshot_, + // const StorageEmbeddedRocksDB & storage_, + // SelectQueryInfo query_info_, + ContextPtr context_, + size_t max_block_size_, + size_t num_streams_) + : SourceStepWithFilter(DataStream{.header = std::move(sample_block)}) + , uris(std::move(uris_)) + , distributed_processing(distributed_processing_) + , virtual_columns(std::move(virtual_columns_)) + , is_path_with_globs(is_path_with_globs_) + , info(std::move(info_)) + , need_only_count(need_only_count_) + , storage(std::move(storage_)) + // , storage_snapshot(std::move(storage_snapshot_)) + // , storage(storage_) + // , query_info(std::move(query_info_)) + , context(std::move(context_)) + , max_block_size(max_block_size_) + , num_streams(num_streams_) + { + } + +private: + std::vector uris; + const bool distributed_processing; + NamesAndTypesList virtual_columns; + bool is_path_with_globs; + ReadFromFormatInfo info; + const bool need_only_count; + std::shared_ptr storage; + + // StorageSnapshotPtr storage_snapshot; + // const StorageEmbeddedRocksDB & storage; + // SelectQueryInfo query_info; + ContextPtr context; + + size_t max_block_size; + size_t num_streams; + + std::shared_ptr iterator_wrapper; + + // FieldVectorPtr keys; + // bool all_scan = false; + + void createIterator(const ActionsDAG::Node * predicate); +}; + +void ReadFromHDFS::applyFilters() +{ + auto filter_actions_dag = ActionsDAG::buildFilterActionsDAG(filter_nodes.nodes, {}, context); + const ActionsDAG::Node * predicate = nullptr; + if (filter_actions_dag) + predicate = filter_actions_dag->getOutputs().at(0); + + createIterator(predicate); +} + +void StorageHDFS::read( + QueryPlan & query_plan, const Names & column_names, const StorageSnapshotPtr & storage_snapshot, SelectQueryInfo & query_info, @@ -852,18 +961,44 @@ Pipe StorageHDFS::read( size_t max_block_size, size_t num_streams) { - std::shared_ptr iterator_wrapper{nullptr}; + auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(context_), virtual_columns); + bool need_only_count = (query_info.optimize_trivial_count || read_from_format_info.requested_columns.empty()) + && context_->getSettingsRef().optimize_count_from_files; + + auto this_ptr = std::static_pointer_cast(shared_from_this()); + + auto reading = std::make_unique( + read_from_format_info.source_header, + uris, + distributed_processing, + virtual_columns, + is_path_with_globs, + std::move(read_from_format_info), + need_only_count, + std::move(this_ptr), + context_, + max_block_size, + num_streams); + + query_plan.addStep(std::move(reading)); +} + +void ReadFromHDFS::createIterator(const ActionsDAG::Node * predicate) +{ + if (iterator_wrapper) + return; + if (distributed_processing) { iterator_wrapper = std::make_shared( - [callback = context_->getReadTaskCallback()]() -> StorageHDFS::PathWithInfo { + [callback = context->getReadTaskCallback()]() -> StorageHDFS::PathWithInfo { return StorageHDFS::PathWithInfo{callback(), std::nullopt}; }); } else if (is_path_with_globs) { /// Iterate through disclosed globs and make a source for each file - auto glob_iterator = std::make_shared(uris[0], query_info.query, virtual_columns, context_); + auto glob_iterator = std::make_shared(uris[0], predicate, virtual_columns, context); iterator_wrapper = std::make_shared([glob_iterator]() { return glob_iterator->next(); @@ -871,31 +1006,31 @@ Pipe StorageHDFS::read( } else { - auto uris_iterator = std::make_shared(uris, query_info.query, virtual_columns, context_); + auto uris_iterator = std::make_shared(uris, predicate, virtual_columns, context); iterator_wrapper = std::make_shared([uris_iterator]() { return uris_iterator->next(); }); } +} - auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(context_), getVirtuals()); - bool need_only_count = (query_info.optimize_trivial_count || read_from_format_info.requested_columns.empty()) - && context_->getSettingsRef().optimize_count_from_files; +void ReadFromHDFS::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) +{ + createIterator(nullptr); Pipes pipes; - auto this_ptr = std::static_pointer_cast(shared_from_this()); for (size_t i = 0; i < num_streams; ++i) { pipes.emplace_back(std::make_shared( - read_from_format_info, - this_ptr, - context_, + info, + storage, + context, max_block_size, iterator_wrapper, - need_only_count, - query_info)); + need_only_count)); //, + //query_info)); } - return Pipe::unitePipes(std::move(pipes)); + pipeline.init(Pipe::unitePipes(std::move(pipes))); } SinkToStoragePtr StorageHDFS::write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr context_, bool /*async_insert*/) diff --git a/src/Storages/HDFS/StorageHDFS.h b/src/Storages/HDFS/StorageHDFS.h index 18eeb787d77..cee1b674eb7 100644 --- a/src/Storages/HDFS/StorageHDFS.h +++ b/src/Storages/HDFS/StorageHDFS.h @@ -51,7 +51,8 @@ public: String getName() const override { return "HDFS"; } - Pipe read( + void read( + QueryPlan & query_plan, const Names & column_names, const StorageSnapshotPtr & storage_snapshot, SelectQueryInfo & query_info, @@ -115,6 +116,7 @@ public: { public: DisclosedGlobIterator(const String & uri_, const ASTPtr & query, const NamesAndTypesList & virtual_columns, const ContextPtr & context); + DisclosedGlobIterator(const String & uri_, const ActionsDAG::Node * predicate, const NamesAndTypesList & virtual_columns, const ContextPtr & context); StorageHDFS::PathWithInfo next(); private: class Impl; @@ -125,7 +127,7 @@ public: class URISIterator { public: - URISIterator(const std::vector & uris_, const ASTPtr & query, const NamesAndTypesList & virtual_columns, const ContextPtr & context); + URISIterator(const std::vector & uris_, const ActionsDAG::Node * predicate, const NamesAndTypesList & virtual_columns, const ContextPtr & context); StorageHDFS::PathWithInfo next(); private: class Impl; @@ -142,8 +144,8 @@ public: ContextPtr context_, UInt64 max_block_size_, std::shared_ptr file_iterator_, - bool need_only_count_, - const SelectQueryInfo & query_info_); + bool need_only_count_); + //const SelectQueryInfo & query_info_); String getName() const override; @@ -162,7 +164,7 @@ private: ColumnsDescription columns_description; bool need_only_count; size_t total_rows_in_file = 0; - SelectQueryInfo query_info; + //SelectQueryInfo query_info; std::unique_ptr read_buf; std::shared_ptr input_format; diff --git a/src/Storages/StorageFile.cpp b/src/Storages/StorageFile.cpp index 25bb6691ff6..b040f452410 100644 --- a/src/Storages/StorageFile.cpp +++ b/src/Storages/StorageFile.cpp @@ -9,6 +9,7 @@ #include #include +#include #include #include @@ -44,6 +45,8 @@ #include #include #include +#include +#include #include #include @@ -947,6 +950,23 @@ StorageFileSource::FilesIterator::FilesIterator( VirtualColumnUtils::filterByPathOrFile(files, files, query, virtual_columns, context_, filter_ast); } +StorageFileSource::FilesIterator::FilesIterator( + const Strings & files_, + std::optional archive_info_, + const ActionsDAG::Node * predicate, + const NamesAndTypesList & virtual_columns, + ContextPtr context_, + bool distributed_processing_) + : files(files_), archive_info(std::move(archive_info_)), distributed_processing(distributed_processing_), context(context_) +{ + ActionsDAGPtr filter_dag; + if (!distributed_processing && !archive_info && !files.empty() && !files[0].empty()) + filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns, files[0]); + + if (filter_dag) + VirtualColumnUtils::filterByPathOrFile(files, files, filter_dag, virtual_columns, context_); +} + String StorageFileSource::FilesIterator::next() { if (distributed_processing) @@ -974,16 +994,13 @@ const String & StorageFileSource::FilesIterator::getFileNameInArchive() StorageFileSource::StorageFileSource( const ReadFromFormatInfo & info, std::shared_ptr storage_, - const StorageSnapshotPtr & storage_snapshot_, ContextPtr context_, - const SelectQueryInfo & query_info_, UInt64 max_block_size_, FilesIteratorPtr files_iterator_, std::unique_ptr read_buf_, bool need_only_count_) : SourceWithKeyCondition(info.source_header, false) , storage(std::move(storage_)) - , storage_snapshot(storage_snapshot_) , files_iterator(std::move(files_iterator_)) , read_buf(std::move(read_buf_)) , columns_description(info.columns_description) @@ -991,7 +1008,6 @@ StorageFileSource::StorageFileSource( , requested_virtual_columns(info.requested_virtual_columns) , block_for_format(info.format_header) , context(context_) - , query_info(query_info_) , max_block_size(max_block_size_) , need_only_count(need_only_count_) { @@ -1322,14 +1338,87 @@ std::optional StorageFileSource::tryGetNumRowsFromCache(const String & p return schema_cache.tryGetNumRows(key, get_last_mod_time); } -Pipe StorageFile::read( +class ReadFromFile : public SourceStepWithFilter +{ +public: + std::string getName() const override { return "ReadFromFile"; } + void initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override; + void applyFilters() override; + + ReadFromFile( + Block sample_block, + std::shared_ptr storage_, + std::vector paths_, + std::optional archive_info_, + NamesAndTypesList virtual_columns_, + bool distributed_processing_, + ReadFromFormatInfo info_, + const bool need_only_count_, + size_t total_bytes_to_read_, + ContextPtr context_, + size_t max_block_size_, + size_t num_streams_) + : SourceStepWithFilter(DataStream{.header = std::move(sample_block)}) + , storage(std::move(storage_)) + , paths(std::move(paths_)) + , archive_info(std::move(archive_info_)) + , virtual_columns(std::move(virtual_columns_)) + , distributed_processing(distributed_processing_) + , info(std::move(info_)) + , need_only_count(need_only_count_) + , total_bytes_to_read(total_bytes_to_read_) + , context(std::move(context_)) + , max_block_size(max_block_size_) + , max_num_streams(num_streams_) + { + } + +private: + std::shared_ptr storage; + + std::vector paths; + std::optional archive_info; + + NamesAndTypesList virtual_columns; + const bool distributed_processing; + + ReadFromFormatInfo info; + const bool need_only_count; + + size_t total_bytes_to_read; + + ContextPtr context; + + size_t max_block_size; + const size_t max_num_streams; + + std::shared_ptr files_iterator; + + // FieldVectorPtr keys; + // bool all_scan = false; + + void createIterator(const ActionsDAG::Node * predicate); +}; + +void ReadFromFile::applyFilters() +{ + auto filter_actions_dag = ActionsDAG::buildFilterActionsDAG(filter_nodes.nodes, {}, context); + const ActionsDAG::Node * predicate = nullptr; + if (filter_actions_dag) + predicate = filter_actions_dag->getOutputs().at(0); + + createIterator(predicate); +} + +void StorageFile::read( + QueryPlan & query_plan, const Names & column_names, const StorageSnapshotPtr & storage_snapshot, SelectQueryInfo & query_info, ContextPtr context, QueryProcessingStage::Enum /*processed_stage*/, size_t max_block_size, - const size_t max_num_streams) + size_t num_streams) { if (use_table_fd) { @@ -1346,17 +1435,48 @@ Pipe StorageFile::read( if (p->size() == 1 && !fs::exists(p->at(0))) { - if (context->getSettingsRef().engine_file_empty_if_not_exists) - return Pipe(std::make_shared(storage_snapshot->getSampleBlockForColumns(column_names))); - else + if (!context->getSettingsRef().engine_file_empty_if_not_exists) throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "File {} doesn't exist", p->at(0)); + + auto header = storage_snapshot->getSampleBlockForColumns(column_names); + InterpreterSelectQuery::addEmptySourceToQueryPlan(query_plan, header, query_info, context); + return; } } - auto files_iterator = std::make_shared(paths, archive_info, query_info.query, virtual_columns, context, distributed_processing); - auto this_ptr = std::static_pointer_cast(shared_from_this()); + auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(context), getVirtuals()); + bool need_only_count = (query_info.optimize_trivial_count || read_from_format_info.requested_columns.empty()) + && context->getSettingsRef().optimize_count_from_files; + + auto reading = std::make_unique( + read_from_format_info.source_header, + std::move(this_ptr), + paths, + archive_info, + virtual_columns, + distributed_processing, + std::move(read_from_format_info), + need_only_count, + total_bytes_to_read, + context, + max_block_size, + num_streams); + + query_plan.addStep(std::move(reading)); +} + +void ReadFromFile::createIterator(const ActionsDAG::Node * predicate) +{ + if (files_iterator) + return; + + files_iterator = std::make_shared(paths, archive_info, predicate, virtual_columns, context, distributed_processing); +} + +void ReadFromFile::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) +{ size_t num_streams = max_num_streams; size_t files_to_read = 0; @@ -1377,10 +1497,6 @@ Pipe StorageFile::read( if (progress_callback && !archive_info) progress_callback(FileProgress(0, total_bytes_to_read)); - auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(context), getVirtuals()); - bool need_only_count = (query_info.optimize_trivial_count || read_from_format_info.requested_columns.empty()) - && context->getSettingsRef().optimize_count_from_files; - for (size_t i = 0; i < num_streams; ++i) { /// In case of reading from fd we have to check whether we have already created @@ -1388,22 +1504,20 @@ Pipe StorageFile::read( /// If yes, then we should use it in StorageFileSource. Atomic bool flag is needed /// to prevent data race in case of parallel reads. std::unique_ptr read_buffer; - if (has_peekable_read_buffer_from_fd.exchange(false)) - read_buffer = std::move(peekable_read_buffer_from_fd); + if (storage->has_peekable_read_buffer_from_fd.exchange(false)) + read_buffer = std::move(storage->peekable_read_buffer_from_fd); pipes.emplace_back(std::make_shared( - read_from_format_info, - this_ptr, - storage_snapshot, + info, + storage, context, - query_info, max_block_size, files_iterator, std::move(read_buffer), need_only_count)); } - return Pipe::unitePipes(std::move(pipes)); + pipeline.init(Pipe::unitePipes(std::move(pipes))); } diff --git a/src/Storages/StorageFile.h b/src/Storages/StorageFile.h index 1fd3f2e0edf..ecb9e01b862 100644 --- a/src/Storages/StorageFile.h +++ b/src/Storages/StorageFile.h @@ -53,7 +53,8 @@ public: std::string getName() const override { return "File"; } - Pipe read( + void read( + QueryPlan & query_plan, const Names & column_names, const StorageSnapshotPtr & storage_snapshot, SelectQueryInfo & query_info, @@ -137,6 +138,7 @@ public: protected: friend class StorageFileSource; friend class StorageFileSink; + friend class ReadFromFile; private: void setStorageMetadata(CommonArguments args); @@ -199,6 +201,14 @@ public: ContextPtr context_, bool distributed_processing_ = false); + explicit FilesIterator( + const Strings & files_, + std::optional archive_info_, + const ActionsDAG::Node * predicate, + const NamesAndTypesList & virtual_columns, + ContextPtr context_, + bool distributed_processing_ = false); + String next(); bool isReadFromArchive() const @@ -234,9 +244,7 @@ private: StorageFileSource( const ReadFromFormatInfo & info, std::shared_ptr storage_, - const StorageSnapshotPtr & storage_snapshot_, ContextPtr context_, - const SelectQueryInfo & query_info_, UInt64 max_block_size_, FilesIteratorPtr files_iterator_, std::unique_ptr read_buf_, @@ -269,7 +277,6 @@ private: std::optional tryGetNumRowsFromCache(const String & path, time_t last_mod_time) const; std::shared_ptr storage; - StorageSnapshotPtr storage_snapshot; FilesIteratorPtr files_iterator; String current_path; std::optional current_file_size; @@ -290,7 +297,6 @@ private: Block block_for_format; ContextPtr context; /// TODO Untangle potential issues with context lifetime. - SelectQueryInfo query_info; UInt64 max_block_size; bool finished_generate = false; diff --git a/src/Storages/VirtualColumnUtils.cpp b/src/Storages/VirtualColumnUtils.cpp index aed06fb0540..7690e160255 100644 --- a/src/Storages/VirtualColumnUtils.cpp +++ b/src/Storages/VirtualColumnUtils.cpp @@ -390,6 +390,42 @@ static void addPathAndFileToVirtualColumns(Block & block, const String & path, s block.getByName("_idx").column->assumeMutableRef().insert(idx); } +ActionsDAGPtr createPathAndFileFilterDAG(const ActionsDAG::Node * predicate, const NamesAndTypesList & virtual_columns, const String & path_example) +{ + if (!predicate || virtual_columns.empty()) + return {}; + + Block block; + for (const auto & column : virtual_columns) + { + if (column.name == "_file" || column.name == "_path") + block.insert({column.type->createColumn(), column.type, column.name}); + } + /// Create a block with one row to construct filter + /// Append "idx" column as the filter result + block.insert({ColumnUInt64::create(), std::make_shared(), "_idx"}); + addPathAndFileToVirtualColumns(block, path_example, 0); + return splitFilterDagForAllowedInputs(predicate, block); +} + +ColumnPtr getFilterByPathAndFileIndexes(const std::vector & paths, const ActionsDAGPtr & dag, const NamesAndTypesList & virtual_columns, const ContextPtr & context) +{ + Block block; + for (const auto & column : virtual_columns) + { + if (column.name == "_file" || column.name == "_path") + block.insert({column.type->createColumn(), column.type, column.name}); + } + block.insert({ColumnUInt64::create(), std::make_shared(), "_idx"}); + + for (size_t i = 0; i != paths.size(); ++i) + addPathAndFileToVirtualColumns(block, paths[i], i); + + filterBlockWithDAG(dag, block, context); + + return block.getByName("_idx").column; +} + ASTPtr createPathAndFileFilterAst(const ASTPtr & query, const NamesAndTypesList & virtual_columns, const String & path_example, const ContextPtr & context) { if (!query || virtual_columns.empty()) diff --git a/src/Storages/VirtualColumnUtils.h b/src/Storages/VirtualColumnUtils.h index e22b9742888..4f9636b4213 100644 --- a/src/Storages/VirtualColumnUtils.h +++ b/src/Storages/VirtualColumnUtils.h @@ -77,6 +77,25 @@ void filterByPathOrFile(std::vector & sources, const std::vector & pa sources = std::move(filtered_sources); } +ActionsDAGPtr createPathAndFileFilterDAG(const ActionsDAG::Node * predicate, const NamesAndTypesList & virtual_columns, const String & path_example); + +ColumnPtr getFilterByPathAndFileIndexes(const std::vector & paths, const ActionsDAGPtr & dag, const NamesAndTypesList & virtual_columns, const ContextPtr & context); + +template +void filterByPathOrFile(std::vector & sources, const std::vector & paths, const ActionsDAGPtr & dag, const NamesAndTypesList & virtual_columns, const ContextPtr & context) +{ + auto indexes_column = getFilterByPathAndFileIndexes(paths, dag, virtual_columns, context); + const auto & indexes = typeid_cast(*indexes_column).getData(); + if (indexes.size() == sources.size()) + return; + + std::vector filtered_sources; + filtered_sources.reserve(indexes.size()); + for (auto index : indexes) + filtered_sources.emplace_back(std::move(sources[index])); + sources = std::move(filtered_sources); +} + void addRequestedPathFileAndSizeVirtualsToChunk( Chunk & chunk, const NamesAndTypesList & requested_virtual_columns, const String & path, std::optional size, const String * filename = nullptr); }