Merge pull request #25563 from ClickHouse/use-dag-in-key-condition

Use ActionsDAG in KeyCondition
This commit is contained in:
Nikolai Kochetov 2022-08-05 11:22:46 +02:00 committed by GitHub
commit 2fe893ba67
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
31 changed files with 1083 additions and 361 deletions

View File

@ -144,8 +144,8 @@ endif ()
list (APPEND clickhouse_common_io_sources ${CONFIG_BUILD})
list (APPEND clickhouse_common_io_headers ${CONFIG_VERSION} ${CONFIG_COMMON})
list (APPEND dbms_sources Functions/IFunction.cpp Functions/FunctionFactory.cpp Functions/FunctionHelpers.cpp Functions/extractTimeZoneFromFunctionArguments.cpp Functions/FunctionsLogical.cpp)
list (APPEND dbms_headers Functions/IFunction.h Functions/FunctionFactory.h Functions/FunctionHelpers.h Functions/extractTimeZoneFromFunctionArguments.h Functions/FunctionsLogical.h)
list (APPEND dbms_sources Functions/IFunction.cpp Functions/FunctionFactory.cpp Functions/FunctionHelpers.cpp Functions/extractTimeZoneFromFunctionArguments.cpp Functions/FunctionsLogical.cpp Functions/indexHint.cpp)
list (APPEND dbms_headers Functions/IFunction.h Functions/FunctionFactory.h Functions/FunctionHelpers.h Functions/extractTimeZoneFromFunctionArguments.h Functions/FunctionsLogical.h Functions/indexHint.h)
list (APPEND dbms_sources
AggregateFunctions/IAggregateFunction.cpp

View File

@ -557,6 +557,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(Bool, query_plan_enable_optimizations, true, "Apply optimizations to query plan", 0) \
M(UInt64, query_plan_max_optimizations_to_apply, 10000, "Limit the total number of optimizations applied to query plan. If zero, ignored. If limit reached, throw exception", 0) \
M(Bool, query_plan_filter_push_down, true, "Allow to push down filter by predicate query plan step", 0) \
M(Bool, query_plan_optimize_primary_key, true, "Analyze primary key using query plan (instead of AST)", 0) \
M(UInt64, regexp_max_matches_per_row, 1000, "Max matches of any single regexp per row, used to safeguard 'extractAllGroupsHorizontal' against consuming too much memory with greedy RE.", 0) \
\
M(UInt64, limit, 0, "Limit on read rows from the most 'end' result for select query, default 0 means no limit length", 0) \

View File

@ -141,6 +141,8 @@ public:
void getLambdaArgumentTypesImpl(DataTypes & arguments) const override { function->getLambdaArgumentTypes(arguments); }
const IFunction * getFunction() const { return function.get(); }
private:
std::shared_ptr<IFunction> function;
};

View File

@ -1,4 +1,4 @@
#include <Functions/IFunction.h>
#include <Functions/indexHint.h>
#include <Functions/FunctionFactory.h>
#include <DataTypes/DataTypesNumber.h>
@ -6,60 +6,6 @@
namespace DB
{
/** The `indexHint` function takes any number of any arguments and always returns one.
*
* This function has a special meaning (see ExpressionAnalyzer, KeyCondition)
* - the expressions inside it are not evaluated;
* - but when analyzing the index (selecting ranges for reading), this function is treated the same way,
* as if instead of using it the expression itself would be.
*
* Example: WHERE something AND indexHint(CounterID = 34)
* - do not read or calculate CounterID = 34, but select ranges in which the CounterID = 34 expression can be true.
*
* The function can be used for debugging purposes, as well as for (hidden from the user) query conversions.
*/
class FunctionIndexHint : public IFunction
{
public:
static constexpr auto name = "indexHint";
static FunctionPtr create(ContextPtr)
{
return std::make_shared<FunctionIndexHint>();
}
bool isVariadic() const override
{
return true;
}
size_t getNumberOfArguments() const override
{
return 0;
}
bool useDefaultImplementationForNulls() const override { return false; }
bool isSuitableForConstantFolding() const override { return false; }
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return false; }
String getName() const override
{
return name;
}
DataTypePtr getReturnTypeImpl(const DataTypes & /*arguments*/) const override
{
return std::make_shared<DataTypeUInt8>();
}
ColumnPtr executeImpl(const ColumnsWithTypeAndName &, const DataTypePtr &, size_t input_rows_count) const override
{
return DataTypeUInt8().createColumnConst(input_rows_count, 1u);
}
};
REGISTER_FUNCTION(IndexHint)
{
factory.registerFunction<FunctionIndexHint>();

70
src/Functions/indexHint.h Normal file
View File

@ -0,0 +1,70 @@
#pragma once
#include <Functions/IFunction.h>
#include <Functions/FunctionFactory.h>
#include <DataTypes/DataTypesNumber.h>
namespace DB
{
class ActionsDAG;
using ActionsDAGPtr = std::shared_ptr<ActionsDAG>;
/** The `indexHint` function takes any number of any arguments and always returns one.
*
* This function has a special meaning (see ExpressionAnalyzer, KeyCondition)
* - the expressions inside it are not evaluated;
* - but when analyzing the index (selecting ranges for reading), this function is treated the same way,
* as if instead of using it the expression itself would be.
*
* Example: WHERE something AND indexHint(CounterID = 34)
* - do not read or calculate CounterID = 34, but select ranges in which the CounterID = 34 expression can be true.
*
* The function can be used for debugging purposes, as well as for (hidden from the user) query conversions.
*/
class FunctionIndexHint : public IFunction
{
public:
static constexpr auto name = "indexHint";
static FunctionPtr create(ContextPtr)
{
return std::make_shared<FunctionIndexHint>();
}
bool isVariadic() const override
{
return true;
}
size_t getNumberOfArguments() const override
{
return 0;
}
bool useDefaultImplementationForNulls() const override { return false; }
bool isSuitableForConstantFolding() const override { return false; }
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return false; }
String getName() const override
{
return name;
}
DataTypePtr getReturnTypeImpl(const DataTypes & /*arguments*/) const override
{
return std::make_shared<DataTypeUInt8>();
}
ColumnPtr executeImpl(const ColumnsWithTypeAndName &, const DataTypePtr &, size_t input_rows_count) const override
{
return DataTypeUInt8().createColumnConst(input_rows_count, 1u);
}
void setActions(ActionsDAGPtr actions_) { actions = std::move(actions_); }
const ActionsDAGPtr & getActions() const { return actions; }
private:
ActionsDAGPtr actions;
};
}

View File

@ -144,6 +144,9 @@ const ActionsDAG::Node & ActionsDAG::addArrayJoin(const Node & child, std::strin
if (!array_type)
throw Exception("ARRAY JOIN requires array argument", ErrorCodes::TYPE_MISMATCH);
if (result_name.empty())
result_name = "arrayJoin(" + child.result_name + ")";
Node node;
node.type = ActionType::ARRAY_JOIN;
node.result_type = array_type->getNestedType();

View File

@ -313,4 +313,10 @@ private:
static ActionsDAGPtr cloneActionsForConjunction(NodeRawConstPtrs conjunction, const ColumnsWithTypeAndName & all_inputs);
};
/// This is an ugly way to bypass impossibility to forward declare ActionDAG::Node.
struct ActionDAGNodes
{
ActionsDAG::NodeRawConstPtrs nodes;
};
}

