Refactor ActionsChain.

This commit is contained in:
Nikolai Kochetov 2020-08-13 23:17:18 +03:00
parent 246c2cafb7
commit 8e631a98ea
14 changed files with 235 additions and 325 deletions

View File

@ -1,6 +1,5 @@
#include "Common/quoteString.h"
#include <Common/quoteString.h>
#include <Common/typeid_cast.h>
#include <Common/PODArray.h>
#include <Core/Row.h>
#include <Functions/FunctionFactory.h>
@ -9,7 +8,6 @@
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <DataTypes/DataTypeSet.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeFunction.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeTuple.h>
@ -21,7 +19,6 @@
#include <Columns/ColumnSet.h>
#include <Columns/ColumnConst.h>
#include <Columns/ColumnsNumber.h>
#include <Storages/StorageSet.h>
@ -546,10 +543,14 @@ void ActionsMatcher::visit(const ASTFunction & node, const ASTPtr & ast, Data &
if (!data.only_consts)
{
String result_name = column_name.get(ast);
data.addAction(ExpressionAction::copyColumn(arg->getColumnName(), result_name));
NameSet joined_columns;
joined_columns.insert(result_name);
data.addAction(ExpressionAction::arrayJoin(std::make_shared<ArrayJoinAction>(joined_columns, false, data.context)));
/// Here we copy argument because arrayJoin removes source column.
/// It makes possible to remove source column before arrayJoin if it won't be needed anymore.
/// It could have been possible to implement arrayJoin which keeps source column,
/// but in this case it will always be replicated (as many arrays), which is expensive.
String tmp_name = data.getUniqueName("_array_join_" + arg->getColumnName());
data.addAction(ExpressionAction::copyColumn(arg->getColumnName(), tmp_name));
data.addAction(ExpressionAction::arrayJoin(tmp_name, result_name));
}
return;

View File

@ -48,7 +48,7 @@ void ArrayJoinAction::prepare(Block & sample_block)
}
}
void ArrayJoinAction::execute(Block & block, bool dry_run)
void ArrayJoinAction::execute(Block & block)
{
if (columns.empty())
throw Exception("No arrays to join", ErrorCodes::LOGICAL_ERROR);
@ -105,7 +105,7 @@ void ArrayJoinAction::execute(Block & block, bool dry_run)
Block tmp_block{src_col, {{}, src_col.type, {}}};
function_builder->build({src_col})->execute(tmp_block, {0}, 1, src_col.column->size(), dry_run);
function_builder->build({src_col})->execute(tmp_block, {0}, 1, src_col.column->size());
non_empty_array_columns[name] = tmp_block.safeGetByPosition(1).column;
}
@ -140,31 +140,4 @@ void ArrayJoinAction::execute(Block & block, bool dry_run)
}
}
void ArrayJoinAction::finalize(NameSet & needed_columns, NameSet & unmodified_columns, NameSet & final_columns)
{
/// Do not ARRAY JOIN columns that are not used anymore.
/// Usually, such columns are not used until ARRAY JOIN, and therefore are ejected further in this function.
/// We will not remove all the columns so as not to lose the number of rows.
for (auto it = columns.begin(); it != columns.end();)
{
bool need = needed_columns.count(*it);
if (!need && columns.size() > 1)
{
columns.erase(it++);
}
else
{
needed_columns.insert(*it);
unmodified_columns.erase(*it);
/// If no ARRAY JOIN results are used, forcibly leave an arbitrary column at the output,
/// so you do not lose the number of rows.
if (!need)
final_columns.insert(*it);
++it;
}
}
}
}

View File

@ -29,8 +29,7 @@ public:
ArrayJoinAction(const NameSet & array_joined_columns_, bool array_join_is_left, const Context & context);
void prepare(Block & sample_block);
void execute(Block & block, bool dry_run);
void finalize(NameSet & needed_columns, NameSet & unmodified_columns, NameSet & final_columns);
void execute(Block & block);
};
using ArrayJoinActionPtr = std::shared_ptr<ArrayJoinAction>;

View File

