mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-23 08:02:02 +00:00
Build actions dag to evaluate missing defaults.
This commit is contained in:
parent
c12f2d460e
commit
d9aa1096cf
@ -171,7 +171,12 @@ Block AddingDefaultsBlockInputStream::readImpl()
|
||||
if (!evaluate_block.columns())
|
||||
evaluate_block.insert({ColumnConst::create(ColumnUInt8::create(1, 0), res.rows()), std::make_shared<DataTypeUInt8>(), "_dummy"});
|
||||
|
||||
evaluateMissingDefaults(evaluate_block, header.getNamesAndTypesList(), columns, context, false);
|
||||
auto dag = createFillingMissingDefaultsExpression(evaluate_block, header.getNamesAndTypesList(), columns, context, false);
|
||||
if (dag)
|
||||
{
|
||||
auto actions = std::make_shared<ExpressionActions>(std::move(dag));
|
||||
actions->execute(evaluate_block);
|
||||
}
|
||||
|
||||
std::unordered_map<size_t, MutableColumnPtr> mixed_columns;
|
||||
|
||||
|
@ -11,6 +11,7 @@ namespace ErrorCodes
|
||||
{
|
||||
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
|
||||
extern const int ILLEGAL_COLUMN;
|
||||
extern const int TOO_FEW_ARGUMENTS_FOR_FUNCTION;
|
||||
}
|
||||
|
||||
namespace
|
||||
@ -35,33 +36,54 @@ public:
|
||||
|
||||
size_t getNumberOfArguments() const override
|
||||
{
|
||||
return 2;
|
||||
return 0;
|
||||
}
|
||||
|
||||
bool isVariadic() const override { return true; }
|
||||
|
||||
bool useDefaultImplementationForNulls() const override { return false; }
|
||||
|
||||
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
|
||||
{
|
||||
const DataTypeArray * array_type = checkAndGetDataType<DataTypeArray>(arguments[1].get());
|
||||
if (!array_type)
|
||||
throw Exception("Second argument for function " + getName() + " must be array.", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
|
||||
if (arguments.size() < 2)
|
||||
throw Exception(ErrorCodes::TOO_FEW_ARGUMENTS_FOR_FUNCTION,
|
||||
"Function {} expect at leas two arguments, got {}", getName(), arguments.size());
|
||||
|
||||
for (size_t i = 1; i < arguments.size(); ++i)
|
||||
{
|
||||
const DataTypeArray * array_type = checkAndGetDataType<DataTypeArray>(arguments[i].get());
|
||||
if (!array_type)
|
||||
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
|
||||
"Argument {} for function {} must be array.",
|
||||
i + 1, getName());
|
||||
}
|
||||
return std::make_shared<DataTypeArray>(arguments[0]);
|
||||
}
|
||||
|
||||
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t) const override
|
||||
{
|
||||
ColumnPtr first_column = arguments[0].column;
|
||||
const ColumnArray * array_column = checkAndGetColumn<ColumnArray>(arguments[1].column.get());
|
||||
ColumnPtr temp_column;
|
||||
if (!array_column)
|
||||
ColumnPtr offsets;
|
||||
|
||||
for (size_t i = 1; i < arguments.size(); ++i)
|
||||
{
|
||||
const auto * const_array_column = checkAndGetColumnConst<ColumnArray>(arguments[1].column.get());
|
||||
if (!const_array_column)
|
||||
throw Exception("Unexpected column for replicate", ErrorCodes::ILLEGAL_COLUMN);
|
||||
temp_column = const_array_column->convertToFullColumn();
|
||||
array_column = checkAndGetColumn<ColumnArray>(temp_column.get());
|
||||
const ColumnArray * array_column = checkAndGetColumn<ColumnArray>(arguments[i].column.get());
|
||||
ColumnPtr temp_column;
|
||||
if (!array_column)
|
||||
{
|
||||
const auto * const_array_column = checkAndGetColumnConst<ColumnArray>(arguments[i].column.get());
|
||||
if (!const_array_column)
|
||||
throw Exception("Unexpected column for replicate", ErrorCodes::ILLEGAL_COLUMN);
|
||||
temp_column = const_array_column->convertToFullColumn();
|
||||
array_column = checkAndGetColumn<ColumnArray>(temp_column.get());
|
||||
}
|
||||
|
||||
if (!offsets || offsets->empty())
|
||||
offsets = array_column->getOffsetsPtr();
|
||||
}
|
||||
return ColumnArray::create(first_column->replicate(array_column->getOffsets())->convertToFullColumnIfConst(), array_column->getOffsetsPtr());
|
||||
|
||||
const auto & offsets_data = assert_cast<const ColumnArray::ColumnOffsets &>(*offsets).getData();
|
||||
return ColumnArray::create(first_column->replicate(offsets_data)->convertToFullColumnIfConst(), offsets);
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -618,6 +618,25 @@ bool ActionsDAG::trivial() const
|
||||
return true;
|
||||
}
|
||||
|
||||
void ActionsDAG::addMaterializingOutputActions()
|
||||
{
|
||||
FunctionOverloadResolverPtr func_builder_materialize =
|
||||
std::make_shared<FunctionOverloadResolverAdaptor>(
|
||||
std::make_unique<DefaultOverloadResolver>(
|
||||
std::make_shared<FunctionMaterialize>()));
|
||||
|
||||
Index new_index;
|
||||
for (auto * node : index)
|
||||
{
|
||||
auto & name = node->result_name;
|
||||
node = &addFunction(func_builder_materialize, {node}, {}, true);
|
||||
node = &addAlias(*node, name, true);
|
||||
new_index.insert(node);
|
||||
}
|
||||
|
||||
index.swap(new_index);
|
||||
}
|
||||
|
||||
ActionsDAGPtr ActionsDAG::makeConvertingActions(
|
||||
const ColumnsWithTypeAndName & source,
|
||||
const ColumnsWithTypeAndName & result,
|
||||
|
@ -231,6 +231,9 @@ public:
|
||||
|
||||
ActionsDAGPtr clone() const;
|
||||
|
||||
/// For apply materialize() function for every output.
|
||||
/// Also add aliases so the result names remain unchanged.
|
||||
void addMaterializingOutputActions();
|
||||
|
||||
enum class MatchColumnsMode
|
||||
{
|
||||
|
@ -7,6 +7,7 @@
|
||||
#include <Interpreters/inplaceBlockConversions.h>
|
||||
#include <Core/Block.h>
|
||||
#include <Storages/ColumnsDescription.h>
|
||||
#include <Interpreters/ExpressionActions.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -74,7 +75,12 @@ Block addMissingDefaults(
|
||||
}
|
||||
|
||||
/// Computes explicitly specified values by default and materialized columns.
|
||||
evaluateMissingDefaults(res, required_columns, columns, context);
|
||||
auto dag = createFillingMissingDefaultsExpression(res, required_columns, columns, context);
|
||||
if (dag)
|
||||
{
|
||||
auto actions = std::make_shared<ExpressionActions>(std::move(dag));
|
||||
actions->execute(res);
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
|
@ -24,7 +24,7 @@ namespace
|
||||
{
|
||||
|
||||
/// Add all required expressions for missing columns calculation
|
||||
void addDefaultRequiredExpressionsRecursively(Block & block, const String & required_column, const ColumnsDescription & columns, ASTPtr default_expr_list_accum, NameSet & added_columns)
|
||||
void addDefaultRequiredExpressionsRecursively(const Block & block, const String & required_column, const ColumnsDescription & columns, ASTPtr default_expr_list_accum, NameSet & added_columns)
|
||||
{
|
||||
checkStackSize();
|
||||
if (block.has(required_column) || added_columns.count(required_column))
|
||||
@ -52,7 +52,7 @@ void addDefaultRequiredExpressionsRecursively(Block & block, const String & requ
|
||||
}
|
||||
}
|
||||
|
||||
ASTPtr defaultRequiredExpressions(Block & block, const NamesAndTypesList & required_columns, const ColumnsDescription & columns)
|
||||
ASTPtr defaultRequiredExpressions(const Block & block, const NamesAndTypesList & required_columns, const ColumnsDescription & columns)
|
||||
{
|
||||
ASTPtr default_expr_list = std::make_shared<ASTExpressionList>();
|
||||
|
||||
@ -87,67 +87,32 @@ ASTPtr convertRequiredExpressions(Block & block, const NamesAndTypesList & requi
|
||||
return conversion_expr_list;
|
||||
}
|
||||
|
||||
void executeExpressionsOnBlock(
|
||||
Block & block,
|
||||
ActionsDAGPtr createFillingMissingDefaultsExpression(
|
||||
const Block & header,
|
||||
ASTPtr expr_list,
|
||||
bool save_unneeded_columns,
|
||||
const NamesAndTypesList & required_columns,
|
||||
const Context & context)
|
||||
{
|
||||
if (!expr_list)
|
||||
return;
|
||||
return nullptr;
|
||||
|
||||
if (!save_unneeded_columns)
|
||||
{
|
||||
auto syntax_result = TreeRewriter(context).analyze(expr_list, block.getNamesAndTypesList());
|
||||
ExpressionAnalyzer{expr_list, syntax_result, context}.getActions(true)->execute(block);
|
||||
return;
|
||||
}
|
||||
|
||||
/** ExpressionAnalyzer eliminates "unused" columns, in order to ensure their safety
|
||||
* we are going to operate on a copy instead of the original block */
|
||||
Block copy_block{block};
|
||||
|
||||
auto syntax_result = TreeRewriter(context).analyze(expr_list, block.getNamesAndTypesList());
|
||||
auto syntax_result = TreeRewriter(context).analyze(expr_list, header.getNamesAndTypesList());
|
||||
auto expression_analyzer = ExpressionAnalyzer{expr_list, syntax_result, context};
|
||||
auto required_source_columns = syntax_result->requiredSourceColumns();
|
||||
auto rows_was = copy_block.rows();
|
||||
auto dag = expression_analyzer.getActionsDAG(true, !save_unneeded_columns);
|
||||
|
||||
// Delete all not needed columns in DEFAULT expression.
|
||||
// They can intersect with columns added in PREWHERE
|
||||
// test 00950_default_prewhere
|
||||
// CLICKHOUSE-4523
|
||||
for (const auto & delete_column : copy_block.getNamesAndTypesList())
|
||||
if (save_unneeded_columns)
|
||||
{
|
||||
if (std::find(required_source_columns.begin(), required_source_columns.end(), delete_column.name) == required_source_columns.end())
|
||||
{
|
||||
copy_block.erase(delete_column.name);
|
||||
}
|
||||
Names required_names;
|
||||
required_names.resize(required_columns.size());
|
||||
for (const auto & column : required_columns)
|
||||
required_names.push_back(column.name);
|
||||
|
||||
dag->removeUnusedActions(required_names);
|
||||
dag->addMaterializingOutputActions();
|
||||
}
|
||||
|
||||
if (copy_block.columns() == 0)
|
||||
{
|
||||
// Add column to indicate block size in execute()
|
||||
copy_block.insert({DataTypeUInt8().createColumnConst(rows_was, 0u), std::make_shared<DataTypeUInt8>(), "__dummy"});
|
||||
}
|
||||
|
||||
expression_analyzer.getActions(true)->execute(copy_block);
|
||||
|
||||
/// move evaluated columns to the original block, materializing them at the same time
|
||||
size_t pos = 0;
|
||||
for (auto col = required_columns.begin(); col != required_columns.end(); ++col, ++pos)
|
||||
{
|
||||
if (copy_block.has(col->name))
|
||||
{
|
||||
auto evaluated_col = copy_block.getByName(col->name);
|
||||
evaluated_col.column = evaluated_col.column->convertToFullColumnIfConst();
|
||||
|
||||
if (block.has(col->name))
|
||||
block.getByName(col->name) = std::move(evaluated_col);
|
||||
else
|
||||
block.insert(pos, std::move(evaluated_col));
|
||||
}
|
||||
}
|
||||
return dag;
|
||||
}
|
||||
|
||||
}
|
||||
@ -157,19 +122,25 @@ void performRequiredConversions(Block & block, const NamesAndTypesList & require
|
||||
ASTPtr conversion_expr_list = convertRequiredExpressions(block, required_columns);
|
||||
if (conversion_expr_list->children.empty())
|
||||
return;
|
||||
executeExpressionsOnBlock(block, conversion_expr_list, true, required_columns, context);
|
||||
|
||||
if (auto dag = createFillingMissingDefaultsExpression(block, conversion_expr_list, true, required_columns, context))
|
||||
{
|
||||
auto expression = std::make_shared<ExpressionActions>(std::move(dag));
|
||||
expression->execute(block);
|
||||
}
|
||||
}
|
||||
|
||||
void evaluateMissingDefaults(Block & block,
|
||||
ActionsDAGPtr createFillingMissingDefaultsExpression(
|
||||
const Block & header,
|
||||
const NamesAndTypesList & required_columns,
|
||||
const ColumnsDescription & columns,
|
||||
const Context & context, bool save_unneeded_columns)
|
||||
{
|
||||
if (!columns.hasDefaults())
|
||||
return;
|
||||
return nullptr;
|
||||
|
||||
ASTPtr default_expr_list = defaultRequiredExpressions(block, required_columns, columns);
|
||||
executeExpressionsOnBlock(block, default_expr_list, save_unneeded_columns, required_columns, context);
|
||||
ASTPtr expr_list = defaultRequiredExpressions(header, required_columns, columns);
|
||||
return createFillingMissingDefaultsExpression(header, expr_list, save_unneeded_columns, required_columns, context);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -12,9 +12,13 @@ class Context;
|
||||
class NamesAndTypesList;
|
||||
class ColumnsDescription;
|
||||
|
||||
class ActionsDAG;
|
||||
using ActionsDAGPtr = std::shared_ptr<ActionsDAG>;
|
||||
|
||||
/// Adds missing defaults to block according to required_columns
|
||||
/// using columns description
|
||||
void evaluateMissingDefaults(Block & block,
|
||||
ActionsDAGPtr createFillingMissingDefaultsExpression(
|
||||
const Block & header,
|
||||
const NamesAndTypesList & required_columns,
|
||||
const ColumnsDescription & columns,
|
||||
const Context & context, bool save_unneeded_columns = true);
|
||||
|
22
src/Processors/QueryPlan/Optimizations/filterPushDown.cpp
Normal file
22
src/Processors/QueryPlan/Optimizations/filterPushDown.cpp
Normal file
@ -0,0 +1,22 @@
|
||||
#include <Processors/QueryPlan/Optimizations/Optimizations.h>
|
||||
#include <Processors/QueryPlan/FilterStep.h>
|
||||
|
||||
namespace DB::QueryPlanOptimizations
|
||||
{
|
||||
|
||||
size_t tryPushDownLimit(QueryPlan::Node * node, QueryPlan::Nodes &)
|
||||
{
|
||||
auto * filter_step = typeid_cast<FilterStep *>(node->step.get());
|
||||
if (!filter_step)
|
||||
return 0;
|
||||
|
||||
QueryPlan::Node * child_node = node->children.front();
|
||||
auto & child = child_node->step;
|
||||
|
||||
if (const auto * adding_const_column = typeid_cast<const AddingConstColumnStep *>(child.get()))
|
||||
{
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -186,7 +186,13 @@ void IMergeTreeReader::evaluateMissingDefaults(Block additional_columns, Columns
|
||||
additional_columns.insert({res_columns[pos], name_and_type->type, name_and_type->name});
|
||||
}
|
||||
|
||||
DB::evaluateMissingDefaults(additional_columns, columns, metadata_snapshot->getColumns(), storage.global_context);
|
||||
auto dag = DB::createFillingMissingDefaultsExpression(
|
||||
additional_columns, columns, metadata_snapshot->getColumns(), storage.global_context);
|
||||
if (dag)
|
||||
{
|
||||
auto actions = std::make_shared<ExpressionActions>(std::move(dag));
|
||||
actions->execute(additional_columns);
|
||||
}
|
||||
|
||||
/// Move columns from block.
|
||||
name_and_type = columns.begin();
|
||||
|
Loading…
Reference in New Issue
Block a user