View File

@ -9,6 +9,7 @@
#include <Functions/grouping.h>
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionsMiscellaneous.h>
#include <Functions/indexHint.h>
#include <AggregateFunctions/AggregateFunctionFactory.h>
@ -934,8 +935,44 @@ void ActionsMatcher::visit(const ASTFunction & node, const ASTPtr & ast, Data &
/// A special function `indexHint`. Everything that is inside it is not calculated
if (node.name == "indexHint")
{
if (data.only_consts)
return;
/// Here we create a separate DAG for indexHint condition.
/// It will be used only for index analysis.
Data index_hint_data(
data.getContext(),
data.set_size_limit,
data.subquery_depth,
data.source_columns,
std::make_shared<ActionsDAG>(data.source_columns),
data.prepared_sets,
data.subqueries_for_sets,
data.no_subqueries,
data.no_makeset,
data.only_consts,
/*create_source_for_in*/ false,
data.aggregation_keys_info);
NamesWithAliases args;
if (node.arguments)
{
for (const auto & arg : node.arguments->children)
{
visit(arg, index_hint_data);
args.push_back({arg->getColumnNameWithoutAlias(), {}});
}
}
auto dag = index_hint_data.getActions();
dag->project(args);
auto index_hint = std::make_shared<FunctionIndexHint>();
index_hint->setActions(std::move(dag));
// Arguments are removed. We add function instead of constant column to avoid constant folding.
data.addFunction(FunctionFactory::instance().get("indexHint", data.getContext()), {}, column_name);
data.addFunction(std::make_unique<FunctionToOverloadResolverAdaptor>(index_hint), {}, column_name);
return;
}

View File

@ -36,6 +36,7 @@ struct RequiredSourceColumnsData
bool has_table_join = false;
bool has_array_join = false;
bool visit_index_hint = false;
bool addColumnAliasIfAny(const IAST & ast);
void addColumnIdentifier(const ASTIdentifier & node);

View File

@ -52,10 +52,8 @@ bool RequiredSourceColumnsMatcher::needChildVisit(const ASTPtr & node, const AST
if (const auto * f = node->as<ASTFunction>())
{
/// "indexHint" is a special function for index analysis.
/// Everything that is inside it is not calculated. See KeyCondition
/// "lambda" visit children itself.
if (f->name == "indexHint" || f->name == "lambda")
if (f->name == "lambda")
return false;
}
@ -73,6 +71,11 @@ void RequiredSourceColumnsMatcher::visit(const ASTPtr & ast, Data & data)
}
if (auto * t = ast->as<ASTFunction>())
{
/// "indexHint" is a special function for index analysis.
/// Everything that is inside it is not calculated. See KeyCondition
if (!data.visit_index_hint && t->name == "indexHint")
return;
data.addColumnAliasIfAny(*ast);
visit(*t, ast, data);
return;

View File

@ -965,12 +965,13 @@ void TreeRewriterResult::collectSourceColumns(bool add_special)
/// Calculate which columns are required to execute the expression.
/// Then, delete all other columns from the list of available columns.
/// After execution, columns will only contain the list of columns needed to read from the table.
void TreeRewriterResult::collectUsedColumns(const ASTPtr & query, bool is_select)
void TreeRewriterResult::collectUsedColumns(const ASTPtr & query, bool is_select, bool visit_index_hint)
{
/// We calculate required_source_columns with source_columns modifications and swap them on exit
required_source_columns = source_columns;
RequiredSourceColumnsVisitor::Data columns_context;
columns_context.visit_index_hint = visit_index_hint;
RequiredSourceColumnsVisitor(columns_context).visit(query);
NameSet source_column_names;
@ -1307,7 +1308,7 @@ TreeRewriterResultPtr TreeRewriter::analyzeSelect(
result.aggregates = getAggregates(query, *select_query);
result.window_function_asts = getWindowFunctions(query, *select_query);
result.expressions_with_window_function = getExpressionsWithWindowFunctions(query);
result.collectUsedColumns(query, true);
result.collectUsedColumns(query, true, settings.query_plan_optimize_primary_key);
result.required_source_columns_before_expanding_alias_columns = result.required_source_columns.getNames();
/// rewrite filters for select query, must go after getArrayJoinedColumns
@ -1331,7 +1332,7 @@ TreeRewriterResultPtr TreeRewriter::analyzeSelect(
result.aggregates = getAggregates(query, *select_query);
result.window_function_asts = getWindowFunctions(query, *select_query);
result.expressions_with_window_function = getExpressionsWithWindowFunctions(query);
result.collectUsedColumns(query, true);
result.collectUsedColumns(query, true, settings.query_plan_optimize_primary_key);
}
}
@ -1397,7 +1398,7 @@ TreeRewriterResultPtr TreeRewriter::analyze(
else
assertNoAggregates(query, "in wrong place");
result.collectUsedColumns(query, false);
result.collectUsedColumns(query, false, settings.query_plan_optimize_primary_key);
return std::make_shared<const TreeRewriterResult>(result);
}

View File

@ -88,7 +88,7 @@ struct TreeRewriterResult
bool add_special = true);
void collectSourceColumns(bool add_special);
void collectUsedColumns(const ASTPtr & query, bool is_select);
void collectUsedColumns(const ASTPtr & query, bool is_select, bool visit_index_hint);
Names requiredSourceColumns() const { return required_source_columns.getNames(); }
const Names & requiredSourceColumnsForAccessCheck() const { return required_source_columns_before_expanding_alias_columns; }
NameSet getArrayJoinSourceNameSet() const;

View File

@ -12,6 +12,8 @@ namespace QueryPlanOptimizations
/// This is the main function which optimizes the whole QueryPlan tree.
void optimizeTree(const QueryPlanOptimizationSettings & settings, QueryPlan::Node & root, QueryPlan::Nodes & nodes);
void optimizePrimaryKeyCondition(QueryPlan::Node & root);
/// Optimization is a function applied to QueryPlan::Node.
/// It can read and update subtree of specified node.
/// It return the number of updated layers of subtree if some change happened.

View File

@ -0,0 +1,49 @@
#include <Processors/QueryPlan/Optimizations/Optimizations.h>
#include <Processors/QueryPlan/FilterStep.h>
#include <Processors/QueryPlan/ReadFromMergeTree.h>
#include <Storages/StorageMerge.h>
#include <Interpreters/ActionsDAG.h>
#include <stack>
namespace DB::QueryPlanOptimizations
{
void optimizePrimaryKeyCondition(QueryPlan::Node & root)
{
struct Frame
{
QueryPlan::Node * node = nullptr;
size_t next_child = 0;
};
std::stack<Frame> stack;
stack.push({.node = &root});
while (!stack.empty())
{
auto & frame = stack.top();
/// Traverse all children first.
if (frame.next_child < frame.node->children.size())
{
stack.push({.node = frame.node->children[frame.next_child]});
++frame.next_child;
continue;
}
if (auto * filter_step = typeid_cast<FilterStep *>(frame.node->step.get()))
{
auto * child = frame.node->children.at(0);
if (auto * read_from_merge_tree = typeid_cast<ReadFromMergeTree *>(child->step.get()))
read_from_merge_tree->addFilter(filter_step->getExpression(), filter_step->getFilterColumnName());
if (auto * read_from_merge = typeid_cast<ReadFromMerge *>(child->step.get()))
read_from_merge->addFilter(filter_step->getExpression(), filter_step->getFilterColumnName());
}
stack.pop();
}
}
}

View File

@ -434,6 +434,7 @@ void QueryPlan::explainPipeline(WriteBuffer & buffer, const ExplainPipelineOptio
void QueryPlan::optimize(const QueryPlanOptimizationSettings & optimization_settings)
{
QueryPlanOptimizations::optimizeTree(optimization_settings, *root, nodes);
QueryPlanOptimizations::optimizePrimaryKeyCondition(*root);
}
void QueryPlan::explainEstimate(MutableColumns & columns)

View File

@ -103,6 +103,8 @@ public:
std::vector<Node *> children = {};
};
const Node * getRootNode() const { return root; }
using Nodes = std::list<Node>;
private:

View File

@ -834,6 +834,9 @@ MergeTreeDataSelectAnalysisResultPtr ReadFromMergeTree::selectRangesToRead(Merge
{
return selectRangesToRead(
std::move(parts),
prewhere_info,
added_filter,
added_filter_column_name,
storage_snapshot->metadata,
storage_snapshot->getMetadataForQuery(),
query_info,
@ -848,6 +851,9 @@ MergeTreeDataSelectAnalysisResultPtr ReadFromMergeTree::selectRangesToRead(Merge
MergeTreeDataSelectAnalysisResultPtr ReadFromMergeTree::selectRangesToRead(
MergeTreeData::DataPartsVector parts,
const PrewhereInfoPtr & prewhere_info,
const ActionsDAGPtr & added_filter,
const std::string & added_filter_column_name,
const StorageMetadataPtr & metadata_snapshot_base,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
@ -882,9 +888,31 @@ MergeTreeDataSelectAnalysisResultPtr ReadFromMergeTree::selectRangesToRead(
// Build and check if primary key is used when necessary
const auto & primary_key = metadata_snapshot->getPrimaryKey();
Names primary_key_columns = primary_key.column_names;
KeyCondition key_condition(query_info, context, primary_key_columns, primary_key.expression);
std::optional<KeyCondition> key_condition;
if (settings.force_primary_key && key_condition.alwaysUnknownOrTrue())
if (settings.query_plan_optimize_primary_key)
{
ActionDAGNodes nodes;
if (prewhere_info)
{
const auto & node = prewhere_info->prewhere_actions->findInIndex(prewhere_info->prewhere_column_name);
nodes.nodes.push_back(&node);
}
if (added_filter)
{
const auto & node = added_filter->findInIndex(added_filter_column_name);
nodes.nodes.push_back(&node);
}
key_condition.emplace(std::move(nodes), query_info.syntax_analyzer_result, query_info.sets, context, primary_key_columns, primary_key.expression);
}
else
{
key_condition.emplace(query_info.query, query_info.syntax_analyzer_result, query_info.sets, context, primary_key_columns, primary_key.expression);
}
if (settings.force_primary_key && key_condition->alwaysUnknownOrTrue())
{
return std::make_shared<MergeTreeDataSelectAnalysisResult>(MergeTreeDataSelectAnalysisResult{
.result = std::make_exception_ptr(Exception(
@ -892,7 +920,7 @@ MergeTreeDataSelectAnalysisResultPtr ReadFromMergeTree::selectRangesToRead(
"Primary key ({}) is not used and setting 'force_primary_key' is set",
fmt::join(primary_key_columns, ", ")))});
}
LOG_DEBUG(log, "Key condition: {}", key_condition.toString());
LOG_DEBUG(log, "Key condition: {}", key_condition->toString());
const auto & select = query_info.query->as<ASTSelectQuery &>();
@ -915,7 +943,7 @@ MergeTreeDataSelectAnalysisResultPtr ReadFromMergeTree::selectRangesToRead(
select,
metadata_snapshot->getColumns().getAllPhysical(),
parts,
key_condition,
*key_condition,
data,
metadata_snapshot,
context,
@ -940,7 +968,7 @@ MergeTreeDataSelectAnalysisResultPtr ReadFromMergeTree::selectRangesToRead(
metadata_snapshot,
query_info,
context,
key_condition,
*key_condition,
reader_settings,
log,
num_streams,

View File

@ -114,6 +114,12 @@ public:
void describeActions(JSONBuilder::JSONMap & map) const override;
void describeIndexes(JSONBuilder::JSONMap & map) const override;
void addFilter(ActionsDAGPtr expression, std::string column_name)
{
added_filter = std::move(expression);
added_filter_column_name = std::move(column_name);
}
StorageID getStorageID() const { return data.getStorageID(); }
UInt64 getSelectedParts() const { return selected_parts; }
UInt64 getSelectedRows() const { return selected_rows; }
@ -121,6 +127,9 @@ public:
static MergeTreeDataSelectAnalysisResultPtr selectRangesToRead(
MergeTreeData::DataPartsVector parts,
const PrewhereInfoPtr & prewhere_info,
const ActionsDAGPtr & added_filter,
const std::string & added_filter_column_name,
const StorageMetadataPtr & metadata_snapshot_base,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
@ -151,6 +160,9 @@ private:
PrewhereInfoPtr prewhere_info;
ExpressionActionsSettings actions_settings;
ActionsDAGPtr added_filter;
std::string added_filter_column_name;
StorageSnapshotPtr storage_snapshot;
StorageMetadataPtr metadata_for_reading;

View File

@ -23,6 +23,7 @@
#include <Common/typeid_cast.h>
#include <Common/CurrentThread.h>
#include "Core/SortDescription.h"
#include <QueryPipeline/narrowPipe.h>
#include <Processors/DelayedPortsProcessor.h>
#include <Processors/RowsBeforeLimitCounter.h>
#include <Processors/Sources/RemoteSource.h>
@ -195,6 +196,12 @@ void QueryPipelineBuilder::resize(size_t num_streams, bool force, bool strict)
pipe.resize(num_streams, force, strict);
}
void QueryPipelineBuilder::narrow(size_t size)
{
checkInitializedAndNotCompleted();
narrowPipe(pipe, size);
}
void QueryPipelineBuilder::addTotalsHavingTransform(ProcessorPtr transform)
{
checkInitializedAndNotCompleted();

View File

@ -94,6 +94,11 @@ public:
/// Changes the number of output ports if needed. Adds ResizeTransform.
void resize(size_t num_streams, bool force = false, bool strict = false);
/// Concat some ports to have no more then size outputs.
/// This method is needed for Merge table engine in case of reading from many tables.
/// It prevents opening too many files at the same time.
void narrow(size_t size);
/// Unite several pipelines together. Result pipeline would have common_header structure.
/// If collector is used, it will collect only newly-added processors, but not processors from pipelines.
static QueryPipelineBuilder unitePipelines(

View File

@ -623,7 +623,7 @@ HiveFiles StorageHive::collectHiveFilesFromPartition(
for (size_t i = 0; i < partition_names.size(); ++i)
ranges.emplace_back(fields[i]);
const KeyCondition partition_key_condition(query_info, getContext(), partition_names, partition_minmax_idx_expr);
const KeyCondition partition_key_condition(query_info.query, query_info.syntax_analyzer_result, query_info.sets, getContext(), partition_names, partition_minmax_idx_expr);
if (!partition_key_condition.checkInHyperrectangle(ranges, partition_types).can_be_true)
return {};
}
@ -691,7 +691,7 @@ HiveFilePtr StorageHive::getHiveFileIfNeeded(
if (prune_level >= PruneLevel::File)
{
const KeyCondition hivefile_key_condition(query_info, getContext(), hivefile_name_types.getNames(), hivefile_minmax_idx_expr);
const KeyCondition hivefile_key_condition(query_info.query, query_info.syntax_analyzer_result, query_info.sets, getContext(), hivefile_name_types.getNames(), hivefile_minmax_idx_expr);
if (hive_file->useFileMinMaxIndex())
{
/// Load file level minmax index and apply

File diff suppressed because it is too large Load Diff

View File

@ -17,6 +17,7 @@ class IFunction;
using FunctionBasePtr = std::shared_ptr<IFunctionBase>;
class ExpressionActions;
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
struct ActionDAGNodes;
/** A field, that can be stored in two representations:
* - A standalone field.
@ -206,7 +207,19 @@ class KeyCondition
public:
/// Does not take into account the SAMPLE section. all_columns - the set of all columns of the table.
KeyCondition(
const SelectQueryInfo & query_info,
const ASTPtr & query,
TreeRewriterResultPtr syntax_analyzer_result,
PreparedSets prepared_sets_,
ContextPtr context,
const Names & key_column_names,
const ExpressionActionsPtr & key_expr,
bool single_point_ = false,
bool strict_ = false);
KeyCondition(
ActionDAGNodes dag_nodes,
TreeRewriterResultPtr syntax_analyzer_result,
PreparedSets prepared_sets_,
ContextPtr context,
const Names & key_column_names,
const ExpressionActionsPtr & key_expr,
@ -342,6 +355,9 @@ private:
public:
static const AtomMap atom_map;
class Tree;
class FunctionTree;
private:
BoolMask checkInRange(
size_t used_key_size,
@ -351,9 +367,9 @@ private:
bool right_bounded,
BoolMask initial_mask) const;
void traverseAST(const ASTPtr & node, ContextPtr context, Block & block_with_constants);
bool tryParseAtomFromAST(const ASTPtr & node, ContextPtr context, Block & block_with_constants, RPNElement & out);
static bool tryParseLogicalOperatorFromAST(const ASTFunction * func, RPNElement & out);
void traverseAST(const Tree & node, ContextPtr context, Block & block_with_constants);
bool tryParseAtomFromAST(const Tree & node, ContextPtr context, Block & block_with_constants, RPNElement & out);
static bool tryParseLogicalOperatorFromAST(const FunctionTree & func, RPNElement & out);
/** Is node the key column
* or expression in which column of key is wrapped by chain of functions,
@ -362,17 +378,17 @@ private:
* and fills chain of possibly-monotonic functions.
*/
bool isKeyPossiblyWrappedByMonotonicFunctions(
const ASTPtr & node,
const Tree & node,
ContextPtr context,
size_t & out_key_column_num,
DataTypePtr & out_key_res_column_type,
MonotonicFunctionsChain & out_functions_chain);
bool isKeyPossiblyWrappedByMonotonicFunctionsImpl(
const ASTPtr & node,
const Tree & node,
size_t & out_key_column_num,
DataTypePtr & out_key_column_type,
std::vector<const ASTFunction *> & out_functions_chain);
std::vector<FunctionTree> & out_functions_chain);
bool transformConstantWithValidFunctions(
const String & expr_name,
@ -383,20 +399,20 @@ private:
std::function<bool(IFunctionBase &, const IDataType &)> always_monotonic) const;
bool canConstantBeWrappedByMonotonicFunctions(
const ASTPtr & node,
const Tree & node,
size_t & out_key_column_num,
DataTypePtr & out_key_column_type,
Field & out_value,
DataTypePtr & out_type);
bool canConstantBeWrappedByFunctions(
const ASTPtr & ast, size_t & out_key_column_num, DataTypePtr & out_key_column_type, Field & out_value, DataTypePtr & out_type);
const Tree & node, size_t & out_key_column_num, DataTypePtr & out_key_column_type, Field & out_value, DataTypePtr & out_type);
/// If it's possible to make an RPNElement
/// that will filter values (possibly tuples) by the content of 'prepared_set',
/// do it and return true.
bool tryPrepareSetIndex(
const ASTs & args,
const FunctionTree & func,
ContextPtr context,
RPNElement & out,
size_t & out_key_column_num);

View File

@ -5136,6 +5136,8 @@ static void selectBestProjection(
const MergeTreeDataSelectExecutor & reader,
const StorageSnapshotPtr & storage_snapshot,
const SelectQueryInfo & query_info,
const ActionsDAGPtr & added_filter,
const std::string & added_filter_column_name,
const Names & required_columns,
ProjectionCandidate & candidate,
ContextPtr query_context,
@ -5166,6 +5168,8 @@ static void selectBestProjection(
storage_snapshot->metadata,
candidate.desc->metadata,
query_info,
added_filter,
added_filter_column_name,
query_context,
settings.max_threads,
max_added_blocks);
@ -5188,6 +5192,8 @@ static void selectBestProjection(
storage_snapshot->metadata,
storage_snapshot->metadata,
query_info, // TODO syntax_analysis_result set in index
added_filter,
added_filter_column_name,
query_context,
settings.max_threads,
max_added_blocks);
@ -5281,7 +5287,7 @@ Block MergeTreeData::getMinMaxCountProjectionBlock(
minmax_columns_types = getMinMaxColumnsTypes(partition_key);
minmax_idx_condition.emplace(
query_info,
query_info.query, query_info.syntax_analyzer_result, query_info.sets,
query_context,
minmax_columns_names,
getMinMaxExpr(partition_key, ExpressionActionsSettings::fromContext(query_context)));
@ -5511,6 +5517,9 @@ std::optional<ProjectionCandidate> MergeTreeData::getQueryProcessingStageWithAgg
query_info.sets = std::move(select.getQueryAnalyzer()->getPreparedSets());
query_info.subquery_for_sets = std::move(select.getQueryAnalyzer()->getSubqueriesForSets());
query_info.prewhere_info = analysis_result.prewhere_info;
const auto & before_where = analysis_result.before_where;
const auto & where_column_name = analysis_result.where_column_name;
bool can_use_aggregate_projection = true;
/// If the first stage of the query pipeline is more complex than Aggregating - Expression - Filter - ReadFromStorage,
@ -5780,6 +5789,8 @@ std::optional<ProjectionCandidate> MergeTreeData::getQueryProcessingStageWithAgg
metadata_snapshot,
metadata_snapshot,
query_info,
before_where,
where_column_name,
query_context,
settings.max_threads,
max_added_blocks);
@ -5811,6 +5822,8 @@ std::optional<ProjectionCandidate> MergeTreeData::getQueryProcessingStageWithAgg
metadata_snapshot,
metadata_snapshot,
query_info,
before_where,
where_column_name,
query_context,
settings.max_threads,
max_added_blocks);
@ -5836,6 +5849,8 @@ std::optional<ProjectionCandidate> MergeTreeData::getQueryProcessingStageWithAgg
reader,
storage_snapshot,
query_info,
before_where,
where_column_name,
analysis_result.required_columns,
candidate,
query_context,
@ -5856,6 +5871,8 @@ std::optional<ProjectionCandidate> MergeTreeData::getQueryProcessingStageWithAgg
reader,
storage_snapshot,
query_info,
before_where,
where_column_name,
analysis_result.required_columns,
candidate,
query_context,
@ -5885,6 +5902,8 @@ std::optional<ProjectionCandidate> MergeTreeData::getQueryProcessingStageWithAgg
selected_candidate->aggregate_descriptions = select.getQueryAnalyzer()->aggregates();
}
/// Just in case, reset prewhere info calculated from projection.
query_info.prewhere_info.reset();
return *selected_candidate;
}

View File

@ -770,7 +770,7 @@ void MergeTreeDataSelectExecutor::filterPartsByPartition(
minmax_columns_types = data.getMinMaxColumnsTypes(partition_key);
minmax_idx_condition.emplace(
query_info, context, minmax_columns_names, data.getMinMaxExpr(partition_key, ExpressionActionsSettings::fromContext(context)));
query_info.query, query_info.syntax_analyzer_result, query_info.sets, context, minmax_columns_names, data.getMinMaxExpr(partition_key, ExpressionActionsSettings::fromContext(context)));
partition_pruner.emplace(metadata_snapshot, query_info, context, false /* strict */);
if (settings.force_index_by_date && (minmax_idx_condition->alwaysUnknownOrTrue() && partition_pruner->isUseless()))
@ -1273,6 +1273,8 @@ MergeTreeDataSelectAnalysisResultPtr MergeTreeDataSelectExecutor::estimateNumMar
const StorageMetadataPtr & metadata_snapshot_base,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
const ActionsDAGPtr & added_filter,
const std::string & added_filter_column_name,
ContextPtr context,
unsigned num_streams,
std::shared_ptr<PartitionIdToMaxBlock> max_block_numbers_to_read) const
@ -1292,6 +1294,9 @@ MergeTreeDataSelectAnalysisResultPtr MergeTreeDataSelectExecutor::estimateNumMar
return ReadFromMergeTree::selectRangesToRead(
std::move(parts),
query_info.prewhere_info,
added_filter,
added_filter_column_name,
metadata_snapshot_base,
metadata_snapshot,
query_info,

View File

@ -60,6 +60,8 @@ public:
const StorageMetadataPtr & metadata_snapshot_base,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
const ActionsDAGPtr & added_filter,
const std::string & added_filter_column_name,
ContextPtr context,
unsigned num_streams,
std::shared_ptr<PartitionIdToMaxBlock> max_block_numbers_to_read = nullptr) const;

View File

@ -161,7 +161,7 @@ MergeTreeIndexConditionMinMax::MergeTreeIndexConditionMinMax(
const SelectQueryInfo & query,
ContextPtr context)
: index_data_types(index.data_types)
, condition(query, context, index.column_names, index.expression)
, condition(query.query, query.syntax_analyzer_result, query.sets, context, index.column_names, index.expression)
{
}

View File

@ -27,7 +27,8 @@ public:
PartitionPruner(const StorageMetadataPtr & metadata, const SelectQueryInfo & query_info, ContextPtr context, bool strict)
: partition_key(MergeTreePartition::adjustPartitionKey(metadata, context))
, partition_condition(
query_info, context, partition_key.column_names, partition_key.expression, true /* single_point */, strict)
query_info.query, query_info.syntax_analyzer_result, query_info.sets,
context, partition_key.column_names, partition_key.expression, true /* single_point */, strict)
, useless(strict ? partition_condition.anyUnknownOrAlwaysTrue() : partition_condition.alwaysUnknownOrTrue())
{
}

View File

@ -23,6 +23,8 @@
#include <Columns/ColumnString.h>
#include <Common/typeid_cast.h>
#include <Common/checkStackSize.h>
#include <Processors/QueryPlan/ReadFromMergeTree.h>
#include <Processors/Sources/NullSource.h>
#include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
#include <Processors/Transforms/MaterializingTransform.h>
@ -216,7 +218,7 @@ QueryProcessingStage::Enum StorageMerge::getQueryProcessingStage(
}
SelectQueryInfo StorageMerge::getModifiedQueryInfo(
SelectQueryInfo getModifiedQueryInfo(
const SelectQueryInfo & query_info, ContextPtr modified_context, const StorageID & current_storage_id, bool is_merge_engine)
{
SelectQueryInfo modified_query_info = query_info;
@ -248,7 +250,11 @@ void StorageMerge::read(
const size_t max_block_size,
unsigned num_streams)
{
Pipes pipes;
/** Just in case, turn off optimization "transfer to PREWHERE",
* since there is no certainty that it works when one of table is MergeTree and other is not.
*/
auto modified_context = Context::createCopy(local_context);
modified_context->setSetting("optimize_move_to_prewhere", false);
bool has_database_virtual_column = false;
bool has_table_virtual_column = false;
@ -265,53 +271,8 @@ void StorageMerge::read(
real_column_names.push_back(column_name);
}
/** Just in case, turn off optimization "transfer to PREWHERE",
* since there is no certainty that it works when one of table is MergeTree and other is not.
*/
auto modified_context = Context::createCopy(local_context);
modified_context->setSetting("optimize_move_to_prewhere", false);
/// What will be result structure depending on query processed stage in source tables?
Block header = getHeaderForProcessingStage(column_names, storage_snapshot, query_info, local_context, processed_stage);
/** First we make list of selected tables to find out its size.
* This is necessary to correctly pass the recommended number of threads to each table.
*/
StorageListWithLocks selected_tables
= getSelectedTables(local_context, query_info.query, has_database_virtual_column, has_table_virtual_column);
query_plan.addInterpreterContext(modified_context);
QueryPlanResourceHolder resources;
if (selected_tables.empty())
{
auto modified_query_info = getModifiedQueryInfo(query_info, modified_context, getStorageID(), false);
/// FIXME: do we support sampling in this case?
auto pipe = createSources(
resources,
{},
modified_query_info,
processed_stage,
max_block_size,
header,
{},
{},
real_column_names,
modified_context,
0,
has_database_virtual_column,
has_table_virtual_column);
IStorage::readFromPipe(query_plan, std::move(pipe), column_names, storage_snapshot, query_info, local_context, getName());
return;
}
size_t tables_count = selected_tables.size();
Float64 num_streams_multiplier
= std::min(static_cast<unsigned>(tables_count), std::max(1U, static_cast<unsigned>(local_context->getSettingsRef().max_streams_multiplier_for_merge_tables)));
num_streams *= num_streams_multiplier;
size_t remaining_streams = num_streams;
= getSelectedTables(modified_context, query_info.query, has_database_virtual_column, has_table_virtual_column);
InputOrderInfoPtr input_sorting_info;
if (query_info.order_optimizer)
@ -320,7 +281,7 @@ void StorageMerge::read(
{
auto storage_ptr = std::get<1>(*it);
auto storage_metadata_snapshot = storage_ptr->getInMemoryMetadataPtr();
auto current_info = query_info.order_optimizer->getInputOrder(storage_metadata_snapshot, local_context);
auto current_info = query_info.order_optimizer->getInputOrder(storage_metadata_snapshot, modified_context);
if (it == selected_tables.begin())
input_sorting_info = current_info;
else if (!current_info || (input_sorting_info && *current_info != *input_sorting_info))
@ -333,7 +294,95 @@ void StorageMerge::read(
query_info.input_order_info = input_sorting_info;
}
auto sample_block = getInMemoryMetadataPtr()->getSampleBlock();
query_plan.addInterpreterContext(modified_context);
/// What will be result structure depending on query processed stage in source tables?
Block common_header = getHeaderForProcessingStage(column_names, storage_snapshot, query_info, local_context, processed_stage);
auto step = std::make_unique<ReadFromMerge>(
common_header,
std::move(selected_tables),
real_column_names,
has_database_virtual_column,
has_table_virtual_column,
max_block_size,
num_streams,
shared_from_this(),
storage_snapshot,
query_info,
std::move(modified_context),
processed_stage);
query_plan.addStep(std::move(step));
}
ReadFromMerge::ReadFromMerge(
Block common_header_,
StorageListWithLocks selected_tables_,
Names column_names_,
bool has_database_virtual_column_,
bool has_table_virtual_column_,
size_t max_block_size,
size_t num_streams,
StoragePtr storage,
StorageSnapshotPtr storage_snapshot,
const SelectQueryInfo & query_info_,
ContextMutablePtr context_,
QueryProcessingStage::Enum processed_stage)
: ISourceStep(DataStream{.header = common_header_})
, required_max_block_size(max_block_size)
, requested_num_streams(num_streams)
, common_header(std::move(common_header_))
, selected_tables(std::move(selected_tables_))
, column_names(std::move(column_names_))
, has_database_virtual_column(has_database_virtual_column_)
, has_table_virtual_column(has_table_virtual_column_)
, storage_merge(std::move(storage))
, merge_storage_snapshot(std::move(storage_snapshot))
, query_info(query_info_)
, context(std::move(context_))
, common_processed_stage(processed_stage)
{
}
void ReadFromMerge::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{
if (selected_tables.empty())
{
pipeline.init(Pipe(std::make_shared<NullSource>(output_stream->header)));
return;
}
size_t tables_count = selected_tables.size();
Float64 num_streams_multiplier
= std::min(static_cast<unsigned>(tables_count), std::max(1U, static_cast<unsigned>(context->getSettingsRef().max_streams_multiplier_for_merge_tables)));
size_t num_streams = requested_num_streams * num_streams_multiplier;
size_t remaining_streams = num_streams;
InputOrderInfoPtr input_sorting_info;
if (query_info.order_optimizer)
{
for (auto it = selected_tables.begin(); it != selected_tables.end(); ++it)
{
auto storage_ptr = std::get<1>(*it);
auto storage_metadata_snapshot = storage_ptr->getInMemoryMetadataPtr();
auto current_info = query_info.order_optimizer->getInputOrder(storage_metadata_snapshot, context);
if (it == selected_tables.begin())
input_sorting_info = current_info;
else if (!current_info || (input_sorting_info && *current_info != *input_sorting_info))
input_sorting_info.reset();
if (!input_sorting_info)
break;
}
query_info.input_order_info = input_sorting_info;
}
auto sample_block = merge_storage_snapshot->getMetadataForQuery()->getSampleBlock();
std::vector<std::unique_ptr<QueryPipelineBuilder>> pipelines;
QueryPlanResourceHolder resources;
for (const auto & table : selected_tables)
{
@ -351,20 +400,20 @@ void StorageMerge::read(
Aliases aliases;
auto storage_metadata_snapshot = storage->getInMemoryMetadataPtr();
auto storage_columns = storage_metadata_snapshot->getColumns();
auto nested_storage_snaphsot = storage->getStorageSnapshot(storage_metadata_snapshot, local_context);
auto nested_storage_snaphsot = storage->getStorageSnapshot(storage_metadata_snapshot, context);
auto modified_query_info = getModifiedQueryInfo(query_info, modified_context, storage->getStorageID(), storage->as<StorageMerge>());
auto syntax_result = TreeRewriter(local_context).analyzeSelect(
auto modified_query_info = getModifiedQueryInfo(query_info, context, storage->getStorageID(), storage->as<StorageMerge>());
auto syntax_result = TreeRewriter(context).analyzeSelect(
modified_query_info.query, TreeRewriterResult({}, storage, nested_storage_snaphsot));
Names column_names_as_aliases;
bool with_aliases = processed_stage == QueryProcessingStage::FetchColumns && !storage_columns.getAliases().empty();
bool with_aliases = common_processed_stage == QueryProcessingStage::FetchColumns && !storage_columns.getAliases().empty();
if (with_aliases)
{
ASTPtr required_columns_expr_list = std::make_shared<ASTExpressionList>();
ASTPtr column_expr;
for (const auto & column : real_column_names)
for (const auto & column : column_names)
{
const auto column_default = storage_columns.getDefault(column);
bool is_alias = column_default && column_default->kind == ColumnDefaultKind::Alias;
@ -373,11 +422,11 @@ void StorageMerge::read(
{
column_expr = column_default->expression->clone();
replaceAliasColumnsInQuery(column_expr, storage_metadata_snapshot->getColumns(),
syntax_result->array_join_result_to_source, local_context);
syntax_result->array_join_result_to_source, context);
auto column_description = storage_columns.get(column);
column_expr = addTypeConversionToAST(std::move(column_expr), column_description.type->getName(),
storage_metadata_snapshot->getColumns().getAll(), local_context);
storage_metadata_snapshot->getColumns().getAll(), context);
column_expr = setAlias(column_expr, column);
auto type = sample_block.getByName(column).type;
@ -389,54 +438,55 @@ void StorageMerge::read(
required_columns_expr_list->children.emplace_back(std::move(column_expr));
}
syntax_result = TreeRewriter(local_context).analyze(
required_columns_expr_list, storage_columns.getAllPhysical(), storage, storage->getStorageSnapshot(storage_metadata_snapshot, local_context));
syntax_result = TreeRewriter(context).analyze(
required_columns_expr_list, storage_columns.getAllPhysical(), storage, storage->getStorageSnapshot(storage_metadata_snapshot, context));
auto alias_actions = ExpressionAnalyzer(required_columns_expr_list, syntax_result, local_context).getActionsDAG(true);
auto alias_actions = ExpressionAnalyzer(required_columns_expr_list, syntax_result, context).getActionsDAG(true);
column_names_as_aliases = alias_actions->getRequiredColumns().getNames();
if (column_names_as_aliases.empty())
column_names_as_aliases.push_back(ExpressionActions::getSmallestColumn(storage_metadata_snapshot->getColumns().getAllPhysical()));
}
auto source_pipe = createSources(
resources,
auto source_pipeline = createSources(
nested_storage_snaphsot,
modified_query_info,
processed_stage,
max_block_size,
header,
common_processed_stage,
required_max_block_size,
common_header,
aliases,
table,
column_names_as_aliases.empty() ? real_column_names : column_names_as_aliases,
modified_context,
current_streams,
has_database_virtual_column,
has_table_virtual_column);
column_names_as_aliases.empty() ? column_names : column_names_as_aliases,
context,
current_streams);
if (!source_pipe.empty())
if (source_pipeline && source_pipeline->initialized())
{
query_plan.addStorageHolder(std::get<1>(table));
query_plan.addTableLock(std::get<2>(table));
resources.storage_holders.push_back(std::get<1>(table));
resources.table_locks.push_back(std::get<2>(table));
pipes.emplace_back(std::move(source_pipe));
pipelines.emplace_back(std::move(source_pipeline));
}
}
auto pipe = Pipe::unitePipes(std::move(pipes));
if (pipelines.empty())
{
pipeline.init(Pipe(std::make_shared<NullSource>(output_stream->header)));
return;
}
if (!pipe.empty() && !query_info.input_order_info)
pipeline = QueryPipelineBuilder::unitePipelines(std::move(pipelines));
if (!query_info.input_order_info)
// It's possible to have many tables read from merge, resize(num_streams) might open too many files at the same time.
// Using narrowPipe instead. But in case of reading in order of primary key, we cannot do it,
// because narrowPipe doesn't preserve order.
narrowPipe(pipe, num_streams);
pipeline.narrow(num_streams);
IStorage::readFromPipe(query_plan, std::move(pipe), column_names, storage_snapshot, query_info, local_context, getName());
query_plan.addResources(std::move(resources));
pipeline.addResources(std::move(resources));
}
Pipe StorageMerge::createSources(
QueryPlanResourceHolder & resources,
QueryPipelineBuilderPtr ReadFromMerge::createSources(
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & modified_query_info,
const QueryProcessingStage::Enum & processed_stage,
@ -447,25 +497,19 @@ Pipe StorageMerge::createSources(
Names & real_column_names,
ContextMutablePtr modified_context,
size_t streams_num,
bool has_database_virtual_column,
bool has_table_virtual_column,
bool concat_streams)
{
const auto & [database_name, storage, _, table_name] = storage_with_lock;
auto & modified_select = modified_query_info.query->as<ASTSelectQuery &>();
Pipe pipe;
QueryPipelineBuilderPtr builder;
if (!storage)
{
auto builder = InterpreterSelectQuery(
return std::make_unique<QueryPipelineBuilder>(InterpreterSelectQuery(
modified_query_info.query, modified_context,
Pipe(std::make_shared<SourceFromSingleChunk>(header)),
SelectQueryOptions(processed_stage).analyze()).buildQueryPipeline();
pipe = QueryPipelineBuilder::getPipe(std::move(builder), resources);
return pipe;
SelectQueryOptions(processed_stage).analyze()).buildQueryPipeline());
}
if (!modified_select.final() && storage->needRewriteQueryWithFinal(real_column_names))
@ -497,11 +541,12 @@ Pipe StorageMerge::createSources(
if (!plan.isInitialized())
return {};
auto builder = plan.buildQueryPipeline(
if (auto * read_from_merge_tree = typeid_cast<ReadFromMergeTree *>(plan.getRootNode()->step.get()))
read_from_merge_tree->addFilter(added_filter, added_filter_column_name);
builder = plan.buildQueryPipeline(
QueryPlanOptimizationSettings::fromContext(modified_context),
BuildQueryPipelineSettings::fromContext(modified_context));
pipe = QueryPipelineBuilder::getPipe(std::move(*builder), resources);
}
else if (processed_stage > storage_stage)
{
@ -515,28 +560,27 @@ Pipe StorageMerge::createSources(
InterpreterSelectQuery interpreter{
modified_query_info.query, modified_context, SelectQueryOptions(processed_stage).ignoreProjections()};
pipe = QueryPipelineBuilder::getPipe(interpreter.buildQueryPipeline(), resources);
builder = std::make_unique<QueryPipelineBuilder>(interpreter.buildQueryPipeline());
/** Materialization is needed, since from distributed storage the constants come materialized.
* If you do not do this, different types (Const and non-Const) columns will be produced in different threads,
* And this is not allowed, since all code is based on the assumption that in the block stream all types are the same.
*/
pipe.addSimpleTransform([](const Block & stream_header) { return std::make_shared<MaterializingTransform>(stream_header); });
builder->addSimpleTransform([](const Block & stream_header) { return std::make_shared<MaterializingTransform>(stream_header); });
}
if (!pipe.empty())
if (builder->initialized())
{
if (concat_streams && pipe.numOutputPorts() > 1)
if (concat_streams && builder->getNumStreams() > 1)
{
// It's possible to have many tables read from merge, resize(1) might open too many files at the same time.
// Using concat instead.
pipe.addTransform(std::make_shared<ConcatProcessor>(pipe.getHeader(), pipe.numOutputPorts()));
builder->addTransform(std::make_shared<ConcatProcessor>(builder->getHeader(), builder->getNumStreams()));
}
/// Add virtual columns if we don't already have them.
Block pipe_header = pipe.getHeader();
Block pipe_header = builder->getHeader();
if (has_database_virtual_column && !pipe_header.has("_database"))
{
@ -550,7 +594,7 @@ Pipe StorageMerge::createSources(
std::move(adding_column_dag),
ExpressionActionsSettings::fromContext(modified_context, CompileExpressions::yes));
pipe.addSimpleTransform([&](const Block & stream_header)
builder->addSimpleTransform([&](const Block & stream_header)
{
return std::make_shared<ExpressionTransform>(stream_header, adding_column_actions);
});
@ -568,7 +612,7 @@ Pipe StorageMerge::createSources(
std::move(adding_column_dag),
ExpressionActionsSettings::fromContext(modified_context, CompileExpressions::yes));
pipe.addSimpleTransform([&](const Block & stream_header)
builder->addSimpleTransform([&](const Block & stream_header)
{
return std::make_shared<ExpressionTransform>(stream_header, adding_column_actions);
});
@ -576,10 +620,10 @@ Pipe StorageMerge::createSources(
/// Subordinary tables could have different but convertible types, like numeric types of different width.
/// We must return streams with structure equals to structure of Merge table.
convertingSourceStream(header, storage_snapshot->metadata, aliases, modified_context, modified_query_info.query, pipe, processed_stage);
convertingSourceStream(header, storage_snapshot->metadata, aliases, modified_context, modified_query_info.query, *builder, processed_stage);
}
return pipe;
return builder;
}
StorageMerge::StorageListWithLocks StorageMerge::getSelectedTables(
@ -747,19 +791,19 @@ void StorageMerge::alter(
setInMemoryMetadata(storage_metadata);
}
void StorageMerge::convertingSourceStream(
void ReadFromMerge::convertingSourceStream(
const Block & header,
const StorageMetadataPtr & metadata_snapshot,
const Aliases & aliases,
ContextPtr local_context,
ASTPtr & query,
Pipe & pipe,
QueryPipelineBuilder & builder,
QueryProcessingStage::Enum processed_stage)
{
Block before_block_header = pipe.getHeader();
Block before_block_header = builder.getHeader();
auto storage_sample_block = metadata_snapshot->getSampleBlock();
auto pipe_columns = pipe.getHeader().getNamesAndTypesList();
auto pipe_columns = builder.getHeader().getNamesAndTypesList();
for (const auto & alias : aliases)
{
@ -772,21 +816,21 @@ void StorageMerge::convertingSourceStream(
auto actions_dag = expression_analyzer.getActionsDAG(true, false);
auto actions = std::make_shared<ExpressionActions>(actions_dag, ExpressionActionsSettings::fromContext(local_context, CompileExpressions::yes));
pipe.addSimpleTransform([&](const Block & stream_header)
builder.addSimpleTransform([&](const Block & stream_header)
{
return std::make_shared<ExpressionTransform>(stream_header, actions);
});
}
{
auto convert_actions_dag = ActionsDAG::makeConvertingActions(pipe.getHeader().getColumnsWithTypeAndName(),
auto convert_actions_dag = ActionsDAG::makeConvertingActions(builder.getHeader().getColumnsWithTypeAndName(),
header.getColumnsWithTypeAndName(),
ActionsDAG::MatchColumnsMode::Name);
auto actions = std::make_shared<ExpressionActions>(
convert_actions_dag,
ExpressionActionsSettings::fromContext(local_context, CompileExpressions::yes));
pipe.addSimpleTransform([&](const Block & stream_header)
builder.addSimpleTransform([&](const Block & stream_header)
{
return std::make_shared<ExpressionTransform>(stream_header, actions);
});
@ -809,7 +853,7 @@ void StorageMerge::convertingSourceStream(
if (!header_column.type->equals(*before_column.type.get()))
{
NamesAndTypesList source_columns = metadata_snapshot->getSampleBlock().getNamesAndTypesList();
auto virtual_column = *getVirtuals().tryGetByName("_table");
auto virtual_column = *storage_merge->getVirtuals().tryGetByName("_table");
source_columns.emplace_back(NameAndTypePair{virtual_column.name, virtual_column.type});
auto syntax_result = TreeRewriter(local_context).analyze(where_expression, source_columns);
ExpressionActionsPtr actions = ExpressionAnalyzer{where_expression, syntax_result, local_context}.getActions(false, false);

View File

@ -1,7 +1,9 @@
#pragma once
#include <Common/OptimizedRegularExpression.h>
#include <Storages/SelectQueryInfo.h>
#include <Storages/IStorage.h>
#include <Processors/QueryPlan/ISourceStep.h>
namespace DB
@ -105,7 +107,62 @@ private:
NamesAndTypesList getVirtuals() const override;
ColumnSizeByName getColumnSizes() const override;
protected:
ColumnsDescription getColumnsDescriptionFromSourceTables() const;
friend class ReadFromMerge;
};
class ReadFromMerge final : public ISourceStep
{
public:
static constexpr auto name = "ReadFromMerge";
String getName() const override { return name; }
using StorageWithLockAndName = std::tuple<String, StoragePtr, TableLockHolder, String>;
using StorageListWithLocks = std::list<StorageWithLockAndName>;
using DatabaseTablesIterators = std::vector<DatabaseTablesIteratorPtr>;
ReadFromMerge(
Block common_header_,
StorageListWithLocks selected_tables_,
Names column_names_,
bool has_database_virtual_column_,
bool has_table_virtual_column_,
size_t max_block_size,
size_t num_streams,
StoragePtr storage,
StorageSnapshotPtr storage_snapshot,
const SelectQueryInfo & query_info_,
ContextMutablePtr context_,
QueryProcessingStage::Enum processed_stage);
void initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override;
void addFilter(ActionsDAGPtr expression, std::string column_name)
{
added_filter = std::move(expression);
added_filter_column_name = std::move(column_name);
}
private:
const size_t required_max_block_size;
const size_t requested_num_streams;
const Block common_header;
StorageListWithLocks selected_tables;
Names column_names;
bool has_database_virtual_column;
bool has_table_virtual_column;
StoragePtr storage_merge;
StorageSnapshotPtr merge_storage_snapshot;
SelectQueryInfo query_info;
ContextMutablePtr context;
QueryProcessingStage::Enum common_processed_stage;
ActionsDAGPtr added_filter;
std::string added_filter_column_name;
struct AliasData
{
String name;
@ -115,8 +172,7 @@ protected:
using Aliases = std::vector<AliasData>;
Pipe createSources(
QueryPlanResourceHolder & resources,
QueryPipelineBuilderPtr createSources(
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
const QueryProcessingStage::Enum & processed_stage,
@ -127,19 +183,12 @@ protected:
Names & real_column_names,
ContextMutablePtr modified_context,
size_t streams_num,
bool has_database_virtual_column,
bool has_table_virtual_column,
bool concat_streams = false);
void convertingSourceStream(
const Block & header, const StorageMetadataPtr & metadata_snapshot, const Aliases & aliases,
ContextPtr context, ASTPtr & query,
Pipe & pipe, QueryProcessingStage::Enum processed_stage);
static SelectQueryInfo getModifiedQueryInfo(
const SelectQueryInfo & query_info, ContextPtr modified_context, const StorageID & current_storage_id, bool is_merge_engine);
ColumnsDescription getColumnsDescriptionFromSourceTables() const;
QueryPipelineBuilder & builder, QueryProcessingStage::Enum processed_stage);
};
}

View File

@ -1,2 +1,2 @@
SleepFunctionCalls: 4 (increment)
SleepFunctionMicroseconds: 400000 (increment)
SleepFunctionCalls: 3 (increment)
SleepFunctionMicroseconds: 300000 (increment)