diff --git a/src/Storages/StorageMerge.cpp b/src/Storages/StorageMerge.cpp index 61f2132cfeb..7df8b8cc6c1 100644 --- a/src/Storages/StorageMerge.cpp +++ b/src/Storages/StorageMerge.cpp @@ -16,7 +16,6 @@ #include #include #include -#include #include #include #include @@ -46,7 +45,6 @@ #include #include -#include namespace { @@ -384,23 +382,23 @@ class ReadFromMerge::RowPolicyData public: RowPolicyData(RowPolicyFilterPtr, std::shared_ptr, ContextPtr); - /// Add columns that needed for row policies to data stream - /// SELECT x from T if T has row policy y=42 - /// required y in data pipeline + /// Add to data stream columns that are needed only for row policies + /// SELECT x from T if T has row policy y=42 + /// required y in data pipeline void extendNames(Names &); /// Use storage facilities to filter data - /// does not guarantee accuracy, but reduce number of rows + /// optimization + /// does not guarantee accuracy, but reduces number of rows void addStorageFilter(SourceStepWithFilter *); - /// Create explicit filter transform to stop + /// Create explicit filter transform to exclude /// rows that are not conform to row level policy void addFilterTransform(QueryPipelineBuilder &); private: - static std::string namesDifference(Names && outer_set, Names && inner_set); RowPolicyFilterPtr row_policy_filter_ptr; - std::string filter_column_name; // complex filer, may contain logic operations + std::string filter_column_name; // complex filter, may contain logic operations ActionsDAGPtr actions_dag; ExpressionActionsPtr filter_actions; }; @@ -704,7 +702,6 @@ QueryPipelineBuilderPtr ReadFromMerge::createSources( row_policy_data->extendNames(real_column_names); } - storage->read(plan, real_column_names, storage_snapshot, @@ -713,17 +710,6 @@ QueryPipelineBuilderPtr ReadFromMerge::createSources( processed_stage, max_block_size, UInt32(streams_num)); - - if (!plan.isInitialized()) - return {}; - - if (row_policy_data) - { - if (auto * source_step_with_filter = dynamic_cast((plan.getRootNode()->step.get()))) - { - row_policy_data->addStorageFilter(source_step_with_filter); - } - } } else { @@ -741,9 +727,17 @@ QueryPipelineBuilderPtr ReadFromMerge::createSources( view->getInMemoryMetadataPtr(), SelectQueryOptions(processed_stage)); interpreter.buildQueryPlan(plan); + } - if (!plan.isInitialized()) - return {}; + if (!plan.isInitialized()) + return {}; + + if (row_policy_data) + { + if (auto * source_step_with_filter = dynamic_cast((plan.getRootNode()->step.get()))) + { + row_policy_data->addStorageFilter(source_step_with_filter); + } } if (auto * read_from_merge_tree = typeid_cast(plan.getRootNode()->step.get())) @@ -846,14 +840,7 @@ QueryPipelineBuilderPtr ReadFromMerge::createSources( /// Subordinary tables could have different but convertible types, like numeric types of different width. /// We must return streams with structure equals to structure of Merge table. - /// Besides this we add FilterTransform if it is needed to follow row level policies. - - convertingSourceStream(header, - storage_snapshot->metadata, - aliases, - modified_context, - *builder, - processed_stage); + convertingSourceStream(header, storage_snapshot->metadata, aliases, modified_context, *builder, processed_stage); } return builder; @@ -878,18 +865,28 @@ ReadFromMerge::RowPolicyData::RowPolicyData(RowPolicyFilterPtr row_policy_filter actions_dag = expression_analyzer.getActionsDAG(false /* add_aliases */, false /* project_result */); filter_actions = std::make_shared(actions_dag, ExpressionActionsSettings::fromContext(local_context, CompileExpressions::yes)); - filter_column_name = namesDifference(filter_actions->getSampleBlock().getNames(), filter_actions->getRequiredColumns()); + const auto & required_columns = filter_actions->getRequiredColumnsWithTypes(); + const auto & sample_block_columns = filter_actions->getSampleBlock().getNamesAndTypesList(); + + NamesAndTypesList added, deleted; + sample_block_columns.getDifference(required_columns, added, deleted); + if (!deleted.empty() || added.size() != 1) + { + throw Exception(ErrorCodes::LOGICAL_ERROR, + "Cannot determine row level filter"); + } + + filter_column_name = added.getNames().front(); } -// Add columns that needed _only_ to evaluate row policies -// SELECT x from t if t has row policy that is based on y void ReadFromMerge::RowPolicyData::extendNames(Names & names) { + std::sort(names.begin(), names.end()); NameSet added_names; for (const auto & req_column : filter_actions->getRequiredColumns()) { - if (std::find(names.begin(), names.end(), req_column) == names.end()) + if (!std::binary_search(names.begin(), names.end(), req_column)) { added_names.insert(req_column); } @@ -926,27 +923,6 @@ void ReadFromMerge::RowPolicyData::addFilterTransform(QueryPipelineBuilder & bui }); } -/// Find out an item that in outer_set vector, but not in inner_set vector -std::string ReadFromMerge::RowPolicyData::namesDifference(Names && outer_set, Names && inner_set) -{ - std::sort(outer_set.begin(), outer_set.end()); - std::sort(inner_set.begin(), inner_set.end()); - - Names result; - - std::set_difference(outer_set.begin(), outer_set.end(), - inner_set.begin(), inner_set.end(), std::inserter(result, result.begin())); - - if (result.size() != 1) - { - throw Exception(ErrorCodes::LOGICAL_ERROR, - "Cannot determine row level filter"); - } - - return result.front(); -} - - StorageMerge::StorageListWithLocks StorageMerge::getSelectedTables( ContextPtr query_context, const ASTPtr & query /* = nullptr */,