Merge pull request #39418 from vdimir/join_and_sets

Filter joined streams for `full_sorting_join` by each other before sorting
This commit is contained in:
Vladimir C 2022-09-02 13:57:06 +02:00 committed by GitHub
commit 963c0111bf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 1155 additions and 53 deletions

View File

@ -366,6 +366,8 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(UInt64, partial_merge_join_left_table_buffer_bytes, 0, "If not 0 group left table blocks in bigger ones for left-side table in partial merge join. It uses up to 2x of specified memory per joining thread.", 0) \ M(UInt64, partial_merge_join_left_table_buffer_bytes, 0, "If not 0 group left table blocks in bigger ones for left-side table in partial merge join. It uses up to 2x of specified memory per joining thread.", 0) \
M(UInt64, partial_merge_join_rows_in_right_blocks, 65536, "Split right-hand joining data in blocks of specified size. It's a portion of data indexed by min-max values and possibly unloaded on disk.", 0) \ M(UInt64, partial_merge_join_rows_in_right_blocks, 65536, "Split right-hand joining data in blocks of specified size. It's a portion of data indexed by min-max values and possibly unloaded on disk.", 0) \
M(UInt64, join_on_disk_max_files_to_merge, 64, "For MergeJoin on disk set how much files it's allowed to sort simultaneously. Then this value bigger then more memory used and then less disk I/O needed. Minimum is 2.", 0) \ M(UInt64, join_on_disk_max_files_to_merge, 64, "For MergeJoin on disk set how much files it's allowed to sort simultaneously. Then this value bigger then more memory used and then less disk I/O needed. Minimum is 2.", 0) \
M(UInt64, max_rows_in_set_to_optimize_join, 100'000, "Maximal size of the set to filter joined tables by each other row sets before joining. 0 - disable.", 0) \
\
M(Bool, compatibility_ignore_collation_in_create_table, true, "Compatibility ignore collation in create table", 0) \ M(Bool, compatibility_ignore_collation_in_create_table, true, "Compatibility ignore collation in create table", 0) \
\ \
M(String, temporary_files_codec, "LZ4", "Set compression codec for temporary files (sort and join on disk). I.e. LZ4, NONE.", 0) \ M(String, temporary_files_codec, "LZ4", "Set compression codec for temporary files (sort and join on disk). I.e. LZ4, NONE.", 0) \

View File

@ -39,6 +39,7 @@
#include <QueryPipeline/Pipe.h> #include <QueryPipeline/Pipe.h>
#include <Processors/QueryPlan/AggregatingStep.h> #include <Processors/QueryPlan/AggregatingStep.h>
#include <Processors/QueryPlan/ArrayJoinStep.h> #include <Processors/QueryPlan/ArrayJoinStep.h>
#include <Processors/QueryPlan/CreateSetAndFilterOnTheFlyStep.h>
#include <Processors/QueryPlan/CreatingSetsStep.h> #include <Processors/QueryPlan/CreatingSetsStep.h>
#include <Processors/QueryPlan/CubeStep.h> #include <Processors/QueryPlan/CubeStep.h>
#include <Processors/QueryPlan/DistinctStep.h> #include <Processors/QueryPlan/DistinctStep.h>
@ -1436,7 +1437,7 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P
if (!joined_plan) if (!joined_plan)
throw Exception(ErrorCodes::LOGICAL_ERROR, "There is no joined plan for query"); throw Exception(ErrorCodes::LOGICAL_ERROR, "There is no joined plan for query");
auto add_sorting = [&settings, this] (QueryPlan & plan, const Names & key_names, bool is_right) auto add_sorting = [&settings, this] (QueryPlan & plan, const Names & key_names, JoinTableSide join_pos)
{ {
SortDescription order_descr; SortDescription order_descr;
order_descr.reserve(key_names.size()); order_descr.reserve(key_names.size());
@ -1455,15 +1456,43 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P
this->context->getTemporaryVolume(), this->context->getTemporaryVolume(),
settings.min_free_disk_space_for_temporary_data, settings.min_free_disk_space_for_temporary_data,
settings.optimize_sorting_by_input_stream_properties); settings.optimize_sorting_by_input_stream_properties);
sorting_step->setStepDescription(fmt::format("Sort {} before JOIN", is_right ? "right" : "left")); sorting_step->setStepDescription(fmt::format("Sort {} before JOIN", join_pos));
plan.addStep(std::move(sorting_step)); plan.addStep(std::move(sorting_step));
}; };
auto crosswise_connection = CreateSetAndFilterOnTheFlyStep::createCrossConnection();
auto add_create_set = [&settings, crosswise_connection](QueryPlan & plan, const Names & key_names, JoinTableSide join_pos)
{
auto creating_set_step = std::make_unique<CreateSetAndFilterOnTheFlyStep>(
plan.getCurrentDataStream(), key_names, settings.max_rows_in_set_to_optimize_join, crosswise_connection, join_pos);
creating_set_step->setStepDescription(fmt::format("Create set and filter {} joined stream", join_pos));
auto * step_raw_ptr = creating_set_step.get();
plan.addStep(std::move(creating_set_step));
return step_raw_ptr;
};
if (expressions.join->pipelineType() == JoinPipelineType::YShaped) if (expressions.join->pipelineType() == JoinPipelineType::YShaped)
{ {
const auto & join_clause = expressions.join->getTableJoin().getOnlyClause(); const auto & table_join = expressions.join->getTableJoin();
add_sorting(query_plan, join_clause.key_names_left, false); const auto & join_clause = table_join.getOnlyClause();
add_sorting(*joined_plan, join_clause.key_names_right, true);
auto join_kind = table_join.kind();
bool kind_allows_filtering = isInner(join_kind) || isLeft(join_kind) || isRight(join_kind);
if (settings.max_rows_in_set_to_optimize_join > 0 && kind_allows_filtering)
{
auto * left_set = add_create_set(query_plan, join_clause.key_names_left, JoinTableSide::Left);
auto * right_set = add_create_set(*joined_plan, join_clause.key_names_right, JoinTableSide::Right);
if (isInnerOrLeft(join_kind))
right_set->setFiltering(left_set->getSet());
if (isInnerOrRight(join_kind))
left_set->setFiltering(right_set->getSet());
}
add_sorting(query_plan, join_clause.key_names_left, JoinTableSide::Left);
add_sorting(*joined_plan, join_clause.key_names_right, JoinTableSide::Right);
} }
QueryPlanStepPtr join_step = std::make_unique<JoinStep>( QueryPlanStepPtr join_step = std::make_unique<JoinStep>(

View File

@ -22,6 +22,8 @@
#include <Interpreters/castColumn.h> #include <Interpreters/castColumn.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <Processors/Chunk.h>
#include <Storages/MergeTree/KeyCondition.h> #include <Storages/MergeTree/KeyCondition.h>
#include <base/range.h> #include <base/range.h>
@ -162,8 +164,16 @@ void Set::setHeader(const ColumnsWithTypeAndName & header)
data.init(data.chooseMethod(key_columns, key_sizes)); data.init(data.chooseMethod(key_columns, key_sizes));
} }
bool Set::insertFromBlock(const ColumnsWithTypeAndName & columns) bool Set::insertFromBlock(const ColumnsWithTypeAndName & columns)
{
Columns cols;
cols.reserve(columns.size());
for (const auto & column : columns)
cols.emplace_back(column.column);
return insertFromBlock(cols);
}
bool Set::insertFromBlock(const Columns & columns)
{ {
std::lock_guard<std::shared_mutex> lock(rwlock); std::lock_guard<std::shared_mutex> lock(rwlock);
@ -179,11 +189,11 @@ bool Set::insertFromBlock(const ColumnsWithTypeAndName & columns)
/// Remember the columns we will work with /// Remember the columns we will work with
for (size_t i = 0; i < keys_size; ++i) for (size_t i = 0; i < keys_size; ++i)
{ {
materialized_columns.emplace_back(columns.at(i).column->convertToFullIfNeeded()); materialized_columns.emplace_back(columns.at(i)->convertToFullIfNeeded());
key_columns.emplace_back(materialized_columns.back().get()); key_columns.emplace_back(materialized_columns.back().get());
} }
size_t rows = columns.at(0).column->size(); size_t rows = columns.at(0)->size();
/// We will insert to the Set only keys, where all components are not NULL. /// We will insert to the Set only keys, where all components are not NULL.
ConstNullMapPtr null_map{}; ConstNullMapPtr null_map{};

View File

@ -20,6 +20,7 @@ class Context;
class IFunctionBase; class IFunctionBase;
using FunctionBasePtr = std::shared_ptr<IFunctionBase>; using FunctionBasePtr = std::shared_ptr<IFunctionBase>;
class Chunk;
/** Data structure for implementation of IN expression. /** Data structure for implementation of IN expression.
*/ */
@ -45,11 +46,14 @@ public:
void setHeader(const ColumnsWithTypeAndName & header); void setHeader(const ColumnsWithTypeAndName & header);
/// Returns false, if some limit was exceeded and no need to insert more data. /// Returns false, if some limit was exceeded and no need to insert more data.
bool insertFromBlock(const Columns & columns);
bool insertFromBlock(const ColumnsWithTypeAndName & columns); bool insertFromBlock(const ColumnsWithTypeAndName & columns);
/// Call after all blocks were inserted. To get the information that set is already created. /// Call after all blocks were inserted. To get the information that set is already created.
void finishInsert() { is_created = true; } void finishInsert() { is_created = true; }
bool isCreated() const { return is_created; } /// finishInsert and isCreated are thread-safe
bool isCreated() const { return is_created.load(); }
/** For columns of 'block', check belonging of corresponding rows to the set. /** For columns of 'block', check belonging of corresponding rows to the set.
* Return UInt8 column with the result. * Return UInt8 column with the result.
@ -111,7 +115,7 @@ private:
bool transform_null_in; bool transform_null_in;
/// Check if set contains all the data. /// Check if set contains all the data.
bool is_created = false; std::atomic<bool> is_created = false;
/// If in the left part columns contains the same types as the elements of the set. /// If in the left part columns contains the same types as the elements of the set.
void executeOrdinary( void executeOrdinary(

View File

@ -73,16 +73,32 @@ public:
return key_names_right.size(); return key_names_right.size();
} }
String formatDebug() const String formatDebug(bool short_format = false) const
{ {
return fmt::format("Left keys: [{}] Right keys [{}] Condition columns: '{}', '{}'", const auto & [left_cond, right_cond] = condColumnNames();
fmt::join(key_names_left, ", "), fmt::join(key_names_right, ", "),
condColumnNames().first, condColumnNames().second); if (short_format)
{
return fmt::format("({}) = ({}){}{}", fmt::join(key_names_left, ", "), fmt::join(key_names_right, ", "),
!left_cond.empty() ? " AND " + left_cond : "", !right_cond.empty() ? " AND " + right_cond : "");
}
return fmt::format(
"Left keys: [{}] Right keys [{}] Condition columns: '{}', '{}'",
fmt::join(key_names_left, ", "), fmt::join(key_names_right, ", "), left_cond, right_cond);
} }
}; };
using Clauses = std::vector<JoinOnClause>; using Clauses = std::vector<JoinOnClause>;
static std::string formatClauses(const Clauses & clauses, bool short_format = false)
{
std::vector<std::string> res;
for (const auto & clause : clauses)
res.push_back("[" + clause.formatDebug(short_format) + "]");
return fmt::format("{}", fmt::join(res, "; "));
}
private: private:
/** Query of the form `SELECT expr(x) AS k FROM t1 ANY LEFT JOIN (SELECT expr(x) AS k FROM t2) USING k` /** Query of the form `SELECT expr(x) AS k FROM t1 ANY LEFT JOIN (SELECT expr(x) AS k FROM t2) USING k`
* The join is made by column k. * The join is made by column k.

View File

@ -0,0 +1,198 @@
#include <Processors/PingPongProcessor.h>
namespace DB
{
/// Create list with `num_ports` of regular ports and 1 auxiliary port with empty header.
template <typename T> requires std::is_same_v<T, InputPorts> || std::is_same_v<T, OutputPorts>
static T createPortsWithSpecial(const Block & header, size_t num_ports)
{
T res(num_ports, header);
res.emplace_back(Block());
return res;
}
PingPongProcessor::PingPongProcessor(const Block & header, size_t num_ports, Order order_)
: IProcessor(createPortsWithSpecial<InputPorts>(header, num_ports),
createPortsWithSpecial<OutputPorts>(header, num_ports))
, aux_in_port(inputs.back())
, aux_out_port(outputs.back())
, order(order_)
{
assert(order == First || order == Second);
port_pairs.resize(num_ports);
auto input_it = inputs.begin();
auto output_it = outputs.begin();
for (size_t i = 0; i < num_ports; ++i)
{
port_pairs[i].input_port = &*input_it;
++input_it;
port_pairs[i].output_port = &*output_it;
++output_it;
}
}
void PingPongProcessor::finishPair(PortsPair & pair)
{
if (!pair.is_finished)
{
pair.output_port->finish();
pair.input_port->close();
pair.is_finished = true;
++num_finished_pairs;
}
}
bool PingPongProcessor::processPair(PortsPair & pair)
{
if (pair.output_port->isFinished())
{
finishPair(pair);
return false;
}
if (pair.input_port->isFinished())
{
finishPair(pair);
return false;
}
if (!pair.output_port->canPush())
{
pair.input_port->setNotNeeded();
return false;
}
pair.input_port->setNeeded();
if (pair.input_port->hasData())
{
Chunk chunk = pair.input_port->pull(true);
ready_to_send |= consume(chunk);
pair.output_port->push(std::move(chunk));
}
return true;
}
bool PingPongProcessor::isPairsFinished() const
{
return num_finished_pairs == port_pairs.size();
}
IProcessor::Status PingPongProcessor::processRegularPorts()
{
if (isPairsFinished())
return Status::Finished;
bool need_data = false;
for (auto & pair : port_pairs)
need_data = processPair(pair) || need_data;
if (isPairsFinished())
return Status::Finished;
if (need_data)
return Status::NeedData;
return Status::PortFull;
}
bool PingPongProcessor::sendPing()
{
if (aux_out_port.canPush())
{
Chunk chunk(aux_out_port.getHeader().cloneEmpty().getColumns(), 0);
aux_out_port.push(std::move(chunk));
is_send = true;
aux_out_port.finish();
return true;
}
return false;
}
bool PingPongProcessor::recievePing()
{
if (aux_in_port.hasData())
{
aux_in_port.pull();
is_received = true;
aux_in_port.close();
return true;
}
return false;
}
bool PingPongProcessor::canSend() const
{
return !is_send && (ready_to_send || isPairsFinished());
}
IProcessor::Status PingPongProcessor::prepare()
{
if (!set_needed_once && !is_received && !aux_in_port.isFinished())
{
set_needed_once = true;
aux_in_port.setNeeded();
}
if (order == First || is_send)
{
if (!is_received)
{
bool received = recievePing();
if (!received)
{
return Status::NeedData;
}
}
}
if (order == Second || is_received)
{
if (!is_send && canSend())
{
bool sent = sendPing();
if (!sent)
return Status::PortFull;
}
}
auto status = processRegularPorts();
if (status == Status::Finished)
{
if (order == First || is_send)
{
if (!is_received)
{
bool received = recievePing();
if (!received)
{
return Status::NeedData;
}
}
}
if (order == Second || is_received)
{
if (!is_send && canSend())
{
bool sent = sendPing();
if (!sent)
return Status::PortFull;
}
}
}
return status;
}
std::pair<InputPort *, OutputPort *> PingPongProcessor::getAuxPorts()
{
return std::make_pair(&aux_in_port, &aux_out_port);
}
}

View File

@ -0,0 +1,105 @@
#pragma once
#include <Processors/IProcessor.h>
#include <base/unit.h>
#include <Processors/Chunk.h>
#include <Common/logger_useful.h>
namespace DB
{
/*
* Processor with N inputs and N outputs. Moves data from i-th input to i-th output as is.
* It has a pair of auxiliary ports to notify another instance by sending empty chunk after some condition holds.
* You should use this processor in pair of instances and connect auxiliary ports crosswise.
*
*
* aux
* PingPongProcessor PingPongProcessor
* aux
*
*
* One of the processors starts processing data, and another waits for notification.
* When `consume` returns true, the first stops processing, sends a ping to another and waits for notification.
* After that, the second one also processes data until `consume`, then send a notification back to the first one.
* After this roundtrip, processors bypass data from regular inputs to outputs.
*/
class PingPongProcessor : public IProcessor
{
public:
enum class Order : uint8_t
{
/// Processor that starts processing data.
First,
/// Processor that waits for notification.
Second,
};
using enum Order;
PingPongProcessor(const Block & header, size_t num_ports, Order order_);
Status prepare() override;
std::pair<InputPort *, OutputPort *> getAuxPorts();
/// Returns `true` when enough data consumed
virtual bool consume(const Chunk & chunk) = 0;
protected:
struct PortsPair
{
InputPort * input_port = nullptr;
OutputPort * output_port = nullptr;
bool is_finished = false;
};
bool sendPing();
bool recievePing();
bool canSend() const;
bool isPairsFinished() const;
bool processPair(PortsPair & pair);
void finishPair(PortsPair & pair);
Status processRegularPorts();
std::vector<PortsPair> port_pairs;
size_t num_finished_pairs = 0;
InputPort & aux_in_port;
OutputPort & aux_out_port;
bool is_send = false;
bool is_received = false;
bool ready_to_send = false;
/// Used to set 'needed' flag once for auxiliary input at first `prepare` call.
bool set_needed_once = false;
Order order;
};
/// Reads first N rows from two streams evenly.
class ReadHeadBalancedProcessor : public PingPongProcessor
{
public:
ReadHeadBalancedProcessor(const Block & header, size_t num_ports, size_t size_to_wait_, Order order_)
: PingPongProcessor(header, num_ports, order_) , data_consumed(0) , size_to_wait(size_to_wait_)
{
}
String getName() const override { return "ReadHeadBalancedProcessor"; }
bool consume(const Chunk & chunk) override
{
data_consumed += chunk.getNumRows();
return data_consumed > size_to_wait;
}
private:
size_t data_consumed;
size_t size_to_wait;
};
}

View File

@ -8,18 +8,18 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR; extern const int LOGICAL_ERROR;
} }
void connect(OutputPort & output, InputPort & input) void connect(OutputPort & output, InputPort & input, bool reconnect)
{ {
if (input.state) if (!reconnect && input.state)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Port is already connected, (header: [{}])", input.header.dumpStructure()); throw Exception(ErrorCodes::LOGICAL_ERROR, "Port is already connected, (header: [{}])", input.header.dumpStructure());
if (output.state) if (!reconnect && output.state)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Port is already connected, (header: [{}])", output.header.dumpStructure()); throw Exception(ErrorCodes::LOGICAL_ERROR, "Port is already connected, (header: [{}])", output.header.dumpStructure());
auto out_name = output.getProcessor().getName(); auto out_name = output.processor ? output.getProcessor().getName() : "null";
auto in_name = input.getProcessor().getName(); auto in_name = input.processor ? input.getProcessor().getName() : "null";
assertCompatibleHeader(output.getHeader(), input.getHeader(), fmt::format(" function connect between {} and {}", out_name, in_name)); assertCompatibleHeader(output.getHeader(), input.getHeader(), fmt::format("function connect between {} and {}", out_name, in_name));
input.output_port = &output; input.output_port = &output;
output.input_port = &input; output.input_port = &input;

View File

@ -25,7 +25,7 @@ namespace ErrorCodes
class Port class Port
{ {
friend void connect(OutputPort &, InputPort &); friend void connect(OutputPort &, InputPort &, bool);
friend class IProcessor; friend class IProcessor;
public: public:
@ -267,7 +267,7 @@ protected:
/// * You can pull only if port hasData(). /// * You can pull only if port hasData().
class InputPort : public Port class InputPort : public Port
{ {
friend void connect(OutputPort &, InputPort &); friend void connect(OutputPort &, InputPort &, bool);
private: private:
OutputPort * output_port = nullptr; OutputPort * output_port = nullptr;
@ -390,7 +390,7 @@ public:
/// * You can push only if port doesn't hasData(). /// * You can push only if port doesn't hasData().
class OutputPort : public Port class OutputPort : public Port
{ {
friend void connect(OutputPort &, InputPort &); friend void connect(OutputPort &, InputPort &, bool);
private: private:
InputPort * input_port = nullptr; InputPort * input_port = nullptr;
@ -483,6 +483,6 @@ using InputPorts = std::list<InputPort>;
using OutputPorts = std::list<OutputPort>; using OutputPorts = std::list<OutputPort>;
void connect(OutputPort & output, InputPort & input); void connect(OutputPort & output, InputPort & input, bool reconnect = false);
} }

View File

@ -0,0 +1,205 @@
#include <Processors/QueryPlan/CreateSetAndFilterOnTheFlyStep.h>
#include <Processors/Transforms/CreateSetAndFilterOnTheFlyTransform.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <IO/Operators.h>
#include <Common/JSONBuilder.h>
#include <Core/ColumnWithTypeAndName.h>
#include <Core/ColumnsWithTypeAndName.h>
#include <Processors/IProcessor.h>
#include <Processors/PingPongProcessor.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
static void connectAllInputs(OutputPortRawPtrs ports, InputPorts & inputs, size_t num_ports)
{
auto input_it = inputs.begin();
for (size_t i = 0; i < num_ports; ++i)
{
connect(*ports[i], *input_it);
input_it++;
}
}
static ColumnsWithTypeAndName getColumnSubset(const Block & block, const Names & column_names)
{
ColumnsWithTypeAndName result;
for (const auto & name : column_names)
result.emplace_back(block.getByName(name));
return result;
}
static ITransformingStep::Traits getTraits()
{
return ITransformingStep::Traits
{
{
.preserves_distinct_columns = true,
.returns_single_stream = false,
.preserves_number_of_streams = true,
.preserves_sorting = true,
},
{
.preserves_number_of_rows = false,
}
};
}
class CreateSetAndFilterOnTheFlyStep::CrosswiseConnection : public boost::noncopyable
{
public:
using PortPair = std::pair<InputPort *, OutputPort *>;
/// Remember ports passed on the first call and connect with ones from second call.
/// Thread-safe.
void connectPorts(PortPair rhs_ports, IProcessor * proc)
{
assert(!rhs_ports.first->isConnected() && !rhs_ports.second->isConnected());
std::lock_guard<std::mutex> lock(mux);
if (input_port || output_port)
{
assert(input_port && output_port);
assert(!input_port->isConnected());
connect(*rhs_ports.second, *input_port);
connect(*output_port, *rhs_ports.first, /* reconnect= */ true);
}
else
{
std::tie(input_port, output_port) = rhs_ports;
assert(input_port && output_port);
assert(!input_port->isConnected() && !output_port->isConnected());
dummy_input_port = std::make_unique<InputPort>(output_port->getHeader(), proc);
connect(*output_port, *dummy_input_port);
}
}
private:
std::mutex mux;
InputPort * input_port = nullptr;
OutputPort * output_port = nullptr;
/// Output ports should always be connected, and we can't add a step to the pipeline without them.
/// So, connect the port from the first processor to this dummy port and then reconnect to the second processor.
std::unique_ptr<InputPort> dummy_input_port;
};
CreateSetAndFilterOnTheFlyStep::CrosswiseConnectionPtr CreateSetAndFilterOnTheFlyStep::createCrossConnection()
{
return std::make_shared<CreateSetAndFilterOnTheFlyStep::CrosswiseConnection>();
}
CreateSetAndFilterOnTheFlyStep::CreateSetAndFilterOnTheFlyStep(
const DataStream & input_stream_,
const Names & column_names_,
size_t max_rows_in_set_,
CrosswiseConnectionPtr crosswise_connection_,
JoinTableSide position_)
: ITransformingStep(input_stream_, input_stream_.header, getTraits())
, column_names(column_names_)
, max_rows_in_set(max_rows_in_set_)
, own_set(std::make_shared<SetWithState>(SizeLimits(max_rows_in_set, 0, OverflowMode::BREAK), false, true))
, filtering_set(nullptr)
, crosswise_connection(crosswise_connection_)
, position(position_)
{
if (crosswise_connection == nullptr)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Crosswise connection is not initialized");
if (input_streams.size() != 1)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Step requires exactly one input stream, got {}", input_streams.size());
own_set->setHeader(getColumnSubset(input_streams[0].header, column_names));
}
void CreateSetAndFilterOnTheFlyStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{
size_t num_streams = pipeline.getNumStreams();
pipeline.addSimpleTransform([this, num_streams](const Block & header, QueryPipelineBuilder::StreamType stream_type) -> ProcessorPtr
{
if (stream_type != QueryPipelineBuilder::StreamType::Main)
return nullptr;
auto res = std::make_shared<CreatingSetsOnTheFlyTransform>(header, column_names, num_streams, own_set);
res->setDescription(this->getStepDescription());
return res;
});
Block input_header = pipeline.getHeader();
auto pipeline_transform = [&input_header, this](OutputPortRawPtrs ports)
{
Processors result_transforms;
size_t num_ports = ports.size();
/// Add balancing transform
auto idx = position == JoinTableSide::Left ? PingPongProcessor::First : PingPongProcessor::Second;
auto stream_balancer = std::make_shared<ReadHeadBalancedProcessor>(input_header, num_ports, max_rows_in_set, idx);
stream_balancer->setDescription(getStepDescription());
/// Regular inputs just bypass data for respective ports
connectAllInputs(ports, stream_balancer->getInputs(), num_ports);
/// Connect auxiliary ports
crosswise_connection->connectPorts(stream_balancer->getAuxPorts(), stream_balancer.get());
if (!filtering_set)
{
LOG_DEBUG(log, "Skip filtering {} stream", position);
result_transforms.emplace_back(std::move(stream_balancer));
return result_transforms;
}
/// Add filtering transform, ports just connected respectively
auto & outputs = stream_balancer->getOutputs();
auto output_it = outputs.begin();
for (size_t i = 0; i < outputs.size() - 1; ++i)
{
auto & port = *output_it++;
auto transform = std::make_shared<FilterBySetOnTheFlyTransform>(port.getHeader(), column_names, filtering_set);
transform->setDescription(this->getStepDescription());
connect(port, transform->getInputPort());
result_transforms.emplace_back(std::move(transform));
}
assert(output_it == std::prev(outputs.end()));
result_transforms.emplace_back(std::move(stream_balancer));
return result_transforms;
};
/// Auxiliary port stream_balancer can be connected later (by crosswise_connection).
/// So, use unsafe `transform` with `check_ports = false` to avoid assertions
pipeline.transform(std::move(pipeline_transform), /* check_ports= */ false);
}
void CreateSetAndFilterOnTheFlyStep::describeActions(JSONBuilder::JSONMap & map) const
{
map.add(getName(), true);
}
void CreateSetAndFilterOnTheFlyStep::describeActions(FormatSettings & settings) const
{
String prefix(settings.offset, ' ');
settings.out << prefix << getName();
settings.out << '\n';
}
void CreateSetAndFilterOnTheFlyStep::updateOutputStream()
{
if (input_streams.size() != 1)
throw Exception(ErrorCodes::LOGICAL_ERROR, "{} requires exactly one input stream, got {}", getName(), input_streams.size());
own_set->setHeader(getColumnSubset(input_streams[0].header, column_names));
output_stream = input_streams[0];
}
}

View File

@ -0,0 +1,59 @@
#pragma once
#include <Processors/QueryPlan/ITransformingStep.h>
#include <Processors/Transforms/CreateSetAndFilterOnTheFlyTransform.h>
#include <Processors/DelayedPortsProcessor.h>
namespace DB
{
/*
* Used to optimize JOIN when joining a small table over a large table.
* Currently applied only for the full sorting join.
* It tries to build a set for each stream.
* Once one stream is finished, it starts to filter another stream with this set.
*/
class CreateSetAndFilterOnTheFlyStep : public ITransformingStep
{
public:
/// Two instances of step need some shared state to connect processors crosswise
class CrosswiseConnection;
using CrosswiseConnectionPtr = std::shared_ptr<CrosswiseConnection>;
static CrosswiseConnectionPtr createCrossConnection();
CreateSetAndFilterOnTheFlyStep(
const DataStream & input_stream_,
const Names & column_names_,
size_t max_rows_in_set_,
CrosswiseConnectionPtr crosswise_connection_,
JoinTableSide position_);
String getName() const override { return "CreateSetAndFilterOnTheFlyStep"; }
void transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & settings) override;
void describeActions(JSONBuilder::JSONMap & map) const override;
void describeActions(FormatSettings & settings) const override;
SetWithStatePtr getSet() const { return own_set; }
/// Set for another stream.
void setFiltering(SetWithStatePtr filtering_set_) { filtering_set = filtering_set_; }
private:
void updateOutputStream() override;
Names column_names;
size_t max_rows_in_set;
SetWithStatePtr own_set;
SetWithStatePtr filtering_set;
CrosswiseConnectionPtr crosswise_connection;
JoinTableSide position;
Poco::Logger * log = &Poco::Logger::get("CreateSetAndFilterOnTheFlyStep");
};
}

View File

@ -34,8 +34,12 @@ QueryPipelineBuilderPtr JoinStep::updatePipeline(QueryPipelineBuilders pipelines
throw Exception(ErrorCodes::LOGICAL_ERROR, "JoinStep expect two input steps"); throw Exception(ErrorCodes::LOGICAL_ERROR, "JoinStep expect two input steps");
if (join->pipelineType() == JoinPipelineType::YShaped) if (join->pipelineType() == JoinPipelineType::YShaped)
return QueryPipelineBuilder::joinPipelinesYShaped( {
auto joined_pipeline = QueryPipelineBuilder::joinPipelinesYShaped(
std::move(pipelines[0]), std::move(pipelines[1]), join, output_stream->header, max_block_size, &processors); std::move(pipelines[0]), std::move(pipelines[1]), join, output_stream->header, max_block_size, &processors);
joined_pipeline->resize(max_streams);
return joined_pipeline;
}
return QueryPipelineBuilder::joinPipelinesRightLeft( return QueryPipelineBuilder::joinPipelinesRightLeft(
std::move(pipelines[0]), std::move(pipelines[0]),

View File

@ -8,6 +8,7 @@
#include <Processors/QueryPlan/Optimizations/Optimizations.h> #include <Processors/QueryPlan/Optimizations/Optimizations.h>
#include <Processors/QueryPlan/ITransformingStep.h> #include <Processors/QueryPlan/ITransformingStep.h>
#include <Processors/QueryPlan/FilterStep.h> #include <Processors/QueryPlan/FilterStep.h>
#include <Processors/QueryPlan/CreateSetAndFilterOnTheFlyStep.h>
#include <Processors/QueryPlan/AggregatingStep.h> #include <Processors/QueryPlan/AggregatingStep.h>
#include <Processors/QueryPlan/ExpressionStep.h> #include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/QueryPlan/JoinStep.h> #include <Processors/QueryPlan/JoinStep.h>
@ -22,6 +23,7 @@
#include <Interpreters/ActionsDAG.h> #include <Interpreters/ActionsDAG.h>
#include <Interpreters/ArrayJoinAction.h> #include <Interpreters/ArrayJoinAction.h>
#include <Interpreters/TableJoin.h> #include <Interpreters/TableJoin.h>
#include <fmt/format.h>
namespace DB::ErrorCodes namespace DB::ErrorCodes
{ {
@ -134,10 +136,24 @@ tryAddNewFilterStep(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes, con
static size_t static size_t
tryAddNewFilterStep(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes, const Names & allowed_inputs, tryAddNewFilterStep(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes, const Names & allowed_inputs,
bool can_remove_filter = true) bool can_remove_filter = true, size_t child_idx = 0)
{ {
if (auto split_filter = splitFilter(parent_node, allowed_inputs, 0)) if (auto split_filter = splitFilter(parent_node, allowed_inputs, child_idx))
return tryAddNewFilterStep(parent_node, nodes, split_filter, can_remove_filter, 0); return tryAddNewFilterStep(parent_node, nodes, split_filter, can_remove_filter, child_idx);
return 0;
}
/// Push down filter through specified type of step
template <typename Step>
static size_t simplePushDownOverStep(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes, QueryPlanStepPtr & child)
{
if (typeid_cast<Step *>(child.get()))
{
Names allowed_inputs = child->getOutputStream().header.getNames();
if (auto updated_steps = tryAddNewFilterStep(parent_node, nodes, allowed_inputs))
return updated_steps;
}
return 0; return 0;
} }
@ -234,12 +250,8 @@ size_t tryPushDownFilter(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes
return updated_steps; return updated_steps;
} }
if (auto * distinct = typeid_cast<DistinctStep *>(child.get())) if (auto updated_steps = simplePushDownOverStep<DistinctStep>(parent_node, nodes, child))
{ return updated_steps;
Names allowed_inputs = distinct->getOutputStream().header.getNames();
if (auto updated_steps = tryAddNewFilterStep(parent_node, nodes, allowed_inputs))
return updated_steps;
}
if (auto * join = typeid_cast<JoinStep *>(child.get())) if (auto * join = typeid_cast<JoinStep *>(child.get()))
{ {
@ -290,7 +302,7 @@ size_t tryPushDownFilter(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes
const size_t updated_steps = tryAddNewFilterStep(parent_node, nodes, split_filter, can_remove_filter, child_idx); const size_t updated_steps = tryAddNewFilterStep(parent_node, nodes, split_filter, can_remove_filter, child_idx);
if (updated_steps > 0) if (updated_steps > 0)
{ {
LOG_DEBUG(&Poco::Logger::get("QueryPlanOptimizations"), "Pushed down filter to {} side of join", kind); LOG_DEBUG(&Poco::Logger::get("QueryPlanOptimizations"), "Pushed down filter {} to the {} side of join", split_filter_column_name, kind);
} }
return updated_steps; return updated_steps;
}; };
@ -321,12 +333,11 @@ size_t tryPushDownFilter(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes
// { // {
// } // }
if (typeid_cast<SortingStep *>(child.get())) if (auto updated_steps = simplePushDownOverStep<SortingStep>(parent_node, nodes, child))
{ return updated_steps;
Names allowed_inputs = child->getOutputStream().header.getNames();
if (auto updated_steps = tryAddNewFilterStep(parent_node, nodes, allowed_inputs)) if (auto updated_steps = simplePushDownOverStep<CreateSetAndFilterOnTheFlyStep>(parent_node, nodes, child))
return updated_steps; return updated_steps;
}
if (auto * union_step = typeid_cast<UnionStep *>(child.get())) if (auto * union_step = typeid_cast<UnionStep *>(child.get()))
{ {

View File

@ -85,6 +85,13 @@ public:
{ {
} }
StrictResizeProcessor(InputPorts inputs_, OutputPorts outputs_)
: IProcessor(inputs_, outputs_)
, current_input(inputs.begin())
, current_output(outputs.begin())
{
}
String getName() const override { return "StrictResize"; } String getName() const override { return "StrictResize"; }
Status prepare(const PortNumbers &, const PortNumbers &) override; Status prepare(const PortNumbers &, const PortNumbers &) override;

View File

@ -0,0 +1,195 @@
#include <Processors/Transforms/CreateSetAndFilterOnTheFlyTransform.h>
#include <cstddef>
#include <mutex>
#include <Interpreters/Set.h>
#include <Common/Stopwatch.h>
#include <Common/formatReadable.h>
#include <Common/logger_useful.h>
#include <Columns/IColumn.h>
#include <Core/ColumnWithTypeAndName.h>
#include <base/types.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
namespace
{
std::vector<size_t> getColumnIndices(const Block & block, const Names & column_names)
{
std::vector<size_t> indices;
for (const auto & name : column_names)
indices.push_back(block.getPositionByName(name));
return indices;
}
Columns getColumnsByIndices(const Chunk & chunk, const std::vector<size_t> & indices)
{
Columns columns;
const Columns & all_cols = chunk.getColumns();
for (const auto & index : indices)
columns.push_back(all_cols.at(index));
return columns;
}
ColumnsWithTypeAndName getColumnsByIndices(const Block & sample_block, const Chunk & chunk, const std::vector<size_t> & indices)
{
Block block = sample_block.cloneEmpty();
block.setColumns(getColumnsByIndices(chunk, indices));
return block.getColumnsWithTypeAndName();
}
}
CreatingSetsOnTheFlyTransform::CreatingSetsOnTheFlyTransform(
const Block & header_, const Names & column_names_, size_t num_streams_, SetWithStatePtr set_)
: ISimpleTransform(header_, header_, true)
, column_names(column_names_)
, key_column_indices(getColumnIndices(inputs.front().getHeader(), column_names))
, num_streams(num_streams_)
, set(set_)
{
}
IProcessor::Status CreatingSetsOnTheFlyTransform::prepare()
{
IProcessor::Status status = ISimpleTransform::prepare();
if (!set || status != Status::Finished)
/// Nothing to do with set
return status;
/// Finalize set
if (set->state == SetWithState::State::Creating)
{
if (input.isFinished())
{
set->finished_count++;
if (set->finished_count != num_streams)
/// Not all instances of processor are finished
return status;
set->finishInsert();
set->state = SetWithState::State::Finished;
LOG_DEBUG(log, "{}: finish building set for [{}] with {} rows, set size is {}",
getDescription(), fmt::join(column_names, ", "), set->getTotalRowCount(),
formatReadableSizeWithBinarySuffix(set->getTotalByteCount()));
set.reset();
}
else
{
/// Should not happen because processor inserted before join that reads all the data
throw Exception(ErrorCodes::LOGICAL_ERROR, "Processor finished, but not all input was read");
}
}
return status;
}
void CreatingSetsOnTheFlyTransform::transform(Chunk & chunk)
{
if (!set || set->state != SetWithState::State::Creating)
{
/// If set building suspended by another processor, release pointer
if (set != nullptr)
set.reset();
return;
}
if (chunk.getNumRows())
{
Columns key_columns = getColumnsByIndices(chunk, key_column_indices);
bool limit_exceeded = !set->insertFromBlock(key_columns);
if (limit_exceeded)
{
auto prev_state = set->state.exchange(SetWithState::State::Suspended);
/// Print log only after first state switch
if (prev_state == SetWithState::State::Creating)
{
LOG_DEBUG(log, "{}: set limit exceeded, give up building set, after reading {} rows and using {}",
getDescription(), set->getTotalRowCount(), formatReadableSizeWithBinarySuffix(set->getTotalByteCount()));
}
/// Probaply we need to clear set here, because it's unneeded anymore
/// But now `Set` doesn't have such method, so reset pointer in all processors and then it should be freed
set.reset();
}
}
}
FilterBySetOnTheFlyTransform::FilterBySetOnTheFlyTransform(const Block & header_, const Names & column_names_, SetWithStatePtr set_)
: ISimpleTransform(header_, header_, true)
, column_names(column_names_)
, key_column_indices(getColumnIndices(inputs.front().getHeader(), column_names))
, set(set_)
{
const auto & header = inputs.front().getHeader();
for (size_t idx : key_column_indices)
key_sample_block.insert(header.getByPosition(idx));
}
IProcessor::Status FilterBySetOnTheFlyTransform::prepare()
{
auto status = ISimpleTransform::prepare();
if (set && set->state == SetWithState::State::Suspended)
set.reset();
if (status == Status::Finished)
{
bool has_filter = set && set->state == SetWithState::State::Finished;
if (has_filter)
{
LOG_DEBUG(log, "Finished {} by [{}]: consumed {} rows in total, {} rows bypassed, result {} rows, {:.2f}% filtered",
Poco::toLower(getDescription()), fmt::join(column_names, ", "),
stat.consumed_rows, stat.consumed_rows_before_set, stat.result_rows,
100 - 100.0 * stat.result_rows / stat.consumed_rows);
}
else
{
LOG_DEBUG(log, "Finished {}: bypass {} rows", Poco::toLower(getDescription()), stat.consumed_rows);
}
/// Release set to free memory
set = nullptr;
}
return status;
}
void FilterBySetOnTheFlyTransform::transform(Chunk & chunk)
{
stat.consumed_rows += chunk.getNumRows();
stat.result_rows += chunk.getNumRows();
bool can_filter = set && set->state == SetWithState::State::Finished;
if (!can_filter)
stat.consumed_rows_before_set += chunk.getNumRows();
if (can_filter && chunk.getNumRows())
{
auto key_columns = getColumnsByIndices(key_sample_block, chunk, key_column_indices);
ColumnPtr mask_col = set->execute(key_columns, false);
const auto & mask = assert_cast<const ColumnUInt8 *>(mask_col.get())->getData();
stat.result_rows -= chunk.getNumRows();
Columns columns = chunk.detachColumns();
size_t result_num_rows = 0;
for (auto & col : columns)
{
col = col->filter(mask, /* negative */ false);
result_num_rows = col->size();
}
stat.result_rows += result_num_rows;
chunk.setColumns(std::move(columns), result_num_rows);
}
}
}

View File

@ -0,0 +1,114 @@
#pragma once
#include <atomic>
#include <mutex>
#include <vector>
#include <Processors/ISimpleTransform.h>
#include <Poco/Logger.h>
#include <Interpreters/Set.h>
namespace DB
{
struct SetWithState : public Set
{
using Set::Set;
/// Flow: Creating -> Finished or Suspended
enum class State
{
/// Set is not yet created,
/// Creating processor continues to build set.
/// Filtering bypasses data.
Creating,
/// Set is finished.
/// Creating processor is finished.
/// Filtering filter stream with this set.
Finished,
/// Set building is canceled (due to limit exceeded).
/// Creating and filtering processors bypass data.
Suspended,
};
std::atomic<State> state = State::Creating;
/// Track number of processors that are currently working on this set.
/// Last one finalizes set.
std::atomic_size_t finished_count = 0;
};
using SetWithStatePtr = std::shared_ptr<SetWithState>;
/*
* Create a set on the fly for incoming stream.
* The set is created from the key columns of the input block.
* Data is not changed and returned as is.
* Can be executed in parallel, but blocks on operations with set.
*/
class CreatingSetsOnTheFlyTransform : public ISimpleTransform
{
public:
CreatingSetsOnTheFlyTransform(const Block & header_, const Names & column_names_, size_t num_streams_, SetWithStatePtr set_);
String getName() const override { return "CreatingSetsOnTheFlyTransform"; }
Status prepare() override;
void transform(Chunk & chunk) override;
private:
Names column_names;
std::vector<size_t> key_column_indices;
size_t num_streams;
/// Set to fill
SetWithStatePtr set;
Poco::Logger * log = &Poco::Logger::get("CreatingSetsOnTheFlyTransform");
};
/*
* Filter the input chunk by the set.
* When set building is not completed, just return the source data.
*/
class FilterBySetOnTheFlyTransform : public ISimpleTransform
{
public:
FilterBySetOnTheFlyTransform(const Block & header_, const Names & column_names_, SetWithStatePtr set_);
String getName() const override { return "FilterBySetOnTheFlyTransform"; }
Status prepare() override;
void transform(Chunk & chunk) override;
private:
/// Set::execute requires ColumnsWithTypesAndNames, so we need to convert Chunk to that format
Block key_sample_block;
Names column_names;
std::vector<size_t> key_column_indices;
/// Filter by this set when it's created
SetWithStatePtr set;
/// Statistics to log
struct Stat
{
/// Total number of rows
size_t consumed_rows = 0;
/// Number of bypassed rows (processed before set is created)
size_t consumed_rows_before_set = 0;
/// Number of rows that passed the filter
size_t result_rows = 0;
} stat;
Poco::Logger * log = &Poco::Logger::get("FilterBySetOnTheFlyTransform");
};
}

View File

@ -770,7 +770,7 @@ void Pipe::setSinks(const Pipe::ProcessorGetterWithStreamKind & getter)
header.clear(); header.clear();
} }
void Pipe::transform(const Transformer & transformer) void Pipe::transform(const Transformer & transformer, bool check_ports)
{ {
if (output_ports.empty()) if (output_ports.empty())
throw Exception("Cannot transform empty Pipe", ErrorCodes::LOGICAL_ERROR); throw Exception("Cannot transform empty Pipe", ErrorCodes::LOGICAL_ERROR);
@ -784,6 +784,9 @@ void Pipe::transform(const Transformer & transformer)
for (const auto & port : output_ports) for (const auto & port : output_ports)
{ {
if (!check_ports)
break;
if (!port->isConnected()) if (!port->isConnected())
throw Exception( throw Exception(
ErrorCodes::LOGICAL_ERROR, ErrorCodes::LOGICAL_ERROR,
@ -799,6 +802,9 @@ void Pipe::transform(const Transformer & transformer)
{ {
for (const auto & port : processor->getInputs()) for (const auto & port : processor->getInputs())
{ {
if (!check_ports)
break;
if (!port.isConnected()) if (!port.isConnected())
throw Exception( throw Exception(
ErrorCodes::LOGICAL_ERROR, ErrorCodes::LOGICAL_ERROR,
@ -806,7 +812,7 @@ void Pipe::transform(const Transformer & transformer)
processor->getName()); processor->getName());
const auto * connected_processor = &port.getOutputPort().getProcessor(); const auto * connected_processor = &port.getOutputPort().getProcessor();
if (!set.contains(connected_processor)) if (check_ports && !set.contains(connected_processor))
throw Exception( throw Exception(
ErrorCodes::LOGICAL_ERROR, ErrorCodes::LOGICAL_ERROR,
"Transformation of Pipe is not valid because processor {} has input port which is connected with unknown processor {}", "Transformation of Pipe is not valid because processor {} has input port which is connected with unknown processor {}",
@ -823,7 +829,7 @@ void Pipe::transform(const Transformer & transformer)
} }
const auto * connected_processor = &port.getInputPort().getProcessor(); const auto * connected_processor = &port.getInputPort().getProcessor();
if (!set.contains(connected_processor)) if (check_ports && !set.contains(connected_processor))
throw Exception( throw Exception(
ErrorCodes::LOGICAL_ERROR, ErrorCodes::LOGICAL_ERROR,
"Transformation of Pipe is not valid because processor {} has output port which is connected with unknown processor {}", "Transformation of Pipe is not valid because processor {} has output port which is connected with unknown processor {}",

View File

@ -85,13 +85,13 @@ public:
/// Add chain to every output port. /// Add chain to every output port.
void addChains(std::vector<Chain> chains); void addChains(std::vector<Chain> chains);
/// Changes the number of output ports if needed. Adds ResizeTransform. /// Changes the number of output ports if needed. Adds (Strict)ResizeProcessor.
void resize(size_t num_streams, bool force = false, bool strict = false); void resize(size_t num_streams, bool force = false, bool strict = false);
using Transformer = std::function<Processors(OutputPortRawPtrs ports)>; using Transformer = std::function<Processors(OutputPortRawPtrs ports)>;
/// Transform Pipe in general way. /// Transform Pipe in general way.
void transform(const Transformer & transformer); void transform(const Transformer & transformer, bool check_ports = true);
/// Unite several pipes together. They should have same header. /// Unite several pipes together. They should have same header.
static Pipe unitePipes(Pipes pipes); static Pipe unitePipes(Pipes pipes);

View File

@ -159,10 +159,10 @@ void QueryPipelineBuilder::addChain(Chain chain)
pipe.addChains(std::move(chains)); pipe.addChains(std::move(chains));
} }
void QueryPipelineBuilder::transform(const Transformer & transformer) void QueryPipelineBuilder::transform(const Transformer & transformer, bool check_ports)
{ {
checkInitializedAndNotCompleted(); checkInitializedAndNotCompleted();
pipe.transform(transformer); pipe.transform(transformer, check_ports);
} }
void QueryPipelineBuilder::setSinks(const Pipe::ProcessorGetterWithStreamKind & getter) void QueryPipelineBuilder::setSinks(const Pipe::ProcessorGetterWithStreamKind & getter)
@ -348,8 +348,7 @@ std::unique_ptr<QueryPipelineBuilder> QueryPipelineBuilder::joinPipelinesYShaped
left->pipe.dropExtremes(); left->pipe.dropExtremes();
right->pipe.dropExtremes(); right->pipe.dropExtremes();
if (left->getNumStreams() != 1 || right->getNumStreams() != 1)
if (left->pipe.output_ports.size() != 1 || right->pipe.output_ports.size() != 1)
throw Exception("Join is supported only for pipelines with one output port", ErrorCodes::LOGICAL_ERROR); throw Exception("Join is supported only for pipelines with one output port", ErrorCodes::LOGICAL_ERROR);
if (left->hasTotals() || right->hasTotals()) if (left->hasTotals() || right->hasTotals())
@ -359,8 +358,7 @@ std::unique_ptr<QueryPipelineBuilder> QueryPipelineBuilder::joinPipelinesYShaped
auto joining = std::make_shared<MergeJoinTransform>(join, inputs, out_header, max_block_size); auto joining = std::make_shared<MergeJoinTransform>(join, inputs, out_header, max_block_size);
auto result = mergePipelines(std::move(left), std::move(right), std::move(joining), collected_processors); return mergePipelines(std::move(left), std::move(right), std::move(joining), collected_processors);
return result;
} }
std::unique_ptr<QueryPipelineBuilder> QueryPipelineBuilder::joinPipelinesRightLeft( std::unique_ptr<QueryPipelineBuilder> QueryPipelineBuilder::joinPipelinesRightLeft(

View File

@ -69,7 +69,7 @@ public:
using Transformer = std::function<Processors(OutputPortRawPtrs ports)>; using Transformer = std::function<Processors(OutputPortRawPtrs ports)>;
/// Transform pipeline in general way. /// Transform pipeline in general way.
void transform(const Transformer & transformer); void transform(const Transformer & transformer, bool check_ports = true);
/// Add TotalsHavingTransform. Resize pipeline to single input. Adds totals port. /// Add TotalsHavingTransform. Resize pipeline to single input. Adds totals port.
void addTotalsHavingTransform(ProcessorPtr transform); void addTotalsHavingTransform(ProcessorPtr transform);

View File

@ -0,0 +1,45 @@
<test>
<substitutions>
<substitution>
<name>table_size</name>
<values>
<value>100000000</value>
</values>
</substitution>
</substitutions>
<settings>
<join_algorithm>full_sorting_merge</join_algorithm>
</settings>
<create_query>
CREATE TABLE t1 (x UInt64, y UInt64) ENGINE = MergeTree ORDER BY y
AS SELECT
sipHash64(number, 't1_x') % {table_size} AS x,
sipHash64(number, 't1_y') % {table_size} AS y
FROM numbers({table_size})
</create_query>
<create_query>
CREATE TABLE t2 (x UInt64, y UInt64) ENGINE = MergeTree ORDER BY y
AS SELECT
sipHash64(number, 't2_x') % {table_size} AS x,
sipHash64(number, 't2_y') % {table_size} AS y
FROM numbers({table_size})
</create_query>
<query>SELECT * FROM t1 JOIN t2 ON t1.x = t2.x WHERE less(t1.y, 10000)</query>
<query>SELECT * FROM t2 JOIN t1 ON t1.x = t2.x WHERE less(t1.y, 10000)</query>
<query>SELECT * FROM t1 JOIN t2 ON t1.x = t2.x WHERE greater(t1.y, {table_size} - 10000)</query>
<query>SELECT * FROM t2 JOIN t1 ON t1.x = t2.x WHERE greater(t1.y, {table_size} - 10000)</query>
<query>SELECT * FROM t1 JOIN t2 ON t1.x = t2.x WHERE t1.y % 100 = 0</query>
<query>SELECT * FROM t2 JOIN t1 ON t1.x = t2.x WHERE t1.y % 100 = 0</query>
<query>SELECT * FROM t1 JOIN t2 ON t1.x = t2.x WHERE t1.y % 1000 = 0</query>
<query>SELECT * FROM t2 JOIN t1 ON t1.x = t2.x WHERE t1.y % 1000 = 0</query>
<drop_query>DROP TABLE IF EXISTS t1</drop_query>
<drop_query>DROP TABLE IF EXISTS t2</drop_query>
</test>

View File

@ -7,6 +7,8 @@ USING (key);
SET join_algorithm = 'full_sorting_merge'; SET join_algorithm = 'full_sorting_merge';
SET max_rows_in_set_to_optimize_join = 0;
EXPLAIN actions=0, description=0, header=1 EXPLAIN actions=0, description=0, header=1
SELECT * FROM ( SELECT 'key2' AS key ) AS s1 SELECT * FROM ( SELECT 'key2' AS key ) AS s1
JOIN ( SELECT 'key1' AS key, '1' AS value UNION ALL SELECT 'key2' AS key, '1' AS value ) AS s2 JOIN ( SELECT 'key1' AS key, '1' AS value UNION ALL SELECT 'key2' AS key, '1' AS value ) AS s2

View File

@ -0,0 +1,7 @@
106
46
42
51
42
24
10

View File

@ -0,0 +1,20 @@
DROP TABLE IF EXISTS t1;
DROP TABLE IF EXISTS t2;
CREATE TABLE t1 (x UInt64, y UInt64) ENGINE = MergeTree ORDER BY y
AS SELECT sipHash64(number, 't1_x') % 100 AS x, sipHash64(number, 't1_y') % 100 AS y FROM numbers(100);
CREATE TABLE t2 (x UInt64, y UInt64) ENGINE = MergeTree ORDER BY y
AS SELECT sipHash64(number, 't2_x') % 100 AS x, sipHash64(number, 't2_y') % 100 AS y FROM numbers(100);
SET max_rows_in_set_to_optimize_join = 1000;
SET join_algorithm = 'full_sorting_merge';
-- different combinations of conditions on key/attribute columns for the left/right tables
SELECT count() FROM t1 JOIN t2 ON t1.x = t2.x;
SELECT count() FROM t1 JOIN t2 ON t1.x = t2.x WHERE t1.y % 2 == 0;
SELECT count() FROM t1 JOIN t2 ON t1.x = t2.x WHERE t1.x % 2 == 0;
SELECT count() FROM t1 JOIN t2 ON t1.x = t2.x WHERE t2.y % 2 == 0;
SELECT count() FROM t1 JOIN t2 ON t1.x = t2.x WHERE t2.x % 2 == 0;
SELECT count() FROM t1 JOIN t2 ON t1.x = t2.x WHERE t1.y % 2 == 0 AND t2.y % 2 == 0;
SELECT count() FROM t1 JOIN t2 ON t1.x = t2.x WHERE t1.x % 2 == 0 AND t2.x % 2 == 0 AND t1.y % 2 == 0 AND t2.y % 2 == 0;

View File

@ -0,0 +1,10 @@
Ok
Ok
Ok
Ok
Ok
Ok
Ok
Ok
Ok
Ok

View File

@ -0,0 +1,55 @@
#!/usr/bin/env bash
# Tags: no-asan,no-msan,no-tsan,no-ubsan
#
# Test doesn't run complex queries, just test the logic of setting, so no need to run with different builds.
# Also, we run similar queries in 02382_join_and_filtering_set.sql which is enabled for these builds.
#
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_CLIENT -mn -q """
CREATE TABLE t1 (x UInt64, y UInt64) ENGINE = MergeTree ORDER BY y
AS SELECT sipHash64(number, 't1_x') % 100 AS x, sipHash64(number, 't1_y') % 100 AS y FROM numbers(100);
CREATE TABLE t2 (x UInt64, y UInt64) ENGINE = MergeTree ORDER BY y
AS SELECT sipHash64(number, 't2_x') % 100 AS x, sipHash64(number, 't2_y') % 100 AS y FROM numbers(100);
"""
# Arguments:
# - value of max_rows_in_set_to_optimize_join
# - join kind
# - expected number of steps in plan
# - expected number of steps in pipeline
function test() {
PARAM_VALUE=$1
JOIN_KIND=${2:-}
EXPECTED_PLAN_STEPS=$3
RES=$(
$CLICKHOUSE_CLIENT --max_rows_in_set_to_optimize_join=${PARAM_VALUE} --join_algorithm='full_sorting_merge' \
-q "EXPLAIN PLAN SELECT count() FROM t1 ${JOIN_KIND} JOIN t2 ON t1.x = t2.x" | grep -o 'CreateSetAndFilterOnTheFlyStep' | wc -l
)
[ "$RES" -eq "$EXPECTED_PLAN_STEPS" ] && echo "Ok" || echo "Fail: $RES != $EXPECTED_PLAN_STEPS"
EXPECTED_PIPELINE_STEPS=$4
RES=$(
$CLICKHOUSE_CLIENT --max_rows_in_set_to_optimize_join=${PARAM_VALUE} --join_algorithm='full_sorting_merge' \
-q "EXPLAIN PIPELINE SELECT count() FROM t1 ${JOIN_KIND} JOIN t2 ON t1.x = t2.x" \
| grep -o -e ReadHeadBalancedProcessor -e FilterBySetOnTheFlyTransform -e CreatingSetsOnTheFlyTransform | wc -l
)
[ "$RES" -eq "$EXPECTED_PIPELINE_STEPS" ] && echo "Ok" || echo "Fail: $RES != $EXPECTED_PIPELINE_STEPS"
}
test 1000 '' 2 6
# no filtering for left/right side
test 1000 'LEFT' 2 5
test 1000 'RIGHT' 2 5
# when disabled no extra steps should be created
test 1000 'FULL' 0 0
test 0 '' 0 0