Merge pull request #14585 from ClickHouse/join-processor

Remove Join from ExpressionActions
This commit is contained in:
alexey-milovidov 2020-09-08 23:56:09 +03:00 committed by GitHub
commit f2e9a09b11
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 169 additions and 271 deletions

View File

@ -18,7 +18,7 @@ String ExpressionBlockInputStream::getName() const { return "Expression"; }
Block ExpressionBlockInputStream::getTotals()
{
totals = children.back()->getTotals();
expression->executeOnTotals(totals);
expression->execute(totals);
return totals;
}
@ -30,14 +30,6 @@ Block ExpressionBlockInputStream::getHeader() const
Block ExpressionBlockInputStream::readImpl()
{
if (!initialized)
{
if (expression->resultIsAlwaysEmpty())
return {};
initialized = true;
}
Block res = children.back()->read();
if (res)
expression->execute(res);

View File

@ -25,7 +25,6 @@ public:
Block getHeader() const override;
protected:
bool initialized = false;
ExpressionActionsPtr expression;
Block readImpl() override;

View File

@ -54,7 +54,7 @@ String FilterBlockInputStream::getName() const { return "Filter"; }
Block FilterBlockInputStream::getTotals()
{
totals = children.back()->getTotals();
expression->executeOnTotals(totals);
expression->execute(totals);
return totals;
}

View File

@ -207,7 +207,7 @@ public:
{
/// Check that expression does not contain unusual actions that will break blocks structure.
for (const auto & action : expression_actions->getActions())
if (action.type == ExpressionAction::Type::JOIN || action.type == ExpressionAction::Type::ARRAY_JOIN)
if (action.type == ExpressionAction::Type::ARRAY_JOIN)
throw Exception("Expression with arrayJoin or other unusual action cannot be captured", ErrorCodes::BAD_ARGUMENTS);
std::unordered_map<std::string, DataTypePtr> arguments_map;

View File

@ -11,6 +11,7 @@
#include <Common/typeid_cast.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeNullable.h>
#include <Functions/IFunction.h>
#include <optional>
#include <Columns/ColumnSet.h>
@ -153,14 +154,6 @@ ExpressionAction ExpressionAction::arrayJoin(std::string source_name, std::strin
return a;
}
ExpressionAction ExpressionAction::ordinaryJoin(std::shared_ptr<TableJoin> table_join, JoinPtr join)
{
ExpressionAction a;
a.type = JOIN;
a.table_join = table_join;
a.join = join;
return a;
}
void ExpressionAction::prepare(Block & sample_block, const Settings & settings, NameSet & names_not_for_constant_folding)
{
@ -260,12 +253,6 @@ void ExpressionAction::prepare(Block & sample_block, const Settings & settings,
break;
}
case JOIN:
{
table_join->addJoinedColumnsAndCorrectNullability(sample_block);
break;
}
case PROJECT:
{
Block new_block;
@ -336,19 +323,6 @@ void ExpressionAction::prepare(Block & sample_block, const Settings & settings,
}
}
void ExpressionAction::execute(Block & block, ExtraBlockPtr & not_processed) const
{
switch (type)
{
case JOIN:
join->joinBlock(block, not_processed);
break;
default:
throw Exception("Unexpected expression call", ErrorCodes::LOGICAL_ERROR);
}
}
void ExpressionAction::execute(Block & block, bool dry_run) const
{
size_t input_rows_count = block.rows();
@ -402,9 +376,6 @@ void ExpressionAction::execute(Block & block, bool dry_run) const
break;
}
case JOIN:
throw Exception("Unexpected JOIN expression call", ErrorCodes::LOGICAL_ERROR);
case PROJECT:
{
Block new_block;
@ -463,14 +434,6 @@ void ExpressionAction::execute(Block & block, bool dry_run) const
}
}
void ExpressionAction::executeOnTotals(Block & block) const
{
if (type != JOIN)
execute(block, false);
else
join->joinTotals(block);
}
std::string ExpressionAction::toString() const
{
@ -510,17 +473,6 @@ std::string ExpressionAction::toString() const
ss << "ARRAY JOIN " << source_name << " -> " << result_name;
break;
case JOIN:
ss << "JOIN ";
for (NamesAndTypesList::const_iterator it = table_join->columnsAddedByJoin().begin();
it != table_join->columnsAddedByJoin().end(); ++it)
{
if (it != table_join->columnsAddedByJoin().begin())
ss << ", ";
ss << it->name;
}
break;
case PROJECT: [[fallthrough]];
case ADD_ALIASES:
ss << (type == PROJECT ? "PROJECT " : "ADD_ALIASES ");
@ -660,53 +612,15 @@ void ExpressionActions::execute(Block & block, bool dry_run) const
}
}
void ExpressionActions::execute(Block & block, ExtraBlockPtr & not_processed) const
{
if (actions.size() != 1)
throw Exception("Continuation over multiple expressions is not supported", ErrorCodes::LOGICAL_ERROR);
actions[0].execute(block, not_processed);
checkLimits(block);
}
bool ExpressionActions::hasJoinOrArrayJoin() const
bool ExpressionActions::hasArrayJoin() const
{
for (const auto & action : actions)
if (action.type == ExpressionAction::JOIN || action.type == ExpressionAction::ARRAY_JOIN)
if (action.type == ExpressionAction::ARRAY_JOIN)
return true;
return false;
}
bool ExpressionActions::hasTotalsInJoin() const
{
for (const auto & action : actions)
if (action.table_join && action.join->hasTotals())
return true;
return false;
}
void ExpressionActions::executeOnTotals(Block & block) const
{
/// If there is `totals` in the subquery for JOIN, but we do not have totals, then take the block with the default values instead of `totals`.
if (!block)
{
if (hasTotalsInJoin())
{
for (const auto & name_and_type : input_columns)
{
auto column = name_and_type.type->createColumn();
column->insertDefault();
block.insert(ColumnWithTypeAndName(std::move(column), name_and_type.type, name_and_type.name));
}
}
else
return; /// There's nothing to JOIN.
}
for (const auto & action : actions)
action.executeOnTotals(block);
}
std::string ExpressionActions::getSmallestColumn(const NamesAndTypesList & columns)
{
@ -1190,28 +1104,6 @@ ExpressionActionsPtr ExpressionActions::splitActionsBeforeArrayJoin(const NameSe
return split_actions;
}
JoinPtr ExpressionActions::getTableJoinAlgo() const
{
for (const auto & action : actions)
if (action.join)
return action.join;
return {};
}
bool ExpressionActions::resultIsAlwaysEmpty() const
{
/// Check that has join which returns empty result.
for (const auto & action : actions)
{
if (action.type == action.JOIN && action.join && action.join->alwaysReturnsEmptySet())
return true;
}
return false;
}
bool ExpressionActions::checkColumnIsAlwaysFalse(const String & column_name) const
{
@ -1293,10 +1185,6 @@ UInt128 ExpressionAction::ActionHash::operator()(const ExpressionAction & action
hash.update(action.result_name);
hash.update(action.source_name);
break;
case JOIN:
for (const auto & col : action.table_join->columnsAddedByJoin())
hash.update(col.name);
break;
case PROJECT:
for (const auto & pair_of_strs : action.projection)
{
@ -1422,8 +1310,8 @@ std::string ExpressionActionsChain::dumpChain() const
return ss.str();
}
ExpressionActionsChain::ArrayJoinStep::ArrayJoinStep(ArrayJoinActionPtr array_join_, ColumnsWithTypeAndName required_columns_, Names required_output_)
: Step(std::move(required_output_))
ExpressionActionsChain::ArrayJoinStep::ArrayJoinStep(ArrayJoinActionPtr array_join_, ColumnsWithTypeAndName required_columns_)
: Step({})
, array_join(std::move(array_join_))
, result_columns(std::move(required_columns_))
{
@ -1462,6 +1350,52 @@ void ExpressionActionsChain::ArrayJoinStep::finalize(const Names & required_outp
std::swap(result_columns, new_result_columns);
}
ExpressionActionsChain::JoinStep::JoinStep(
std::shared_ptr<TableJoin> analyzed_join_,
JoinPtr join_,
ColumnsWithTypeAndName required_columns_)
: Step({})
, analyzed_join(std::move(analyzed_join_))
, join(std::move(join_))
, result_columns(std::move(required_columns_))
{
for (const auto & column : result_columns)
required_columns.emplace_back(column.name, column.type);
analyzed_join->addJoinedColumnsAndCorrectNullability(result_columns);
}
void ExpressionActionsChain::JoinStep::finalize(const Names & required_output_)
{
/// We need to update required and result columns by removing unused ones.
NamesAndTypesList new_required_columns;
ColumnsWithTypeAndName new_result_columns;
/// That's an input columns we need.
NameSet required_names(required_output_.begin(), required_output_.end());
for (const auto & name : analyzed_join->keyNamesLeft())
required_names.emplace(name);
for (const auto & column : required_columns)
{
if (required_names.count(column.name) != 0)
new_required_columns.emplace_back(column);
}
/// Result will also contain joined columns.
for (const auto & column : analyzed_join->columnsAddedByJoin())
required_names.emplace(column.name);
for (const auto & column : result_columns)
{
if (required_names.count(column.name) != 0)
new_result_columns.emplace_back(column);
}
std::swap(required_columns, new_required_columns);
std::swap(result_columns, new_result_columns);
}
ExpressionActionsPtr & ExpressionActionsChain::Step::actions()
{
return typeid_cast<ExpressionActionsStep *>(this)->actions;

View File

@ -69,8 +69,6 @@ public:
/// Source column is removed from block.
ARRAY_JOIN,
JOIN,
/// Reorder and rename the columns, delete the extra ones. The same column names are allowed in the result.
PROJECT,
/// Add columns with alias names. This columns are the same as non-aliased. PROJECT columns if you need to modify them.
@ -119,7 +117,6 @@ public:
static ExpressionAction project(const Names & projected_columns_);
static ExpressionAction addAliases(const NamesWithAliases & aliased_columns_);
static ExpressionAction arrayJoin(std::string source_name, std::string result_name);
static ExpressionAction ordinaryJoin(std::shared_ptr<TableJoin> table_join, JoinPtr join);
/// Which columns necessary to perform this action.
Names getNeededColumns() const;
@ -137,10 +134,6 @@ private:
friend class ExpressionActions;
void prepare(Block & sample_block, const Settings & settings, NameSet & names_not_for_constant_folding);
void executeOnTotals(Block & block) const;
/// Executes action on block (modify it). Block could be split in case of JOIN. Then not_processed block is created.
void execute(Block & block, ExtraBlockPtr & not_processed) const;
void execute(Block & block, bool dry_run) const;
};
@ -205,18 +198,7 @@ public:
/// Execute the expression on the block. The block must contain all the columns returned by getRequiredColumns.
void execute(Block & block, bool dry_run = false) const;
/// Execute the expression on the block with continuation. This method in only supported for single JOIN.
void execute(Block & block, ExtraBlockPtr & not_processed) const;
bool hasJoinOrArrayJoin() const;
/// Check if joined subquery has totals.
bool hasTotalsInJoin() const;
/** Execute the expression on the block of total values.
* Almost the same as `execute`. The difference is only when JOIN is executed.
*/
void executeOnTotals(Block & block) const;
bool hasArrayJoin() const;
/// Obtain a sample block that contains the names and types of result columns.
const Block & getSampleBlock() const { return sample_block; }
@ -225,14 +207,8 @@ public:
static std::string getSmallestColumn(const NamesAndTypesList & columns);
JoinPtr getTableJoinAlgo() const;
const Settings & getSettings() const { return settings; }
/// Check if result block has no rows. True if it's definite, false if we can't say for sure.
/// Call it only after subqueries for join were executed.
bool resultIsAlwaysEmpty() const;
/// Check if column is always zero. True if it's definite, false if we can't say for sure.
/// Call it only after subqueries for sets were executed.
bool checkColumnIsAlwaysFalse(const String & column_name) const;
@ -357,7 +333,7 @@ struct ExpressionActionsChain
NamesAndTypesList required_columns;
ColumnsWithTypeAndName result_columns;
ArrayJoinStep(ArrayJoinActionPtr array_join_, ColumnsWithTypeAndName required_columns_, Names required_output_);
ArrayJoinStep(ArrayJoinActionPtr array_join_, ColumnsWithTypeAndName required_columns_);
const NamesAndTypesList & getRequiredColumns() const override { return required_columns; }
const ColumnsWithTypeAndName & getResultColumns() const override { return result_columns; }
@ -366,6 +342,22 @@ struct ExpressionActionsChain
std::string dump() const override { return "ARRAY JOIN"; }
};
struct JoinStep : public Step
{
std::shared_ptr<TableJoin> analyzed_join;
JoinPtr join;
NamesAndTypesList required_columns;
ColumnsWithTypeAndName result_columns;
JoinStep(std::shared_ptr<TableJoin> analyzed_join_, JoinPtr join_, ColumnsWithTypeAndName required_columns_);
const NamesAndTypesList & getRequiredColumns() const override { return required_columns; }
const ColumnsWithTypeAndName & getResultColumns() const override { return result_columns; }
void finalize(const Names & required_output_) override;
void prependProjectInput() const override {} /// TODO: remove unused columns before JOIN ?
std::string dump() const override { return "JOIN"; }
};
using StepPtr = std::unique_ptr<Step>;
using Steps = std::vector<StepPtr>;

View File

@ -182,7 +182,9 @@ void ExpressionAnalyzer::analyzeAggregation()
if (join)
{
getRootActionsNoMakeSet(analyzedJoin().leftKeysList(), true, temp_actions, false);
addJoinAction(temp_actions);
auto sample_columns = temp_actions->getSampleBlock().getColumnsWithTypeAndName();
analyzedJoin().addJoinedColumnsAndCorrectNullability(sample_columns);
temp_actions = std::make_shared<ExpressionActions>(sample_columns, context);
}
columns_after_join = columns_after_array_join;
@ -474,19 +476,13 @@ ArrayJoinActionPtr SelectQueryExpressionAnalyzer::appendArrayJoin(ExpressionActi
auto array_join = addMultipleArrayJoinAction(step.actions(), is_array_join_left);
chain.steps.push_back(std::make_unique<ExpressionActionsChain::ArrayJoinStep>(
array_join, step.getResultColumns(),
Names())); /// Required output is empty because all array joined columns are kept by step.
array_join, step.getResultColumns()));
chain.addStep();
return array_join;
}
void ExpressionAnalyzer::addJoinAction(ExpressionActionsPtr & actions, JoinPtr join) const
{
actions->add(ExpressionAction::ordinaryJoin(syntax->analyzed_join, join));
}
bool SelectQueryExpressionAnalyzer::appendJoinLeftKeys(ExpressionActionsChain & chain, bool only_types)
{
ExpressionActionsChain::Step & step = chain.lastStep(columns_after_array_join);
@ -495,14 +491,17 @@ bool SelectQueryExpressionAnalyzer::appendJoinLeftKeys(ExpressionActionsChain &
return true;
}
bool SelectQueryExpressionAnalyzer::appendJoin(ExpressionActionsChain & chain)
JoinPtr SelectQueryExpressionAnalyzer::appendJoin(ExpressionActionsChain & chain)
{
JoinPtr table_join = makeTableJoin(*syntax->ast_join);
ExpressionActionsChain::Step & step = chain.lastStep(columns_after_array_join);
addJoinAction(step.actions(), table_join);
return true;
chain.steps.push_back(std::make_unique<ExpressionActionsChain::JoinStep>(
syntax->analyzed_join, table_join, step.getResultColumns()));
chain.addStep();
return table_join;
}
static JoinPtr tryGetStorageJoin(std::shared_ptr<TableJoin> analyzed_join)
@ -1091,15 +1090,8 @@ ExpressionAnalysisResult::ExpressionAnalysisResult(
{
query_analyzer.appendJoinLeftKeys(chain, only_types || !first_stage);
before_join = chain.getLastActions(true);
if (before_join)
chain.addStep();
query_analyzer.appendJoin(chain);
join = chain.getLastActions();
if (!join)
throw Exception("No expected JOIN", ErrorCodes::LOGICAL_ERROR);
before_join = chain.getLastActions();
join = query_analyzer.appendJoin(chain);
chain.addStep();
}
@ -1150,9 +1142,8 @@ ExpressionAnalysisResult::ExpressionAnalysisResult(
if (hasJoin())
{
/// You may find it strange but we support read_in_order for HashJoin and do not support for MergeJoin.
auto join_algo = join->getTableJoinAlgo();
bool has_delayed_stream = query_analyzer.analyzedJoin().needStreamWithNonJoinedRows();
join_allow_read_in_order = typeid_cast<HashJoin *>(join_algo.get()) && !has_delayed_stream;
join_allow_read_in_order = typeid_cast<HashJoin *>(join.get()) && !has_delayed_stream;
}
optimize_read_in_order =
@ -1242,8 +1233,8 @@ void ExpressionAnalysisResult::checkActions() const
{
if (actions)
for (const auto & action : actions->getActions())
if (action.type == ExpressionAction::Type::JOIN || action.type == ExpressionAction::Type::ARRAY_JOIN)
throw Exception("PREWHERE cannot contain ARRAY JOIN or JOIN action", ErrorCodes::ILLEGAL_PREWHERE);
if (action.type == ExpressionAction::Type::ARRAY_JOIN)
throw Exception("PREWHERE cannot contain ARRAY JOIN action", ErrorCodes::ILLEGAL_PREWHERE);
};
check_actions(prewhere_info->prewhere_actions);

View File

@ -139,8 +139,6 @@ protected:
ArrayJoinActionPtr addMultipleArrayJoinAction(ExpressionActionsPtr & actions, bool is_left) const;
void addJoinAction(ExpressionActionsPtr & actions, JoinPtr = {}) const;
void getRootActions(const ASTPtr & ast, bool no_subqueries, ExpressionActionsPtr & actions, bool only_consts = false);
/** Similar to getRootActions but do not make sets when analyzing IN functions. It's used in
@ -182,7 +180,7 @@ struct ExpressionAnalysisResult
ExpressionActionsPtr before_array_join;
ArrayJoinActionPtr array_join;
ExpressionActionsPtr before_join;
ExpressionActionsPtr join;
JoinPtr join;
ExpressionActionsPtr before_where;
ExpressionActionsPtr before_aggregation;
ExpressionActionsPtr before_having;
@ -313,7 +311,7 @@ private:
/// Before aggregation:
ArrayJoinActionPtr appendArrayJoin(ExpressionActionsChain & chain, ExpressionActionsPtr & before_array_join, bool only_types);
bool appendJoinLeftKeys(ExpressionActionsChain & chain, bool only_types);
bool appendJoin(ExpressionActionsChain & chain);
JoinPtr appendJoin(ExpressionActionsChain & chain);
/// Add preliminary rows filtration. Actions are created in other expression analyzer to prevent any possible alias injection.
void appendPreliminaryFilter(ExpressionActionsChain & chain, ExpressionActionsPtr actions, String column_name);
/// remove_filter is set in ExpressionActionsChain::finalize();

View File

@ -599,7 +599,6 @@ static std::vector<std::unordered_set<std::optional<size_t>>> getActionsDependen
case ExpressionAction::ADD_COLUMN:
case ExpressionAction::COPY_COLUMN:
case ExpressionAction::ARRAY_JOIN:
case ExpressionAction::JOIN:
{
Names columns = actions[i].getNeededColumns();
for (const auto & column : columns)

View File

@ -32,7 +32,7 @@
#include <Processors/Pipe.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <Processors/Transforms/InflatingExpressionTransform.h>
#include <Processors/Transforms/JoiningTransform.h>
#include <Processors/Transforms/AggregatingTransform.h>
#include <Processors/QueryPlan/ArrayJoinStep.h>
#include <Processors/QueryPlan/ReadFromStorageStep.h>
@ -913,12 +913,12 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, const BlockInpu
if (expressions.hasJoin())
{
Block join_result_sample;
JoinPtr join = expressions.join->getTableJoinAlgo();
JoinPtr join = expressions.join;
join_result_sample = InflatingExpressionTransform::transformHeader(
join_result_sample = JoiningTransform::transformHeader(
query_plan.getCurrentDataStream().header, expressions.join);
QueryPlanStepPtr join_step = std::make_unique<InflatingExpressionStep>(
QueryPlanStepPtr join_step = std::make_unique<JoinStep>(
query_plan.getCurrentDataStream(),
expressions.join);

View File

@ -4,6 +4,7 @@
#include <Core/Settings.h>
#include <Core/Block.h>
#include <Core/ColumnsWithTypeAndName.h>
#include <Common/StringUtils/StringUtils.h>
@ -228,9 +229,9 @@ void TableJoin::addJoinedColumn(const NameAndTypePair & joined_column)
columns_added_by_join.push_back(joined_column);
}
void TableJoin::addJoinedColumnsAndCorrectNullability(Block & sample_block) const
void TableJoin::addJoinedColumnsAndCorrectNullability(ColumnsWithTypeAndName & columns) const
{
for (auto & col : sample_block)
for (auto & col : columns)
{
/// Materialize column.
/// Column is not empty if it is constant, but after Join all constants will be materialized.
@ -249,7 +250,7 @@ void TableJoin::addJoinedColumnsAndCorrectNullability(Block & sample_block) cons
if (rightBecomeNullable(res_type))
res_type = makeNullable(res_type);
sample_block.insert(ColumnWithTypeAndName(nullptr, res_type, col.name));
columns.emplace_back(nullptr, res_type, col.name);
}
}

View File

@ -22,6 +22,9 @@ struct DatabaseAndTableWithAlias;
class Block;
class DictionaryReader;
struct ColumnWithTypeAndName;
using ColumnsWithTypeAndName = std::vector<ColumnWithTypeAndName>;
struct Settings;
class IVolume;
@ -133,7 +136,7 @@ public:
bool leftBecomeNullable(const DataTypePtr & column_type) const;
bool rightBecomeNullable(const DataTypePtr & column_type) const;
void addJoinedColumn(const NameAndTypePair & joined_column);
void addJoinedColumnsAndCorrectNullability(Block & sample_block) const;
void addJoinedColumnsAndCorrectNullability(ColumnsWithTypeAndName & columns) const;
void setAsofInequality(ASOF::Inequality inequality) { asof_inequality = inequality; }
ASOF::Inequality getAsofInequality() { return asof_inequality; }

View File

@ -2,7 +2,7 @@
#include <Processors/Transforms/ExpressionTransform.h>
#include <Processors/QueryPipeline.h>
#include <Processors/Transforms/ConvertingTransform.h>
#include <Processors/Transforms/InflatingExpressionTransform.h>
#include <Processors/Transforms/JoiningTransform.h>
#include <Interpreters/ExpressionActions.h>
#include <IO/Operators.h>
@ -14,13 +14,29 @@ static ITransformingStep::Traits getTraits(const ExpressionActionsPtr & expressi
return ITransformingStep::Traits
{
{
.preserves_distinct_columns = !expression->hasJoinOrArrayJoin(),
.preserves_distinct_columns = !expression->hasArrayJoin(),
.returns_single_stream = false,
.preserves_number_of_streams = true,
.preserves_sorting = !expression->hasJoinOrArrayJoin(),
.preserves_sorting = !expression->hasArrayJoin(),
},
{
.preserves_number_of_rows = !expression->hasJoinOrArrayJoin(),
.preserves_number_of_rows = !expression->hasArrayJoin(),
}
};
}
static ITransformingStep::Traits getJoinTraits()
{
return ITransformingStep::Traits
{
{
.preserves_distinct_columns = false,
.returns_single_stream = false,
.preserves_number_of_streams = true,
.preserves_sorting = false,
},
{
.preserves_number_of_rows = false,
}
};
}
@ -51,10 +67,9 @@ void ExpressionStep::updateInputStream(DataStream input_stream, bool keep_header
void ExpressionStep::transformPipeline(QueryPipeline & pipeline)
{
pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type)
pipeline.addSimpleTransform([&](const Block & header)
{
bool on_totals = stream_type == QueryPipeline::StreamType::Totals;
return std::make_shared<Transform>(header, expression, on_totals);
return std::make_shared<Transform>(header, expression);
});
if (!blocksHaveEqualStructure(pipeline.getHeader(), output_stream->header))
@ -86,17 +101,17 @@ void ExpressionStep::describeActions(FormatSettings & settings) const
doDescribeActions(expression, settings);
}
InflatingExpressionStep::InflatingExpressionStep(const DataStream & input_stream_, ExpressionActionsPtr expression_)
JoinStep::JoinStep(const DataStream & input_stream_, JoinPtr join_)
: ITransformingStep(
input_stream_,
Transform::transformHeader(input_stream_.header, expression_),
getTraits(expression_))
, expression(std::move(expression_))
Transform::transformHeader(input_stream_.header, join_),
getJoinTraits())
, join(std::move(join_))
{
updateDistinctColumns(output_stream->header, output_stream->distinct_columns);
}
void InflatingExpressionStep::transformPipeline(QueryPipeline & pipeline)
void JoinStep::transformPipeline(QueryPipeline & pipeline)
{
/// In case joined subquery has totals, and we don't, add default chunk to totals.
bool add_default_totals = false;
@ -109,13 +124,8 @@ void InflatingExpressionStep::transformPipeline(QueryPipeline & pipeline)
pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type)
{
bool on_totals = stream_type == QueryPipeline::StreamType::Totals;
return std::make_shared<Transform>(header, expression, on_totals, add_default_totals);
return std::make_shared<Transform>(header, join, on_totals, add_default_totals);
});
}
void InflatingExpressionStep::describeActions(FormatSettings & settings) const
{
doDescribeActions(expression, settings);
}
}

View File

@ -7,8 +7,11 @@ namespace DB
class ExpressionActions;
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
class IJoin;
using JoinPtr = std::shared_ptr<IJoin>;
class ExpressionTransform;
class InflatingExpressionTransform;
class JoiningTransform;
/// Calculates specified expression. See ExpressionTransform.
class ExpressionStep : public ITransformingStep
@ -32,20 +35,18 @@ private:
};
/// TODO: add separate step for join.
class InflatingExpressionStep : public ITransformingStep
class JoinStep : public ITransformingStep
{
public:
using Transform = InflatingExpressionTransform;
using Transform = JoiningTransform;
explicit InflatingExpressionStep(const DataStream & input_stream_, ExpressionActionsPtr expression_);
String getName() const override { return "InflatingExpression"; }
explicit JoinStep(const DataStream & input_stream_, JoinPtr join_);
String getName() const override { return "Join"; }
void transformPipeline(QueryPipeline & pipeline) override;
void describeActions(FormatSettings & settings) const override;
private:
ExpressionActionsPtr expression;
JoinPtr join;
};
}

View File

@ -13,7 +13,7 @@ static ITransformingStep::Traits getTraits(const ExpressionActionsPtr & expressi
return ITransformingStep::Traits
{
{
.preserves_distinct_columns = !expression->hasJoinOrArrayJoin(), /// I suppose it actually never happens
.preserves_distinct_columns = !expression->hasArrayJoin(), /// I suppose it actually never happens
.returns_single_stream = false,
.preserves_number_of_streams = true,
.preserves_sorting = true,

View File

@ -12,33 +12,17 @@ Block ExpressionTransform::transformHeader(Block header, const ExpressionActions
}
ExpressionTransform::ExpressionTransform(const Block & header_, ExpressionActionsPtr expression_, bool on_totals_)
: ISimpleTransform(header_, transformHeader(header_, expression_), on_totals_)
ExpressionTransform::ExpressionTransform(const Block & header_, ExpressionActionsPtr expression_)
: ISimpleTransform(header_, transformHeader(header_, expression_), false)
, expression(std::move(expression_))
, on_totals(on_totals_)
{
}
void ExpressionTransform::transform(Chunk & chunk)
{
if (!initialized)
{
initialized = true;
if (expression->resultIsAlwaysEmpty() && !on_totals)
{
stopReading();
chunk.clear();
return;
}
}
auto block = getInputPort().getHeader().cloneWithColumns(chunk.detachColumns());
if (on_totals)
expression->executeOnTotals(block);
else
expression->execute(block);
expression->execute(block);
auto num_rows = block.rows();
chunk.setColumns(block.getColumns(), num_rows);

View File

@ -17,8 +17,7 @@ class ExpressionTransform : public ISimpleTransform
public:
ExpressionTransform(
const Block & header_,
ExpressionActionsPtr expression_,
bool on_totals_ = false);
ExpressionActionsPtr expression_);
String getName() const override { return "ExpressionTransform"; }
@ -29,8 +28,6 @@ protected:
private:
ExpressionActionsPtr expression;
bool on_totals;
bool initialized = false;
};
}

View File

@ -103,10 +103,7 @@ void FilterTransform::transform(Chunk & chunk)
Block block = getInputPort().getHeader().cloneWithColumns(columns);
columns.clear();
if (on_totals)
expression->executeOnTotals(block);
else
expression->execute(block);
expression->execute(block);
num_rows_before_filtration = block.rows();
columns = block.getColumns();

View File

@ -1,32 +1,32 @@
#include <Processors/Transforms/InflatingExpressionTransform.h>
#include <Processors/Transforms/JoiningTransform.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/ExpressionActions.h>
namespace DB
{
Block InflatingExpressionTransform::transformHeader(Block header, const ExpressionActionsPtr & expression)
Block JoiningTransform::transformHeader(Block header, const JoinPtr & join)
{
ExtraBlockPtr tmp;
expression->execute(header, tmp);
join->joinBlock(header, tmp);
return header;
}
InflatingExpressionTransform::InflatingExpressionTransform(Block input_header, ExpressionActionsPtr expression_,
bool on_totals_, bool default_totals_)
: ISimpleTransform(input_header, transformHeader(input_header, expression_), on_totals_)
, expression(std::move(expression_))
JoiningTransform::JoiningTransform(Block input_header, JoinPtr join_,
bool on_totals_, bool default_totals_)
: ISimpleTransform(input_header, transformHeader(input_header, join_), on_totals_)
, join(std::move(join_))
, on_totals(on_totals_)
, default_totals(default_totals_)
{}
void InflatingExpressionTransform::transform(Chunk & chunk)
void JoiningTransform::transform(Chunk & chunk)
{
if (!initialized)
{
initialized = true;
if (expression->resultIsAlwaysEmpty() && !on_totals)
if (join->alwaysReturnsEmptySet() && !on_totals)
{
stopReading();
chunk.clear();
@ -42,10 +42,10 @@ void InflatingExpressionTransform::transform(Chunk & chunk)
/// Drop totals if both out stream and joined stream doesn't have ones.
/// See comment in ExpressionTransform.h
if (default_totals && !expression->hasTotalsInJoin())
if (default_totals && !join->hasTotals())
return;
expression->executeOnTotals(block);
join->joinTotals(block);
}
else
block = readExecute(chunk);
@ -54,7 +54,7 @@ void InflatingExpressionTransform::transform(Chunk & chunk)
chunk.setColumns(block.getColumns(), num_rows);
}
Block InflatingExpressionTransform::readExecute(Chunk & chunk)
Block JoiningTransform::readExecute(Chunk & chunk)
{
Block res;
@ -64,7 +64,7 @@ Block InflatingExpressionTransform::readExecute(Chunk & chunk)
res = getInputPort().getHeader().cloneWithColumns(chunk.detachColumns());
if (res)
expression->execute(res, not_processed);
join->joinBlock(res, not_processed);
}
else if (not_processed->empty()) /// There's not processed data inside expression.
{
@ -72,12 +72,12 @@ Block InflatingExpressionTransform::readExecute(Chunk & chunk)
res = getInputPort().getHeader().cloneWithColumns(chunk.detachColumns());
not_processed.reset();
expression->execute(res, not_processed);
join->joinBlock(res, not_processed);
}
else
{
res = std::move(not_processed->block);
expression->execute(res, not_processed);
join->joinBlock(res, not_processed);
}
return res;
}

View File

@ -5,25 +5,25 @@
namespace DB
{
class ExpressionActions;
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
class IJoin;
using JoinPtr = std::shared_ptr<IJoin>;
class InflatingExpressionTransform : public ISimpleTransform
class JoiningTransform : public ISimpleTransform
{
public:
InflatingExpressionTransform(Block input_header, ExpressionActionsPtr expression_,
bool on_totals_ = false, bool default_totals_ = false);
JoiningTransform(Block input_header, JoinPtr join_,
bool on_totals_ = false, bool default_totals_ = false);
String getName() const override { return "InflatingExpressionTransform"; }
static Block transformHeader(Block header, const ExpressionActionsPtr & expression);
static Block transformHeader(Block header, const JoinPtr & join);
protected:
void transform(Chunk & chunk) override;
bool needInputData() const override { return !not_processed; }
private:
ExpressionActionsPtr expression;
JoinPtr join;
bool on_totals;
/// This flag means that we have manually added totals to our pipeline.
/// It may happen in case if joined subquery has totals, but out string doesn't.

View File

@ -137,7 +137,7 @@ SRCS(
Transforms/FillingTransform.cpp
Transforms/FilterTransform.cpp
Transforms/FinishSortingTransform.cpp
Transforms/InflatingExpressionTransform.cpp
Transforms/JoiningTransform.cpp
Transforms/LimitByTransform.cpp
Transforms/LimitsCheckingTransform.cpp
Transforms/MaterializingTransform.cpp

View File

@ -484,8 +484,8 @@ Block validateColumnsDefaultsAndGetSampleBlock(ASTPtr default_expr_list, const N
auto syntax_analyzer_result = TreeRewriter(context).analyze(default_expr_list, all_columns);
const auto actions = ExpressionAnalyzer(default_expr_list, syntax_analyzer_result, context).getActions(true);
for (const auto & action : actions->getActions())
if (action.type == ExpressionAction::Type::JOIN || action.type == ExpressionAction::Type::ARRAY_JOIN)
throw Exception("Unsupported default value that requires ARRAY JOIN or JOIN action", ErrorCodes::THERE_IS_NO_DEFAULT_VALUE);
if (action.type == ExpressionAction::Type::ARRAY_JOIN)
throw Exception("Unsupported default value that requires ARRAY JOIN action", ErrorCodes::THERE_IS_NO_DEFAULT_VALUE);
return actions->getSampleBlock();
}