@ -1,19 +1,19 @@
#include <Interpreters/Set.h>
#include <Common/ProfileEvents.h>
#include <Common/SipHash.h>
#include <Interpreters/ArrayJoinAction.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/ExpressionJIT.h>
#include <Interpreters/TableJoin.h>
#include <Interpreters/Context.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnArray.h>
#include <Common/typeid_cast.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypesNumber.h>
#include <Functions/FunctionFactory.h>
#include <Functions/IFunction.h>
#include <optional>
#include <Columns/ColumnSet.h>
#include <Functions/FunctionHelpers.h>
#if !defined(ARCADIA_BUILD)
# include "config_core.h"
@ -37,6 +37,7 @@ namespace ErrorCodes
extern const int NOT_FOUND_COLUMN_IN_BLOCK;
extern const int TOO_MANY_TEMPORARY_COLUMNS;
extern const int TOO_MANY_TEMPORARY_NON_CONST_COLUMNS;
extern const int TYPE_MISMATCH;
}
/// Read comment near usage
@ -47,9 +48,6 @@ Names ExpressionAction::getNeededColumns() const
{
Names res = argument_names;
if (array_join)
res.insert(res.end(), array_join->columns.begin(), array_join->columns.end());
if (table_join)
res.insert(res.end(), table_join->keyNamesLeft().begin(), table_join->keyNamesLeft().end());
@ -143,11 +141,15 @@ ExpressionAction ExpressionAction::addAliases(const NamesWithAliases & aliased_c
return a;
}
ExpressionAction ExpressionAction::arrayJoin(ArrayJoinActionPtr array_join_)
ExpressionAction ExpressionAction::arrayJoin(std::string source_name, std::string result_name)
{
if (source_name == result_name)
throw Exception("ARRAY JOIN action should have different source and result names", ErrorCodes::LOGICAL_ERROR);
ExpressionAction a;
a.type = ARRAY_JOIN;
a.array_join = std::move(array_join_);
a.source_name = std::move(source_name);
a.result_name = std::move(result_name);
return a;
}
@ -243,7 +245,18 @@ void ExpressionAction::prepare(Block & sample_block, const Settings & settings,
case ARRAY_JOIN:
{
array_join->prepare(sample_block);
ColumnWithTypeAndName current = sample_block.getByName(source_name);
sample_block.erase(source_name);
const DataTypeArray * array_type = typeid_cast<const DataTypeArray *>(&*current.type);
if (!array_type)
throw Exception("ARRAY JOIN requires array argument", ErrorCodes::TYPE_MISMATCH);
current.name = result_name;
current.type = array_type->getNestedType();
current.column = nullptr; /// Result is never const
sample_block.insert(std::move(current));
break;
}
@ -369,7 +382,23 @@ void ExpressionAction::execute(Block & block, bool dry_run) const
case ARRAY_JOIN:
{
array_join->execute(block, dry_run);
auto source = block.getByName(source_name);
block.erase(source_name);
source.column = source.column->convertToFullColumnIfConst();
const ColumnArray * array = typeid_cast<const ColumnArray *>(source.column.get());
if (!array)
throw Exception("ARRAY JOIN of not array: " + source_name, ErrorCodes::TYPE_MISMATCH);
for (auto & column : block)
column.column = column.column->replicate(array->getOffsets());
source.column = array->getDataPtr();
source.type = assert_cast<const DataTypeArray &>(*source.type).getNestedType();
source.name = result_name;
block.insert(std::move(source));
break;
}
@ -478,13 +507,7 @@ std::string ExpressionAction::toString() const
break;
case ARRAY_JOIN:
ss << (array_join->is_left ? "LEFT " : "") << "ARRAY JOIN ";
for (NameSet::const_iterator it = array_join->columns.begin(); it != array_join->columns.end(); ++it)
{
if (it != array_join->columns.begin())
ss << ", ";
ss << *it;
}
ss << "ARRAY JOIN " << source_name << " -> " << result_name;
break;
case JOIN:
@ -597,9 +620,6 @@ void ExpressionActions::addImpl(ExpressionAction action, Names & new_names)
if (!action.result_name.empty())
new_names.push_back(action.result_name);
if (action.array_join)
new_names.insert(new_names.end(), action.array_join->columns.begin(), action.array_join->columns.end());
/// Compiled functions are custom functions and they don't need building
if (action.type == ExpressionAction::APPLY_FUNCTION && !action.is_function_compiled)
{
@ -631,51 +651,6 @@ void ExpressionActions::prependProjectInput()
actions.insert(actions.begin(), ExpressionAction::project(getRequiredColumns()));
}
void ExpressionActions::prependArrayJoin(const ExpressionAction & action, const Block & sample_block_before)
{
if (action.type != ExpressionAction::ARRAY_JOIN)
throw Exception("ARRAY_JOIN action expected", ErrorCodes::LOGICAL_ERROR);
NameSet array_join_set(action.array_join->columns.begin(), action.array_join->columns.end());
for (auto & it : input_columns)
{
if (array_join_set.count(it.name))
{
array_join_set.erase(it.name);
it.type = std::make_shared<DataTypeArray>(it.type);
}
}
for (const std::string & name : array_join_set)
{
input_columns.emplace_back(name, sample_block_before.getByName(name).type);
actions.insert(actions.begin(), ExpressionAction::removeColumn(name));
}
actions.insert(actions.begin(), action);
optimizeArrayJoin();
}
bool ExpressionActions::popUnusedArrayJoin(const Names & required_columns, ExpressionAction & out_action)
{
if (actions.empty() || actions.back().type != ExpressionAction::ARRAY_JOIN)
return false;
NameSet required_set(required_columns.begin(), required_columns.end());
for (const std::string & name : actions.back().array_join->columns)
{
if (required_set.count(name))
return false;
}
for (const std::string & name : actions.back().array_join->columns)
{
DataTypePtr & type = sample_block.getByName(name).type;
type = std::make_shared<DataTypeArray>(type);
}
out_action = actions.back();
actions.pop_back();
return true;
}
void ExpressionActions::execute(Block & block, bool dry_run) const
{
for (const auto & action : actions)
@ -809,7 +784,18 @@ void ExpressionActions::finalize(const Names & output_columns)
}
else if (action.type == ExpressionAction::ARRAY_JOIN)
{
action.array_join->finalize(needed_columns, unmodified_columns, final_columns);
/// We need source anyway, in order to calculate number of rows correctly.
needed_columns.insert(action.source_name);
unmodified_columns.erase(action.result_name);
needed_columns.erase(action.result_name);
/// Note: technically, if result of arrayJoin is not needed,
/// we may remove all the columns and loose the number of rows here.
/// However, I cannot imagine how it is possible.
/// For "big" ARRAY JOIN it could have happened in query like
/// SELECT count() FROM table ARRAY JOIN x
/// Now, "big" ARRAY JOIN is moved to separate pipeline step,
/// and arrayJoin(x) is an expression which result can't be lost.
}
else
{
@ -946,7 +932,7 @@ void ExpressionActions::finalize(const Names & output_columns)
auto process = [&] (const String & name)
{
auto refcount = --columns_refcount[name];
if (refcount <= 0)
if (refcount <= 0 && action.type != ExpressionAction::ARRAY_JOIN)
{
new_actions.push_back(ExpressionAction::removeColumn(name));
if (sample_block.has(name))
@ -1046,8 +1032,6 @@ void ExpressionActions::optimizeArrayJoin()
if (!actions[i].result_name.empty())
array_joined_columns.insert(actions[i].result_name);
if (actions[i].array_join)
array_joined_columns.insert(actions[i].array_join->columns.begin(), actions[i].array_join->columns.end());
array_join_dependencies.insert(needed.begin(), needed.end());
}
@ -1106,7 +1090,7 @@ ExpressionActionsPtr ExpressionActions::splitActionsBeforeArrayJoin(const NameSe
/// Do not split action if input depends only on array joined columns.
if (split_actions->input_columns.empty())
return split_actions;
return nullptr;
/// Actions which depend on ARRAY JOIN result.
NameSet array_join_dependent_columns = array_joined_columns;
@ -1162,8 +1146,6 @@ ExpressionActionsPtr ExpressionActions::splitActionsBeforeArrayJoin(const NameSe
/// Add result of this action to array_join_dependent_columns too.
if (!action.result_name.empty())
array_join_dependent_columns.insert(action.result_name);
if (action.array_join)
array_join_dependent_columns.insert(action.array_join->columns.begin(), action.array_join->columns.end());
/// Add arguments of this action to array_join_dependent_columns_arguments.
auto needed = action.getNeededColumns();
@ -1191,7 +1173,7 @@ ExpressionActionsPtr ExpressionActions::splitActionsBeforeArrayJoin(const NameSe
/// Return empty actions if nothing was separated. Keep `this` unchanged.
if (split_actions->getActions().empty())
return split_actions;
return nullptr;
std::swap(actions, new_actions);
@ -1313,9 +1295,8 @@ UInt128 ExpressionAction::ActionHash::operator()(const ExpressionAction & action
hash.update(arg_name);
break;
case ARRAY_JOIN:
hash.update(action.array_join->is_left);
for (const auto & col : action.array_join->columns)
hash.update(col);
hash.update(action.result_name);
hash.update(action.source_name);
break;
case JOIN:
for (const auto & col : action.table_join->columnsAddedByJoin())
@ -1371,15 +1352,9 @@ bool ExpressionAction::operator==(const ExpressionAction & other) const
return false;
}
bool same_array_join = !array_join && !other.array_join;
if (array_join && other.array_join)
same_array_join = (array_join->columns == other.array_join->columns) &&
(array_join->is_left == other.array_join->is_left);
return source_name == other.source_name
&& result_name == other.result_name
&& argument_names == other.argument_names
&& same_array_join
&& TableJoin::sameJoin(table_join.get(), other.table_join.get())
&& projection == other.projection
&& is_function_compiled == other.is_function_compiled;
@ -1452,12 +1427,11 @@ std::string ExpressionActionsChain::dumpChain() const
return ss.str();
}
ExpressionActionsChain::Step::Step(ArrayJoinActionPtr array_join_, ColumnsWithTypeAndName required_columns_)
: kind(Kind::ARRAY_JOIN)
, array_join(std::move(array_join_))
, columns_after_array_join(std::move(required_columns_))
ExpressionActionsChain::ArrayJoinLink::ArrayJoinLink(ArrayJoinActionPtr array_join_, ColumnsWithTypeAndName required_columns_)
: array_join(std::move(array_join_))
, result_columns(std::move(required_columns_))
{
for (auto & column : columns_after_array_join)
for (auto & column : result_columns)
{
required_columns.emplace_back(NameAndTypePair(column.name, column.type));
@ -1471,54 +1445,68 @@ ExpressionActionsChain::Step::Step(ArrayJoinActionPtr array_join_, ColumnsWithTy
}
}
void ExpressionActionsChain::ArrayJoinLink::finalize(const Names & required_output_)
{
NamesAndTypesList new_required_columns;
ColumnsWithTypeAndName new_result_columns;
NameSet names(required_output_.begin(), required_output_.end());
for (const auto & column : result_columns)
{
if (array_join->columns.count(column.name) != 0 || names.count(column.name) != 0)
new_result_columns.emplace_back(column);
}
for (const auto & column : required_columns)
{
if (array_join->columns.count(column.name) != 0 || names.count(column.name) != 0)
new_required_columns.emplace_back(column);
}
std::swap(required_columns, new_required_columns);
std::swap(result_columns, new_result_columns);
}
ExpressionActionsChain::Step::Step(ArrayJoinActionPtr array_join, ColumnsWithTypeAndName required_columns)
: link(ArrayJoinLink(std::move(array_join), std::move(required_columns)))
{
}
template <typename Res, typename Ptr, typename Callback>
static Res dispatch(Ptr * ptr, Callback && callback)
{
if (std::holds_alternative<ExpressionActionsChain::ExpressionActionsLink>(ptr->link))
return callback(std::get<ExpressionActionsChain::ExpressionActionsLink>(ptr->link));
if (std::holds_alternative<ExpressionActionsChain::ArrayJoinLink>(ptr->link))
return callback(std::get<ExpressionActionsChain::ArrayJoinLink>(ptr->link));
throw Exception("Unknown variant in ExpressionActionsChain step", ErrorCodes::LOGICAL_ERROR);
}
const NamesAndTypesList & ExpressionActionsChain::Step::getRequiredColumns() const
{
using Res = const NamesAndTypesList &;
return dispatch<Res>(this, [](auto & x) -> Res { return x.getRequiredColumns(); });
}
const ColumnsWithTypeAndName & ExpressionActionsChain::Step::getResultColumns() const
{
using Res = const ColumnsWithTypeAndName &;
return dispatch<Res>(this, [](auto & x) -> Res{ return x.getResultColumns(); });
}
void ExpressionActionsChain::Step::finalize(const Names & required_output_)
{
switch (kind)
{
case Kind::ACTIONS:
{
actions->finalize(required_output_);
return;
}
case Kind::ARRAY_JOIN:
{
NamesAndTypesList new_required_columns;
ColumnsWithTypeAndName new_result_columns;
NameSet names(required_output_.begin(), required_output_.end());
for (const auto & column : columns_after_array_join)
{
if (array_join->columns.count(column.name) != 0 || names.count(column.name) != 0)
new_result_columns.emplace_back(column);
}
for (const auto & column : required_columns)
{
if (array_join->columns.count(column.name) != 0 || names.count(column.name) != 0)
new_required_columns.emplace_back(column);
}
std::swap(required_columns, new_required_columns);
std::swap(columns_after_array_join, new_result_columns);
return;
}
}
dispatch<void>(this, [&required_output_](auto & x) { x.finalize(required_output_); });
}
void ExpressionActionsChain::Step::prependProjectInput() const
{
switch (kind)
{
case Kind::ACTIONS:
{
actions->prependProjectInput();
return;
}
case Kind::ARRAY_JOIN:
{
/// TODO: remove unused columns before ARRAY JOIN ?
return;
}
}
dispatch<void>(this, [](auto & x) { x.prependProjectInput(); });
}
std::string ExpressionActionsChain::Step::dump() const
{
return dispatch<std::string>(this, [](auto & x) { return x.dump(); });
}
}

View File

@ -9,9 +9,10 @@
#include <unordered_map>
#include <unordered_set>
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Interpreters/ArrayJoinAction.h>
#include <DataTypes/DataTypeArray.h>
#include <variant>
#if !defined(ARCADIA_BUILD)
# include "config_core.h"
#endif
@ -45,6 +46,9 @@ using DataTypePtr = std::shared_ptr<const IDataType>;
class ExpressionActions;
class CompiledExpressionCache;
class ArrayJoinAction;
using ArrayJoinActionPtr = std::shared_ptr<ArrayJoinAction>;
/** Action on the block.
*/
struct ExpressionAction
@ -60,10 +64,9 @@ public:
APPLY_FUNCTION,
/** Replaces the specified columns with arrays into columns with elements.
* Duplicates the values in the remaining columns by the number of elements in the arrays.
* Arrays must be parallel (have the same lengths).
*/
/// Replaces the source column with array into column with elements.
/// Duplicates the values in the remaining columns by the number of elements in the arrays.
/// Source column is removed from block.
ARRAY_JOIN,
JOIN,
@ -76,7 +79,7 @@ public:
Type type{};
/// For ADD/REMOVE/COPY_COLUMN.
/// For ADD/REMOVE/ARRAY_JOIN/COPY_COLUMN.
std::string source_name;
std::string result_name;
DataTypePtr result_type;
@ -98,9 +101,6 @@ public:
Names argument_names;
bool is_function_compiled = false;
/// For ARRAY JOIN
ArrayJoinActionPtr array_join;
/// For JOIN
std::shared_ptr<const TableJoin> table_join;
JoinPtr join;
@ -118,7 +118,7 @@ public:
static ExpressionAction project(const NamesWithAliases & projected_columns_);
static ExpressionAction project(const Names & projected_columns_);
static ExpressionAction addAliases(const NamesWithAliases & aliased_columns_);
static ExpressionAction arrayJoin(ArrayJoinActionPtr array_join_);
static ExpressionAction arrayJoin(std::string source_name, std::string result_name);
static ExpressionAction ordinaryJoin(std::shared_ptr<TableJoin> table_join, JoinPtr join);
/// Which columns necessary to perform this action.
@ -177,15 +177,8 @@ public:
/// Adds to the beginning the removal of all extra columns.
void prependProjectInput();
/// Add the specified ARRAY JOIN action to the beginning. Change the appropriate input types to arrays.
/// If there are unknown columns in the ARRAY JOIN list, take their types from sample_block, and immediately after ARRAY JOIN remove them.
void prependArrayJoin(const ExpressionAction & action, const Block & sample_block_before);
/// If the last action is ARRAY JOIN, and it does not affect the columns from required_columns, discard and return it.
/// Change the corresponding output types to arrays.
bool popUnusedArrayJoin(const Names & required_columns, ExpressionAction & out_action);
/// Splits actions into two parts. Returned half may be swapped with ARRAY JOIN.
/// Returns nullptr if no actions may be moved before ARRAY JOIN.
ExpressionActionsPtr splitActionsBeforeArrayJoin(const NameSet & array_joined_columns);
/// - Adds actions to delete all but the specified columns.
@ -202,8 +195,8 @@ public:
Names getRequiredColumns() const
{
Names names;
for (NamesAndTypesList::const_iterator it = input_columns.begin(); it != input_columns.end(); ++it)
names.push_back(it->name);
for (const auto & input : input_columns)
names.push_back(input.name);
return names;
}
@ -294,21 +287,36 @@ struct ExpressionActionsChain
{
explicit ExpressionActionsChain(const Context & context_) : context(context_) {}
struct Step
struct ExpressionActionsLink
{
enum class Kind
{
ACTIONS,
ARRAY_JOIN,
};
ExpressionActionsPtr actions;
Kind kind;
const NamesAndTypesList & getRequiredColumns() const { return actions->getRequiredColumnsWithTypes(); }
const ColumnsWithTypeAndName & getResultColumns() const { return actions->getSampleBlock().getColumnsWithTypeAndName(); }
void finalize(const Names & required_output_) const { actions->finalize(required_output_); }
void prependProjectInput() const { actions->prependProjectInput(); }
std::string dump() const { return actions->dumpActions(); }
};
struct ArrayJoinLink
{
ArrayJoinActionPtr array_join;
NamesAndTypesList required_columns;
ColumnsWithTypeAndName columns_after_array_join;
ColumnsWithTypeAndName result_columns;
ArrayJoinLink(ArrayJoinActionPtr array_join_, ColumnsWithTypeAndName required_columns_);
const NamesAndTypesList & getRequiredColumns() const { return required_columns; }
const ColumnsWithTypeAndName & getResultColumns() const { return result_columns; }
void finalize(const Names & required_output_);
void prependProjectInput() const {} /// TODO: remove unused columns before ARRAY JOIN ?
static std::string dump() { return "ARRAY JOIN"; }
};
struct Step
{
std::variant<ExpressionActionsLink, ArrayJoinLink> link;
ExpressionActionsPtr actions;
/// Columns were added to the block before current step in addition to prev step output.
NameSet additional_input;
/// Columns which are required in the result of current step.
@ -319,61 +327,24 @@ struct ExpressionActionsChain
std::vector<bool> can_remove_required_output;
public:
explicit Step(const ExpressionActionsPtr & actions_ = nullptr, const Names & required_output_ = Names())
: kind(Kind::ACTIONS)
, actions(actions_)
explicit Step(ExpressionActionsPtr actions, const Names & required_output_ = Names())
: link(ExpressionActionsLink{std::move(actions)})
, required_output(required_output_)
{
}
explicit Step(ArrayJoinActionPtr array_join_, ColumnsWithTypeAndName required_columns_);
NamesAndTypesList getRequiredColumns() const
{
switch (kind)
{
case Kind::ACTIONS:
return actions->getRequiredColumnsWithTypes();
case Kind::ARRAY_JOIN:
return required_columns;
}
__builtin_unreachable();
}
ColumnsWithTypeAndName getResultColumns() const
{
switch (kind)
{
case Kind::ACTIONS:
return actions->getSampleBlock().getColumnsWithTypeAndName();
case Kind::ARRAY_JOIN:
return columns_after_array_join;
}
__builtin_unreachable();
}
explicit Step(ArrayJoinActionPtr array_join, ColumnsWithTypeAndName required_columns);
const NamesAndTypesList & getRequiredColumns() const;
const ColumnsWithTypeAndName & getResultColumns() const;
/// Remove unused result and update required columns
void finalize(const Names & required_output_);
/// Add projections to expression
void prependProjectInput() const;
std::string dump() const;
std::string dump() const
{
switch (kind)
{
case Kind::ACTIONS:
{
return actions->dumpActions();
}
case Kind::ARRAY_JOIN:
{
return "ARRAY JOIN";
}
}
__builtin_unreachable();
}
ExpressionActionsPtr & actions() { return std::get<ExpressionActionsLink>(link).actions; }
const ExpressionActionsPtr & actions() const { return std::get<ExpressionActionsLink>(link).actions; }
};
using Steps = std::vector<Step>;
@ -399,7 +370,7 @@ struct ExpressionActionsChain
throw Exception("Empty ExpressionActionsChain", ErrorCodes::LOGICAL_ERROR);
}
return steps.back().actions;
return steps.back().actions();
}
Step & getLastStep()

View File

@ -1,30 +1,20 @@
#include <Poco/Util/Application.h>
#include <Poco/String.h>
#include <Core/Block.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTQualifiedAsterisk.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTSelectWithUnionQuery.h>
#include <Parsers/ASTSubquery.h>
#include <Parsers/ASTOrderByElement.h>
#include <Parsers/formatAST.h>
#include <Parsers/DumpASTNode.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/NestedUtils.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <Columns/IColumn.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/LogicalExpressionsOptimizer.h>
#include <Interpreters/ArrayJoinAction.h>
#include <Interpreters/ExternalDictionariesLoader.h>
#include <Interpreters/Set.h>
#include <Interpreters/TableJoin.h>
@ -42,20 +32,14 @@
#include <Storages/StorageJoin.h>
#include <DataStreams/copyData.h>
#include <DataStreams/IBlockInputStream.h>
#include <Dictionaries/IDictionary.h>
#include <Dictionaries/DictionaryStructure.h>
#include <Common/typeid_cast.h>
#include <Common/StringUtils/StringUtils.h>
#include <ext/range.h>
#include <DataTypes/DataTypeFactory.h>
#include <Functions/FunctionsMiscellaneous.h>
#include <Parsers/ExpressionListParsers.h>
#include <Parsers/parseQuery.h>
#include <Parsers/queryToString.h>
#include <Interpreters/interpretSubquery.h>
#include <Interpreters/DatabaseAndTableWithAlias.h>
#include <Interpreters/misc.h>
@ -180,7 +164,11 @@ void ExpressionAnalyzer::analyzeAggregation()
{
getRootActionsNoMakeSet(array_join_expression_list, true, temp_actions, false);
if (auto array_join = addMultipleArrayJoinAction(temp_actions, is_array_join_left))
temp_actions->add(ExpressionAction::arrayJoin(array_join));
{
auto sample_block = temp_actions->getSampleBlock();
array_join->prepare(sample_block);
temp_actions = std::make_shared<ExpressionActions>(sample_block.getColumnsWithTypeAndName(), context);
}
for (auto & column : temp_actions->getSampleBlock().getNamesAndTypesList())
if (syntax->array_join_result_to_source.count(column.name))
@ -480,10 +468,10 @@ ArrayJoinActionPtr SelectQueryExpressionAnalyzer::appendArrayJoin(ExpressionActi
ExpressionActionsChain::Step & step = chain.lastStep(sourceColumns());
getRootActions(array_join_expression_list, only_types, step.actions);
getRootActions(array_join_expression_list, only_types, step.actions());
before_array_join = chain.getLastActions();
auto array_join = addMultipleArrayJoinAction(step.actions, is_array_join_left);
auto array_join = addMultipleArrayJoinAction(step.actions(), is_array_join_left);
chain.steps.push_back(ExpressionActionsChain::Step(array_join, step.getResultColumns()));
@ -501,7 +489,7 @@ bool SelectQueryExpressionAnalyzer::appendJoinLeftKeys(ExpressionActionsChain &
{
ExpressionActionsChain::Step & step = chain.lastStep(columns_after_array_join);
getRootActions(analyzedJoin().leftKeysList(), only_types, step.actions);
getRootActions(analyzedJoin().leftKeysList(), only_types, step.actions());
return true;
}
@ -511,7 +499,7 @@ bool SelectQueryExpressionAnalyzer::appendJoin(ExpressionActionsChain & chain)
ExpressionActionsChain::Step & step = chain.lastStep(columns_after_array_join);
addJoinAction(step.actions, table_join);
addJoinAction(step.actions(), table_join);
return true;
}
@ -635,12 +623,12 @@ bool SelectQueryExpressionAnalyzer::appendPrewhere(
return false;
auto & step = chain.lastStep(sourceColumns());
getRootActions(select_query->prewhere(), only_types, step.actions);
getRootActions(select_query->prewhere(), only_types, step.actions());
String prewhere_column_name = select_query->prewhere()->getColumnName();
step.required_output.push_back(prewhere_column_name);
step.can_remove_required_output.push_back(true);
auto filter_type = step.actions->getSampleBlock().getByName(prewhere_column_name).type;
auto filter_type = step.actions()->getSampleBlock().getByName(prewhere_column_name).type;
if (!filter_type->canBeUsedInBooleanContext())
throw Exception("Invalid type for filter in PREWHERE: " + filter_type->getName(),
ErrorCodes::ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER);
@ -664,7 +652,7 @@ bool SelectQueryExpressionAnalyzer::appendPrewhere(
}
}
auto names = step.actions->getSampleBlock().getNames();
auto names = step.actions()->getSampleBlock().getNames();
NameSet name_set(names.begin(), names.end());
for (const auto & column : sourceColumns())
@ -672,7 +660,7 @@ bool SelectQueryExpressionAnalyzer::appendPrewhere(
name_set.erase(column.name);
Names required_output(name_set.begin(), name_set.end());
step.actions->finalize(required_output);
step.actions()->finalize(required_output);
}
{
@ -683,8 +671,8 @@ bool SelectQueryExpressionAnalyzer::appendPrewhere(
/// 2. Store side columns which were calculated during prewhere actions execution if they are used.
/// Example: select F(A) prewhere F(A) > 0. F(A) can be saved from prewhere step.
/// 3. Check if we can remove filter column at prewhere step. If we can, action will store single REMOVE_COLUMN.
ColumnsWithTypeAndName columns = step.actions->getSampleBlock().getColumnsWithTypeAndName();
auto required_columns = step.actions->getRequiredColumns();
ColumnsWithTypeAndName columns = step.actions()->getSampleBlock().getColumnsWithTypeAndName();
auto required_columns = step.actions()->getRequiredColumns();
NameSet prewhere_input_names(required_columns.begin(), required_columns.end());
NameSet unused_source_columns;
@ -709,7 +697,7 @@ void SelectQueryExpressionAnalyzer::appendPreliminaryFilter(ExpressionActionsCha
ExpressionActionsChain::Step & step = chain.lastStep(sourceColumns());
// FIXME: assert(filter_info);
step.actions = std::move(actions);
step = ExpressionActionsChain::Step(std::move(actions));
step.required_output.push_back(std::move(column_name));
step.can_remove_required_output = {true};
@ -729,9 +717,9 @@ bool SelectQueryExpressionAnalyzer::appendWhere(ExpressionActionsChain & chain,
step.required_output.push_back(where_column_name);
step.can_remove_required_output = {true};
getRootActions(select_query->where(), only_types, step.actions);
getRootActions(select_query->where(), only_types, step.actions());
auto filter_type = step.actions->getSampleBlock().getByName(where_column_name).type;
auto filter_type = step.actions()->getSampleBlock().getByName(where_column_name).type;
if (!filter_type->canBeUsedInBooleanContext())
throw Exception("Invalid type for filter in WHERE: " + filter_type->getName(),
ErrorCodes::ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER);
@ -753,7 +741,7 @@ bool SelectQueryExpressionAnalyzer::appendGroupBy(ExpressionActionsChain & chain
for (const auto & ast : asts)
{
step.required_output.emplace_back(ast->getColumnName());
getRootActions(ast, only_types, step.actions);
getRootActions(ast, only_types, step.actions());
}
if (optimize_aggregation_in_order)
@ -793,7 +781,7 @@ void SelectQueryExpressionAnalyzer::appendAggregateFunctionsArguments(Expression
/// TODO: data.aggregates -> aggregates()
for (const ASTFunction * node : data.aggregates)
for (auto & argument : node->arguments->children)
getRootActions(argument, only_types, step.actions);
getRootActions(argument, only_types, step.actions());
}
bool SelectQueryExpressionAnalyzer::appendHaving(ExpressionActionsChain & chain, bool only_types)
@ -806,7 +794,7 @@ bool SelectQueryExpressionAnalyzer::appendHaving(ExpressionActionsChain & chain,
ExpressionActionsChain::Step & step = chain.lastStep(aggregated_columns);
step.required_output.push_back(select_query->having()->getColumnName());
getRootActions(select_query->having(), only_types, step.actions);
getRootActions(select_query->having(), only_types, step.actions());
return true;
}
@ -817,7 +805,7 @@ void SelectQueryExpressionAnalyzer::appendSelect(ExpressionActionsChain & chain,
ExpressionActionsChain::Step & step = chain.lastStep(aggregated_columns);
getRootActions(select_query->select(), only_types, step.actions);
getRootActions(select_query->select(), only_types, step.actions());
for (const auto & child : select_query->select()->children)
step.required_output.push_back(child->getColumnName());
@ -833,7 +821,7 @@ bool SelectQueryExpressionAnalyzer::appendOrderBy(ExpressionActionsChain & chain
ExpressionActionsChain::Step & step = chain.lastStep(aggregated_columns);
getRootActions(select_query->orderBy(), only_types, step.actions);
getRootActions(select_query->orderBy(), only_types, step.actions());
for (auto & child : select_query->orderBy()->children)
{
@ -864,7 +852,7 @@ bool SelectQueryExpressionAnalyzer::appendLimitBy(ExpressionActionsChain & chain
ExpressionActionsChain::Step & step = chain.lastStep(aggregated_columns);
getRootActions(select_query->limitBy(), only_types, step.actions);
getRootActions(select_query->limitBy(), only_types, step.actions());
NameSet aggregated_names;
for (const auto & column : aggregated_columns)
@ -929,14 +917,14 @@ void SelectQueryExpressionAnalyzer::appendProjectResult(ExpressionActionsChain &
}
}
step.actions->add(ExpressionAction::project(result_columns));
step.actions()->add(ExpressionAction::project(result_columns));
}
void ExpressionAnalyzer::appendExpression(ExpressionActionsChain & chain, const ASTPtr & expr, bool only_types)
{
ExpressionActionsChain::Step & step = chain.lastStep(sourceColumns());
getRootActions(expr, only_types, step.actions);
getRootActions(expr, only_types, step.actions());
step.required_output.push_back(expr->getColumnName());
}
@ -1077,7 +1065,7 @@ ExpressionAnalysisResult::ExpressionAnalysisResult(
if (query_analyzer.appendPrewhere(chain, !first_stage, additional_required_columns_after_prewhere))
{
prewhere_info = std::make_shared<PrewhereInfo>(
chain.steps.front().actions, query.prewhere()->getColumnName());
chain.steps.front().actions(), query.prewhere()->getColumnName());
if (allowEarlyConstantFolding(*prewhere_info->prewhere_actions, settings))
{

View File

@ -1,5 +1,4 @@
#include <Interpreters/InterpreterSystemQuery.h>
#include <Interpreters/InterpreterSystemQuery.h>
#include <Common/DNSResolver.h>
#include <Common/ActionLock.h>
#include <Common/typeid_cast.h>

View File

@ -623,7 +623,7 @@ ASTPtr MutationsInterpreter::prepareInterpreterSelectQuery(std::vector<Stage> &
actions_chain.finalize();
/// Propagate information about columns needed as input.
for (const auto & column : actions_chain.steps.front().actions->getRequiredColumnsWithTypes())
for (const auto & column : actions_chain.steps.front().actions()->getRequiredColumnsWithTypes())
prepared_stages[i - 1].output_columns.insert(column.name);
}
@ -667,12 +667,12 @@ BlockInputStreamPtr MutationsInterpreter::addStreamsForLaterStages(const std::ve
if (i < stage.filter_column_names.size())
{
/// Execute DELETEs.
in = std::make_shared<FilterBlockInputStream>(in, step.actions, stage.filter_column_names[i]);
in = std::make_shared<FilterBlockInputStream>(in, step.actions(), stage.filter_column_names[i]);
}
else
{
/// Execute UPDATE or final projection.
in = std::make_shared<ExpressionBlockInputStream>(in, step.actions);
in = std::make_shared<ExpressionBlockInputStream>(in, step.actions());
}
}

View File

@ -36,16 +36,17 @@ ExpressionStep::ExpressionStep(const DataStream & input_stream_, ExpressionActio
updateDistinctColumns(output_stream->header, output_stream->distinct_columns);
}
void ExpressionStep::updateInputStream(DataStream input_stream, Block result_header)
void ExpressionStep::updateInputStream(DataStream input_stream, bool keep_header)
{
Block out_header = keep_header ? std::move(output_stream->header)
: Transform::transformHeader(input_stream.header, expression);
output_stream = createOutputStream(
input_stream,
res_header ? res_header : Transform::transformHeader(input_stream.header, expression),
std::move(out_header),
getDataStreamTraits());
input_streams.clear();
input_streams.emplace_back(std::move(input_stream));
res_header = std::move(result_header);
}
void ExpressionStep::transformPipeline(QueryPipeline & pipeline)
@ -56,11 +57,12 @@ void ExpressionStep::transformPipeline(QueryPipeline & pipeline)
return std::make_shared<Transform>(header, expression, on_totals);
});
if (res_header && !blocksHaveEqualStructure(res_header, output_stream->header))
if (!blocksHaveEqualStructure(pipeline.getHeader(), output_stream->header))
{
pipeline.addSimpleTransform([&](const Block & header)
{
return std::make_shared<ConvertingTransform>(header, res_header, ConvertingTransform::MatchColumnsMode::Name);
return std::make_shared<ConvertingTransform>(header, output_stream->header,
ConvertingTransform::MatchColumnsMode::Name);
});
}
}

View File

@ -21,7 +21,7 @@ public:
void transformPipeline(QueryPipeline & pipeline) override;
void updateInputStream(DataStream input_stream, Block result_header);
void updateInputStream(DataStream input_stream, bool keep_header);
void describeActions(FormatSettings & settings) const override;
@ -29,7 +29,6 @@ public:
private:
ExpressionActionsPtr expression;
Block res_header;
};
/// TODO: add separate step for join.

View File

@ -41,16 +41,19 @@ FilterStep::FilterStep(
updateDistinctColumns(output_stream->header, output_stream->distinct_columns);
}
void FilterStep::updateInputStream(DataStream input_stream, Block result_header)
void FilterStep::updateInputStream(DataStream input_stream, bool keep_header)
{
Block out_header = std::move(output_stream->header);
if (keep_header)
out_header = FilterTransform::transformHeader(input_stream.header, expression, filter_column_name, remove_filter_column);
output_stream = createOutputStream(
input_stream,
res_header ? res_header : FilterTransform::transformHeader(input_stream.header, expression, filter_column_name, remove_filter_column),
std::move(out_header),
getDataStreamTraits());
input_streams.clear();
input_streams.emplace_back(std::move(input_stream));
res_header = std::move(result_header);
}
void FilterStep::transformPipeline(QueryPipeline & pipeline)
@ -61,11 +64,11 @@ void FilterStep::transformPipeline(QueryPipeline & pipeline)
return std::make_shared<FilterTransform>(header, expression, filter_column_name, remove_filter_column, on_totals);
});
if (res_header && !blocksHaveEqualStructure(res_header, output_stream->header))
if (!blocksHaveEqualStructure(pipeline.getHeader(), output_stream->header))
{
pipeline.addSimpleTransform([&](const Block & header)
{
return std::make_shared<ConvertingTransform>(header, res_header, ConvertingTransform::MatchColumnsMode::Name);
return std::make_shared<ConvertingTransform>(header, output_stream->header, ConvertingTransform::MatchColumnsMode::Name);
});
}
}

View File

@ -20,7 +20,7 @@ public:
String getName() const override { return "Filter"; }
void transformPipeline(QueryPipeline & pipeline) override;
void updateInputStream(DataStream input_stream, Block result_header);
void updateInputStream(DataStream input_stream, bool keep_header);
void describeActions(FormatSettings & settings) const override;
@ -32,7 +32,6 @@ private:
ExpressionActionsPtr expression;
String filter_column_name;
bool remove_filter_column;
Block res_header;
};
}

View File

@ -413,6 +413,7 @@ static void tryPushDownLimit(QueryPlanStepPtr & parent, QueryPlan::Node * child_
parent.swap(child);
}
/// Move ARRAY JOIN up if possible.
static void tryLiftUpArrayJoin(QueryPlan::Node * parent_node, QueryPlan::Node * child_node, QueryPlan::Nodes & nodes)
{
auto & parent = parent_node->step;
@ -431,17 +432,18 @@ static void tryLiftUpArrayJoin(QueryPlan::Node * parent_node, QueryPlan::Node *
auto split_actions = expression->splitActionsBeforeArrayJoin(array_join->columns);
/// No actions can be moved before ARRAY JOIN.
if (split_actions->getActions().empty())
if (!split_actions)
return;
auto expected_header = parent->getOutputStream().header;
/// All actions was moved before ARRAY JOIN. Swap Expression and ArrayJoin.
if (expression->getActions().empty())
{
/// Expression -> ArrayJoin
auto expected_header = parent->getOutputStream().header;
/// Expression/Filter -> ArrayJoin
std::swap(parent, child);
/// ArrayJoin -> Expression
/// ArrayJoin -> Expression/Filter
if (expression_step)
child = std::make_unique<ExpressionStep>(child_node->children.at(0)->step->getOutputStream(),
std::move(split_actions));
@ -461,26 +463,12 @@ static void tryLiftUpArrayJoin(QueryPlan::Node * parent_node, QueryPlan::Node *
node.children.swap(child_node->children);
child_node->children.emplace_back(&node);
/// Expression/Filter -> ArrayJoin -> node -> Something
// if (filter_step && split_actions->getSampleBlock().has(filter_step->getFilterColumnName()))
// {
// /// Filter -> ArrayJoin -> node -> Something
// node.step = std::make_unique<FilterStep>(node.children.at(0)->step->getOutputStream(),
// std::move(split_actions),
// filter_step->getFilterColumnName(),
// filter_step->removesFilterColumn());
//
// array_join_step->updateInputStream(node.step->getOutputStream());
//
// parent = std::make_unique<ExpressionStep>(array_join_step->getOutputStream(),
// filter_step->getExpression());
// /// Expression -> ArrayJoin -> Filter -> Something
// }
node.step = std::make_unique<ExpressionStep>(node.children.at(0)->step->getOutputStream(),
std::move(split_actions));
array_join_step->updateInputStream(node.step->getOutputStream(), {});
expression_step ? expression_step->updateInputStream(array_join_step->getOutputStream(), expected_header)
: filter_step->updateInputStream(array_join_step->getOutputStream(), expected_header);
expression_step ? expression_step->updateInputStream(array_join_step->getOutputStream(), true)
: filter_step->updateInputStream(array_join_step->getOutputStream(), true);
}
void QueryPlan::optimize()

View File

@ -11,7 +11,7 @@ namespace ErrorCodes
Block ArrayJoinTransform::transformHeader(Block header, const ArrayJoinActionPtr & array_join)
{
array_join->execute(header, true);
array_join->execute(header);
return header;
}
@ -30,7 +30,7 @@ ArrayJoinTransform::ArrayJoinTransform(
void ArrayJoinTransform::transform(Chunk & chunk)
{
auto block = getInputPort().getHeader().cloneWithColumns(chunk.detachColumns());
array_join->execute(block, false);
array_join->execute(block);
chunk.setColumns(block.getColumns(), block.rows());
}