merge_row_policy: cleanup, remove some debug output

This commit is contained in:
Ilya Golshtein 2023-06-14 21:31:17 +00:00
parent 978a535849
commit da5f607242
3 changed files with 43 additions and 202 deletions

View File

@ -93,14 +93,12 @@
#include <Common/typeid_cast.h>
#include <Common/ProfileEvents.h>
#include <Common/logger_useful.h>
namespace ProfileEvents
{
extern const Event SelectQueriesWithSubqueries;
extern const Event QueriesWithSubqueries;
}
#pragma GCC diagnostic ignored "-Wold-style-cast"
namespace DB
{
@ -134,9 +132,6 @@ FilterDAGInfoPtr generateFilterActions(
Names & prerequisite_columns,
PreparedSetsPtr prepared_sets)
{
LOG_TRACE(&Poco::Logger::get("generateFilterActions"), "top of");
auto filter_info = std::make_shared<FilterDAGInfo>();
const auto & db_name = table_id.getDatabaseName();
@ -551,15 +546,7 @@ InterpreterSelectQuery::InterpreterSelectQuery(
std::shared_ptr<TableJoin> table_join = joined_tables.makeTableJoin(query);
if (storage)
{
LOG_TRACE(&Poco::Logger::get("InterpretSelectQuery ctor"), " {}, table name: {}, calling getRowPolicyFilter", (void*)this, table_id.getTableName());
row_policy_filter = context->getRowPolicyFilter(table_id.getDatabaseName(), table_id.getTableName(), RowPolicyFilterType::SELECT_FILTER);
}
else
{
LOG_TRACE(&Poco::Logger::get("InterpretSelectQuery ctor"), " {}, no storage", (void*)this);
}
StorageView * view = nullptr;
if (storage)
@ -863,8 +850,6 @@ InterpreterSelectQuery::InterpreterSelectQuery(
/// Add prewhere actions with alias columns and record needed columns from storage.
if (storage)
{
LOG_TRACE(log, "calling addPrewhereAliasActions");
addPrewhereAliasActions();
analysis_result.required_columns = required_columns;
}
@ -960,8 +945,6 @@ Block InterpreterSelectQuery::getSampleBlockImpl()
analysis_result = ExpressionAnalysisResult(
*query_analyzer, metadata_snapshot, first_stage, second_stage, options.only_analyze, filter_info, additional_filter_info, source_header);
LOG_TRACE(log, "getSampleBlockImpl {} : source_header after ExpressionAnalysisResult {}", (void*) this, source_header.dumpStructure());
if (options.to_stage == QueryProcessingStage::Enum::FetchColumns)
{
@ -971,12 +954,8 @@ Block InterpreterSelectQuery::getSampleBlockImpl()
{
header = analysis_result.prewhere_info->prewhere_actions->updateHeader(header);
if (analysis_result.prewhere_info->remove_prewhere_column)
{
LOG_TRACE(log, "getSampleBlockImpl {} : erasing column {}", (void*) this, analysis_result.prewhere_info->prewhere_column_name);
header.erase(analysis_result.prewhere_info->prewhere_column_name);
}
}
LOG_TRACE(log, "getSampleBlockImpl {} : returning header", (void*) this);
return header;
}
@ -1462,7 +1441,7 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P
/// Read the data from Storage. from_stage - to what stage the request was completed in Storage.
executeFetchColumns(from_stage, query_plan);
LOG_TRACE(log, "executeImpl {}, {} -> {}", (void*) this, QueryProcessingStage::toString(from_stage), QueryProcessingStage::toString(options.to_stage));
LOG_TRACE(log, "{} -> {}", QueryProcessingStage::toString(from_stage), QueryProcessingStage::toString(options.to_stage));
}
if (query_info.projection && query_info.projection->input_order_info && query_info.input_order_info)
@ -1528,16 +1507,12 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P
// Thus, we don't actually need to check if projection is active.
if (!query_info.projection && expressions.filter_info)
{
LOG_TRACE(log, "executeImpl, adding Row-level security filter; column_name {}, block {}",
expressions.filter_info->column_name, query_plan.getCurrentDataStream().header.dumpStructure());
auto row_level_security_step = std::make_unique<FilterStep>(
query_plan.getCurrentDataStream(),
expressions.filter_info->actions,
expressions.filter_info->column_name,
expressions.filter_info->do_remove_column);
row_level_security_step->setStepDescription("Row-level security filter");
query_plan.addStep(std::move(row_level_security_step));
}
@ -2075,16 +2050,11 @@ void InterpreterSelectQuery::addPrewhereAliasActions()
auto & expressions = analysis_result;
if (expressions.filter_info)
{
LOG_TRACE(&Poco::Logger::get("addPrewhereAliasActions"), " {}, expressions.filter_info", (void*)this);
if (!expressions.prewhere_info)
{
const bool does_storage_support_prewhere = !input_pipe && storage && storage->supportsPrewhere();
LOG_TRACE(&Poco::Logger::get("addPrewhereAliasActions"), " {}, expressions.filter_info 1 - does_storage_support_prewhere {} shouldMoveToPrewhere() {}",
(void*)this, does_storage_support_prewhere, shouldMoveToPrewhere());
if (does_storage_support_prewhere && shouldMoveToPrewhere())
{
LOG_TRACE(&Poco::Logger::get("addPrewhereAliasActions"), " {}, expressions.filter_info 1.5", (void*)this);
/// Execute row level filter in prewhere as a part of "move to prewhere" optimization.
expressions.prewhere_info = std::make_shared<PrewhereInfo>(
std::move(expressions.filter_info->actions),
@ -2097,21 +2067,12 @@ void InterpreterSelectQuery::addPrewhereAliasActions()
}
else
{
LOG_TRACE(&Poco::Logger::get("addPrewhereAliasActions"), " {}, expressions.filter_info 2", (void*)this);
/// Add row level security actions to prewhere.
expressions.prewhere_info->row_level_filter = std::move(expressions.filter_info->actions);
expressions.prewhere_info->row_level_column_name = std::move(expressions.filter_info->column_name);
expressions.prewhere_info->row_level_filter->projectInput(false);
expressions.filter_info = nullptr;
}
if (expressions.prewhere_info)
{
LOG_TRACE(&Poco::Logger::get("addPrewhereAliasActions"), " {} dump: {}", (void*)this, expressions.prewhere_info->dump());
}
else
{
LOG_TRACE(&Poco::Logger::get("addPrewhereAliasActions"), " no prewhere_info");
}
}
auto & prewhere_info = analysis_result.prewhere_info;

View File

@ -312,7 +312,7 @@ public:
const auto & primary_key = storage_snapshot->metadata->getPrimaryKey();
const Names & primary_key_column_names = primary_key.column_names;
KeyCondition key_condition(filter, context, primary_key_column_names, primary_key.expression, NameSet{});
LOG_DEBUG(log, "ReadFromPart (MergeTreeSequentialSource) Key condition: {}", key_condition.toString());
LOG_DEBUG(log, "Key condition: {}", key_condition.toString());
if (!key_condition.alwaysFalse())
mark_ranges = MergeTreeDataSelectExecutor::markRangesFromPKRange(

View File

@ -47,10 +47,6 @@
#include <Common/logger_useful.h>
#pragma GCC diagnostic ignored "-Wunused-parameter"
#pragma GCC diagnostic ignored "-Wunused-but-set-variable"
namespace
{
@ -270,7 +266,6 @@ QueryProcessingStage::Enum StorageMerge::getQueryProcessingStage(
/// (see removeJoin())
///
/// And for this we need to return FetchColumns.
LOG_TRACE(&Poco::Logger::get("StorageMerge::getQueryProcessingStage"), "to_stage {}", to_stage);
if (const auto * select = query_info.query->as<ASTSelectQuery>(); select && hasJoin(*select))
return QueryProcessingStage::FetchColumns;
@ -295,7 +290,6 @@ QueryProcessingStage::Enum StorageMerge::getQueryProcessingStage(
stage_in_source_tables,
table->getQueryProcessingStage(local_context, to_stage,
table->getStorageSnapshot(table->getInMemoryMetadataPtr(), local_context), query_info));
LOG_TRACE(&Poco::Logger::get("StorageMerge::getQueryProcessingStage"), "stage_in_source_tables {}", stage_in_source_tables);
}
iterator->next();
@ -322,9 +316,6 @@ void StorageMerge::read(
auto modified_context = Context::createCopy(local_context);
// modified_context->setSetting("optimize_move_to_prewhere", false);
LOG_TRACE(&Poco::Logger::get("StorageMerge::read"), "processed_stage {}", QueryProcessingStage::toString(processed_stage));
bool has_database_virtual_column = false;
bool has_table_virtual_column = false;
Names real_column_names;
@ -339,7 +330,6 @@ void StorageMerge::read(
else
{
real_column_names.push_back(column_name);
LOG_TRACE(&Poco::Logger::get("StorageMerge::read"), "column_name {}", column_name);
}
}
@ -494,7 +484,6 @@ void ReadFromMerge::initializePipeline(QueryPipelineBuilder & pipeline, const Bu
bool with_aliases = common_processed_stage == QueryProcessingStage::FetchColumns && !storage_columns.getAliases().empty();
if (with_aliases)
{
LOG_TRACE(&Poco::Logger::get("ReadFromMerge::initializePipeline"), "with_aliases");
ASTPtr required_columns_expr_list = std::make_shared<ASTExpressionList>();
ASTPtr column_expr;
@ -534,8 +523,6 @@ void ReadFromMerge::initializePipeline(QueryPipelineBuilder & pipeline, const Bu
}
}
LOG_TRACE(&Poco::Logger::get("ReadFromMerge::initializePipeline"), "table name: {}", storage->getStorageID().getTableName());
auto source_pipeline = createSources(
nested_storage_snaphsot,
modified_query_info,
@ -668,8 +655,6 @@ QueryPipelineBuilderPtr ReadFromMerge::createSources(
storage_snapshot,
modified_query_info);
#pragma GCC diagnostic ignored "-Wunreachable-code"
#pragma GCC diagnostic ignored "-Wunused-variable"
if (processed_stage <= storage_stage || (allow_experimental_analyzer && processed_stage == QueryProcessingStage::FetchColumns))
{
/// If there are only virtual columns in query, you must request at least one other column.
@ -679,12 +664,9 @@ QueryPipelineBuilderPtr ReadFromMerge::createSources(
QueryPlan & plan = child_plans.emplace_back();
StorageView * view = dynamic_cast<StorageView *>(storage.get());
bool direct_read = false;
if (!view || allow_experimental_analyzer)
// if (!view || allow_experimental_analyzer)
{
LOG_TRACE(&Poco::Logger::get("ReadFromMerge::createSources"), "direct storage->read");
storage->read(plan,
real_column_names,
storage_snapshot,
@ -693,28 +675,45 @@ QueryPipelineBuilderPtr ReadFromMerge::createSources(
processed_stage,
max_block_size,
UInt32(streams_num));
direct_read = true;
if (!plan.isInitialized())
return {};
if (auto * source_step_with_filter = dynamic_cast<SourceStepWithFilter*>((plan.getRootNode()->step.get())))
{
auto row_policy_filter = modified_context->getRowPolicyFilter(database_name, table_name, RowPolicyFilterType::SELECT_FILTER);
if (row_policy_filter)
{
ASTPtr expr = row_policy_filter->expression;
auto syntax_result = TreeRewriter(modified_context).analyze(expr, header.getNamesAndTypesList());
auto expression_analyzer = ExpressionAnalyzer{row_policy_filter->expression, syntax_result, modified_context};
auto filter_dag_ptr = expression_analyzer.getActionsDAG(true, false);
auto filter_actions = std::make_shared<ExpressionActions>(filter_dag_ptr, ExpressionActionsSettings::fromContext(modified_context, CompileExpressions::yes));
auto required_columns = filter_actions->getRequiredColumns();
LOG_TRACE(&Poco::Logger::get("ReadFromMerge::convertinfSourceStream"), "filter_actions_dag: {},<> {}, <> {}",
filter_actions->getActionsDAG().dumpNames(), filter_actions->getActionsDAG().dumpDAG(), filter_actions->getSampleBlock().dumpStructure());
auto fa_actions_columns_sorted = filter_actions->getSampleBlock().getNames();
std::sort(fa_actions_columns_sorted.begin(), fa_actions_columns_sorted.end());
Names required_columns_sorted = required_columns;
std::sort(required_columns_sorted.begin(), required_columns_sorted.end());
Names filter_columns;
std::set_difference(fa_actions_columns_sorted.begin(), fa_actions_columns_sorted.end(),
required_columns.begin(), required_columns.end(),
std::inserter(filter_columns, filter_columns.begin()));
source_step_with_filter->addFilter(filter_dag_ptr, filter_columns.front());
}
}
}
// else if (!view)
// {
// /// For view storage, we need to rewrite the `modified_query_info.view_query` to optimize read.
// /// The most intuitive way is to use InterpreterSelectQuery.
// /// Intercept the settings
// modified_context->setSetting("max_threads", streams_num);
// modified_context->setSetting("max_streams_to_max_threads_ratio", 1);
// modified_context->setSetting("max_block_size", max_block_size);
// LOG_TRACE(&Poco::Logger::get("ReadFromMerge::createSources"), "creating InterpreterSelectQuery 1.0");
// InterpreterSelectQuery interpreter(modified_query_info.query,
// modified_context,
// storage,
// storage->getInMemoryMetadataPtr(), // view->getInMemoryMetadataPtr(),
// // SelectQueryOptions(/* processed_stage*/));
// SelectQueryOptions(processed_stage));
// // SelectQueryOptions(QueryProcessingStage::WithMergeableState));
// interpreter.buildQueryPlan(plan);
// }
else
{
/// For view storage, we need to rewrite the `modified_query_info.view_query` to optimize read.
@ -725,90 +724,25 @@ QueryPipelineBuilderPtr ReadFromMerge::createSources(
modified_context->setSetting("max_streams_to_max_threads_ratio", 1);
modified_context->setSetting("max_block_size", max_block_size);
LOG_TRACE(&Poco::Logger::get("ReadFromMerge::createSources"), "creating InterpreterSelectQuery 1.5");
InterpreterSelectQuery interpreter(modified_query_info.query,
modified_context,
storage,
view->getInMemoryMetadataPtr(),
SelectQueryOptions(processed_stage));
interpreter.buildQueryPlan(plan);
}
if (!plan.isInitialized())
return {};
}
if (auto * read_from_merge_tree = typeid_cast<ReadFromMergeTree *>(plan.getRootNode()->step.get()))
{
size_t filters_dags_size = filter_dags.size();
LOG_TRACE(&Poco::Logger::get("ReadFromMerge::createSources"), "ReadFromMergeTree detected, DAG size {}", filters_dags_size);
for (size_t i = 0; i < filters_dags_size; ++i)
{
LOG_TRACE(&Poco::Logger::get("ReadFromMerge::createSources"), "adding filter {}", filter_dags[i]->dumpDAG());
read_from_merge_tree->addFilter(filter_dags[i], filter_nodes.nodes[i]);
}
}
if (auto * source_step_with_filter = dynamic_cast<SourceStepWithFilter*>((plan.getRootNode()->step.get())))
{
auto row_policy_filter = modified_context->getRowPolicyFilter(database_name, table_name, RowPolicyFilterType::SELECT_FILTER);
if (row_policy_filter)
{
// row_policy_filter->expression
// auto pipe_columns = builder.getHeader().getNamesAndTypesList();
ASTPtr expr = row_policy_filter->expression;
// auto * select_ast = expr /* query_ast */ ->as<ASTSelectQuery>();
// assert(select_ast);
// select_ast->setExpression(ASTSelectQuery::Expression::SELECT, std::make_shared<ASTExpressionList>());
// auto expr_list = select_ast->select();
// expr_list->children.push_back(expr);
// String filter_column_name = expr_list->children.at(0)->getColumnName();
// LOG_TRACE(&Poco::Logger::get("ReadFromMerge::convertinfSourceStream"), "filter_column_name: {} ", filter_column_name);
auto syntax_result = TreeRewriter(modified_context).analyze(expr, header/*builder->getHeader().*/.getNamesAndTypesList() /* pipe_columns*/);
// auto syntax_result = TreeRewriter(local_context).analyze(expr, NamesAndTypesList());
auto expression_analyzer = ExpressionAnalyzer{row_policy_filter->expression, syntax_result, modified_context};
auto filter_dag_ptr = expression_analyzer.getActionsDAG(true, false);
auto filter_actions = std::make_shared<ExpressionActions>(filter_dag_ptr, ExpressionActionsSettings::fromContext(modified_context, CompileExpressions::yes));
auto required_columns = filter_actions->getRequiredColumns();
for (const auto & req_col : required_columns)
{
LOG_TRACE(&Poco::Logger::get("ReadFromMerge::convertinfSourceStream"), "req_col: {}", req_col);
}
LOG_TRACE(&Poco::Logger::get("ReadFromMerge::convertinfSourceStream"), "filter_actions_dag: {},<> {}, <> {}",
filter_actions->getActionsDAG().dumpNames(), filter_actions->getActionsDAG().dumpDAG(), filter_actions->getSampleBlock().dumpStructure());
auto fa_actions_columns_sorted = filter_actions->getSampleBlock().getNames();
std::sort(fa_actions_columns_sorted.begin(), fa_actions_columns_sorted.end());
Names required_columns_sorted = required_columns;
std::sort(required_columns_sorted.begin(), required_columns_sorted.end());
Names filter_columns;
std::set_difference(fa_actions_columns_sorted.begin(), fa_actions_columns_sorted.end(),
required_columns.begin(), required_columns.end(),
std::inserter(filter_columns, filter_columns.begin()));
LOG_TRACE(&Poco::Logger::get("ReadFromMerge::createSources"), "SourceStepWithFilter detected");
auto found_column = filter_dag_ptr->tryFindInOutputs(filter_columns.front());
assert(found_column);
// LOG_TRACE(&Poco::Logger::get("ReadFromMerge::createSources"), "found column {}", found_column->dumpDAG());
source_step_with_filter->addFilter(/* filter_actions */ filter_dag_ptr, filter_columns.front());
}
}
builder = plan.buildQueryPipeline(
QueryPlanOptimizationSettings::fromContext(modified_context),
BuildQueryPipelineSettings::fromContext(modified_context));
@ -834,7 +768,6 @@ QueryPipelineBuilderPtr ReadFromMerge::createSources(
{
modified_select.replaceDatabaseAndTable(database_name, table_name);
/// TODO: Find a way to support projections for StorageMerge
LOG_TRACE(&Poco::Logger::get("ReadFromMerge::createSources"), "creating InterpreterSelectQuery 2");
InterpreterSelectQuery interpreter{modified_query_info.query,
modified_context,
SelectQueryOptions(processed_stage).ignoreProjections()};
@ -1114,16 +1047,6 @@ void ReadFromMerge::convertingSourceStream(
convert_actions_match_columns_mode = ActionsDAG::MatchColumnsMode::Position;
for (const auto & column_with_type_and_name : builder.getHeader().getColumnsWithTypeAndName())
{
LOG_TRACE(&Poco::Logger::get("ReadFromMerge::convertinfSourceStream"), "column name: {} (builder.getHeader().getColumnsWithTypeAndName())", column_with_type_and_name.name);
}
for (const auto & column_with_type_and_name : header.getColumnsWithTypeAndName())
{
LOG_TRACE(&Poco::Logger::get("ReadFromMerge::convertingSourceStream"), "column name: {} (header.getColumnsWithTypeAndName())", column_with_type_and_name.name);
}
auto convert_actions_dag = ActionsDAG::makeConvertingActions(builder.getHeader().getColumnsWithTypeAndName(),
header.getColumnsWithTypeAndName(),
convert_actions_match_columns_mode);
@ -1147,36 +1070,14 @@ void ReadFromMerge::convertingSourceStream(
if (row_policy_filter)
{
// row_policy_filter->expression
// auto pipe_columns = builder.getHeader().getNamesAndTypesList();
ASTPtr expr = row_policy_filter->expression;
// auto * select_ast = expr /* query_ast */ ->as<ASTSelectQuery>();
// assert(select_ast);
// select_ast->setExpression(ASTSelectQuery::Expression::SELECT, std::make_shared<ASTExpressionList>());
// auto expr_list = select_ast->select();
// expr_list->children.push_back(expr);
// String filter_column_name = expr_list->children.at(0)->getColumnName();
// LOG_TRACE(&Poco::Logger::get("ReadFromMerge::convertinfSourceStream"), "filter_column_name: {} ", filter_column_name);
auto syntax_result = TreeRewriter(local_context).analyze(expr, pipe_columns);
// auto syntax_result = TreeRewriter(local_context).analyze(expr, NamesAndTypesList());
auto expression_analyzer = ExpressionAnalyzer{row_policy_filter->expression, syntax_result, local_context};
auto actions_dag = expression_analyzer.getActionsDAG(true, false);
LOG_TRACE(&Poco::Logger::get("ReadFromMerge::convertinfSourceStream"), "actions_dag: {},<> {}", actions_dag->dumpNames(), actions_dag->dumpDAG());
auto filter_actions = std::make_shared<ExpressionActions>(actions_dag, ExpressionActionsSettings::fromContext(local_context, CompileExpressions::yes));
auto required_columns = filter_actions->getRequiredColumns();
for (const auto & req_col : required_columns)
{
LOG_TRACE(&Poco::Logger::get("ReadFromMerge::convertinfSourceStream"), "req_col: {}", req_col);
}
LOG_TRACE(&Poco::Logger::get("ReadFromMerge::convertinfSourceStream"), "filter_actions_dag: {},<> {}, <> {}",
filter_actions->getActionsDAG().dumpNames(), filter_actions->getActionsDAG().dumpDAG(), filter_actions->getSampleBlock().dumpStructure());
@ -1195,32 +1096,11 @@ void ReadFromMerge::convertingSourceStream(
required_columns.begin(), required_columns.end(),
std::inserter(filter_columns, filter_columns.begin()));
for (const auto & filter_column : filter_columns)
{
LOG_TRACE(&Poco::Logger::get("ReadFromMerge::convertinfSourceStream"), "filter_column: {}", filter_column);
}
// Block block;
// block = filter_actions->getActionsDAG().updateHeader(std::move(block));
// LOG_TRACE(&Poco::Logger::get("ReadFromMerge::convertingSourceStream"), "block from updateHeader {}", block.dumpStructure());
builder.addSimpleTransform([&](const Block & stream_header)
{
LOG_TRACE(&Poco::Logger::get("ReadFromMerge::convertinfSourceStream"), "stream_header {}", stream_header.dumpStructure());
return std::make_shared<FilterTransform>(stream_header, filter_actions, filter_columns.front(), true /* remove fake column */);
});
// auto row_level_filter_step = std::make_unique<FilterStep>(
// query_plan.getCurrentDataStream(),
// expressions.prewhere_info->row_level_filter,
// expressions.prewhere_info->row_level_column_name,
// true);
// row_level_filter_step->setStepDescription("Row-level security filter (PREWHERE)");
// query_plan.addStep(std::move(row_level_filter_step));
}
}
}