mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-21 01:00:48 +00:00
Merge pull request #13611 from ClickHouse/array-join-processor
Refactor ARRAY JOIN
This commit is contained in:
commit
7c0fcb2039
@ -1,6 +1,5 @@
|
||||
#include "Common/quoteString.h"
|
||||
#include <Common/quoteString.h>
|
||||
#include <Common/typeid_cast.h>
|
||||
#include <Common/PODArray.h>
|
||||
#include <Core/Row.h>
|
||||
|
||||
#include <Functions/FunctionFactory.h>
|
||||
@ -9,7 +8,6 @@
|
||||
#include <AggregateFunctions/AggregateFunctionFactory.h>
|
||||
|
||||
#include <DataTypes/DataTypeSet.h>
|
||||
#include <DataTypes/DataTypesNumber.h>
|
||||
#include <DataTypes/DataTypeFunction.h>
|
||||
#include <DataTypes/DataTypeString.h>
|
||||
#include <DataTypes/DataTypeTuple.h>
|
||||
@ -21,7 +19,6 @@
|
||||
|
||||
#include <Columns/ColumnSet.h>
|
||||
#include <Columns/ColumnConst.h>
|
||||
#include <Columns/ColumnsNumber.h>
|
||||
|
||||
#include <Storages/StorageSet.h>
|
||||
|
||||
@ -546,10 +543,14 @@ void ActionsMatcher::visit(const ASTFunction & node, const ASTPtr & ast, Data &
|
||||
if (!data.only_consts)
|
||||
{
|
||||
String result_name = column_name.get(ast);
|
||||
data.addAction(ExpressionAction::copyColumn(arg->getColumnName(), result_name));
|
||||
NameSet joined_columns;
|
||||
joined_columns.insert(result_name);
|
||||
data.addAction(ExpressionAction::arrayJoin(joined_columns, false, data.context));
|
||||
/// Here we copy argument because arrayJoin removes source column.
|
||||
/// It makes possible to remove source column before arrayJoin if it won't be needed anymore.
|
||||
|
||||
/// It could have been possible to implement arrayJoin which keeps source column,
|
||||
/// but in this case it will always be replicated (as many arrays), which is expensive.
|
||||
String tmp_name = data.getUniqueName("_array_join_" + arg->getColumnName());
|
||||
data.addAction(ExpressionAction::copyColumn(arg->getColumnName(), tmp_name));
|
||||
data.addAction(ExpressionAction::arrayJoin(tmp_name, result_name));
|
||||
}
|
||||
|
||||
return;
|
||||
|
@ -48,7 +48,7 @@ void ArrayJoinAction::prepare(Block & sample_block)
|
||||
}
|
||||
}
|
||||
|
||||
void ArrayJoinAction::execute(Block & block, bool dry_run)
|
||||
void ArrayJoinAction::execute(Block & block)
|
||||
{
|
||||
if (columns.empty())
|
||||
throw Exception("No arrays to join", ErrorCodes::LOGICAL_ERROR);
|
||||
@ -105,7 +105,7 @@ void ArrayJoinAction::execute(Block & block, bool dry_run)
|
||||
|
||||
Block tmp_block{src_col, {{}, src_col.type, {}}};
|
||||
|
||||
function_builder->build({src_col})->execute(tmp_block, {0}, 1, src_col.column->size(), dry_run);
|
||||
function_builder->build({src_col})->execute(tmp_block, {0}, 1, src_col.column->size());
|
||||
non_empty_array_columns[name] = tmp_block.safeGetByPosition(1).column;
|
||||
}
|
||||
|
||||
@ -140,31 +140,4 @@ void ArrayJoinAction::execute(Block & block, bool dry_run)
|
||||
}
|
||||
}
|
||||
|
||||
void ArrayJoinAction::finalize(NameSet & needed_columns, NameSet & unmodified_columns, NameSet & final_columns)
|
||||
{
|
||||
/// Do not ARRAY JOIN columns that are not used anymore.
|
||||
/// Usually, such columns are not used until ARRAY JOIN, and therefore are ejected further in this function.
|
||||
/// We will not remove all the columns so as not to lose the number of rows.
|
||||
for (auto it = columns.begin(); it != columns.end();)
|
||||
{
|
||||
bool need = needed_columns.count(*it);
|
||||
if (!need && columns.size() > 1)
|
||||
{
|
||||
columns.erase(it++);
|
||||
}
|
||||
else
|
||||
{
|
||||
needed_columns.insert(*it);
|
||||
unmodified_columns.erase(*it);
|
||||
|
||||
/// If no ARRAY JOIN results are used, forcibly leave an arbitrary column at the output,
|
||||
/// so you do not lose the number of rows.
|
||||
if (!need)
|
||||
final_columns.insert(*it);
|
||||
|
||||
++it;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -12,8 +12,9 @@ class Context;
|
||||
class IFunctionOverloadResolver;
|
||||
using FunctionOverloadResolverPtr = std::shared_ptr<IFunctionOverloadResolver>;
|
||||
|
||||
struct ArrayJoinAction
|
||||
class ArrayJoinAction
|
||||
{
|
||||
public:
|
||||
NameSet columns;
|
||||
bool is_left = false;
|
||||
bool is_unaligned = false;
|
||||
@ -28,8 +29,9 @@ struct ArrayJoinAction
|
||||
|
||||
ArrayJoinAction(const NameSet & array_joined_columns_, bool array_join_is_left, const Context & context);
|
||||
void prepare(Block & sample_block);
|
||||
void execute(Block & block, bool dry_run);
|
||||
void finalize(NameSet & needed_columns, NameSet & unmodified_columns, NameSet & final_columns);
|
||||
void execute(Block & block);
|
||||
};
|
||||
|
||||
using ArrayJoinActionPtr = std::shared_ptr<ArrayJoinAction>;
|
||||
|
||||
}
|
||||
|
@ -1,19 +1,19 @@
|
||||
#include <Interpreters/Set.h>
|
||||
#include <Common/ProfileEvents.h>
|
||||
#include <Common/SipHash.h>
|
||||
#include <Interpreters/ArrayJoinAction.h>
|
||||
#include <Interpreters/ExpressionActions.h>
|
||||
#include <Interpreters/ExpressionJIT.h>
|
||||
#include <Interpreters/TableJoin.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Columns/ColumnsNumber.h>
|
||||
#include <Columns/ColumnArray.h>
|
||||
#include <Common/typeid_cast.h>
|
||||
#include <DataTypes/DataTypeArray.h>
|
||||
#include <DataTypes/DataTypesNumber.h>
|
||||
#include <Functions/FunctionFactory.h>
|
||||
#include <Functions/IFunction.h>
|
||||
#include <optional>
|
||||
#include <Columns/ColumnSet.h>
|
||||
#include <Functions/FunctionHelpers.h>
|
||||
|
||||
#if !defined(ARCADIA_BUILD)
|
||||
# include "config_core.h"
|
||||
@ -37,6 +37,7 @@ namespace ErrorCodes
|
||||
extern const int NOT_FOUND_COLUMN_IN_BLOCK;
|
||||
extern const int TOO_MANY_TEMPORARY_COLUMNS;
|
||||
extern const int TOO_MANY_TEMPORARY_NON_CONST_COLUMNS;
|
||||
extern const int TYPE_MISMATCH;
|
||||
}
|
||||
|
||||
/// Read comment near usage
|
||||
@ -47,9 +48,6 @@ Names ExpressionAction::getNeededColumns() const
|
||||
{
|
||||
Names res = argument_names;
|
||||
|
||||
if (array_join)
|
||||
res.insert(res.end(), array_join->columns.begin(), array_join->columns.end());
|
||||
|
||||
if (table_join)
|
||||
res.insert(res.end(), table_join->keyNamesLeft().begin(), table_join->keyNamesLeft().end());
|
||||
|
||||
@ -143,11 +141,15 @@ ExpressionAction ExpressionAction::addAliases(const NamesWithAliases & aliased_c
|
||||
return a;
|
||||
}
|
||||
|
||||
ExpressionAction ExpressionAction::arrayJoin(const NameSet & array_joined_columns, bool array_join_is_left, const Context & context)
|
||||
ExpressionAction ExpressionAction::arrayJoin(std::string source_name, std::string result_name)
|
||||
{
|
||||
if (source_name == result_name)
|
||||
throw Exception("ARRAY JOIN action should have different source and result names", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
ExpressionAction a;
|
||||
a.type = ARRAY_JOIN;
|
||||
a.array_join = std::make_shared<ArrayJoinAction>(array_joined_columns, array_join_is_left, context);
|
||||
a.source_name = std::move(source_name);
|
||||
a.result_name = std::move(result_name);
|
||||
return a;
|
||||
}
|
||||
|
||||
@ -243,7 +245,18 @@ void ExpressionAction::prepare(Block & sample_block, const Settings & settings,
|
||||
|
||||
case ARRAY_JOIN:
|
||||
{
|
||||
array_join->prepare(sample_block);
|
||||
ColumnWithTypeAndName current = sample_block.getByName(source_name);
|
||||
sample_block.erase(source_name);
|
||||
|
||||
const DataTypeArray * array_type = typeid_cast<const DataTypeArray *>(&*current.type);
|
||||
if (!array_type)
|
||||
throw Exception("ARRAY JOIN requires array argument", ErrorCodes::TYPE_MISMATCH);
|
||||
|
||||
current.name = result_name;
|
||||
current.type = array_type->getNestedType();
|
||||
current.column = nullptr; /// Result is never const
|
||||
sample_block.insert(std::move(current));
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
@ -369,7 +382,23 @@ void ExpressionAction::execute(Block & block, bool dry_run) const
|
||||
|
||||
case ARRAY_JOIN:
|
||||
{
|
||||
array_join->execute(block, dry_run);
|
||||
auto source = block.getByName(source_name);
|
||||
block.erase(source_name);
|
||||
source.column = source.column->convertToFullColumnIfConst();
|
||||
|
||||
const ColumnArray * array = typeid_cast<const ColumnArray *>(source.column.get());
|
||||
if (!array)
|
||||
throw Exception("ARRAY JOIN of not array: " + source_name, ErrorCodes::TYPE_MISMATCH);
|
||||
|
||||
for (auto & column : block)
|
||||
column.column = column.column->replicate(array->getOffsets());
|
||||
|
||||
source.column = array->getDataPtr();
|
||||
source.type = assert_cast<const DataTypeArray &>(*source.type).getNestedType();
|
||||
source.name = result_name;
|
||||
|
||||
block.insert(std::move(source));
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
@ -478,13 +507,7 @@ std::string ExpressionAction::toString() const
|
||||
break;
|
||||
|
||||
case ARRAY_JOIN:
|
||||
ss << (array_join->is_left ? "LEFT " : "") << "ARRAY JOIN ";
|
||||
for (NameSet::const_iterator it = array_join->columns.begin(); it != array_join->columns.end(); ++it)
|
||||
{
|
||||
if (it != array_join->columns.begin())
|
||||
ss << ", ";
|
||||
ss << *it;
|
||||
}
|
||||
ss << "ARRAY JOIN " << source_name << " -> " << result_name;
|
||||
break;
|
||||
|
||||
case JOIN:
|
||||
@ -597,9 +620,6 @@ void ExpressionActions::addImpl(ExpressionAction action, Names & new_names)
|
||||
if (!action.result_name.empty())
|
||||
new_names.push_back(action.result_name);
|
||||
|
||||
if (action.array_join)
|
||||
new_names.insert(new_names.end(), action.array_join->columns.begin(), action.array_join->columns.end());
|
||||
|
||||
/// Compiled functions are custom functions and they don't need building
|
||||
if (action.type == ExpressionAction::APPLY_FUNCTION && !action.is_function_compiled)
|
||||
{
|
||||
@ -631,51 +651,6 @@ void ExpressionActions::prependProjectInput()
|
||||
actions.insert(actions.begin(), ExpressionAction::project(getRequiredColumns()));
|
||||
}
|
||||
|
||||
void ExpressionActions::prependArrayJoin(const ExpressionAction & action, const Block & sample_block_before)
|
||||
{
|
||||
if (action.type != ExpressionAction::ARRAY_JOIN)
|
||||
throw Exception("ARRAY_JOIN action expected", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
NameSet array_join_set(action.array_join->columns.begin(), action.array_join->columns.end());
|
||||
for (auto & it : input_columns)
|
||||
{
|
||||
if (array_join_set.count(it.name))
|
||||
{
|
||||
array_join_set.erase(it.name);
|
||||
it.type = std::make_shared<DataTypeArray>(it.type);
|
||||
}
|
||||
}
|
||||
for (const std::string & name : array_join_set)
|
||||
{
|
||||
input_columns.emplace_back(name, sample_block_before.getByName(name).type);
|
||||
actions.insert(actions.begin(), ExpressionAction::removeColumn(name));
|
||||
}
|
||||
|
||||
actions.insert(actions.begin(), action);
|
||||
optimizeArrayJoin();
|
||||
}
|
||||
|
||||
|
||||
bool ExpressionActions::popUnusedArrayJoin(const Names & required_columns, ExpressionAction & out_action)
|
||||
{
|
||||
if (actions.empty() || actions.back().type != ExpressionAction::ARRAY_JOIN)
|
||||
return false;
|
||||
NameSet required_set(required_columns.begin(), required_columns.end());
|
||||
for (const std::string & name : actions.back().array_join->columns)
|
||||
{
|
||||
if (required_set.count(name))
|
||||
return false;
|
||||
}
|
||||
for (const std::string & name : actions.back().array_join->columns)
|
||||
{
|
||||
DataTypePtr & type = sample_block.getByName(name).type;
|
||||
type = std::make_shared<DataTypeArray>(type);
|
||||
}
|
||||
out_action = actions.back();
|
||||
actions.pop_back();
|
||||
return true;
|
||||
}
|
||||
|
||||
void ExpressionActions::execute(Block & block, bool dry_run) const
|
||||
{
|
||||
for (const auto & action : actions)
|
||||
@ -809,7 +784,18 @@ void ExpressionActions::finalize(const Names & output_columns)
|
||||
}
|
||||
else if (action.type == ExpressionAction::ARRAY_JOIN)
|
||||
{
|
||||
action.array_join->finalize(needed_columns, unmodified_columns, final_columns);
|
||||
/// We need source anyway, in order to calculate number of rows correctly.
|
||||
needed_columns.insert(action.source_name);
|
||||
unmodified_columns.erase(action.result_name);
|
||||
needed_columns.erase(action.result_name);
|
||||
|
||||
/// Note: technically, if result of arrayJoin is not needed,
|
||||
/// we may remove all the columns and loose the number of rows here.
|
||||
/// However, I cannot imagine how it is possible.
|
||||
/// For "big" ARRAY JOIN it could have happened in query like
|
||||
/// SELECT count() FROM table ARRAY JOIN x
|
||||
/// Now, "big" ARRAY JOIN is moved to separate pipeline step,
|
||||
/// and arrayJoin(x) is an expression which result can't be lost.
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -946,7 +932,7 @@ void ExpressionActions::finalize(const Names & output_columns)
|
||||
auto process = [&] (const String & name)
|
||||
{
|
||||
auto refcount = --columns_refcount[name];
|
||||
if (refcount <= 0)
|
||||
if (refcount <= 0 && action.type != ExpressionAction::ARRAY_JOIN)
|
||||
{
|
||||
new_actions.push_back(ExpressionAction::removeColumn(name));
|
||||
if (sample_block.has(name))
|
||||
@ -1046,8 +1032,6 @@ void ExpressionActions::optimizeArrayJoin()
|
||||
|
||||
if (!actions[i].result_name.empty())
|
||||
array_joined_columns.insert(actions[i].result_name);
|
||||
if (actions[i].array_join)
|
||||
array_joined_columns.insert(actions[i].array_join->columns.begin(), actions[i].array_join->columns.end());
|
||||
|
||||
array_join_dependencies.insert(needed.begin(), needed.end());
|
||||
}
|
||||
@ -1077,6 +1061,134 @@ void ExpressionActions::optimizeArrayJoin()
|
||||
}
|
||||
}
|
||||
|
||||
ExpressionActionsPtr ExpressionActions::splitActionsBeforeArrayJoin(const NameSet & array_joined_columns)
|
||||
{
|
||||
/// Create new actions.
|
||||
/// Copy from this because we don't have context.
|
||||
/// TODO: remove context from constructor?
|
||||
auto split_actions = std::make_shared<ExpressionActions>(*this);
|
||||
split_actions->actions.clear();
|
||||
split_actions->sample_block.clear();
|
||||
split_actions->input_columns.clear();
|
||||
|
||||
/// Expected chain:
|
||||
/// Expression (this) -> ArrayJoin (array_joined_columns) -> Expression (split_actions)
|
||||
|
||||
/// We are going to move as many actions as we can from this to split_actions.
|
||||
/// We can move all inputs which are not depend on array_joined_columns
|
||||
/// (with some exceptions to PROJECT and REMOVE_COLUMN
|
||||
|
||||
/// Use the same inputs for split_actions, except array_joined_columns.
|
||||
for (const auto & input_column : input_columns)
|
||||
{
|
||||
if (array_joined_columns.count(input_column.name) == 0)
|
||||
{
|
||||
split_actions->input_columns.emplace_back(input_column);
|
||||
split_actions->sample_block.insert(ColumnWithTypeAndName(nullptr, input_column.type, input_column.name));
|
||||
}
|
||||
}
|
||||
|
||||
/// Do not split action if input depends only on array joined columns.
|
||||
if (split_actions->input_columns.empty())
|
||||
return nullptr;
|
||||
|
||||
/// Actions which depend on ARRAY JOIN result.
|
||||
NameSet array_join_dependent_columns = array_joined_columns;
|
||||
/// Arguments of actions which depend on ARRAY JOIN result.
|
||||
/// This columns can't be deleted in split_actions.
|
||||
NameSet array_join_dependent_columns_arguments;
|
||||
|
||||
/// We create new_actions list for `this`. Current actions are moved to new_actions nor added to split_actions.
|
||||
Actions new_actions;
|
||||
for (const auto & action : actions)
|
||||
{
|
||||
/// Exception for PROJECT.
|
||||
/// It removes columns, so it will remove split_actions output which may be needed for actions from `this`.
|
||||
/// So, we replace it ADD_ALIASES.
|
||||
/// Usually, PROJECT is added to begin of actions in order to remove unused output of prev actions.
|
||||
/// We skip it now, but will prependProjectInput at the end.
|
||||
if (action.type == ExpressionAction::PROJECT)
|
||||
{
|
||||
/// Each alias has separate dependencies, so we split this action into two parts.
|
||||
NamesWithAliases split_aliases;
|
||||
NamesWithAliases depend_aliases;
|
||||
for (const auto & pair : action.projection)
|
||||
{
|
||||
/// Skip if is not alias.
|
||||
if (pair.second.empty())
|
||||
continue;
|
||||
|
||||
if (array_join_dependent_columns.count(pair.first))
|
||||
{
|
||||
array_join_dependent_columns.insert(pair.second);
|
||||
depend_aliases.emplace_back(std::move(pair));
|
||||
}
|
||||
else
|
||||
split_aliases.emplace_back(std::move(pair));
|
||||
}
|
||||
|
||||
if (!split_aliases.empty())
|
||||
split_actions->add(ExpressionAction::addAliases(split_aliases));
|
||||
|
||||
if (!depend_aliases.empty())
|
||||
new_actions.emplace_back(ExpressionAction::addAliases(depend_aliases));
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
bool depends_on_array_join = false;
|
||||
for (auto & column : action.getNeededColumns())
|
||||
if (array_join_dependent_columns.count(column) != 0)
|
||||
depends_on_array_join = true;
|
||||
|
||||
if (depends_on_array_join)
|
||||
{
|
||||
/// Add result of this action to array_join_dependent_columns too.
|
||||
if (!action.result_name.empty())
|
||||
array_join_dependent_columns.insert(action.result_name);
|
||||
|
||||
/// Add arguments of this action to array_join_dependent_columns_arguments.
|
||||
auto needed = action.getNeededColumns();
|
||||
array_join_dependent_columns_arguments.insert(needed.begin(), needed.end());
|
||||
|
||||
new_actions.emplace_back(action);
|
||||
}
|
||||
else if (action.type == ExpressionAction::REMOVE_COLUMN)
|
||||
{
|
||||
/// Exception for REMOVE_COLUMN.
|
||||
/// We cannot move it to split_actions if any argument from `this` needed that column.
|
||||
if (array_join_dependent_columns_arguments.count(action.source_name))
|
||||
new_actions.emplace_back(action);
|
||||
else
|
||||
split_actions->add(action);
|
||||
}
|
||||
else
|
||||
split_actions->add(action);
|
||||
}
|
||||
|
||||
/// Return empty actions if nothing was separated. Keep `this` unchanged.
|
||||
if (split_actions->getActions().empty())
|
||||
return nullptr;
|
||||
|
||||
std::swap(actions, new_actions);
|
||||
|
||||
/// Collect inputs from ARRAY JOIN.
|
||||
NamesAndTypesList inputs_from_array_join;
|
||||
for (auto & column : input_columns)
|
||||
if (array_joined_columns.count(column.name))
|
||||
inputs_from_array_join.emplace_back(std::move(column));
|
||||
|
||||
/// Fix inputs for `this`.
|
||||
/// It is output of split_actions + inputs from ARRAY JOIN.
|
||||
input_columns = split_actions->getSampleBlock().getNamesAndTypesList();
|
||||
input_columns.insert(input_columns.end(), inputs_from_array_join.begin(), inputs_from_array_join.end());
|
||||
|
||||
/// Remove not needed columns.
|
||||
if (!actions.empty())
|
||||
prependProjectInput();
|
||||
|
||||
return split_actions;
|
||||
}
|
||||
|
||||
JoinPtr ExpressionActions::getTableJoinAlgo() const
|
||||
{
|
||||
@ -1178,9 +1290,8 @@ UInt128 ExpressionAction::ActionHash::operator()(const ExpressionAction & action
|
||||
hash.update(arg_name);
|
||||
break;
|
||||
case ARRAY_JOIN:
|
||||
hash.update(action.array_join->is_left);
|
||||
for (const auto & col : action.array_join->columns)
|
||||
hash.update(col);
|
||||
hash.update(action.result_name);
|
||||
hash.update(action.source_name);
|
||||
break;
|
||||
case JOIN:
|
||||
for (const auto & col : action.table_join->columnsAddedByJoin())
|
||||
@ -1236,15 +1347,9 @@ bool ExpressionAction::operator==(const ExpressionAction & other) const
|
||||
return false;
|
||||
}
|
||||
|
||||
bool same_array_join = !array_join && !other.array_join;
|
||||
if (array_join && other.array_join)
|
||||
same_array_join = (array_join->columns == other.array_join->columns) &&
|
||||
(array_join->is_left == other.array_join->is_left);
|
||||
|
||||
return source_name == other.source_name
|
||||
&& result_name == other.result_name
|
||||
&& argument_names == other.argument_names
|
||||
&& same_array_join
|
||||
&& TableJoin::sameJoin(table_join.get(), other.table_join.get())
|
||||
&& projection == other.projection
|
||||
&& is_function_compiled == other.is_function_compiled;
|
||||
@ -1255,8 +1360,8 @@ void ExpressionActionsChain::addStep()
|
||||
if (steps.empty())
|
||||
throw Exception("Cannot add action to empty ExpressionActionsChain", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
ColumnsWithTypeAndName columns = steps.back().actions->getSampleBlock().getColumnsWithTypeAndName();
|
||||
steps.push_back(Step(std::make_shared<ExpressionActions>(columns, context)));
|
||||
ColumnsWithTypeAndName columns = steps.back()->getResultColumns();
|
||||
steps.push_back(std::make_unique<ExpressionActionsStep>(std::make_shared<ExpressionActions>(columns, context)));
|
||||
}
|
||||
|
||||
void ExpressionActionsChain::finalize()
|
||||
@ -1264,16 +1369,16 @@ void ExpressionActionsChain::finalize()
|
||||
/// Finalize all steps. Right to left to define unnecessary input columns.
|
||||
for (int i = static_cast<int>(steps.size()) - 1; i >= 0; --i)
|
||||
{
|
||||
Names required_output = steps[i].required_output;
|
||||
Names required_output = steps[i]->required_output;
|
||||
std::unordered_map<String, size_t> required_output_indexes;
|
||||
for (size_t j = 0; j < required_output.size(); ++j)
|
||||
required_output_indexes[required_output[j]] = j;
|
||||
auto & can_remove_required_output = steps[i].can_remove_required_output;
|
||||
auto & can_remove_required_output = steps[i]->can_remove_required_output;
|
||||
|
||||
if (i + 1 < static_cast<int>(steps.size()))
|
||||
{
|
||||
const NameSet & additional_input = steps[i + 1].additional_input;
|
||||
for (const auto & it : steps[i + 1].actions->getRequiredColumnsWithTypes())
|
||||
const NameSet & additional_input = steps[i + 1]->additional_input;
|
||||
for (const auto & it : steps[i + 1]->getRequiredColumns())
|
||||
{
|
||||
if (additional_input.count(it.name) == 0)
|
||||
{
|
||||
@ -1285,27 +1390,19 @@ void ExpressionActionsChain::finalize()
|
||||
}
|
||||
}
|
||||
}
|
||||
steps[i].actions->finalize(required_output);
|
||||
}
|
||||
|
||||
/// When possible, move the ARRAY JOIN from earlier steps to later steps.
|
||||
for (size_t i = 1; i < steps.size(); ++i)
|
||||
{
|
||||
ExpressionAction action;
|
||||
if (steps[i - 1].actions->popUnusedArrayJoin(steps[i - 1].required_output, action))
|
||||
steps[i].actions->prependArrayJoin(action, steps[i - 1].actions->getSampleBlock());
|
||||
steps[i]->finalize(required_output);
|
||||
}
|
||||
|
||||
/// Adding the ejection of unnecessary columns to the beginning of each step.
|
||||
for (size_t i = 1; i < steps.size(); ++i)
|
||||
{
|
||||
size_t columns_from_previous = steps[i - 1].actions->getSampleBlock().columns();
|
||||
size_t columns_from_previous = steps[i - 1]->getResultColumns().size();
|
||||
|
||||
/// If unnecessary columns are formed at the output of the previous step, we'll add them to the beginning of this step.
|
||||
/// Except when we drop all the columns and lose the number of rows in the block.
|
||||
if (!steps[i].actions->getRequiredColumnsWithTypes().empty()
|
||||
&& columns_from_previous > steps[i].actions->getRequiredColumnsWithTypes().size())
|
||||
steps[i].actions->prependProjectInput();
|
||||
if (!steps[i]->getResultColumns().empty()
|
||||
&& columns_from_previous > steps[i]->getRequiredColumns().size())
|
||||
steps[i]->prependProjectInput();
|
||||
}
|
||||
}
|
||||
|
||||
@ -1317,12 +1414,62 @@ std::string ExpressionActionsChain::dumpChain() const
|
||||
{
|
||||
ss << "step " << i << "\n";
|
||||
ss << "required output:\n";
|
||||
for (const std::string & name : steps[i].required_output)
|
||||
for (const std::string & name : steps[i]->required_output)
|
||||
ss << name << "\n";
|
||||
ss << "\n" << steps[i].actions->dumpActions() << "\n";
|
||||
ss << "\n" << steps[i]->dump() << "\n";
|
||||
}
|
||||
|
||||
return ss.str();
|
||||
}
|
||||
|
||||
ExpressionActionsChain::ArrayJoinStep::ArrayJoinStep(ArrayJoinActionPtr array_join_, ColumnsWithTypeAndName required_columns_, Names required_output_)
|
||||
: Step(std::move(required_output_))
|
||||
, array_join(std::move(array_join_))
|
||||
, result_columns(std::move(required_columns_))
|
||||
{
|
||||
for (auto & column : result_columns)
|
||||
{
|
||||
required_columns.emplace_back(NameAndTypePair(column.name, column.type));
|
||||
|
||||
if (array_join->columns.count(column.name) > 0)
|
||||
{
|
||||
const auto * array = typeid_cast<const DataTypeArray *>(column.type.get());
|
||||
column.type = array->getNestedType();
|
||||
/// Arrays are materialized
|
||||
column.column = nullptr;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void ExpressionActionsChain::ArrayJoinStep::finalize(const Names & required_output_)
|
||||
{
|
||||
NamesAndTypesList new_required_columns;
|
||||
ColumnsWithTypeAndName new_result_columns;
|
||||
|
||||
NameSet names(required_output_.begin(), required_output_.end());
|
||||
for (const auto & column : result_columns)
|
||||
{
|
||||
if (array_join->columns.count(column.name) != 0 || names.count(column.name) != 0)
|
||||
new_result_columns.emplace_back(column);
|
||||
}
|
||||
for (const auto & column : required_columns)
|
||||
{
|
||||
if (array_join->columns.count(column.name) != 0 || names.count(column.name) != 0)
|
||||
new_required_columns.emplace_back(column);
|
||||
}
|
||||
|
||||
std::swap(required_columns, new_required_columns);
|
||||
std::swap(result_columns, new_result_columns);
|
||||
}
|
||||
|
||||
ExpressionActionsPtr & ExpressionActionsChain::Step::actions()
|
||||
{
|
||||
return typeid_cast<ExpressionActionsStep *>(this)->actions;
|
||||
}
|
||||
|
||||
const ExpressionActionsPtr & ExpressionActionsChain::Step::actions() const
|
||||
{
|
||||
return typeid_cast<const ExpressionActionsStep *>(this)->actions;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -9,7 +9,9 @@
|
||||
#include <unordered_map>
|
||||
#include <unordered_set>
|
||||
#include <Parsers/ASTTablesInSelectQuery.h>
|
||||
#include <Interpreters/ArrayJoinAction.h>
|
||||
#include <DataTypes/DataTypeArray.h>
|
||||
|
||||
#include <variant>
|
||||
|
||||
#if !defined(ARCADIA_BUILD)
|
||||
# include "config_core.h"
|
||||
@ -44,6 +46,9 @@ using DataTypePtr = std::shared_ptr<const IDataType>;
|
||||
class ExpressionActions;
|
||||
class CompiledExpressionCache;
|
||||
|
||||
class ArrayJoinAction;
|
||||
using ArrayJoinActionPtr = std::shared_ptr<ArrayJoinAction>;
|
||||
|
||||
/** Action on the block.
|
||||
*/
|
||||
struct ExpressionAction
|
||||
@ -59,10 +64,9 @@ public:
|
||||
|
||||
APPLY_FUNCTION,
|
||||
|
||||
/** Replaces the specified columns with arrays into columns with elements.
|
||||
* Duplicates the values in the remaining columns by the number of elements in the arrays.
|
||||
* Arrays must be parallel (have the same lengths).
|
||||
*/
|
||||
/// Replaces the source column with array into column with elements.
|
||||
/// Duplicates the values in the remaining columns by the number of elements in the arrays.
|
||||
/// Source column is removed from block.
|
||||
ARRAY_JOIN,
|
||||
|
||||
JOIN,
|
||||
@ -75,7 +79,7 @@ public:
|
||||
|
||||
Type type{};
|
||||
|
||||
/// For ADD/REMOVE/COPY_COLUMN.
|
||||
/// For ADD/REMOVE/ARRAY_JOIN/COPY_COLUMN.
|
||||
std::string source_name;
|
||||
std::string result_name;
|
||||
DataTypePtr result_type;
|
||||
@ -97,9 +101,6 @@ public:
|
||||
Names argument_names;
|
||||
bool is_function_compiled = false;
|
||||
|
||||
/// For ARRAY JOIN
|
||||
std::shared_ptr<ArrayJoinAction> array_join;
|
||||
|
||||
/// For JOIN
|
||||
std::shared_ptr<const TableJoin> table_join;
|
||||
JoinPtr join;
|
||||
@ -117,7 +118,7 @@ public:
|
||||
static ExpressionAction project(const NamesWithAliases & projected_columns_);
|
||||
static ExpressionAction project(const Names & projected_columns_);
|
||||
static ExpressionAction addAliases(const NamesWithAliases & aliased_columns_);
|
||||
static ExpressionAction arrayJoin(const NameSet & array_joined_columns, bool array_join_is_left, const Context & context);
|
||||
static ExpressionAction arrayJoin(std::string source_name, std::string result_name);
|
||||
static ExpressionAction ordinaryJoin(std::shared_ptr<TableJoin> table_join, JoinPtr join);
|
||||
|
||||
/// Which columns necessary to perform this action.
|
||||
@ -143,6 +144,8 @@ private:
|
||||
void execute(Block & block, bool dry_run) const;
|
||||
};
|
||||
|
||||
class ExpressionActions;
|
||||
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
|
||||
|
||||
/** Contains a sequence of actions on the block.
|
||||
*/
|
||||
@ -174,13 +177,9 @@ public:
|
||||
/// Adds to the beginning the removal of all extra columns.
|
||||
void prependProjectInput();
|
||||
|
||||
/// Add the specified ARRAY JOIN action to the beginning. Change the appropriate input types to arrays.
|
||||
/// If there are unknown columns in the ARRAY JOIN list, take their types from sample_block, and immediately after ARRAY JOIN remove them.
|
||||
void prependArrayJoin(const ExpressionAction & action, const Block & sample_block_before);
|
||||
|
||||
/// If the last action is ARRAY JOIN, and it does not affect the columns from required_columns, discard and return it.
|
||||
/// Change the corresponding output types to arrays.
|
||||
bool popUnusedArrayJoin(const Names & required_columns, ExpressionAction & out_action);
|
||||
/// Splits actions into two parts. Returned half may be swapped with ARRAY JOIN.
|
||||
/// Returns nullptr if no actions may be moved before ARRAY JOIN.
|
||||
ExpressionActionsPtr splitActionsBeforeArrayJoin(const NameSet & array_joined_columns);
|
||||
|
||||
/// - Adds actions to delete all but the specified columns.
|
||||
/// - Removes unused input columns.
|
||||
@ -196,8 +195,8 @@ public:
|
||||
Names getRequiredColumns() const
|
||||
{
|
||||
Names names;
|
||||
for (NamesAndTypesList::const_iterator it = input_columns.begin(); it != input_columns.end(); ++it)
|
||||
names.push_back(it->name);
|
||||
for (const auto & input : input_columns)
|
||||
names.push_back(input.name);
|
||||
return names;
|
||||
}
|
||||
|
||||
@ -274,8 +273,6 @@ private:
|
||||
void optimizeArrayJoin();
|
||||
};
|
||||
|
||||
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
|
||||
|
||||
|
||||
/** The sequence of transformations over the block.
|
||||
* It is assumed that the result of each step is fed to the input of the next step.
|
||||
@ -288,11 +285,14 @@ using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
|
||||
*/
|
||||
struct ExpressionActionsChain
|
||||
{
|
||||
ExpressionActionsChain(const Context & context_)
|
||||
: context(context_) {}
|
||||
explicit ExpressionActionsChain(const Context & context_) : context(context_) {}
|
||||
|
||||
|
||||
struct Step
|
||||
{
|
||||
ExpressionActionsPtr actions;
|
||||
virtual ~Step() = default;
|
||||
explicit Step(Names required_output_) : required_output(std::move(required_output_)) {}
|
||||
|
||||
/// Columns were added to the block before current step in addition to prev step output.
|
||||
NameSet additional_input;
|
||||
/// Columns which are required in the result of current step.
|
||||
@ -302,11 +302,72 @@ struct ExpressionActionsChain
|
||||
/// If not empty, has the same size with required_output; is filled in finalize().
|
||||
std::vector<bool> can_remove_required_output;
|
||||
|
||||
Step(const ExpressionActionsPtr & actions_ = nullptr, const Names & required_output_ = Names())
|
||||
: actions(actions_), required_output(required_output_) {}
|
||||
virtual const NamesAndTypesList & getRequiredColumns() const = 0;
|
||||
virtual const ColumnsWithTypeAndName & getResultColumns() const = 0;
|
||||
/// Remove unused result and update required columns
|
||||
virtual void finalize(const Names & required_output_) = 0;
|
||||
/// Add projections to expression
|
||||
virtual void prependProjectInput() const = 0;
|
||||
virtual std::string dump() const = 0;
|
||||
|
||||
/// Only for ExpressionActionsStep
|
||||
ExpressionActionsPtr & actions();
|
||||
const ExpressionActionsPtr & actions() const;
|
||||
};
|
||||
|
||||
using Steps = std::vector<Step>;
|
||||
struct ExpressionActionsStep : public Step
|
||||
{
|
||||
ExpressionActionsPtr actions;
|
||||
|
||||
explicit ExpressionActionsStep(ExpressionActionsPtr actions_, Names required_output_ = Names())
|
||||
: Step(std::move(required_output_))
|
||||
, actions(std::move(actions_))
|
||||
{
|
||||
}
|
||||
|
||||
const NamesAndTypesList & getRequiredColumns() const override
|
||||
{
|
||||
return actions->getRequiredColumnsWithTypes();
|
||||
}
|
||||
|
||||
const ColumnsWithTypeAndName & getResultColumns() const override
|
||||
{
|
||||
return actions->getSampleBlock().getColumnsWithTypeAndName();
|
||||
}
|
||||
|
||||
void finalize(const Names & required_output_) override
|
||||
{
|
||||
actions->finalize(required_output_);
|
||||
}
|
||||
|
||||
void prependProjectInput() const override
|
||||
{
|
||||
actions->prependProjectInput();
|
||||
}
|
||||
|
||||
std::string dump() const override
|
||||
{
|
||||
return actions->dumpActions();
|
||||
}
|
||||
};
|
||||
|
||||
struct ArrayJoinStep : public Step
|
||||
{
|
||||
ArrayJoinActionPtr array_join;
|
||||
NamesAndTypesList required_columns;
|
||||
ColumnsWithTypeAndName result_columns;
|
||||
|
||||
ArrayJoinStep(ArrayJoinActionPtr array_join_, ColumnsWithTypeAndName required_columns_, Names required_output_);
|
||||
|
||||
const NamesAndTypesList & getRequiredColumns() const override { return required_columns; }
|
||||
const ColumnsWithTypeAndName & getResultColumns() const override { return result_columns; }
|
||||
void finalize(const Names & required_output_) override;
|
||||
void prependProjectInput() const override {} /// TODO: remove unused columns before ARRAY JOIN ?
|
||||
std::string dump() const override { return "ARRAY JOIN"; }
|
||||
};
|
||||
|
||||
using StepPtr = std::unique_ptr<Step>;
|
||||
using Steps = std::vector<StepPtr>;
|
||||
|
||||
const Context & context;
|
||||
Steps steps;
|
||||
@ -329,7 +390,7 @@ struct ExpressionActionsChain
|
||||
throw Exception("Empty ExpressionActionsChain", ErrorCodes::LOGICAL_ERROR);
|
||||
}
|
||||
|
||||
return steps.back().actions;
|
||||
return steps.back()->actions();
|
||||
}
|
||||
|
||||
Step & getLastStep()
|
||||
@ -337,14 +398,14 @@ struct ExpressionActionsChain
|
||||
if (steps.empty())
|
||||
throw Exception("Empty ExpressionActionsChain", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
return steps.back();
|
||||
return *steps.back();
|
||||
}
|
||||
|
||||
Step & lastStep(const NamesAndTypesList & columns)
|
||||
{
|
||||
if (steps.empty())
|
||||
steps.emplace_back(std::make_shared<ExpressionActions>(columns, context));
|
||||
return steps.back();
|
||||
steps.emplace_back(std::make_unique<ExpressionActionsStep>(std::make_shared<ExpressionActions>(columns, context)));
|
||||
return *steps.back();
|
||||
}
|
||||
|
||||
std::string dumpChain() const;
|
||||
|
@ -1,30 +1,20 @@
|
||||
#include <Poco/Util/Application.h>
|
||||
#include <Poco/String.h>
|
||||
|
||||
#include <Core/Block.h>
|
||||
|
||||
#include <Parsers/ASTFunction.h>
|
||||
#include <Parsers/ASTIdentifier.h>
|
||||
#include <Parsers/ASTLiteral.h>
|
||||
#include <Parsers/ASTQualifiedAsterisk.h>
|
||||
#include <Parsers/ASTExpressionList.h>
|
||||
#include <Parsers/ASTSelectQuery.h>
|
||||
#include <Parsers/ASTSelectWithUnionQuery.h>
|
||||
#include <Parsers/ASTSubquery.h>
|
||||
#include <Parsers/ASTOrderByElement.h>
|
||||
#include <Parsers/formatAST.h>
|
||||
#include <Parsers/DumpASTNode.h>
|
||||
|
||||
#include <DataTypes/DataTypeNullable.h>
|
||||
#include <DataTypes/NestedUtils.h>
|
||||
#include <DataTypes/DataTypesNumber.h>
|
||||
#include <DataTypes/DataTypeLowCardinality.h>
|
||||
|
||||
#include <Columns/IColumn.h>
|
||||
|
||||
#include <Interpreters/ExpressionAnalyzer.h>
|
||||
#include <Interpreters/ExpressionActions.h>
|
||||
#include <Interpreters/LogicalExpressionsOptimizer.h>
|
||||
#include <Interpreters/ArrayJoinAction.h>
|
||||
#include <Interpreters/ExternalDictionariesLoader.h>
|
||||
#include <Interpreters/Set.h>
|
||||
#include <Interpreters/TableJoin.h>
|
||||
@ -42,20 +32,14 @@
|
||||
#include <Storages/StorageJoin.h>
|
||||
|
||||
#include <DataStreams/copyData.h>
|
||||
#include <DataStreams/IBlockInputStream.h>
|
||||
|
||||
#include <Dictionaries/IDictionary.h>
|
||||
#include <Dictionaries/DictionaryStructure.h>
|
||||
|
||||
#include <Common/typeid_cast.h>
|
||||
#include <Common/StringUtils/StringUtils.h>
|
||||
|
||||
#include <ext/range.h>
|
||||
#include <DataTypes/DataTypeFactory.h>
|
||||
#include <Functions/FunctionsMiscellaneous.h>
|
||||
#include <Parsers/ExpressionListParsers.h>
|
||||
#include <Parsers/parseQuery.h>
|
||||
#include <Parsers/queryToString.h>
|
||||
#include <Interpreters/interpretSubquery.h>
|
||||
#include <Interpreters/DatabaseAndTableWithAlias.h>
|
||||
#include <Interpreters/misc.h>
|
||||
@ -173,25 +157,37 @@ void ExpressionAnalyzer::analyzeAggregation()
|
||||
|
||||
if (select_query)
|
||||
{
|
||||
NamesAndTypesList array_join_columns;
|
||||
|
||||
bool is_array_join_left;
|
||||
ASTPtr array_join_expression_list = select_query->arrayJoinExpressionList(is_array_join_left);
|
||||
if (array_join_expression_list)
|
||||
if (ASTPtr array_join_expression_list = select_query->arrayJoinExpressionList(is_array_join_left))
|
||||
{
|
||||
getRootActionsNoMakeSet(array_join_expression_list, true, temp_actions, false);
|
||||
addMultipleArrayJoinAction(temp_actions, is_array_join_left);
|
||||
if (auto array_join = addMultipleArrayJoinAction(temp_actions, is_array_join_left))
|
||||
{
|
||||
auto sample_block = temp_actions->getSampleBlock();
|
||||
array_join->prepare(sample_block);
|
||||
temp_actions = std::make_shared<ExpressionActions>(sample_block.getColumnsWithTypeAndName(), context);
|
||||
}
|
||||
|
||||
array_join_columns.clear();
|
||||
for (auto & column : temp_actions->getSampleBlock().getNamesAndTypesList())
|
||||
if (syntax->array_join_result_to_source.count(column.name))
|
||||
array_join_columns.emplace_back(column);
|
||||
}
|
||||
|
||||
columns_after_array_join = sourceColumns();
|
||||
columns_after_array_join.insert(columns_after_array_join.end(), array_join_columns.begin(), array_join_columns.end());
|
||||
|
||||
const ASTTablesInSelectQueryElement * join = select_query->join();
|
||||
if (join)
|
||||
{
|
||||
getRootActionsNoMakeSet(analyzedJoin().leftKeysList(), true, temp_actions, false);
|
||||
addJoinAction(temp_actions);
|
||||
}
|
||||
|
||||
columns_after_join = columns_after_array_join;
|
||||
const auto & added_by_join = analyzedJoin().columnsAddedByJoin();
|
||||
columns_after_join.insert(columns_after_join.end(), added_by_join.begin(), added_by_join.end());
|
||||
}
|
||||
|
||||
has_aggregation = makeAggregateDescriptions(temp_actions);
|
||||
@ -281,16 +277,6 @@ void ExpressionAnalyzer::initGlobalSubqueriesAndExternalTables(bool do_global)
|
||||
}
|
||||
|
||||
|
||||
NamesAndTypesList ExpressionAnalyzer::sourceWithJoinedColumns() const
|
||||
{
|
||||
auto result_columns = sourceColumns();
|
||||
result_columns.insert(result_columns.end(), array_join_columns.begin(), array_join_columns.end());
|
||||
result_columns.insert(result_columns.end(),
|
||||
analyzedJoin().columnsAddedByJoin().begin(), analyzedJoin().columnsAddedByJoin().end());
|
||||
return result_columns;
|
||||
}
|
||||
|
||||
|
||||
void SelectQueryExpressionAnalyzer::tryMakeSetForIndexFromSubquery(const ASTPtr & subquery_or_table_name)
|
||||
{
|
||||
auto set_key = PreparedSetKey::forSubquery(*subquery_or_table_name);
|
||||
@ -374,7 +360,7 @@ void SelectQueryExpressionAnalyzer::makeSetsForIndex(const ASTPtr & node)
|
||||
}
|
||||
else
|
||||
{
|
||||
ExpressionActionsPtr temp_actions = std::make_shared<ExpressionActions>(sourceWithJoinedColumns(), context);
|
||||
ExpressionActionsPtr temp_actions = std::make_shared<ExpressionActions>(columns_after_join, context);
|
||||
getRootActions(left_in_operand, true, temp_actions);
|
||||
|
||||
Block sample_block_with_calculated_columns = temp_actions->getSampleBlock();
|
||||
@ -455,7 +441,7 @@ const ASTSelectQuery * SelectQueryExpressionAnalyzer::getAggregatingQuery() cons
|
||||
}
|
||||
|
||||
/// "Big" ARRAY JOIN.
|
||||
void ExpressionAnalyzer::addMultipleArrayJoinAction(ExpressionActionsPtr & actions, bool array_join_is_left) const
|
||||
ArrayJoinActionPtr ExpressionAnalyzer::addMultipleArrayJoinAction(ExpressionActionsPtr & actions, bool array_join_is_left) const
|
||||
{
|
||||
NameSet result_columns;
|
||||
for (const auto & result_source : syntax->array_join_result_to_source)
|
||||
@ -468,25 +454,32 @@ void ExpressionAnalyzer::addMultipleArrayJoinAction(ExpressionActionsPtr & actio
|
||||
result_columns.insert(result_source.first);
|
||||
}
|
||||
|
||||
actions->add(ExpressionAction::arrayJoin(result_columns, array_join_is_left, context));
|
||||
return std::make_shared<ArrayJoinAction>(result_columns, array_join_is_left, context);
|
||||
}
|
||||
|
||||
bool SelectQueryExpressionAnalyzer::appendArrayJoin(ExpressionActionsChain & chain, bool only_types)
|
||||
ArrayJoinActionPtr SelectQueryExpressionAnalyzer::appendArrayJoin(ExpressionActionsChain & chain, ExpressionActionsPtr & before_array_join, bool only_types)
|
||||
{
|
||||
const auto * select_query = getSelectQuery();
|
||||
|
||||
bool is_array_join_left;
|
||||
ASTPtr array_join_expression_list = select_query->arrayJoinExpressionList(is_array_join_left);
|
||||
if (!array_join_expression_list)
|
||||
return false;
|
||||
return nullptr;
|
||||
|
||||
ExpressionActionsChain::Step & step = chain.lastStep(sourceColumns());
|
||||
|
||||
getRootActions(array_join_expression_list, only_types, step.actions);
|
||||
getRootActions(array_join_expression_list, only_types, step.actions());
|
||||
|
||||
addMultipleArrayJoinAction(step.actions, is_array_join_left);
|
||||
before_array_join = chain.getLastActions();
|
||||
auto array_join = addMultipleArrayJoinAction(step.actions(), is_array_join_left);
|
||||
|
||||
return true;
|
||||
chain.steps.push_back(std::make_unique<ExpressionActionsChain::ArrayJoinStep>(
|
||||
array_join, step.getResultColumns(),
|
||||
Names())); /// Required output is empty because all array joined columns are kept by step.
|
||||
|
||||
chain.addStep();
|
||||
|
||||
return array_join;
|
||||
}
|
||||
|
||||
void ExpressionAnalyzer::addJoinAction(ExpressionActionsPtr & actions, JoinPtr join) const
|
||||
@ -496,9 +489,9 @@ void ExpressionAnalyzer::addJoinAction(ExpressionActionsPtr & actions, JoinPtr j
|
||||
|
||||
bool SelectQueryExpressionAnalyzer::appendJoinLeftKeys(ExpressionActionsChain & chain, bool only_types)
|
||||
{
|
||||
ExpressionActionsChain::Step & step = chain.lastStep(sourceColumns());
|
||||
ExpressionActionsChain::Step & step = chain.lastStep(columns_after_array_join);
|
||||
|
||||
getRootActions(analyzedJoin().leftKeysList(), only_types, step.actions);
|
||||
getRootActions(analyzedJoin().leftKeysList(), only_types, step.actions());
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -506,9 +499,9 @@ bool SelectQueryExpressionAnalyzer::appendJoin(ExpressionActionsChain & chain)
|
||||
{
|
||||
JoinPtr table_join = makeTableJoin(*syntax->ast_join);
|
||||
|
||||
ExpressionActionsChain::Step & step = chain.lastStep(sourceColumns());
|
||||
ExpressionActionsChain::Step & step = chain.lastStep(columns_after_array_join);
|
||||
|
||||
addJoinAction(step.actions, table_join);
|
||||
addJoinAction(step.actions(), table_join);
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -632,12 +625,12 @@ bool SelectQueryExpressionAnalyzer::appendPrewhere(
|
||||
return false;
|
||||
|
||||
auto & step = chain.lastStep(sourceColumns());
|
||||
getRootActions(select_query->prewhere(), only_types, step.actions);
|
||||
getRootActions(select_query->prewhere(), only_types, step.actions());
|
||||
String prewhere_column_name = select_query->prewhere()->getColumnName();
|
||||
step.required_output.push_back(prewhere_column_name);
|
||||
step.can_remove_required_output.push_back(true);
|
||||
|
||||
auto filter_type = step.actions->getSampleBlock().getByName(prewhere_column_name).type;
|
||||
auto filter_type = step.actions()->getSampleBlock().getByName(prewhere_column_name).type;
|
||||
if (!filter_type->canBeUsedInBooleanContext())
|
||||
throw Exception("Invalid type for filter in PREWHERE: " + filter_type->getName(),
|
||||
ErrorCodes::ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER);
|
||||
@ -661,7 +654,7 @@ bool SelectQueryExpressionAnalyzer::appendPrewhere(
|
||||
}
|
||||
}
|
||||
|
||||
auto names = step.actions->getSampleBlock().getNames();
|
||||
auto names = step.actions()->getSampleBlock().getNames();
|
||||
NameSet name_set(names.begin(), names.end());
|
||||
|
||||
for (const auto & column : sourceColumns())
|
||||
@ -669,7 +662,7 @@ bool SelectQueryExpressionAnalyzer::appendPrewhere(
|
||||
name_set.erase(column.name);
|
||||
|
||||
Names required_output(name_set.begin(), name_set.end());
|
||||
step.actions->finalize(required_output);
|
||||
step.actions()->finalize(required_output);
|
||||
}
|
||||
|
||||
{
|
||||
@ -680,8 +673,8 @@ bool SelectQueryExpressionAnalyzer::appendPrewhere(
|
||||
/// 2. Store side columns which were calculated during prewhere actions execution if they are used.
|
||||
/// Example: select F(A) prewhere F(A) > 0. F(A) can be saved from prewhere step.
|
||||
/// 3. Check if we can remove filter column at prewhere step. If we can, action will store single REMOVE_COLUMN.
|
||||
ColumnsWithTypeAndName columns = step.actions->getSampleBlock().getColumnsWithTypeAndName();
|
||||
auto required_columns = step.actions->getRequiredColumns();
|
||||
ColumnsWithTypeAndName columns = step.actions()->getSampleBlock().getColumnsWithTypeAndName();
|
||||
auto required_columns = step.actions()->getRequiredColumns();
|
||||
NameSet prewhere_input_names(required_columns.begin(), required_columns.end());
|
||||
NameSet unused_source_columns;
|
||||
|
||||
@ -694,8 +687,9 @@ bool SelectQueryExpressionAnalyzer::appendPrewhere(
|
||||
}
|
||||
}
|
||||
|
||||
chain.steps.emplace_back(std::make_shared<ExpressionActions>(std::move(columns), context));
|
||||
chain.steps.back().additional_input = std::move(unused_source_columns);
|
||||
chain.steps.emplace_back(std::make_unique<ExpressionActionsChain::ExpressionActionsStep>(
|
||||
std::make_shared<ExpressionActions>(std::move(columns), context)));
|
||||
chain.steps.back()->additional_input = std::move(unused_source_columns);
|
||||
}
|
||||
|
||||
return true;
|
||||
@ -706,7 +700,7 @@ void SelectQueryExpressionAnalyzer::appendPreliminaryFilter(ExpressionActionsCha
|
||||
ExpressionActionsChain::Step & step = chain.lastStep(sourceColumns());
|
||||
|
||||
// FIXME: assert(filter_info);
|
||||
step.actions = std::move(actions);
|
||||
step.actions() = std::move(actions);
|
||||
step.required_output.push_back(std::move(column_name));
|
||||
step.can_remove_required_output = {true};
|
||||
|
||||
@ -720,15 +714,15 @@ bool SelectQueryExpressionAnalyzer::appendWhere(ExpressionActionsChain & chain,
|
||||
if (!select_query->where())
|
||||
return false;
|
||||
|
||||
ExpressionActionsChain::Step & step = chain.lastStep(sourceColumns());
|
||||
ExpressionActionsChain::Step & step = chain.lastStep(columns_after_join);
|
||||
|
||||
auto where_column_name = select_query->where()->getColumnName();
|
||||
step.required_output.push_back(where_column_name);
|
||||
step.can_remove_required_output = {true};
|
||||
|
||||
getRootActions(select_query->where(), only_types, step.actions);
|
||||
getRootActions(select_query->where(), only_types, step.actions());
|
||||
|
||||
auto filter_type = step.actions->getSampleBlock().getByName(where_column_name).type;
|
||||
auto filter_type = step.actions()->getSampleBlock().getByName(where_column_name).type;
|
||||
if (!filter_type->canBeUsedInBooleanContext())
|
||||
throw Exception("Invalid type for filter in WHERE: " + filter_type->getName(),
|
||||
ErrorCodes::ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER);
|
||||
@ -744,21 +738,20 @@ bool SelectQueryExpressionAnalyzer::appendGroupBy(ExpressionActionsChain & chain
|
||||
if (!select_query->groupBy())
|
||||
return false;
|
||||
|
||||
ExpressionActionsChain::Step & step = chain.lastStep(sourceColumns());
|
||||
ExpressionActionsChain::Step & step = chain.lastStep(columns_after_join);
|
||||
|
||||
ASTs asts = select_query->groupBy()->children;
|
||||
for (const auto & ast : asts)
|
||||
{
|
||||
step.required_output.emplace_back(ast->getColumnName());
|
||||
getRootActions(ast, only_types, step.actions);
|
||||
getRootActions(ast, only_types, step.actions());
|
||||
}
|
||||
|
||||
if (optimize_aggregation_in_order)
|
||||
{
|
||||
auto all_columns = sourceWithJoinedColumns();
|
||||
for (auto & child : asts)
|
||||
{
|
||||
group_by_elements_actions.emplace_back(std::make_shared<ExpressionActions>(all_columns, context));
|
||||
group_by_elements_actions.emplace_back(std::make_shared<ExpressionActions>(columns_after_join, context));
|
||||
getRootActions(child, only_types, group_by_elements_actions.back());
|
||||
}
|
||||
}
|
||||
@ -770,7 +763,7 @@ void SelectQueryExpressionAnalyzer::appendAggregateFunctionsArguments(Expression
|
||||
{
|
||||
const auto * select_query = getAggregatingQuery();
|
||||
|
||||
ExpressionActionsChain::Step & step = chain.lastStep(sourceColumns());
|
||||
ExpressionActionsChain::Step & step = chain.lastStep(columns_after_join);
|
||||
|
||||
for (const auto & desc : aggregate_descriptions)
|
||||
for (const auto & name : desc.argument_names)
|
||||
@ -791,7 +784,7 @@ void SelectQueryExpressionAnalyzer::appendAggregateFunctionsArguments(Expression
|
||||
/// TODO: data.aggregates -> aggregates()
|
||||
for (const ASTFunction * node : data.aggregates)
|
||||
for (auto & argument : node->arguments->children)
|
||||
getRootActions(argument, only_types, step.actions);
|
||||
getRootActions(argument, only_types, step.actions());
|
||||
}
|
||||
|
||||
bool SelectQueryExpressionAnalyzer::appendHaving(ExpressionActionsChain & chain, bool only_types)
|
||||
@ -804,7 +797,7 @@ bool SelectQueryExpressionAnalyzer::appendHaving(ExpressionActionsChain & chain,
|
||||
ExpressionActionsChain::Step & step = chain.lastStep(aggregated_columns);
|
||||
|
||||
step.required_output.push_back(select_query->having()->getColumnName());
|
||||
getRootActions(select_query->having(), only_types, step.actions);
|
||||
getRootActions(select_query->having(), only_types, step.actions());
|
||||
|
||||
return true;
|
||||
}
|
||||
@ -815,7 +808,7 @@ void SelectQueryExpressionAnalyzer::appendSelect(ExpressionActionsChain & chain,
|
||||
|
||||
ExpressionActionsChain::Step & step = chain.lastStep(aggregated_columns);
|
||||
|
||||
getRootActions(select_query->select(), only_types, step.actions);
|
||||
getRootActions(select_query->select(), only_types, step.actions());
|
||||
|
||||
for (const auto & child : select_query->select()->children)
|
||||
step.required_output.push_back(child->getColumnName());
|
||||
@ -831,7 +824,7 @@ bool SelectQueryExpressionAnalyzer::appendOrderBy(ExpressionActionsChain & chain
|
||||
|
||||
ExpressionActionsChain::Step & step = chain.lastStep(aggregated_columns);
|
||||
|
||||
getRootActions(select_query->orderBy(), only_types, step.actions);
|
||||
getRootActions(select_query->orderBy(), only_types, step.actions());
|
||||
|
||||
for (auto & child : select_query->orderBy()->children)
|
||||
{
|
||||
@ -844,10 +837,9 @@ bool SelectQueryExpressionAnalyzer::appendOrderBy(ExpressionActionsChain & chain
|
||||
|
||||
if (optimize_read_in_order)
|
||||
{
|
||||
auto all_columns = sourceWithJoinedColumns();
|
||||
for (auto & child : select_query->orderBy()->children)
|
||||
{
|
||||
order_by_elements_actions.emplace_back(std::make_shared<ExpressionActions>(all_columns, context));
|
||||
order_by_elements_actions.emplace_back(std::make_shared<ExpressionActions>(columns_after_join, context));
|
||||
getRootActions(child, only_types, order_by_elements_actions.back());
|
||||
}
|
||||
}
|
||||
@ -863,7 +855,7 @@ bool SelectQueryExpressionAnalyzer::appendLimitBy(ExpressionActionsChain & chain
|
||||
|
||||
ExpressionActionsChain::Step & step = chain.lastStep(aggregated_columns);
|
||||
|
||||
getRootActions(select_query->limitBy(), only_types, step.actions);
|
||||
getRootActions(select_query->limitBy(), only_types, step.actions());
|
||||
|
||||
NameSet aggregated_names;
|
||||
for (const auto & column : aggregated_columns)
|
||||
@ -928,14 +920,14 @@ void SelectQueryExpressionAnalyzer::appendProjectResult(ExpressionActionsChain &
|
||||
}
|
||||
}
|
||||
|
||||
step.actions->add(ExpressionAction::project(result_columns));
|
||||
step.actions()->add(ExpressionAction::project(result_columns));
|
||||
}
|
||||
|
||||
|
||||
void ExpressionAnalyzer::appendExpression(ExpressionActionsChain & chain, const ASTPtr & expr, bool only_types)
|
||||
{
|
||||
ExpressionActionsChain::Step & step = chain.lastStep(sourceColumns());
|
||||
getRootActions(expr, only_types, step.actions);
|
||||
getRootActions(expr, only_types, step.actions());
|
||||
step.required_output.push_back(expr->getColumnName());
|
||||
}
|
||||
|
||||
@ -1076,7 +1068,7 @@ ExpressionAnalysisResult::ExpressionAnalysisResult(
|
||||
if (query_analyzer.appendPrewhere(chain, !first_stage, additional_required_columns_after_prewhere))
|
||||
{
|
||||
prewhere_info = std::make_shared<PrewhereInfo>(
|
||||
chain.steps.front().actions, query.prewhere()->getColumnName());
|
||||
chain.steps.front()->actions(), query.prewhere()->getColumnName());
|
||||
|
||||
if (allowEarlyConstantFolding(*prewhere_info->prewhere_actions, settings))
|
||||
{
|
||||
@ -1093,7 +1085,7 @@ ExpressionAnalysisResult::ExpressionAnalysisResult(
|
||||
chain.addStep();
|
||||
}
|
||||
|
||||
query_analyzer.appendArrayJoin(chain, only_types || !first_stage);
|
||||
array_join = query_analyzer.appendArrayJoin(chain, before_array_join, only_types || !first_stage);
|
||||
|
||||
if (query_analyzer.hasTableJoin())
|
||||
{
|
||||
@ -1119,7 +1111,7 @@ ExpressionAnalysisResult::ExpressionAnalysisResult(
|
||||
{
|
||||
Block before_where_sample;
|
||||
if (chain.steps.size() > 1)
|
||||
before_where_sample = chain.steps[chain.steps.size() - 2].actions->getSampleBlock();
|
||||
before_where_sample = Block(chain.steps[chain.steps.size() - 2]->getResultColumns());
|
||||
else
|
||||
before_where_sample = source_header;
|
||||
if (sanitizeBlock(before_where_sample))
|
||||
@ -1200,7 +1192,7 @@ void ExpressionAnalysisResult::finalize(const ExpressionActionsChain & chain, co
|
||||
{
|
||||
if (hasPrewhere())
|
||||
{
|
||||
const ExpressionActionsChain::Step & step = chain.steps.at(0);
|
||||
const ExpressionActionsChain::Step & step = *chain.steps.at(0);
|
||||
prewhere_info->remove_prewhere_column = step.can_remove_required_output.at(0);
|
||||
|
||||
Names columns_to_remove;
|
||||
@ -1225,10 +1217,10 @@ void ExpressionAnalysisResult::finalize(const ExpressionActionsChain & chain, co
|
||||
else if (hasFilter())
|
||||
{
|
||||
/// Can't have prewhere and filter set simultaneously
|
||||
filter_info->do_remove_column = chain.steps.at(0).can_remove_required_output.at(0);
|
||||
filter_info->do_remove_column = chain.steps.at(0)->can_remove_required_output.at(0);
|
||||
}
|
||||
if (hasWhere())
|
||||
remove_where_filter = chain.steps.at(where_step_num).can_remove_required_output.at(0);
|
||||
remove_where_filter = chain.steps.at(where_step_num)->can_remove_required_output.at(0);
|
||||
}
|
||||
|
||||
void ExpressionAnalysisResult::removeExtraColumns() const
|
||||
|
@ -34,6 +34,9 @@ struct ASTTablesInSelectQueryElement;
|
||||
struct StorageInMemoryMetadata;
|
||||
using StorageMetadataPtr = std::shared_ptr<const StorageInMemoryMetadata>;
|
||||
|
||||
class ArrayJoinAction;
|
||||
using ArrayJoinActionPtr = std::shared_ptr<ArrayJoinAction>;
|
||||
|
||||
/// Create columns in block or return false if not possible
|
||||
bool sanitizeBlock(Block & block, bool throw_if_cannot_create_column = false);
|
||||
|
||||
@ -43,9 +46,12 @@ struct ExpressionAnalyzerData
|
||||
SubqueriesForSets subqueries_for_sets;
|
||||
PreparedSets prepared_sets;
|
||||
|
||||
/// Columns after ARRAY JOIN. It there is no ARRAY JOIN, it's source_columns.
|
||||
NamesAndTypesList columns_after_array_join;
|
||||
/// Columns after Columns after ARRAY JOIN and JOIN. If there is no JOIN, it's columns_after_array_join.
|
||||
NamesAndTypesList columns_after_join;
|
||||
/// Columns after ARRAY JOIN, JOIN, and/or aggregation.
|
||||
NamesAndTypesList aggregated_columns;
|
||||
NamesAndTypesList array_join_columns;
|
||||
|
||||
bool has_aggregation = false;
|
||||
NamesAndTypesList aggregation_keys;
|
||||
@ -128,12 +134,10 @@ protected:
|
||||
const TableJoin & analyzedJoin() const { return *syntax->analyzed_join; }
|
||||
const NamesAndTypesList & sourceColumns() const { return syntax->required_source_columns; }
|
||||
const std::vector<const ASTFunction *> & aggregates() const { return syntax->aggregates; }
|
||||
NamesAndTypesList sourceWithJoinedColumns() const;
|
||||
|
||||
/// Find global subqueries in the GLOBAL IN/JOIN sections. Fills in external_tables.
|
||||
void initGlobalSubqueriesAndExternalTables(bool do_global);
|
||||
|
||||
void addMultipleArrayJoinAction(ExpressionActionsPtr & actions, bool is_left) const;
|
||||
ArrayJoinActionPtr addMultipleArrayJoinAction(ExpressionActionsPtr & actions, bool is_left) const;
|
||||
|
||||
void addJoinAction(ExpressionActionsPtr & actions, JoinPtr = {}) const;
|
||||
|
||||
@ -175,6 +179,8 @@ struct ExpressionAnalysisResult
|
||||
bool optimize_read_in_order = false;
|
||||
bool optimize_aggregation_in_order = false;
|
||||
|
||||
ExpressionActionsPtr before_array_join;
|
||||
ArrayJoinActionPtr array_join;
|
||||
ExpressionActionsPtr before_join;
|
||||
ExpressionActionsPtr join;
|
||||
ExpressionActionsPtr before_where;
|
||||
@ -305,7 +311,7 @@ private:
|
||||
*/
|
||||
|
||||
/// Before aggregation:
|
||||
bool appendArrayJoin(ExpressionActionsChain & chain, bool only_types);
|
||||
ArrayJoinActionPtr appendArrayJoin(ExpressionActionsChain & chain, ExpressionActionsPtr & before_array_join, bool only_types);
|
||||
bool appendJoinLeftKeys(ExpressionActionsChain & chain, bool only_types);
|
||||
bool appendJoin(ExpressionActionsChain & chain);
|
||||
/// Add preliminary rows filtration. Actions are created in other expression analyzer to prevent any possible alias injection.
|
||||
|
@ -34,6 +34,7 @@
|
||||
#include <Processors/Transforms/ExpressionTransform.h>
|
||||
#include <Processors/Transforms/InflatingExpressionTransform.h>
|
||||
#include <Processors/Transforms/AggregatingTransform.h>
|
||||
#include <Processors/QueryPlan/ArrayJoinStep.h>
|
||||
#include <Processors/QueryPlan/ReadFromStorageStep.h>
|
||||
#include <Processors/QueryPlan/ExpressionStep.h>
|
||||
#include <Processors/QueryPlan/FilterStep.h>
|
||||
@ -862,6 +863,25 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, const BlockInpu
|
||||
query_plan.addStep(std::move(row_level_security_step));
|
||||
}
|
||||
|
||||
if (expressions.before_array_join)
|
||||
{
|
||||
QueryPlanStepPtr before_array_join_step = std::make_unique<ExpressionStep>(
|
||||
query_plan.getCurrentDataStream(),
|
||||
expressions.before_array_join);
|
||||
before_array_join_step->setStepDescription("Before ARRAY JOIN");
|
||||
query_plan.addStep(std::move(before_array_join_step));
|
||||
}
|
||||
|
||||
if (expressions.array_join)
|
||||
{
|
||||
QueryPlanStepPtr array_join_step = std::make_unique<ArrayJoinStep>(
|
||||
query_plan.getCurrentDataStream(),
|
||||
expressions.array_join);
|
||||
|
||||
array_join_step->setStepDescription("ARRAY JOIN");
|
||||
query_plan.addStep(std::move(array_join_step));
|
||||
}
|
||||
|
||||
if (expressions.before_join)
|
||||
{
|
||||
QueryPlanStepPtr before_join_step = std::make_unique<ExpressionStep>(
|
||||
|
@ -623,7 +623,7 @@ ASTPtr MutationsInterpreter::prepareInterpreterSelectQuery(std::vector<Stage> &
|
||||
actions_chain.finalize();
|
||||
|
||||
/// Propagate information about columns needed as input.
|
||||
for (const auto & column : actions_chain.steps.front().actions->getRequiredColumnsWithTypes())
|
||||
for (const auto & column : actions_chain.steps.front()->actions()->getRequiredColumnsWithTypes())
|
||||
prepared_stages[i - 1].output_columns.insert(column.name);
|
||||
}
|
||||
|
||||
@ -667,12 +667,12 @@ BlockInputStreamPtr MutationsInterpreter::addStreamsForLaterStages(const std::ve
|
||||
if (i < stage.filter_column_names.size())
|
||||
{
|
||||
/// Execute DELETEs.
|
||||
in = std::make_shared<FilterBlockInputStream>(in, step.actions, stage.filter_column_names[i]);
|
||||
in = std::make_shared<FilterBlockInputStream>(in, step->actions(), stage.filter_column_names[i]);
|
||||
}
|
||||
else
|
||||
{
|
||||
/// Execute UPDATE or final projection.
|
||||
in = std::make_shared<ExpressionBlockInputStream>(in, step.actions);
|
||||
in = std::make_shared<ExpressionBlockInputStream>(in, step->actions());
|
||||
}
|
||||
}
|
||||
|
||||
|
83
src/Processors/QueryPlan/ArrayJoinStep.cpp
Normal file
83
src/Processors/QueryPlan/ArrayJoinStep.cpp
Normal file
@ -0,0 +1,83 @@
|
||||
#include <Processors/QueryPlan/ArrayJoinStep.h>
|
||||
#include <Processors/Transforms/ArrayJoinTransform.h>
|
||||
#include <Processors/Transforms/ConvertingTransform.h>
|
||||
#include <Processors/QueryPipeline.h>
|
||||
#include <Interpreters/ArrayJoinAction.h>
|
||||
#include <IO/Operators.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
static ITransformingStep::Traits getTraits()
|
||||
{
|
||||
return ITransformingStep::Traits
|
||||
{
|
||||
{
|
||||
.preserves_distinct_columns = false,
|
||||
.returns_single_stream = false,
|
||||
.preserves_number_of_streams = true,
|
||||
.preserves_sorting = false,
|
||||
},
|
||||
{
|
||||
.preserves_number_of_rows = false,
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
ArrayJoinStep::ArrayJoinStep(const DataStream & input_stream_, ArrayJoinActionPtr array_join_)
|
||||
: ITransformingStep(
|
||||
input_stream_,
|
||||
ArrayJoinTransform::transformHeader(input_stream_.header, array_join_),
|
||||
getTraits())
|
||||
, array_join(std::move(array_join_))
|
||||
{
|
||||
}
|
||||
|
||||
void ArrayJoinStep::updateInputStream(DataStream input_stream, Block result_header)
|
||||
{
|
||||
output_stream = createOutputStream(
|
||||
input_stream,
|
||||
ArrayJoinTransform::transformHeader(input_stream.header, array_join),
|
||||
getDataStreamTraits());
|
||||
|
||||
input_streams.clear();
|
||||
input_streams.emplace_back(std::move(input_stream));
|
||||
res_header = std::move(result_header);
|
||||
}
|
||||
|
||||
void ArrayJoinStep::transformPipeline(QueryPipeline & pipeline)
|
||||
{
|
||||
pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type)
|
||||
{
|
||||
bool on_totals = stream_type == QueryPipeline::StreamType::Totals;
|
||||
return std::make_shared<ArrayJoinTransform>(header, array_join, on_totals);
|
||||
});
|
||||
|
||||
if (res_header && !blocksHaveEqualStructure(res_header, output_stream->header))
|
||||
{
|
||||
pipeline.addSimpleTransform([&](const Block & header)
|
||||
{
|
||||
return std::make_shared<ConvertingTransform>(header, res_header, ConvertingTransform::MatchColumnsMode::Name);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
void ArrayJoinStep::describeActions(FormatSettings & settings) const
|
||||
{
|
||||
String prefix(settings.offset, ' ');
|
||||
bool first = true;
|
||||
|
||||
settings.out << prefix << (array_join->is_left ? "LEFT " : "") << "ARRAY JOIN ";
|
||||
for (const auto & column : array_join->columns)
|
||||
{
|
||||
if (!first)
|
||||
settings.out << ", ";
|
||||
first = false;
|
||||
|
||||
|
||||
settings.out << column;
|
||||
}
|
||||
settings.out << '\n';
|
||||
}
|
||||
|
||||
}
|
29
src/Processors/QueryPlan/ArrayJoinStep.h
Normal file
29
src/Processors/QueryPlan/ArrayJoinStep.h
Normal file
@ -0,0 +1,29 @@
|
||||
#pragma once
|
||||
#include <Processors/QueryPlan/ITransformingStep.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class ArrayJoinAction;
|
||||
using ArrayJoinActionPtr = std::shared_ptr<ArrayJoinAction>;
|
||||
|
||||
class ArrayJoinStep : public ITransformingStep
|
||||
{
|
||||
public:
|
||||
explicit ArrayJoinStep(const DataStream & input_stream_, ArrayJoinActionPtr array_join_);
|
||||
String getName() const override { return "ArrayJoin"; }
|
||||
|
||||
void transformPipeline(QueryPipeline & pipeline) override;
|
||||
|
||||
void describeActions(FormatSettings & settings) const override;
|
||||
|
||||
void updateInputStream(DataStream input_stream, Block result_header);
|
||||
|
||||
const ArrayJoinActionPtr & arrayJoin() const { return array_join; }
|
||||
|
||||
private:
|
||||
ArrayJoinActionPtr array_join;
|
||||
Block res_header;
|
||||
};
|
||||
|
||||
}
|
@ -1,6 +1,7 @@
|
||||
#include <Processors/QueryPlan/ExpressionStep.h>
|
||||
#include <Processors/Transforms/ExpressionTransform.h>
|
||||
#include <Processors/QueryPipeline.h>
|
||||
#include <Processors/Transforms/ConvertingTransform.h>
|
||||
#include <Processors/Transforms/InflatingExpressionTransform.h>
|
||||
#include <Interpreters/ExpressionActions.h>
|
||||
#include <IO/Operators.h>
|
||||
@ -35,6 +36,19 @@ ExpressionStep::ExpressionStep(const DataStream & input_stream_, ExpressionActio
|
||||
updateDistinctColumns(output_stream->header, output_stream->distinct_columns);
|
||||
}
|
||||
|
||||
void ExpressionStep::updateInputStream(DataStream input_stream, bool keep_header)
|
||||
{
|
||||
Block out_header = keep_header ? std::move(output_stream->header)
|
||||
: Transform::transformHeader(input_stream.header, expression);
|
||||
output_stream = createOutputStream(
|
||||
input_stream,
|
||||
std::move(out_header),
|
||||
getDataStreamTraits());
|
||||
|
||||
input_streams.clear();
|
||||
input_streams.emplace_back(std::move(input_stream));
|
||||
}
|
||||
|
||||
void ExpressionStep::transformPipeline(QueryPipeline & pipeline)
|
||||
{
|
||||
pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type)
|
||||
@ -42,6 +56,15 @@ void ExpressionStep::transformPipeline(QueryPipeline & pipeline)
|
||||
bool on_totals = stream_type == QueryPipeline::StreamType::Totals;
|
||||
return std::make_shared<Transform>(header, expression, on_totals);
|
||||
});
|
||||
|
||||
if (!blocksHaveEqualStructure(pipeline.getHeader(), output_stream->header))
|
||||
{
|
||||
pipeline.addSimpleTransform([&](const Block & header)
|
||||
{
|
||||
return std::make_shared<ConvertingTransform>(header, output_stream->header,
|
||||
ConvertingTransform::MatchColumnsMode::Name);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
static void doDescribeActions(const ExpressionActionsPtr & expression, IQueryPlanStep::FormatSettings & settings)
|
||||
|
@ -21,8 +21,12 @@ public:
|
||||
|
||||
void transformPipeline(QueryPipeline & pipeline) override;
|
||||
|
||||
void updateInputStream(DataStream input_stream, bool keep_header);
|
||||
|
||||
void describeActions(FormatSettings & settings) const override;
|
||||
|
||||
const ExpressionActionsPtr & getExpression() const { return expression; }
|
||||
|
||||
private:
|
||||
ExpressionActionsPtr expression;
|
||||
};
|
||||
|
@ -1,6 +1,7 @@
|
||||
#include <Processors/QueryPlan/FilterStep.h>
|
||||
#include <Processors/Transforms/FilterTransform.h>
|
||||
#include <Processors/QueryPipeline.h>
|
||||
#include <Processors/Transforms/ConvertingTransform.h>
|
||||
#include <Interpreters/ExpressionActions.h>
|
||||
#include <IO/Operators.h>
|
||||
|
||||
@ -40,6 +41,21 @@ FilterStep::FilterStep(
|
||||
updateDistinctColumns(output_stream->header, output_stream->distinct_columns);
|
||||
}
|
||||
|
||||
void FilterStep::updateInputStream(DataStream input_stream, bool keep_header)
|
||||
{
|
||||
Block out_header = std::move(output_stream->header);
|
||||
if (keep_header)
|
||||
out_header = FilterTransform::transformHeader(input_stream.header, expression, filter_column_name, remove_filter_column);
|
||||
|
||||
output_stream = createOutputStream(
|
||||
input_stream,
|
||||
std::move(out_header),
|
||||
getDataStreamTraits());
|
||||
|
||||
input_streams.clear();
|
||||
input_streams.emplace_back(std::move(input_stream));
|
||||
}
|
||||
|
||||
void FilterStep::transformPipeline(QueryPipeline & pipeline)
|
||||
{
|
||||
pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type)
|
||||
@ -47,6 +63,14 @@ void FilterStep::transformPipeline(QueryPipeline & pipeline)
|
||||
bool on_totals = stream_type == QueryPipeline::StreamType::Totals;
|
||||
return std::make_shared<FilterTransform>(header, expression, filter_column_name, remove_filter_column, on_totals);
|
||||
});
|
||||
|
||||
if (!blocksHaveEqualStructure(pipeline.getHeader(), output_stream->header))
|
||||
{
|
||||
pipeline.addSimpleTransform([&](const Block & header)
|
||||
{
|
||||
return std::make_shared<ConvertingTransform>(header, output_stream->header, ConvertingTransform::MatchColumnsMode::Name);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
void FilterStep::describeActions(FormatSettings & settings) const
|
||||
|
@ -20,8 +20,14 @@ public:
|
||||
String getName() const override { return "Filter"; }
|
||||
void transformPipeline(QueryPipeline & pipeline) override;
|
||||
|
||||
void updateInputStream(DataStream input_stream, bool keep_header);
|
||||
|
||||
void describeActions(FormatSettings & settings) const override;
|
||||
|
||||
const ExpressionActionsPtr & getExpression() const { return expression; }
|
||||
const String & getFilterColumnName() const { return filter_column_name; }
|
||||
bool removesFilterColumn() const { return remove_filter_column; }
|
||||
|
||||
private:
|
||||
ExpressionActionsPtr expression;
|
||||
String filter_column_name;
|
||||
|
@ -3,6 +3,8 @@
|
||||
#include <Processors/QueryPipeline.h>
|
||||
#include <IO/WriteBuffer.h>
|
||||
#include <IO/Operators.h>
|
||||
#include <Interpreters/ExpressionActions.h>
|
||||
#include <Interpreters/ArrayJoinAction.h>
|
||||
#include <stack>
|
||||
#include <Processors/QueryPlan/LimitStep.h>
|
||||
#include "MergingSortedStep.h"
|
||||
@ -10,6 +12,9 @@
|
||||
#include "MergeSortingStep.h"
|
||||
#include "PartialSortingStep.h"
|
||||
#include "TotalsHavingStep.h"
|
||||
#include "ExpressionStep.h"
|
||||
#include "ArrayJoinStep.h"
|
||||
#include "FilterStep.h"
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -408,6 +413,64 @@ static void tryPushDownLimit(QueryPlanStepPtr & parent, QueryPlan::Node * child_
|
||||
parent.swap(child);
|
||||
}
|
||||
|
||||
/// Move ARRAY JOIN up if possible.
|
||||
static void tryLiftUpArrayJoin(QueryPlan::Node * parent_node, QueryPlan::Node * child_node, QueryPlan::Nodes & nodes)
|
||||
{
|
||||
auto & parent = parent_node->step;
|
||||
auto & child = child_node->step;
|
||||
auto * expression_step = typeid_cast<ExpressionStep *>(parent.get());
|
||||
auto * filter_step = typeid_cast<FilterStep *>(parent.get());
|
||||
auto * array_join_step = typeid_cast<ArrayJoinStep *>(child.get());
|
||||
|
||||
if (!(expression_step || filter_step) || !array_join_step)
|
||||
return;
|
||||
|
||||
const auto & array_join = array_join_step->arrayJoin();
|
||||
const auto & expression = expression_step ? expression_step->getExpression()
|
||||
: filter_step->getExpression();
|
||||
|
||||
auto split_actions = expression->splitActionsBeforeArrayJoin(array_join->columns);
|
||||
|
||||
/// No actions can be moved before ARRAY JOIN.
|
||||
if (!split_actions)
|
||||
return;
|
||||
|
||||
/// All actions was moved before ARRAY JOIN. Swap Expression and ArrayJoin.
|
||||
if (expression->getActions().empty())
|
||||
{
|
||||
auto expected_header = parent->getOutputStream().header;
|
||||
|
||||
/// Expression/Filter -> ArrayJoin
|
||||
std::swap(parent, child);
|
||||
/// ArrayJoin -> Expression/Filter
|
||||
|
||||
if (expression_step)
|
||||
child = std::make_unique<ExpressionStep>(child_node->children.at(0)->step->getOutputStream(),
|
||||
std::move(split_actions));
|
||||
else
|
||||
child = std::make_unique<FilterStep>(child_node->children.at(0)->step->getOutputStream(),
|
||||
std::move(split_actions),
|
||||
filter_step->getFilterColumnName(),
|
||||
filter_step->removesFilterColumn());
|
||||
|
||||
array_join_step->updateInputStream(child->getOutputStream(), expected_header);
|
||||
return;
|
||||
}
|
||||
|
||||
/// Add new expression step before ARRAY JOIN.
|
||||
/// Expression/Filter -> ArrayJoin -> Something
|
||||
auto & node = nodes.emplace_back();
|
||||
node.children.swap(child_node->children);
|
||||
child_node->children.emplace_back(&node);
|
||||
/// Expression/Filter -> ArrayJoin -> node -> Something
|
||||
|
||||
node.step = std::make_unique<ExpressionStep>(node.children.at(0)->step->getOutputStream(),
|
||||
std::move(split_actions));
|
||||
array_join_step->updateInputStream(node.step->getOutputStream(), {});
|
||||
expression_step ? expression_step->updateInputStream(array_join_step->getOutputStream(), true)
|
||||
: filter_step->updateInputStream(array_join_step->getOutputStream(), true);
|
||||
}
|
||||
|
||||
void QueryPlan::optimize()
|
||||
{
|
||||
struct Frame
|
||||
@ -436,7 +499,13 @@ void QueryPlan::optimize()
|
||||
++frame.next_child;
|
||||
}
|
||||
else
|
||||
{
|
||||
/// Last entrance, try lift up.
|
||||
if (frame.node->children.size() == 1)
|
||||
tryLiftUpArrayJoin(frame.node, frame.node->children.front(), nodes);
|
||||
|
||||
stack.pop();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -69,11 +69,10 @@ public:
|
||||
std::vector<Node *> children = {};
|
||||
};
|
||||
|
||||
private:
|
||||
|
||||
using Nodes = std::list<Node>;
|
||||
Nodes nodes;
|
||||
|
||||
private:
|
||||
Nodes nodes;
|
||||
Node * root = nullptr;
|
||||
|
||||
void checkInitialized() const;
|
||||
|
37
src/Processors/Transforms/ArrayJoinTransform.cpp
Normal file
37
src/Processors/Transforms/ArrayJoinTransform.cpp
Normal file
@ -0,0 +1,37 @@
|
||||
#include <Processors/Transforms/ArrayJoinTransform.h>
|
||||
#include <Interpreters/ArrayJoinAction.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
Block ArrayJoinTransform::transformHeader(Block header, const ArrayJoinActionPtr & array_join)
|
||||
{
|
||||
array_join->execute(header);
|
||||
return header;
|
||||
}
|
||||
|
||||
ArrayJoinTransform::ArrayJoinTransform(
|
||||
const Block & header_,
|
||||
ArrayJoinActionPtr array_join_,
|
||||
bool /*on_totals_*/)
|
||||
: ISimpleTransform(header_, transformHeader(header_, array_join_), false)
|
||||
, array_join(std::move(array_join_))
|
||||
{
|
||||
/// TODO
|
||||
// if (on_totals_)
|
||||
// throw Exception("ARRAY JOIN is not supported for totals", ErrorCodes::LOGICAL_ERROR);
|
||||
}
|
||||
|
||||
void ArrayJoinTransform::transform(Chunk & chunk)
|
||||
{
|
||||
auto block = getInputPort().getHeader().cloneWithColumns(chunk.detachColumns());
|
||||
array_join->execute(block);
|
||||
chunk.setColumns(block.getColumns(), block.rows());
|
||||
}
|
||||
|
||||
}
|
30
src/Processors/Transforms/ArrayJoinTransform.h
Normal file
30
src/Processors/Transforms/ArrayJoinTransform.h
Normal file
@ -0,0 +1,30 @@
|
||||
#pragma once
|
||||
#include <Processors/ISimpleTransform.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class ArrayJoinAction;
|
||||
using ArrayJoinActionPtr = std::shared_ptr<ArrayJoinAction>;
|
||||
|
||||
/// Execute ARRAY JOIN
|
||||
class ArrayJoinTransform : public ISimpleTransform
|
||||
{
|
||||
public:
|
||||
ArrayJoinTransform(
|
||||
const Block & header_,
|
||||
ArrayJoinActionPtr array_join_,
|
||||
bool on_totals_ = false);
|
||||
|
||||
String getName() const override { return "ArrayJoinTransform"; }
|
||||
|
||||
static Block transformHeader(Block header, const ArrayJoinActionPtr & array_join);
|
||||
|
||||
protected:
|
||||
void transform(Chunk & chunk) override;
|
||||
|
||||
private:
|
||||
ArrayJoinActionPtr array_join;
|
||||
};
|
||||
|
||||
}
|
@ -88,6 +88,7 @@ SRCS(
|
||||
QueryPipeline.cpp
|
||||
QueryPlan/AddingDelayedSourceStep.cpp
|
||||
QueryPlan/AggregatingStep.cpp
|
||||
QueryPlan/ArrayJoinStep.cpp
|
||||
QueryPlan/ConvertingStep.cpp
|
||||
QueryPlan/CreatingSetsStep.cpp
|
||||
QueryPlan/CubeStep.cpp
|
||||
@ -124,6 +125,7 @@ SRCS(
|
||||
Transforms/AddingSelectorTransform.cpp
|
||||
Transforms/AggregatingInOrderTransform.cpp
|
||||
Transforms/AggregatingTransform.cpp
|
||||
Transforms/ArrayJoinTransform.cpp
|
||||
Transforms/ConvertingTransform.cpp
|
||||
Transforms/CopyTransform.cpp
|
||||
Transforms/CreatingSetsTransform.cpp
|
||||
|
Loading…
Reference in New Issue
Block a user