merge_row_policy: cleanup

This commit is contained in:
Ilya Golshtein 2023-10-24 13:09:57 +00:00
parent 786183ee24
commit 0d27150948

View File

@ -16,7 +16,6 @@
#include <Interpreters/getHeaderForProcessingStage.h>
#include <Interpreters/addTypeConversionToAST.h>
#include <Interpreters/replaceAliasColumnsInQuery.h>
#include <Interpreters/RequiredSourceColumnsVisitor.h>
#include <Planner/Utils.h>
#include <Analyzer/Utils.h>
#include <Analyzer/ConstantNode.h>
@ -46,7 +45,6 @@
#include <base/range.h>
#include <algorithm>
#include <Common/logger_useful.h>
namespace
{
@ -384,23 +382,23 @@ class ReadFromMerge::RowPolicyData
public:
RowPolicyData(RowPolicyFilterPtr, std::shared_ptr<DB::IStorage>, ContextPtr);
/// Add columns that needed for row policies to data stream
/// Add to data stream columns that are needed only for row policies
/// SELECT x from T if T has row policy y=42
/// required y in data pipeline
void extendNames(Names &);
/// Use storage facilities to filter data
/// does not guarantee accuracy, but reduce number of rows
/// optimization
/// does not guarantee accuracy, but reduces number of rows
void addStorageFilter(SourceStepWithFilter *);
/// Create explicit filter transform to stop
/// Create explicit filter transform to exclude
/// rows that are not conform to row level policy
void addFilterTransform(QueryPipelineBuilder &);
private:
static std::string namesDifference(Names && outer_set, Names && inner_set);
RowPolicyFilterPtr row_policy_filter_ptr;
std::string filter_column_name; // complex filer, may contain logic operations
std::string filter_column_name; // complex filter, may contain logic operations
ActionsDAGPtr actions_dag;
ExpressionActionsPtr filter_actions;
};
@ -704,7 +702,6 @@ QueryPipelineBuilderPtr ReadFromMerge::createSources(
row_policy_data->extendNames(real_column_names);
}
storage->read(plan,
real_column_names,
storage_snapshot,
@ -713,17 +710,6 @@ QueryPipelineBuilderPtr ReadFromMerge::createSources(
processed_stage,
max_block_size,
UInt32(streams_num));
if (!plan.isInitialized())
return {};
if (row_policy_data)
{
if (auto * source_step_with_filter = dynamic_cast<SourceStepWithFilter*>((plan.getRootNode()->step.get())))
{
row_policy_data->addStorageFilter(source_step_with_filter);
}
}
}
else
{
@ -741,9 +727,17 @@ QueryPipelineBuilderPtr ReadFromMerge::createSources(
view->getInMemoryMetadataPtr(),
SelectQueryOptions(processed_stage));
interpreter.buildQueryPlan(plan);
}
if (!plan.isInitialized())
return {};
if (row_policy_data)
{
if (auto * source_step_with_filter = dynamic_cast<SourceStepWithFilter*>((plan.getRootNode()->step.get())))
{
row_policy_data->addStorageFilter(source_step_with_filter);
}
}
if (auto * read_from_merge_tree = typeid_cast<ReadFromMergeTree *>(plan.getRootNode()->step.get()))
@ -846,14 +840,7 @@ QueryPipelineBuilderPtr ReadFromMerge::createSources(
/// Subordinary tables could have different but convertible types, like numeric types of different width.
/// We must return streams with structure equals to structure of Merge table.
/// Besides this we add FilterTransform if it is needed to follow row level policies.
convertingSourceStream(header,
storage_snapshot->metadata,
aliases,
modified_context,
*builder,
processed_stage);
convertingSourceStream(header, storage_snapshot->metadata, aliases, modified_context, *builder, processed_stage);
}
return builder;
@ -878,18 +865,28 @@ ReadFromMerge::RowPolicyData::RowPolicyData(RowPolicyFilterPtr row_policy_filter
actions_dag = expression_analyzer.getActionsDAG(false /* add_aliases */, false /* project_result */);
filter_actions = std::make_shared<ExpressionActions>(actions_dag,
ExpressionActionsSettings::fromContext(local_context, CompileExpressions::yes));
filter_column_name = namesDifference(filter_actions->getSampleBlock().getNames(), filter_actions->getRequiredColumns());
const auto & required_columns = filter_actions->getRequiredColumnsWithTypes();
const auto & sample_block_columns = filter_actions->getSampleBlock().getNamesAndTypesList();
NamesAndTypesList added, deleted;
sample_block_columns.getDifference(required_columns, added, deleted);
if (!deleted.empty() || added.size() != 1)
{
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Cannot determine row level filter");
}
filter_column_name = added.getNames().front();
}
// Add columns that needed _only_ to evaluate row policies
// SELECT x from t if t has row policy that is based on y
void ReadFromMerge::RowPolicyData::extendNames(Names & names)
{
std::sort(names.begin(), names.end());
NameSet added_names;
for (const auto & req_column : filter_actions->getRequiredColumns())
{
if (std::find(names.begin(), names.end(), req_column) == names.end())
if (!std::binary_search(names.begin(), names.end(), req_column))
{
added_names.insert(req_column);
}
@ -926,27 +923,6 @@ void ReadFromMerge::RowPolicyData::addFilterTransform(QueryPipelineBuilder & bui
});
}
/// Find out an item that in outer_set vector, but not in inner_set vector
std::string ReadFromMerge::RowPolicyData::namesDifference(Names && outer_set, Names && inner_set)
{
std::sort(outer_set.begin(), outer_set.end());
std::sort(inner_set.begin(), inner_set.end());
Names result;
std::set_difference(outer_set.begin(), outer_set.end(),
inner_set.begin(), inner_set.end(), std::inserter(result, result.begin()));
if (result.size() != 1)
{
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Cannot determine row level filter");
}
return result.front();
}
StorageMerge::StorageListWithLocks StorageMerge::getSelectedTables(
ContextPtr query_context,
const ASTPtr & query /* = nullptr */,