Merge pull request #23743 from ClickHouse/refactor-join-step

Refactor join step
This commit is contained in:
Nikolai Kochetov 2021-05-02 21:17:23 +03:00 committed by GitHub
commit 08f10dced0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 672 additions and 262 deletions

View File

@ -26,6 +26,8 @@
#include <Interpreters/DictionaryReader.h>
#include <Interpreters/Context.h>
#include <Processors/QueryPlan/ExpressionStep.h>
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <AggregateFunctions/parseAggregateFunctionParameters.h>
@ -119,11 +121,14 @@ bool sanitizeBlock(Block & block, bool throw_if_cannot_create_column)
return true;
}
ExpressionAnalyzerData::~ExpressionAnalyzerData() = default;
ExpressionAnalyzer::ExtractedSettings::ExtractedSettings(const Settings & settings_)
: use_index_for_in_with_subqueries(settings_.use_index_for_in_with_subqueries)
, size_limits_for_set(settings_.max_rows_in_set, settings_.max_bytes_in_set, settings_.set_overflow_mode)
{}
ExpressionAnalyzer::~ExpressionAnalyzer() = default;
ExpressionAnalyzer::ExpressionAnalyzer(
const ASTPtr & query_,
@ -746,11 +751,11 @@ static JoinPtr tryGetStorageJoin(std::shared_ptr<TableJoin> analyzed_join)
return {};
}
static ExpressionActionsPtr createJoinedBlockActions(ContextPtr context, const TableJoin & analyzed_join)
static ActionsDAGPtr createJoinedBlockActions(ContextPtr context, const TableJoin & analyzed_join)
{
ASTPtr expression_list = analyzed_join.rightKeysList();
auto syntax_result = TreeRewriter(context).analyze(expression_list, analyzed_join.columnsFromJoinedTable());
return ExpressionAnalyzer(expression_list, syntax_result, context).getActions(true, false);
return ExpressionAnalyzer(expression_list, syntax_result, context).getActionsDAG(true, false);
}
static bool allowDictJoin(StoragePtr joined_storage, ContextPtr context, String & dict_name, String & key_name)
@ -802,67 +807,77 @@ JoinPtr SelectQueryExpressionAnalyzer::makeTableJoin(
const ASTTablesInSelectQueryElement & join_element, const ColumnsWithTypeAndName & left_sample_columns)
{
/// Two JOINs are not supported with the same subquery, but different USINGs.
auto join_hash = join_element.getTreeHash();
String join_subquery_id = toString(join_hash.first) + "_" + toString(join_hash.second);
SubqueryForSet & subquery_for_join = subqueries_for_sets[join_subquery_id];
if (joined_plan)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Table join was already created for query");
/// Use StorageJoin if any.
if (!subquery_for_join.join)
subquery_for_join.join = tryGetStorageJoin(syntax->analyzed_join);
JoinPtr join = tryGetStorageJoin(syntax->analyzed_join);
if (!subquery_for_join.join)
if (!join)
{
/// Actions which need to be calculated on joined block.
ExpressionActionsPtr joined_block_actions = createJoinedBlockActions(getContext(), analyzedJoin());
auto joined_block_actions = createJoinedBlockActions(getContext(), analyzedJoin());
Names original_right_columns;
if (!subquery_for_join.source)
NamesWithAliases required_columns_with_aliases = analyzedJoin().getRequiredColumns(
Block(joined_block_actions->getResultColumns()), joined_block_actions->getRequiredColumns().getNames());
for (auto & pr : required_columns_with_aliases)
original_right_columns.push_back(pr.first);
/** For GLOBAL JOINs (in the case, for example, of the push method for executing GLOBAL subqueries), the following occurs
* - in the addExternalStorage function, the JOIN (SELECT ...) subquery is replaced with JOIN _data1,
* in the subquery_for_set object this subquery is exposed as source and the temporary table _data1 as the `table`.
* - this function shows the expression JOIN _data1.
*/
auto interpreter = interpretSubquery(join_element.table_expression, getContext(), original_right_columns, query_options);
{
NamesWithAliases required_columns_with_aliases = analyzedJoin().getRequiredColumns(
joined_block_actions->getSampleBlock(), joined_block_actions->getRequiredColumns());
for (auto & pr : required_columns_with_aliases)
original_right_columns.push_back(pr.first);
joined_plan = std::make_unique<QueryPlan>();
interpreter->buildQueryPlan(*joined_plan);
/** For GLOBAL JOINs (in the case, for example, of the push method for executing GLOBAL subqueries), the following occurs
* - in the addExternalStorage function, the JOIN (SELECT ...) subquery is replaced with JOIN _data1,
* in the subquery_for_set object this subquery is exposed as source and the temporary table _data1 as the `table`.
* - this function shows the expression JOIN _data1.
*/
auto interpreter = interpretSubquery(join_element.table_expression, getContext(), original_right_columns, query_options);
auto sample_block = interpreter->getSampleBlock();
subquery_for_join.makeSource(interpreter, std::move(required_columns_with_aliases));
auto rename_dag = std::make_unique<ActionsDAG>(sample_block.getColumnsWithTypeAndName());
for (const auto & name_with_alias : required_columns_with_aliases)
{
if (sample_block.has(name_with_alias.first))
{
auto pos = sample_block.getPositionByName(name_with_alias.first);
const auto & alias = rename_dag->addAlias(*rename_dag->getInputs()[pos], name_with_alias.second);
rename_dag->getIndex()[pos] = &alias;
}
}
auto rename_step = std::make_unique<ExpressionStep>(joined_plan->getCurrentDataStream(), std::move(rename_dag));
rename_step->setStepDescription("Rename joined columns");
joined_plan->addStep(std::move(rename_step));
}
/// TODO You do not need to set this up when JOIN is only needed on remote servers.
subquery_for_join.addJoinActions(joined_block_actions); /// changes subquery_for_join.sample_block inside
auto joined_actions_step = std::make_unique<ExpressionStep>(joined_plan->getCurrentDataStream(), std::move(joined_block_actions));
joined_actions_step->setStepDescription("Joined actions");
joined_plan->addStep(std::move(joined_actions_step));
const ColumnsWithTypeAndName & right_sample_columns = subquery_for_join.sample_block.getColumnsWithTypeAndName();
const ColumnsWithTypeAndName & right_sample_columns = joined_plan->getCurrentDataStream().header.getColumnsWithTypeAndName();
bool need_convert = syntax->analyzed_join->applyJoinKeyConvert(left_sample_columns, right_sample_columns);
if (need_convert)
subquery_for_join.addJoinActions(std::make_shared<ExpressionActions>(
syntax->analyzed_join->rightConvertingActions(),
ExpressionActionsSettings::fromContext(getContext())));
{
auto converting_step = std::make_unique<ExpressionStep>(joined_plan->getCurrentDataStream(), syntax->analyzed_join->rightConvertingActions());
converting_step->setStepDescription("Convert joined columns");
joined_plan->addStep(std::move(converting_step));
}
subquery_for_join.join = makeJoin(syntax->analyzed_join, subquery_for_join.sample_block, getContext());
join = makeJoin(syntax->analyzed_join, joined_plan->getCurrentDataStream().header, getContext());
/// Do not make subquery for join over dictionary.
if (syntax->analyzed_join->dictionary_reader)
{
JoinPtr join = subquery_for_join.join;
subqueries_for_sets.erase(join_subquery_id);
return join;
}
joined_plan.reset();
}
else
{
const ColumnsWithTypeAndName & right_sample_columns = subquery_for_join.sample_block.getColumnsWithTypeAndName();
bool need_convert = syntax->analyzed_join->applyJoinKeyConvert(left_sample_columns, right_sample_columns);
if (need_convert)
subquery_for_join.addJoinActions(std::make_shared<ExpressionActions>(syntax->analyzed_join->rightConvertingActions()));
}
syntax->analyzed_join->applyJoinKeyConvert(left_sample_columns, {});
return subquery_for_join.join;
return join;
}
ActionsDAGPtr SelectQueryExpressionAnalyzer::appendPrewhere(
@ -1345,6 +1360,11 @@ ExpressionActionsPtr ExpressionAnalyzer::getConstActions(const ColumnsWithTypeAn
return std::make_shared<ExpressionActions>(actions, ExpressionActionsSettings::fromContext(getContext()));
}
std::unique_ptr<QueryPlan> SelectQueryExpressionAnalyzer::getJoinedPlan()
{
return std::move(joined_plan);
}
ActionsDAGPtr SelectQueryExpressionAnalyzer::simpleSelectActions()
{
ExpressionActionsChain new_chain(getContext());

View File

@ -47,9 +47,13 @@ bool sanitizeBlock(Block & block, bool throw_if_cannot_create_column = false);
/// ExpressionAnalyzer sources, intermediates and results. It splits data and logic, allows to test them separately.
struct ExpressionAnalyzerData
{
~ExpressionAnalyzerData();
SubqueriesForSets subqueries_for_sets;
PreparedSets prepared_sets;
std::unique_ptr<QueryPlan> joined_plan;
/// Columns after ARRAY JOIN. If there is no ARRAY JOIN, it's source_columns.
NamesAndTypesList columns_after_array_join;
/// Columns after Columns after ARRAY JOIN and JOIN. If there is no JOIN, it's columns_after_array_join.
@ -99,6 +103,8 @@ public:
: ExpressionAnalyzer(query_, syntax_analyzer_result_, context_, 0, false, {})
{}
~ExpressionAnalyzer();
void appendExpression(ExpressionActionsChain & chain, const ASTPtr & expr, bool only_types);
/// If `ast` is not a SELECT query, just gets all the actions to evaluate the expression.
@ -293,6 +299,7 @@ public:
const AggregateDescriptions & aggregates() const { return aggregate_descriptions; }
const PreparedSets & getPreparedSets() const { return prepared_sets; }
std::unique_ptr<QueryPlan> getJoinedPlan();
/// Tables that will need to be sent to remote servers for distributed query processing.
const TemporaryTablesMapping & getExternalTables() const { return external_tables; }

View File

@ -1504,6 +1504,7 @@ BlockInputStreamPtr HashJoin::createStreamWithNonJoinedRows(const Block & result
void HashJoin::reuseJoinedData(const HashJoin & join)
{
data = join.data;
from_storage_join = true;
joinDispatch(kind, strictness, data->maps, [this](auto kind_, auto strictness_, auto & map)
{
used_flags.reinit<kind_, strictness_>(map.getBufferSizeInCells(data->type) + 1);

View File

@ -159,6 +159,8 @@ public:
void joinTotals(Block & block) const override;
bool isFilled() const override { return from_storage_join || data->type == Type::DICT; }
/** For RIGHT and FULL JOINs.
* A stream that will contain default values from left table, joined with rows from right table, that was not joined before.
* Use only after all calls to joinBlock was done.
@ -344,6 +346,9 @@ private:
ASTTableJoin::Kind kind;
ASTTableJoin::Strictness strictness;
/// This join was created from StorageJoin and it is already filled.
bool from_storage_join = false;
/// Names of key columns in right-side table (in the order they appear in ON/USING clause). @note It could contain duplicates.
const Names & key_names_right;

View File

@ -41,6 +41,10 @@ public:
virtual size_t getTotalByteCount() const = 0;
virtual bool alwaysReturnsEmptySet() const { return false; }
/// StorageJoin/Dictionary is already filled. No need to call addJoinedBlock.
/// Different query plan is used for such joins.
virtual bool isFilled() const { return false; }
virtual BlockInputStreamPtr createStreamWithNonJoinedRows(const Block &, UInt64) const { return {}; }
};

View File

@ -47,6 +47,7 @@
#include <Processors/QueryPlan/FillingStep.h>
#include <Processors/QueryPlan/FilterStep.h>
#include <Processors/QueryPlan/FinishSortingStep.h>
#include <Processors/QueryPlan/JoinStep.h>
#include <Processors/QueryPlan/LimitByStep.h>
#include <Processors/QueryPlan/LimitStep.h>
#include <Processors/QueryPlan/MergeSortingStep.h>
@ -1117,14 +1118,37 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, const BlockInpu
if (expressions.hasJoin())
{
QueryPlanStepPtr join_step = std::make_unique<JoinStep>(
query_plan.getCurrentDataStream(),
expressions.join,
expressions.join_has_delayed_stream,
settings.max_block_size);
if (expressions.join->isFilled())
{
QueryPlanStepPtr filled_join_step = std::make_unique<FilledJoinStep>(
query_plan.getCurrentDataStream(),
expressions.join,
settings.max_block_size);
join_step->setStepDescription("JOIN");
query_plan.addStep(std::move(join_step));
filled_join_step->setStepDescription("JOIN");
query_plan.addStep(std::move(filled_join_step));
}
else
{
auto joined_plan = query_analyzer->getJoinedPlan();
if (!joined_plan)
throw Exception(ErrorCodes::LOGICAL_ERROR, "There is no joined plan for query");
QueryPlanStepPtr join_step = std::make_unique<JoinStep>(
query_plan.getCurrentDataStream(),
joined_plan->getCurrentDataStream(),
expressions.join,
settings.max_block_size);
join_step->setStepDescription("JOIN");
std::vector<QueryPlanPtr> plans;
plans.emplace_back(std::make_unique<QueryPlan>(std::move(query_plan)));
plans.emplace_back(std::move(joined_plan));
query_plan = QueryPlan();
query_plan.unitePlans(std::move(join_step), {std::move(plans)});
}
}
if (expressions.hasWhere())

View File

@ -1,9 +1,6 @@
#include <Interpreters/SubqueryForSet.h>
#include <Interpreters/InterpreterSelectWithUnionQuery.h>
#include <Interpreters/IJoin.h>
#include <Interpreters/MergeJoin.h>
#include <Interpreters/ExpressionActions.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Interpreters/PreparedSets.h>
namespace DB
{
@ -13,65 +10,4 @@ SubqueryForSet::~SubqueryForSet() = default;
SubqueryForSet::SubqueryForSet(SubqueryForSet &&) = default;
SubqueryForSet & SubqueryForSet::operator= (SubqueryForSet &&) = default;
void SubqueryForSet::makeSource(std::shared_ptr<InterpreterSelectWithUnionQuery> & interpreter,
NamesWithAliases && joined_block_aliases_)
{
joined_block_aliases = std::move(joined_block_aliases_);
source = std::make_unique<QueryPlan>();
interpreter->buildQueryPlan(*source);
sample_block = interpreter->getSampleBlock();
renameColumns(sample_block);
}
void SubqueryForSet::renameColumns(Block & block)
{
for (const auto & name_with_alias : joined_block_aliases)
{
if (block.has(name_with_alias.first))
{
auto pos = block.getPositionByName(name_with_alias.first);
auto column = block.getByPosition(pos);
block.erase(pos);
column.name = name_with_alias.second;
block.insert(std::move(column));
}
}
}
void SubqueryForSet::addJoinActions(ExpressionActionsPtr actions)
{
actions->execute(sample_block);
if (joined_block_actions == nullptr)
{
joined_block_actions = actions;
}
else
{
auto new_dag = ActionsDAG::merge(
std::move(*joined_block_actions->getActionsDAG().clone()),
std::move(*actions->getActionsDAG().clone()));
joined_block_actions = std::make_shared<ExpressionActions>(new_dag, actions->getSettings());
}
}
bool SubqueryForSet::insertJoinedBlock(Block & block)
{
renameColumns(block);
if (joined_block_actions)
joined_block_actions->execute(block);
return join->addJoinedBlock(block);
}
void SubqueryForSet::setTotals(Block totals)
{
if (join)
{
renameColumns(totals);
join->setTotals(totals);
}
}
}

View File

@ -2,19 +2,16 @@
#include <Core/Block.h>
#include <Storages/IStorage_fwd.h>
#include <Parsers/IAST.h>
#include <Interpreters/IJoin.h>
#include <Interpreters/PreparedSets.h>
namespace DB
{
class InterpreterSelectWithUnionQuery;
class ExpressionActions;
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
class QueryPlan;
class Set;
using SetPtr = std::shared_ptr<Set>;
/// Information on what to do when executing a subquery in the [GLOBAL] IN/JOIN section.
struct SubqueryForSet
{
@ -28,28 +25,10 @@ struct SubqueryForSet
/// If set, build it from result.
SetPtr set;
JoinPtr join;
/// Apply this actions to joined block.
ExpressionActionsPtr joined_block_actions;
Block sample_block; /// source->getHeader() + column renames
/// If set, put the result into the table.
/// This is a temporary table for transferring to remote servers for distributed query processing.
StoragePtr table;
void makeSource(std::shared_ptr<InterpreterSelectWithUnionQuery> & interpreter,
NamesWithAliases && joined_block_aliases_);
void addJoinActions(ExpressionActionsPtr actions);
bool insertJoinedBlock(Block & block);
void setTotals(Block totals);
private:
NamesWithAliases joined_block_aliases; /// Rename column from joined block from this list.
/// Rename source right table column names into qualified column names if they conflicts with left table ones.
void renameColumns(Block & block);
};
/// ID of subquery -> what to do with it.

View File

@ -19,7 +19,6 @@
#include <Interpreters/ReplaceQueryParameterVisitor.h>
#include <Poco/Util/AbstractConfiguration.h>
namespace DB
{

View File

@ -7,6 +7,7 @@
#include <Processors/Transforms/CreatingSetsTransform.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <Processors/Transforms/MergingAggregatedMemoryEfficientTransform.h>
#include <Processors/Transforms/JoiningTransform.h>
#include <Processors/Formats/IOutputFormat.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/Executors/PipelineExecutor.h>
@ -96,6 +97,12 @@ void QueryPipeline::addTransform(ProcessorPtr transform)
pipe.addTransform(std::move(transform));
}
void QueryPipeline::addTransform(ProcessorPtr transform, InputPort * totals, InputPort * extremes)
{
checkInitializedAndNotCompleted();
pipe.addTransform(std::move(transform), totals, extremes);
}
void QueryPipeline::transform(const Transformer & transformer)
{
checkInitializedAndNotCompleted();
@ -255,6 +262,96 @@ QueryPipeline QueryPipeline::unitePipelines(
return pipeline;
}
std::unique_ptr<QueryPipeline> QueryPipeline::joinPipelines(
std::unique_ptr<QueryPipeline> left,
std::unique_ptr<QueryPipeline> right,
JoinPtr join,
size_t max_block_size,
Processors * collected_processors)
{
left->checkInitializedAndNotCompleted();
right->checkInitializedAndNotCompleted();
/// Extremes before join are useless. They will be calculated after if needed.
left->pipe.dropExtremes();
right->pipe.dropExtremes();
left->pipe.collected_processors = collected_processors;
right->pipe.collected_processors = collected_processors;
/// In case joined subquery has totals, and we don't, add default chunk to totals.
bool default_totals = false;
if (!left->hasTotals() && right->hasTotals())
{
left->addDefaultTotals();
default_totals = true;
}
/// (left) ──────┐
/// ╞> Joining ─> (joined)
/// (left) ─┐┌───┘
/// └┼───┐
/// (right) ┐ (totals) ──┼─┐ ╞> Joining ─> (joined)
/// ╞> Resize ┐ ╓─┘┌┼─┘
/// (right) ┘ │ ╟──┘└─┐
/// ╞> FillingJoin ─> Resize ╣ ╞> Joining ─> (totals)
/// (totals) ─────────┘ ╙─────┘
size_t num_streams = left->getNumStreams();
right->resize(1);
auto adding_joined = std::make_shared<FillingRightJoinSideTransform>(right->getHeader(), join);
InputPort * totals_port = nullptr;
if (right->hasTotals())
totals_port = adding_joined->addTotalsPort();
right->addTransform(std::move(adding_joined), totals_port, nullptr);
size_t num_streams_including_totals = num_streams + (left->hasTotals() ? 1 : 0);
right->resize(num_streams_including_totals);
/// This counter is needed for every Joining except totals, to decide which Joining will generate non joined rows.
auto finish_counter = std::make_shared<JoiningTransform::FinishCounter>(num_streams);
auto lit = left->pipe.output_ports.begin();
auto rit = right->pipe.output_ports.begin();
for (size_t i = 0; i < num_streams; ++i)
{
auto joining = std::make_shared<JoiningTransform>(left->getHeader(), join, max_block_size, false, default_totals, finish_counter);
connect(**lit, joining->getInputs().front());
connect(**rit, joining->getInputs().back());
*lit = &joining->getOutputs().front();
++lit;
++rit;
if (collected_processors)
collected_processors->emplace_back(joining);
left->pipe.processors.emplace_back(std::move(joining));
}
if (left->hasTotals())
{
auto joining = std::make_shared<JoiningTransform>(left->getHeader(), join, max_block_size, true, default_totals);
connect(*left->pipe.totals_port, joining->getInputs().front());
connect(**rit, joining->getInputs().back());
left->pipe.totals_port = &joining->getOutputs().front();
++rit;
if (collected_processors)
collected_processors->emplace_back(joining);
left->pipe.processors.emplace_back(std::move(joining));
}
left->pipe.processors.insert(left->pipe.processors.end(), right->pipe.processors.begin(), right->pipe.processors.end());
left->pipe.holder = std::move(right->pipe.holder);
left->pipe.header = left->pipe.output_ports.front()->getHeader();
return left;
}
void QueryPipeline::addCreatingSetsTransform(const Block & res_header, SubqueryForSet subquery_for_set, const SizeLimits & limits, ContextPtr context)
{

View File

@ -27,6 +27,9 @@ struct SizeLimits;
struct ExpressionActionsSettings;
class IJoin;
using JoinPtr = std::shared_ptr<IJoin>;
class QueryPipeline
{
public:
@ -52,6 +55,7 @@ public:
void addSimpleTransform(const Pipe::ProcessorGetterWithStreamKind & getter);
/// Add transform with getNumStreams() input ports.
void addTransform(ProcessorPtr transform);
void addTransform(ProcessorPtr transform, InputPort * totals, InputPort * extremes);
using Transformer = std::function<Processors(OutputPortRawPtrs ports)>;
/// Transform pipeline in general way.
@ -90,6 +94,15 @@ public:
size_t max_threads_limit = 0,
Processors * collected_processors = nullptr);
/// Join two pipelines together using JoinPtr.
/// If collector is used, it will collect only newly-added processors, but not processors from pipelines.
static std::unique_ptr<QueryPipeline> joinPipelines(
std::unique_ptr<QueryPipeline> left,
std::unique_ptr<QueryPipeline> right,
JoinPtr join,
size_t max_block_size,
Processors * collected_processors = nullptr);
/// Add other pipeline and execute it before current one.
/// Pipeline must have empty header, it should not generate any chunk.
/// This is used for CreatingSets.

View File

@ -55,8 +55,8 @@ void CreatingSetStep::describeActions(FormatSettings & settings) const
settings.out << prefix;
if (subquery_for_set.set)
settings.out << "Set: ";
else if (subquery_for_set.join)
settings.out << "Join: ";
// else if (subquery_for_set.join)
// settings.out << "Join: ";
settings.out << description << '\n';
}
@ -65,8 +65,8 @@ void CreatingSetStep::describeActions(JSONBuilder::JSONMap & map) const
{
if (subquery_for_set.set)
map.add("Set", description);
else if (subquery_for_set.join)
map.add("Join", description);
// else if (subquery_for_set.join)
// map.add("Join", description);
}
@ -134,8 +134,6 @@ void addCreatingSetsStep(
continue;
auto plan = std::move(set.source);
std::string type = (set.join != nullptr) ? "JOIN"
: "subquery";
auto creating_set = std::make_unique<CreatingSetStep>(
plan->getCurrentDataStream(),
@ -143,7 +141,7 @@ void addCreatingSetsStep(
std::move(set),
limits,
context);
creating_set->setStepDescription("Create set for " + type);
creating_set->setStepDescription("Create set for subquery");
plan->addStep(std::move(creating_set));
input_streams.emplace_back(plan->getCurrentDataStream());

View File

@ -28,22 +28,6 @@ static ITransformingStep::Traits getTraits(const ActionsDAGPtr & actions)
};
}
static ITransformingStep::Traits getJoinTraits()
{
return ITransformingStep::Traits
{
{
.preserves_distinct_columns = false,
.returns_single_stream = false,
.preserves_number_of_streams = true,
.preserves_sorting = false,
},
{
.preserves_number_of_rows = false,
}
};
}
ExpressionStep::ExpressionStep(const DataStream & input_stream_, ActionsDAGPtr actions_dag_)
: ITransformingStep(
input_stream_,
@ -118,47 +102,4 @@ void ExpressionStep::describeActions(JSONBuilder::JSONMap & map) const
map.add("Expression", expression->toTree());
}
JoinStep::JoinStep(const DataStream & input_stream_, JoinPtr join_, bool has_non_joined_rows_, size_t max_block_size_)
: ITransformingStep(
input_stream_,
Transform::transformHeader(input_stream_.header, join_),
getJoinTraits())
, join(std::move(join_))
, has_non_joined_rows(has_non_joined_rows_)
, max_block_size(max_block_size_)
{
}
void JoinStep::transformPipeline(QueryPipeline & pipeline, const BuildQueryPipelineSettings &)
{
/// In case joined subquery has totals, and we don't, add default chunk to totals.
bool add_default_totals = false;
if (!pipeline.hasTotals())
{
pipeline.addDefaultTotals();
add_default_totals = true;
}
pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type)
{
bool on_totals = stream_type == QueryPipeline::StreamType::Totals;
return std::make_shared<Transform>(header, join, on_totals, add_default_totals);
});
if (has_non_joined_rows)
{
const Block & join_result_sample = pipeline.getHeader();
auto stream = std::make_shared<LazyNonJoinedBlockInputStream>(*join, join_result_sample, max_block_size);
auto source = std::make_shared<SourceFromInputStream>(std::move(stream));
source->setQueryPlanStep(this);
pipeline.addDelayedStream(source);
/// Now, after adding delayed stream, it has implicit dependency on other port.
/// Here we add resize processor to remove this dependency.
/// Otherwise, if we add MergeSorting + MergingSorted transform to pipeline, we could get `Pipeline stuck`
pipeline.resize(pipeline.getNumStreams(), true);
}
}
}

View File

@ -7,9 +7,6 @@ namespace DB
class ActionsDAG;
using ActionsDAGPtr = std::shared_ptr<ActionsDAG>;
class IJoin;
using JoinPtr = std::shared_ptr<IJoin>;
class ExpressionTransform;
class JoiningTransform;
@ -36,23 +33,4 @@ private:
ActionsDAGPtr actions_dag;
};
/// TODO: add separate step for join.
class JoinStep : public ITransformingStep
{
public:
using Transform = JoiningTransform;
explicit JoinStep(const DataStream & input_stream_, JoinPtr join_, bool has_non_joined_rows_, size_t max_block_size_);
String getName() const override { return "Join"; }
void transformPipeline(QueryPipeline & pipeline, const BuildQueryPipelineSettings &) override;
const JoinPtr & getJoin() const { return join; }
private:
JoinPtr join;
bool has_non_joined_rows;
size_t max_block_size;
};
}

View File

@ -0,0 +1,89 @@
#include <Processors/QueryPlan/JoinStep.h>
#include <Processors/QueryPipeline.h>
#include <Processors/Transforms/JoiningTransform.h>
#include <Interpreters/IJoin.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
JoinStep::JoinStep(
const DataStream & left_stream_,
const DataStream & right_stream_,
JoinPtr join_,
size_t max_block_size_)
: join(std::move(join_))
, max_block_size(max_block_size_)
{
input_streams = {left_stream_, right_stream_};
output_stream = DataStream
{
.header = JoiningTransform::transformHeader(left_stream_.header, join),
};
}
QueryPipelinePtr JoinStep::updatePipeline(QueryPipelines pipelines, const BuildQueryPipelineSettings &)
{
if (pipelines.size() != 2)
throw Exception(ErrorCodes::LOGICAL_ERROR, "JoinStep expect two input steps");
return QueryPipeline::joinPipelines(std::move(pipelines[0]), std::move(pipelines[1]), join, max_block_size, &processors);
}
void JoinStep::describePipeline(FormatSettings & settings) const
{
IQueryPlanStep::describePipeline(processors, settings);
}
static ITransformingStep::Traits getStorageJoinTraits()
{
return ITransformingStep::Traits
{
{
.preserves_distinct_columns = false,
.returns_single_stream = false,
.preserves_number_of_streams = true,
.preserves_sorting = false,
},
{
.preserves_number_of_rows = false,
}
};
}
FilledJoinStep::FilledJoinStep(const DataStream & input_stream_, JoinPtr join_, size_t max_block_size_)
: ITransformingStep(
input_stream_,
JoiningTransform::transformHeader(input_stream_.header, join_),
getStorageJoinTraits())
, join(std::move(join_))
, max_block_size(max_block_size_)
{
if (!join->isFilled())
throw Exception(ErrorCodes::LOGICAL_ERROR, "FilledJoinStep expects Join to be filled");
}
void FilledJoinStep::transformPipeline(QueryPipeline & pipeline, const BuildQueryPipelineSettings &)
{
bool default_totals = false;
if (!pipeline.hasTotals() && join->hasTotals())
{
pipeline.addDefaultTotals();
default_totals = true;
}
auto finish_counter = std::make_shared<JoiningTransform::FinishCounter>(pipeline.getNumStreams());
pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type)
{
bool on_totals = stream_type == QueryPipeline::StreamType::Totals;
auto counter = on_totals ? nullptr : finish_counter;
return std::make_shared<JoiningTransform>(header, join, max_block_size, on_totals, default_totals, counter);
});
}
}

View File

@ -0,0 +1,50 @@
#pragma once
#include <Processors/QueryPlan/IQueryPlanStep.h>
#include <Processors/QueryPlan/ITransformingStep.h>
namespace DB
{
class IJoin;
using JoinPtr = std::shared_ptr<IJoin>;
/// Join two data streams.
class JoinStep : public IQueryPlanStep
{
public:
JoinStep(
const DataStream & left_stream_,
const DataStream & right_stream_,
JoinPtr join_,
size_t max_block_size_);
String getName() const override { return "Join"; }
QueryPipelinePtr updatePipeline(QueryPipelines pipelines, const BuildQueryPipelineSettings &) override;
void describePipeline(FormatSettings & settings) const override;
const JoinPtr & getJoin() const { return join; }
private:
JoinPtr join;
size_t max_block_size;
Processors processors;
};
/// Special step for the case when Join is already filled.
/// For StorageJoin and Dictionary.
class FilledJoinStep : public ITransformingStep
{
public:
FilledJoinStep(const DataStream & input_stream_, JoinPtr join_, size_t max_block_size_);
String getName() const override { return "FilledJoin"; }
void transformPipeline(QueryPipeline & pipeline, const BuildQueryPipelineSettings &) override;
private:
JoinPtr join;
size_t max_block_size;
};
}

View File

@ -3,6 +3,7 @@
#include <Processors/QueryPlan/FilterStep.h>
#include <Processors/QueryPlan/AggregatingStep.h>
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/QueryPlan/JoinStep.h>
#include <Processors/QueryPlan/ArrayJoinStep.h>
#include <Processors/QueryPlan/CreatingSetsStep.h>
#include <Processors/QueryPlan/CubeStep.h>
@ -72,8 +73,8 @@ static size_t tryAddNewFilterStep(
/// Add new Filter step before Aggregating.
/// Expression/Filter -> Aggregating -> Something
auto & node = nodes.emplace_back();
node.children.swap(child_node->children);
child_node->children.emplace_back(&node);
node.children.emplace_back(&node);
std::swap(node.children[0], child_node->children[0]);
/// Expression/Filter -> Aggregating -> Filter -> Something
/// New filter column is the first one.

View File

@ -45,8 +45,6 @@ void CreatingSetsTransform::startSubquery()
{
if (subquery.set)
LOG_TRACE(log, "Creating set.");
if (subquery.join)
LOG_TRACE(log, "Creating join.");
if (subquery.table)
LOG_TRACE(log, "Filling temporary table.");
@ -54,10 +52,9 @@ void CreatingSetsTransform::startSubquery()
table_out = subquery.table->write({}, subquery.table->getInMemoryMetadataPtr(), getContext());
done_with_set = !subquery.set;
done_with_join = !subquery.join;
done_with_table = !subquery.table;
if (done_with_set && done_with_join && done_with_table)
if (done_with_set /*&& done_with_join*/ && done_with_table)
throw Exception("Logical error: nothing to do with subquery", ErrorCodes::LOGICAL_ERROR);
if (table_out)
@ -72,8 +69,6 @@ void CreatingSetsTransform::finishSubquery()
if (subquery.set)
LOG_DEBUG(log, "Created Set with {} entries from {} rows in {} sec.", subquery.set->getTotalRowCount(), read_rows, seconds);
if (subquery.join)
LOG_DEBUG(log, "Created Join with {} entries from {} rows in {} sec.", subquery.join->getTotalRowCount(), read_rows, seconds);
if (subquery.table)
LOG_DEBUG(log, "Created Table with {} rows in {} sec.", read_rows, seconds);
}
@ -81,12 +76,6 @@ void CreatingSetsTransform::finishSubquery()
{
LOG_DEBUG(log, "Subquery has empty result.");
}
if (totals)
subquery.setTotals(getInputPort().getHeader().cloneWithColumns(totals.detachColumns()));
else
/// Set empty totals anyway, it is needed for MergeJoin.
subquery.setTotals({});
}
void CreatingSetsTransform::init()
@ -111,12 +100,6 @@ void CreatingSetsTransform::consume(Chunk chunk)
done_with_set = true;
}
if (!done_with_join)
{
if (!subquery.insertJoinedBlock(block))
done_with_join = true;
}
if (!done_with_table)
{
block = materializeBlock(block);
@ -130,7 +113,7 @@ void CreatingSetsTransform::consume(Chunk chunk)
done_with_table = true;
}
if (done_with_set && done_with_join && done_with_table)
if (done_with_set && done_with_table)
finishConsume();
}

View File

@ -1,6 +1,7 @@
#pragma once
#include <DataStreams/SizeLimits.h>
#include <DataStreams/IBlockStream_fwd.h>
#include <Interpreters/Context_fwd.h>
#include <Interpreters/SubqueryForSet.h>
#include <Processors/IAccumulatingTransform.h>
@ -43,7 +44,7 @@ private:
Stopwatch watch;
bool done_with_set = true;
bool done_with_join = true;
//bool done_with_join = true;
bool done_with_table = true;
SizeLimits network_transfer_limits;

View File

@ -1,10 +1,17 @@
#include <Processors/Transforms/JoiningTransform.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/ExpressionActions.h>
#include <DataStreams/IBlockInputStream.h>
#include <DataTypes/DataTypesNumber.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
Block JoiningTransform::transformHeader(Block header, const JoinPtr & join)
{
ExtraBlockPtr tmp;
@ -12,13 +19,128 @@ Block JoiningTransform::transformHeader(Block header, const JoinPtr & join)
return header;
}
JoiningTransform::JoiningTransform(Block input_header, JoinPtr join_,
bool on_totals_, bool default_totals_)
: ISimpleTransform(input_header, transformHeader(input_header, join_), on_totals_)
JoiningTransform::JoiningTransform(
Block input_header,
JoinPtr join_,
size_t max_block_size_,
bool on_totals_,
bool default_totals_,
FinishCounterPtr finish_counter_)
: IProcessor({input_header}, {transformHeader(input_header, join_)})
, join(std::move(join_))
, on_totals(on_totals_)
, default_totals(default_totals_)
{}
, finish_counter(std::move(finish_counter_))
, max_block_size(max_block_size_)
{
if (!join->isFilled())
inputs.emplace_back(Block(), this);
}
IProcessor::Status JoiningTransform::prepare()
{
auto & output = outputs.front();
/// Check can output.
if (output.isFinished() || stop_reading)
{
output.finish();
for (auto & input : inputs)
input.close();
return Status::Finished;
}
if (!output.canPush())
{
for (auto & input : inputs)
input.setNotNeeded();
return Status::PortFull;
}
/// Output if has data.
if (has_output)
{
output.push(std::move(output_chunk));
has_output = false;
return Status::PortFull;
}
if (inputs.size() > 1)
{
auto & last_in = inputs.back();
if (!last_in.isFinished())
{
last_in.setNeeded();
if (last_in.hasData())
throw Exception(ErrorCodes::LOGICAL_ERROR, "No data is expected from second JoiningTransform port");
return Status::NeedData;
}
}
if (has_input)
return Status::Ready;
auto & input = inputs.front();
if (input.isFinished())
{
if (process_non_joined)
return Status::Ready;
output.finish();
return Status::Finished;
}
input.setNeeded();
if (!input.hasData())
return Status::NeedData;
input_chunk = input.pull(true);
has_input = true;
return Status::Ready;
}
void JoiningTransform::work()
{
if (has_input)
{
transform(input_chunk);
output_chunk.swap(input_chunk);
has_input = not_processed != nullptr;
has_output = !output_chunk.empty();
}
else
{
if (!non_joined_stream)
{
if (!finish_counter || !finish_counter->isLast())
{
process_non_joined = false;
return;
}
non_joined_stream = join->createStreamWithNonJoinedRows(outputs.front().getHeader(), max_block_size);
if (!non_joined_stream)
{
process_non_joined = false;
return;
}
}
auto block = non_joined_stream->read();
if (!block)
{
process_non_joined = false;
return;
}
auto rows = block.rows();
output_chunk.setColumns(block.getColumns(), rows);
has_output = true;
}
}
void JoiningTransform::transform(Chunk & chunk)
{
@ -28,7 +150,7 @@ void JoiningTransform::transform(Chunk & chunk)
if (join->alwaysReturnsEmptySet() && !on_totals)
{
stopReading();
stop_reading = true;
chunk.clear();
return;
}
@ -42,7 +164,7 @@ void JoiningTransform::transform(Chunk & chunk)
auto cols = chunk.detachColumns();
for (auto & col : cols)
col = col->cloneResized(1);
block = getInputPort().getHeader().cloneWithColumns(std::move(cols));
block = inputs.front().getHeader().cloneWithColumns(std::move(cols));
/// Drop totals if both out stream and joined stream doesn't have ones.
/// See comment in ExpressionTransform.h
@ -61,29 +183,122 @@ void JoiningTransform::transform(Chunk & chunk)
Block JoiningTransform::readExecute(Chunk & chunk)
{
Block res;
// std::cerr << "=== Chunk rows " << chunk.getNumRows() << " cols " << chunk.getNumColumns() << std::endl;
if (!not_processed)
{
// std::cerr << "!not_processed " << std::endl;
if (chunk.hasColumns())
res = getInputPort().getHeader().cloneWithColumns(chunk.detachColumns());
res = inputs.front().getHeader().cloneWithColumns(chunk.detachColumns());
if (res)
join->joinBlock(res, not_processed);
}
else if (not_processed->empty()) /// There's not processed data inside expression.
{
// std::cerr << "not_processed->empty() " << std::endl;
if (chunk.hasColumns())
res = getInputPort().getHeader().cloneWithColumns(chunk.detachColumns());
res = inputs.front().getHeader().cloneWithColumns(chunk.detachColumns());
not_processed.reset();
join->joinBlock(res, not_processed);
}
else
{
// std::cerr << "not not_processed->empty() " << std::endl;
res = std::move(not_processed->block);
join->joinBlock(res, not_processed);
}
// std::cerr << "Res block rows " << res.rows() << " cols " << res.columns() << std::endl;
return res;
}
FillingRightJoinSideTransform::FillingRightJoinSideTransform(Block input_header, JoinPtr join_)
: IProcessor({input_header}, {Block()})
, join(std::move(join_))
{}
InputPort * FillingRightJoinSideTransform::addTotalsPort()
{
if (inputs.size() > 1)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Totals port was already added to FillingRightJoinSideTransform");
return &inputs.emplace_back(inputs.front().getHeader(), this);
}
IProcessor::Status FillingRightJoinSideTransform::prepare()
{
auto & output = outputs.front();
/// Check can output.
if (output.isFinished())
{
for (auto & input : inputs)
input.close();
return Status::Finished;
}
if (!output.canPush())
{
for (auto & input : inputs)
input.setNotNeeded();
return Status::PortFull;
}
auto & input = inputs.front();
if (stop_reading)
{
input.close();
}
else if (!input.isFinished())
{
input.setNeeded();
if (!input.hasData())
return Status::NeedData;
chunk = input.pull(true);
return Status::Ready;
}
if (inputs.size() > 1)
{
auto & totals_input = inputs.back();
if (!totals_input.isFinished())
{
totals_input.setNeeded();
if (!totals_input.hasData())
return Status::NeedData;
chunk = totals_input.pull(true);
for_totals = true;
return Status::Ready;
}
}
else if (!set_totals)
{
chunk.setColumns(inputs.front().getHeader().cloneEmpty().getColumns(), 0);
for_totals = true;
return Status::Ready;
}
output.finish();
return Status::Finished;
}
void FillingRightJoinSideTransform::work()
{
auto block = inputs.front().getHeader().cloneWithColumns(chunk.detachColumns());
if (for_totals)
join->setTotals(block);
else
stop_reading = !join->addJoinedBlock(block);
set_totals = for_totals;
}
}

View File

@ -1,5 +1,5 @@
#pragma once
#include <Processors/ISimpleTransform.h>
#include <Processors/IProcessor.h>
namespace DB
@ -8,21 +8,63 @@ namespace DB
class IJoin;
using JoinPtr = std::shared_ptr<IJoin>;
class JoiningTransform : public ISimpleTransform
class IBlockInputStream;
using BlockInputStreamPtr = std::shared_ptr<IBlockInputStream>;
/// Join rows to chunk form left table.
/// This transform usually has two input ports and one output.
/// First input is for data from left table.
/// Second input has empty header and is connected with FillingRightJoinSide.
/// We can process left table only when Join is filled. Second input is used to signal that FillingRightJoinSide is finished.
class JoiningTransform : public IProcessor
{
public:
JoiningTransform(Block input_header, JoinPtr join_,
bool on_totals_ = false, bool default_totals_ = false);
/// Count streams and check which is last.
/// The last one should process non-joined rows.
class FinishCounter
{
public:
explicit FinishCounter(size_t total_) : total(total_) {}
bool isLast()
{
return finished.fetch_add(1) + 1 >= total;
}
private:
const size_t total;
std::atomic<size_t> finished{0};
};
using FinishCounterPtr = std::shared_ptr<FinishCounter>;
JoiningTransform(
Block input_header,
JoinPtr join_,
size_t max_block_size_,
bool on_totals_ = false,
bool default_totals_ = false,
FinishCounterPtr finish_counter_ = nullptr);
String getName() const override { return "JoiningTransform"; }
static Block transformHeader(Block header, const JoinPtr & join);
Status prepare() override;
void work() override;
protected:
void transform(Chunk & chunk) override;
bool needInputData() const override { return !not_processed; }
void transform(Chunk & chunk);
private:
Chunk input_chunk;
Chunk output_chunk;
bool has_input = false;
bool has_output = false;
bool stop_reading = false;
bool process_non_joined = true;
JoinPtr join;
bool on_totals;
/// This flag means that we have manually added totals to our pipeline.
@ -33,7 +75,33 @@ private:
ExtraBlockPtr not_processed;
FinishCounterPtr finish_counter;
BlockInputStreamPtr non_joined_stream;
size_t max_block_size;
Block readExecute(Chunk & chunk);
};
/// Fills Join with block from right table.
/// Has single input and single output port.
/// Output port has empty header. It is closed when al data is inserted in join.
class FillingRightJoinSideTransform : public IProcessor
{
public:
FillingRightJoinSideTransform(Block input_header, JoinPtr join_);
String getName() const override { return "FillingRightJoinSide"; }
InputPort * addTotalsPort();
Status prepare() override;
void work() override;
private:
JoinPtr join;
Chunk chunk;
bool stop_reading = false;
bool for_totals = false;
bool set_totals = false;
};
}

View File

@ -107,6 +107,7 @@ SRCS(
QueryPlan/IQueryPlanStep.cpp
QueryPlan/ISourceStep.cpp
QueryPlan/ITransformingStep.cpp
QueryPlan/JoinStep.cpp
QueryPlan/LimitByStep.cpp
QueryPlan/LimitStep.cpp
QueryPlan/MergeSortingStep.cpp