Merge pull request #58255 from ClickHouse/filter-virtual-columns-storage-merge

Refactor StorageMerge virtual columns filtering.
This commit is contained in:
Nikolai Kochetov 2024-01-03 16:09:49 +01:00 committed by GitHub
commit 7834519212
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 206 additions and 174 deletions

View File

@ -115,6 +115,7 @@ public:
explicit ActionsDAG(const ColumnsWithTypeAndName & inputs_);
const Nodes & getNodes() const { return nodes; }
static Nodes detachNodes(ActionsDAG && dag) { return std::move(dag.nodes); }
const NodeRawConstPtrs & getOutputs() const { return outputs; }
/** Output nodes can contain any column returned from DAG.
* You may manually change it if needed.

View File

@ -80,7 +80,6 @@ namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int NOT_IMPLEMENTED;
extern const int ILLEGAL_PREWHERE;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int SAMPLING_NOT_SUPPORTED;
extern const int ALTER_OF_COLUMN_IS_FORBIDDEN;
@ -88,6 +87,20 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
StorageMerge::DatabaseNameOrRegexp::DatabaseNameOrRegexp(
const String & source_database_name_or_regexp_,
bool database_is_regexp_,
std::optional<OptimizedRegularExpression> source_database_regexp_,
std::optional<OptimizedRegularExpression> source_table_regexp_,
std::optional<DBToTableSetMap> source_databases_and_tables_)
: source_database_name_or_regexp(source_database_name_or_regexp_)
, database_is_regexp(database_is_regexp_)
, source_database_regexp(std::move(source_database_regexp_))
, source_table_regexp(std::move(source_table_regexp_))
, source_databases_and_tables(std::move(source_databases_and_tables_))
{
}
StorageMerge::StorageMerge(
const StorageID & table_id_,
const ColumnsDescription & columns_,
@ -98,10 +111,11 @@ StorageMerge::StorageMerge(
ContextPtr context_)
: IStorage(table_id_)
, WithContext(context_->getGlobalContext())
, source_database_regexp(source_database_name_or_regexp_)
, source_databases_and_tables(source_databases_and_tables_)
, source_database_name_or_regexp(source_database_name_or_regexp_)
, database_is_regexp(database_is_regexp_)
, database_name_or_regexp(
source_database_name_or_regexp_,
database_is_regexp_,
source_database_name_or_regexp_, {},
source_databases_and_tables_)
{
StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(columns_.empty() ? getColumnsDescriptionFromSourceTables() : columns_);
@ -119,10 +133,11 @@ StorageMerge::StorageMerge(
ContextPtr context_)
: IStorage(table_id_)
, WithContext(context_->getGlobalContext())
, source_database_regexp(source_database_name_or_regexp_)
, source_table_regexp(source_table_regexp_)
, source_database_name_or_regexp(source_database_name_or_regexp_)
, database_is_regexp(database_is_regexp_)
, database_name_or_regexp(
source_database_name_or_regexp_,
database_is_regexp_,
source_database_name_or_regexp_,
source_table_regexp_, {})
{
StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(columns_.empty() ? getColumnsDescriptionFromSourceTables() : columns_);
@ -130,6 +145,11 @@ StorageMerge::StorageMerge(
setInMemoryMetadata(storage_metadata);
}
StorageMerge::DatabaseTablesIterators StorageMerge::getDatabaseIterators(ContextPtr context_) const
{
return database_name_or_regexp.getDatabaseIterators(context_);
}
ColumnsDescription StorageMerge::getColumnsDescriptionFromSourceTables() const
{
auto table = getFirstTable([](auto && t) { return t; });
@ -141,7 +161,7 @@ ColumnsDescription StorageMerge::getColumnsDescriptionFromSourceTables() const
template <typename F>
StoragePtr StorageMerge::getFirstTable(F && predicate) const
{
auto database_table_iterators = getDatabaseIterators(getContext());
auto database_table_iterators = database_name_or_regexp.getDatabaseIterators(getContext());
for (auto & iterator : database_table_iterators)
{
@ -236,7 +256,6 @@ std::optional<NameSet> StorageMerge::supportedPrewhereColumns() const
return supported_columns;
}
QueryProcessingStage::Enum StorageMerge::getQueryProcessingStage(
ContextPtr local_context,
QueryProcessingStage::Enum to_stage,
@ -255,7 +274,7 @@ QueryProcessingStage::Enum StorageMerge::getQueryProcessingStage(
auto stage_in_source_tables = QueryProcessingStage::FetchColumns;
DatabaseTablesIterators database_table_iterators = getDatabaseIterators(local_context);
DatabaseTablesIterators database_table_iterators = database_name_or_regexp.getDatabaseIterators(local_context);
size_t selected_table_size = 0;
@ -297,45 +316,6 @@ void StorageMerge::read(
*/
auto modified_context = Context::createCopy(local_context);
modified_context->setSetting("optimize_move_to_prewhere", false);
bool has_database_virtual_column = false;
bool has_table_virtual_column = false;
Names real_column_names;
real_column_names.reserve(column_names.size());
for (const auto & column_name : column_names)
{
if (column_name == "_database" && isVirtualColumn(column_name, storage_snapshot->metadata))
has_database_virtual_column = true;
else if (column_name == "_table" && isVirtualColumn(column_name, storage_snapshot->metadata))
has_table_virtual_column = true;
else
real_column_names.push_back(column_name);
}
StorageListWithLocks selected_tables
= getSelectedTables(modified_context, query_info.query, has_database_virtual_column, has_table_virtual_column);
InputOrderInfoPtr input_sorting_info;
if (query_info.order_optimizer)
{
for (auto it = selected_tables.begin(); it != selected_tables.end(); ++it)
{
auto storage_ptr = std::get<1>(*it);
auto storage_metadata_snapshot = storage_ptr->getInMemoryMetadataPtr();
auto current_info = query_info.order_optimizer->getInputOrder(storage_metadata_snapshot, modified_context);
if (it == selected_tables.begin())
input_sorting_info = current_info;
else if (!current_info || (input_sorting_info && *current_info != *input_sorting_info))
input_sorting_info.reset();
if (!input_sorting_info)
break;
}
query_info.input_order_info = input_sorting_info;
}
query_plan.addInterpreterContext(modified_context);
/// What will be result structure depending on query processed stage in source tables?
@ -343,10 +323,7 @@ void StorageMerge::read(
auto step = std::make_unique<ReadFromMerge>(
common_header,
std::move(selected_tables),
real_column_names,
has_database_virtual_column,
has_table_virtual_column,
column_names,
max_block_size,
num_streams,
shared_from_this(),
@ -358,43 +335,9 @@ void StorageMerge::read(
query_plan.addStep(std::move(step));
}
/// An object of this helper class is created
/// when processing a Merge table data source (subordinary table)
/// that has row policies
/// to guarantee that these row policies are applied
class ReadFromMerge::RowPolicyData
{
public:
RowPolicyData(RowPolicyFilterPtr, std::shared_ptr<DB::IStorage>, ContextPtr);
/// Add to data stream columns that are needed only for row policies
/// SELECT x from T if T has row policy y=42
/// required y in data pipeline
void extendNames(Names &) const;
/// Use storage facilities to filter data
/// optimization
/// does not guarantee accuracy, but reduces number of rows
void addStorageFilter(SourceStepWithFilter *) const;
/// Create explicit filter transform to exclude
/// rows that are not conform to row level policy
void addFilterTransform(QueryPipelineBuilder &) const;
private:
std::string filter_column_name; // complex filter, may contain logic operations
ActionsDAGPtr actions_dag;
ExpressionActionsPtr filter_actions;
StorageMetadataPtr storage_metadata_snapshot;
};
ReadFromMerge::ReadFromMerge(
Block common_header_,
StorageListWithLocks selected_tables_,
Names column_names_,
bool has_database_virtual_column_,
bool has_table_virtual_column_,
Names all_column_names_,
size_t max_block_size,
size_t num_streams,
StoragePtr storage,
@ -406,21 +349,19 @@ ReadFromMerge::ReadFromMerge(
, required_max_block_size(max_block_size)
, requested_num_streams(num_streams)
, common_header(std::move(common_header_))
, selected_tables(std::move(selected_tables_))
, column_names(std::move(column_names_))
, has_database_virtual_column(has_database_virtual_column_)
, has_table_virtual_column(has_table_virtual_column_)
, all_column_names(std::move(all_column_names_))
, storage_merge(std::move(storage))
, merge_storage_snapshot(std::move(storage_snapshot))
, query_info(query_info_)
, context(std::move(context_))
, common_processed_stage(processed_stage)
{
createChildPlans();
}
void ReadFromMerge::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{
filterTablesAndCreateChildrenPlans();
if (selected_tables.empty())
{
pipeline.init(Pipe(std::make_shared<NullSource>(output_stream->header)));
@ -430,13 +371,10 @@ void ReadFromMerge::initializePipeline(QueryPipelineBuilder & pipeline, const Bu
QueryPlanResourceHolder resources;
std::vector<std::unique_ptr<QueryPipelineBuilder>> pipelines;
chassert(selected_tables.size() == child_plans.size());
chassert(selected_tables.size() == table_aliases.size());
chassert(selected_tables.size() == table_row_policy_data_opts.size());
auto table_it = selected_tables.begin();
for (size_t i = 0; i < selected_tables.size(); ++i, ++table_it)
{
auto & plan = child_plans.at(i);
auto & child_plan = child_plans->at(i);
const auto & table = *table_it;
const auto storage = std::get<1>(table);
@ -446,13 +384,13 @@ void ReadFromMerge::initializePipeline(QueryPipelineBuilder & pipeline, const Bu
auto modified_query_info = getModifiedQueryInfo(query_info, context, table, nested_storage_snaphsot);
auto source_pipeline = createSources(
plan,
child_plan.plan,
nested_storage_snaphsot,
modified_query_info,
common_processed_stage,
common_header,
table_aliases.at(i),
table_row_policy_data_opts.at(i),
child_plan.table_aliases,
child_plan.row_policy_data_opt,
table,
context);
@ -490,10 +428,37 @@ void ReadFromMerge::initializePipeline(QueryPipelineBuilder & pipeline, const Bu
pipeline.addResources(std::move(resources));
}
void ReadFromMerge::createChildPlans()
void ReadFromMerge::filterTablesAndCreateChildrenPlans()
{
if (child_plans)
return;
has_database_virtual_column = false;
has_table_virtual_column = false;
column_names.clear();
column_names.reserve(column_names.size());
for (const auto & column_name : all_column_names)
{
if (column_name == "_database" && storage_merge->isVirtualColumn(column_name, merge_storage_snapshot->metadata))
has_database_virtual_column = true;
else if (column_name == "_table" && storage_merge->isVirtualColumn(column_name, merge_storage_snapshot->metadata))
has_table_virtual_column = true;
else
column_names.push_back(column_name);
}
selected_tables = getSelectedTables(context, has_database_virtual_column, has_table_virtual_column);
child_plans = createChildrenPlans(query_info);
}
std::vector<ReadFromMerge::ChildPlan> ReadFromMerge::createChildrenPlans(SelectQueryInfo & query_info_) const
{
if (selected_tables.empty())
return;
return {};
std::vector<ChildPlan> res;
size_t tables_count = selected_tables.size();
Float64 num_streams_multiplier
@ -503,7 +468,7 @@ void ReadFromMerge::createChildPlans()
if (order_info)
{
query_info.input_order_info = order_info;
query_info_.input_order_info = order_info;
}
else if (query_info.order_optimizer)
{
@ -522,7 +487,7 @@ void ReadFromMerge::createChildPlans()
break;
}
query_info.input_order_info = input_sorting_info;
query_info_.input_order_info = input_sorting_info;
}
for (const auto & table : selected_tables)
@ -542,8 +507,10 @@ void ReadFromMerge::createChildPlans()
if (sampling_requested && !storage->supportsSampling())
throw Exception(ErrorCodes::SAMPLING_NOT_SUPPORTED, "Illegal SAMPLE: table {} doesn't support sampling", storage->getStorageID().getNameForLogs());
auto & aliases = table_aliases.emplace_back();
auto & row_policy_data_opt = table_row_policy_data_opts.emplace_back();
res.emplace_back();
auto & aliases = res.back().table_aliases;
auto & row_policy_data_opt = res.back().row_policy_data_opt;
auto storage_metadata_snapshot = storage->getInMemoryMetadataPtr();
auto nested_storage_snaphsot = storage->getStorageSnapshot(storage_metadata_snapshot, context);
@ -616,7 +583,7 @@ void ReadFromMerge::createChildPlans()
}
}
child_plans.emplace_back(createPlanForTable(
res.back().plan = createPlanForTable(
nested_storage_snaphsot,
modified_query_info,
common_processed_stage,
@ -625,8 +592,10 @@ void ReadFromMerge::createChildPlans()
column_names_as_aliases.empty() ? std::move(real_column_names) : std::move(column_names_as_aliases),
row_policy_data_opt,
context,
current_streams));
current_streams);
}
return res;
}
SelectQueryInfo ReadFromMerge::getModifiedQueryInfo(const SelectQueryInfo & query_info,
@ -804,7 +773,7 @@ QueryPlan ReadFromMerge::createPlanForTable(
Names && real_column_names,
const RowPolicyDataOpt & row_policy_data_opt,
ContextMutablePtr modified_context,
size_t streams_num)
size_t streams_num) const
{
const auto & [database_name, storage, _, table_name] = storage_with_lock;
@ -967,21 +936,14 @@ void ReadFromMerge::RowPolicyData::addFilterTransform(QueryPipelineBuilder & bui
});
}
StorageMerge::StorageListWithLocks StorageMerge::getSelectedTables(
StorageMerge::StorageListWithLocks ReadFromMerge::getSelectedTables(
ContextPtr query_context,
const ASTPtr & query /* = nullptr */,
bool filter_by_database_virtual_column /* = false */,
bool filter_by_table_virtual_column /* = false */) const
bool filter_by_database_virtual_column,
bool filter_by_table_virtual_column) const
{
/// FIXME: filtering does not work with allow_experimental_analyzer due to
/// different column names there (it has "table_name._table" not just
/// "_table")
assert(!filter_by_database_virtual_column || !filter_by_table_virtual_column || query);
const Settings & settings = query_context->getSettingsRef();
StorageListWithLocks selected_tables;
DatabaseTablesIterators database_table_iterators = getDatabaseIterators(getContext());
StorageListWithLocks res;
DatabaseTablesIterators database_table_iterators = assert_cast<StorageMerge &>(*storage_merge).getDatabaseIterators(query_context);
MutableColumnPtr database_name_virtual_column;
MutableColumnPtr table_name_virtual_column;
@ -1005,13 +967,10 @@ StorageMerge::StorageListWithLocks StorageMerge::getSelectedTables(
if (!storage)
continue;
if (query && query->as<ASTSelectQuery>()->prewhere() && !storage->supportsPrewhere())
throw Exception(ErrorCodes::ILLEGAL_PREWHERE, "Storage {} doesn't support PREWHERE.", storage->getName());
if (storage.get() != this)
if (storage.get() != storage_merge.get())
{
auto table_lock = storage->lockForShare(query_context->getCurrentQueryId(), settings.lock_acquire_timeout);
selected_tables.emplace_back(iterator->databaseName(), storage, std::move(table_lock), iterator->name());
res.emplace_back(iterator->databaseName(), storage, std::move(table_lock), iterator->name());
if (filter_by_table_virtual_column)
table_name_virtual_column->insert(iterator->name());
}
@ -1020,33 +979,42 @@ StorageMerge::StorageListWithLocks StorageMerge::getSelectedTables(
}
}
if (!filter_by_database_virtual_column && !filter_by_table_virtual_column)
return res;
auto filter_actions_dag = ActionsDAG::buildFilterActionsDAG(filter_nodes.nodes, {}, context);
if (!filter_actions_dag)
return res;
const auto * predicate = filter_actions_dag->getOutputs().at(0);
if (filter_by_database_virtual_column)
{
/// Filter names of selected tables if there is a condition on "_database" virtual column in WHERE clause
Block virtual_columns_block
= Block{ColumnWithTypeAndName(std::move(database_name_virtual_column), std::make_shared<DataTypeString>(), "_database")};
VirtualColumnUtils::filterBlockWithQuery(query, virtual_columns_block, query_context);
VirtualColumnUtils::filterBlockWithPredicate(predicate, virtual_columns_block, query_context);
auto values = VirtualColumnUtils::extractSingleValueFromBlock<String>(virtual_columns_block, "_database");
/// Remove unused databases from the list
selected_tables.remove_if([&](const auto & elem) { return values.find(std::get<0>(elem)) == values.end(); });
res.remove_if([&](const auto & elem) { return values.find(std::get<0>(elem)) == values.end(); });
}
if (filter_by_table_virtual_column)
{
/// Filter names of selected tables if there is a condition on "_table" virtual column in WHERE clause
Block virtual_columns_block = Block{ColumnWithTypeAndName(std::move(table_name_virtual_column), std::make_shared<DataTypeString>(), "_table")};
VirtualColumnUtils::filterBlockWithQuery(query, virtual_columns_block, query_context);
VirtualColumnUtils::filterBlockWithPredicate(predicate, virtual_columns_block, query_context);
auto values = VirtualColumnUtils::extractSingleValueFromBlock<String>(virtual_columns_block, "_table");
/// Remove unused tables from the list
selected_tables.remove_if([&](const auto & elem) { return values.find(std::get<3>(elem)) == values.end(); });
res.remove_if([&](const auto & elem) { return values.find(std::get<3>(elem)) == values.end(); });
}
return selected_tables;
return res;
}
DatabaseTablesIteratorPtr StorageMerge::getDatabaseIterator(const String & database_name, ContextPtr local_context) const
DatabaseTablesIteratorPtr StorageMerge::DatabaseNameOrRegexp::getDatabaseIterator(const String & database_name, ContextPtr local_context) const
{
auto database = DatabaseCatalog::instance().getDatabase(database_name);
@ -1066,7 +1034,7 @@ DatabaseTablesIteratorPtr StorageMerge::getDatabaseIterator(const String & datab
return database->getTablesIterator(local_context, table_name_match);
}
StorageMerge::DatabaseTablesIterators StorageMerge::getDatabaseIterators(ContextPtr local_context) const
StorageMerge::DatabaseTablesIterators StorageMerge::DatabaseNameOrRegexp::getDatabaseIterators(ContextPtr local_context) const
{
try
{
@ -1191,8 +1159,16 @@ void ReadFromMerge::convertAndFilterSourceStream(
});
}
const ReadFromMerge::StorageListWithLocks & ReadFromMerge::getSelectedTables()
{
filterTablesAndCreateChildrenPlans();
return selected_tables;
}
bool ReadFromMerge::requestReadingInOrder(InputOrderInfoPtr order_info_)
{
filterTablesAndCreateChildrenPlans();
/// Disable read-in-order optimization for reverse order with final.
/// Otherwise, it can lead to incorrect final behavior because the implementation may rely on the reading in direct order).
if (order_info_->direction != 1 && InterpreterSelectQuery::isQueryWithFinal(query_info))
@ -1205,9 +1181,9 @@ bool ReadFromMerge::requestReadingInOrder(InputOrderInfoPtr order_info_)
};
bool ok = true;
for (const auto & plan : child_plans)
if (plan.isInitialized())
ok &= recursivelyApplyToReadingSteps(plan.getRootNode(), request_read_in_order);
for (const auto & child_plan : *child_plans)
if (child_plan.plan.isInitialized())
ok &= recursivelyApplyToReadingSteps(child_plan.plan.getRootNode(), request_read_in_order);
if (!ok)
return false;
@ -1234,9 +1210,11 @@ void ReadFromMerge::applyFilters(const QueryPlan & plan) const
void ReadFromMerge::applyFilters()
{
for (const auto & plan : child_plans)
if (plan.isInitialized())
applyFilters(plan);
filterTablesAndCreateChildrenPlans();
for (const auto & child_plan : *child_plans)
if (child_plan.plan.isInitialized())
applyFilters(child_plan.plan);
}
IStorage::ColumnSizeByName StorageMerge::getColumnSizes() const

View File

@ -12,6 +12,9 @@ namespace DB
struct QueryPlanResourceHolder;
struct RowPolicyFilter;
using RowPolicyFilterPtr = std::shared_ptr<const RowPolicyFilter>;
/** A table that represents the union of an arbitrary number of other tables.
* All tables must have the same structure.
*/
@ -78,24 +81,36 @@ public:
std::optional<UInt64> totalRows(const Settings & settings) const override;
std::optional<UInt64> totalBytes(const Settings & settings) const override;
using DatabaseTablesIterators = std::vector<DatabaseTablesIteratorPtr>;
DatabaseTablesIterators getDatabaseIterators(ContextPtr context) const;
private:
/// (Database, Table, Lock, TableName)
using StorageWithLockAndName = std::tuple<String, StoragePtr, TableLockHolder, String>;
using StorageListWithLocks = std::list<StorageWithLockAndName>;
struct DatabaseNameOrRegexp
{
String source_database_name_or_regexp;
bool database_is_regexp = false;
std::optional<OptimizedRegularExpression> source_database_regexp;
std::optional<OptimizedRegularExpression> source_table_regexp;
std::optional<DBToTableSetMap> source_databases_and_tables;
String source_database_name_or_regexp;
bool database_is_regexp = false;
DatabaseNameOrRegexp(
const String & source_database_name_or_regexp_,
bool database_is_regexp_,
std::optional<OptimizedRegularExpression> source_database_regexp_,
std::optional<OptimizedRegularExpression> source_table_regexp_,
std::optional<DBToTableSetMap> source_databases_and_tables_);
/// (Database, Table, Lock, TableName)
using StorageWithLockAndName = std::tuple<String, StoragePtr, TableLockHolder, String>;
using StorageListWithLocks = std::list<StorageWithLockAndName>;
using DatabaseTablesIterators = std::vector<DatabaseTablesIteratorPtr>;
DatabaseTablesIteratorPtr getDatabaseIterator(const String & database_name, ContextPtr context) const;
StorageMerge::StorageListWithLocks getSelectedTables(
ContextPtr query_context,
const ASTPtr & query = nullptr,
bool filter_by_database_virtual_column = false,
bool filter_by_table_virtual_column = false) const;
DatabaseTablesIterators getDatabaseIterators(ContextPtr context) const;
};
DatabaseNameOrRegexp database_name_or_regexp;
template <typename F>
StoragePtr getFirstTable(F && predicate) const;
@ -103,10 +118,6 @@ private:
template <typename F>
void forEachTable(F && func) const;
DatabaseTablesIteratorPtr getDatabaseIterator(const String & database_name, ContextPtr context) const;
DatabaseTablesIterators getDatabaseIterators(ContextPtr context) const;
NamesAndTypesList getVirtuals() const override;
ColumnSizeByName getColumnSizes() const override;
@ -132,10 +143,7 @@ public:
ReadFromMerge(
Block common_header_,
StorageListWithLocks selected_tables_,
Names column_names_,
bool has_database_virtual_column_,
bool has_table_virtual_column_,
Names all_column_names_,
size_t max_block_size,
size_t num_streams,
StoragePtr storage,
@ -146,7 +154,7 @@ public:
void initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override;
const StorageListWithLocks & getSelectedTables() const { return selected_tables; }
const StorageListWithLocks & getSelectedTables();
/// Returns `false` if requested reading cannot be performed.
bool requestReadingInOrder(InputOrderInfoPtr order_info_);
@ -159,16 +167,13 @@ private:
const Block common_header;
StorageListWithLocks selected_tables;
Names all_column_names;
Names column_names;
bool has_database_virtual_column;
bool has_table_virtual_column;
StoragePtr storage_merge;
StorageSnapshotPtr merge_storage_snapshot;
/// Store read plan for each child table.
/// It's needed to guarantee lifetime for child steps to be the same as for this step (mainly for EXPLAIN PIPELINE).
std::vector<QueryPlan> child_plans;
SelectQueryInfo query_info;
ContextMutablePtr context;
QueryProcessingStage::Enum common_processed_stage;
@ -184,14 +189,52 @@ private:
using Aliases = std::vector<AliasData>;
class RowPolicyData;
/// An object of this helper class is created
/// when processing a Merge table data source (subordinary table)
/// that has row policies
/// to guarantee that these row policies are applied
class RowPolicyData
{
public:
RowPolicyData(RowPolicyFilterPtr, std::shared_ptr<DB::IStorage>, ContextPtr);
/// Add to data stream columns that are needed only for row policies
/// SELECT x from T if T has row policy y=42
/// required y in data pipeline
void extendNames(Names &) const;
/// Use storage facilities to filter data
/// optimization
/// does not guarantee accuracy, but reduces number of rows
void addStorageFilter(SourceStepWithFilter *) const;
/// Create explicit filter transform to exclude
/// rows that are not conform to row level policy
void addFilterTransform(QueryPipelineBuilder &) const;
private:
std::string filter_column_name; // complex filter, may contain logic operations
ActionsDAGPtr actions_dag;
ExpressionActionsPtr filter_actions;
StorageMetadataPtr storage_metadata_snapshot;
};
using RowPolicyDataOpt = std::optional<RowPolicyData>;
std::vector<Aliases> table_aliases;
struct ChildPlan
{
QueryPlan plan;
Aliases table_aliases;
RowPolicyDataOpt row_policy_data_opt;
};
std::vector<RowPolicyDataOpt> table_row_policy_data_opts;
/// Store read plan for each child table.
/// It's needed to guarantee lifetime for child steps to be the same as for this step (mainly for EXPLAIN PIPELINE).
std::optional<std::vector<ChildPlan>> child_plans;
void createChildPlans();
std::vector<ChildPlan> createChildrenPlans(SelectQueryInfo & query_info_) const;
void filterTablesAndCreateChildrenPlans();
void applyFilters(const QueryPlan & plan) const;
@ -204,7 +247,7 @@ private:
Names && real_column_names,
const RowPolicyDataOpt & row_policy_data_opt,
ContextMutablePtr modified_context,
size_t streams_num);
size_t streams_num) const;
QueryPipelineBuilderPtr createSources(
QueryPlan & plan,
@ -231,6 +274,11 @@ private:
ContextPtr context,
QueryPipelineBuilder & builder,
QueryProcessingStage::Enum processed_stage);
StorageMerge::StorageListWithLocks getSelectedTables(
ContextPtr query_context,
bool filter_by_database_virtual_column,
bool filter_by_table_virtual_column) const;
};
}

View File

@ -502,9 +502,12 @@ static const ActionsDAG::Node * splitFilterNodeForAllowedInputs(
const ActionsDAG::Node * res = node_copy.children.front();
/// Expression like (not_allowed AND 256) can't be resuced to (and(256)) because AND requires
/// at least two arguments; also it can't be reduced to (256) because result type is different.
/// TODO: add CAST here
if (!res->result_type->equals(*node->result_type))
return nullptr;
{
ActionsDAG tmp_dag;
res = &tmp_dag.addCast(*res, node->result_type, {});
additional_nodes.splice(additional_nodes.end(), ActionsDAG::detachNodes(std::move(tmp_dag)));
}
return res;
}

View File

@ -13,6 +13,8 @@ CREATE TABLE numbers5 ENGINE = MergeTree ORDER BY number AS SELECT number FROM n
SELECT count() FROM merge(currentDatabase(), '^numbers\\d+$');
SELECT DISTINCT count() FROM merge(currentDatabase(), '^numbers\\d+$') GROUP BY number;
SET optimize_aggregation_in_order = 0; -- FIXME : in order may happen before filter push down
SET max_rows_to_read = 1000;
SET max_threads = 'auto';