mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-12 17:32:32 +00:00
135 lines
4.1 KiB
C++
135 lines
4.1 KiB
C++
#include <Processors/QueryPlan/JoinStep.h>
|
|
#include <QueryPipeline/QueryPipelineBuilder.h>
|
|
#include <Processors/Transforms/JoiningTransform.h>
|
|
#include <Interpreters/IJoin.h>
|
|
#include <Common/typeid_cast.h>
|
|
|
|
namespace DB
|
|
{
|
|
|
|
namespace ErrorCodes
|
|
{
|
|
extern const int LOGICAL_ERROR;
|
|
}
|
|
|
|
JoinStep::JoinStep(
|
|
const DataStream & left_stream_,
|
|
const DataStream & right_stream_,
|
|
JoinPtr join_,
|
|
size_t max_block_size_,
|
|
size_t max_streams_,
|
|
bool keep_left_read_in_order_)
|
|
: join(std::move(join_)), max_block_size(max_block_size_), max_streams(max_streams_), keep_left_read_in_order(keep_left_read_in_order_)
|
|
{
|
|
input_streams = {left_stream_, right_stream_};
|
|
output_stream = DataStream
|
|
{
|
|
.header = JoiningTransform::transformHeader(left_stream_.header, join),
|
|
};
|
|
}
|
|
|
|
QueryPipelineBuilderPtr JoinStep::updatePipeline(QueryPipelineBuilders pipelines, const BuildQueryPipelineSettings &)
|
|
{
|
|
if (pipelines.size() != 2)
|
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "JoinStep expect two input steps");
|
|
|
|
if (join->pipelineType() == JoinPipelineType::YShaped)
|
|
{
|
|
auto joined_pipeline = QueryPipelineBuilder::joinPipelinesYShaped(
|
|
std::move(pipelines[0]), std::move(pipelines[1]), join, output_stream->header, max_block_size, &processors);
|
|
joined_pipeline->resize(max_streams);
|
|
return joined_pipeline;
|
|
}
|
|
|
|
return QueryPipelineBuilder::joinPipelinesRightLeft(
|
|
std::move(pipelines[0]),
|
|
std::move(pipelines[1]),
|
|
join,
|
|
output_stream->header,
|
|
max_block_size,
|
|
max_streams,
|
|
keep_left_read_in_order,
|
|
&processors);
|
|
}
|
|
|
|
bool JoinStep::allowPushDownToRight() const
|
|
{
|
|
return join->pipelineType() == JoinPipelineType::YShaped;
|
|
}
|
|
|
|
void JoinStep::describePipeline(FormatSettings & settings) const
|
|
{
|
|
IQueryPlanStep::describePipeline(processors, settings);
|
|
}
|
|
|
|
void JoinStep::updateInputStream(const DataStream & new_input_stream_, size_t idx)
|
|
{
|
|
if (idx == 0)
|
|
{
|
|
input_streams = {new_input_stream_, input_streams.at(1)};
|
|
output_stream = DataStream
|
|
{
|
|
.header = JoiningTransform::transformHeader(new_input_stream_.header, join),
|
|
};
|
|
}
|
|
else
|
|
{
|
|
input_streams = {input_streams.at(0), new_input_stream_};
|
|
}
|
|
}
|
|
|
|
static ITransformingStep::Traits getStorageJoinTraits()
|
|
{
|
|
return ITransformingStep::Traits
|
|
{
|
|
{
|
|
.preserves_distinct_columns = false,
|
|
.returns_single_stream = false,
|
|
.preserves_number_of_streams = true,
|
|
.preserves_sorting = false,
|
|
},
|
|
{
|
|
.preserves_number_of_rows = false,
|
|
}
|
|
};
|
|
}
|
|
|
|
FilledJoinStep::FilledJoinStep(const DataStream & input_stream_, JoinPtr join_, size_t max_block_size_)
|
|
: ITransformingStep(
|
|
input_stream_,
|
|
JoiningTransform::transformHeader(input_stream_.header, join_),
|
|
getStorageJoinTraits())
|
|
, join(std::move(join_))
|
|
, max_block_size(max_block_size_)
|
|
{
|
|
if (!join->isFilled())
|
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "FilledJoinStep expects Join to be filled");
|
|
}
|
|
|
|
void FilledJoinStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
|
|
{
|
|
bool default_totals = false;
|
|
if (!pipeline.hasTotals() && join->getTotals())
|
|
{
|
|
pipeline.addDefaultTotals();
|
|
default_totals = true;
|
|
}
|
|
|
|
auto finish_counter = std::make_shared<JoiningTransform::FinishCounter>(pipeline.getNumStreams());
|
|
|
|
pipeline.addSimpleTransform([&](const Block & header, QueryPipelineBuilder::StreamType stream_type)
|
|
{
|
|
bool on_totals = stream_type == QueryPipelineBuilder::StreamType::Totals;
|
|
auto counter = on_totals ? nullptr : finish_counter;
|
|
return std::make_shared<JoiningTransform>(header, output_stream->header, join, max_block_size, on_totals, default_totals, counter);
|
|
});
|
|
}
|
|
|
|
void FilledJoinStep::updateOutputStream()
|
|
{
|
|
output_stream = createOutputStream(
|
|
input_streams.front(), JoiningTransform::transformHeader(input_streams.front().header, join), getDataStreamTraits());
|
|
}
|
|
|
|
}
|