Create sets for joins: wip

This commit is contained in:
vdimir 2022-08-01 13:20:12 +00:00
parent 8f06430ebd
commit 71708d595f
No known key found for this signature in database
GPG Key ID: 6EE4CE2BEDC51862
11 changed files with 286 additions and 12 deletions

View File

@ -1456,9 +1456,9 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P
return set;
};
auto add_filter_by_set = [](QueryPlan & plan, const Names & key_names, auto set, bool is_right)
auto add_filter_by_set = [](QueryPlan & plan, const Names & key_names, auto set, auto ports_state, bool is_right)
{
auto filter_by_set_step = std::make_unique<FilterBySetOnTheFlyStep>(plan.getCurrentDataStream(), key_names, set);
auto filter_by_set_step = std::make_unique<FilterBySetOnTheFlyStep>(plan.getCurrentDataStream(), key_names, set, ports_state);
filter_by_set_step->setStepDescription(fmt::format("Filter {} stream by set", is_right ? "right" : "left"));
plan.addStep(std::move(filter_by_set_step));
};
@ -1472,8 +1472,9 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P
auto left_set = add_create_set(query_plan, join_clause.key_names_left, false);
auto right_set = add_create_set(*joined_plan, join_clause.key_names_right, true);
add_filter_by_set(query_plan, join_clause.key_names_left, right_set, false);
add_filter_by_set(*joined_plan, join_clause.key_names_right, left_set, true);
auto ports_state = std::make_shared<FilterBySetOnTheFlyStep::PortsState>();
add_filter_by_set(query_plan, join_clause.key_names_left, right_set, ports_state, false);
add_filter_by_set(*joined_plan, join_clause.key_names_right, left_set, ports_state, true);
}
add_sorting(query_plan, join_clause.key_names_left, false);

View File

@ -170,4 +170,132 @@ IProcessor::Status DelayedPortsProcessor::prepare(const PortNumbers & updated_in
return Status::PortFull;
}
NotifyProcessor::NotifyProcessor(const Block & header, size_t num_ports)
: IProcessor(InputPorts(num_ports, header), OutputPorts(num_ports, header))
, aux_in_port(Block(), this)
, aux_out_port(Block(), this)
{
port_pairs.resize(num_ports);
auto input_it = inputs.begin();
auto output_it = outputs.begin();
for (size_t i = 0; i < num_ports; ++i)
{
port_pairs[i].input_port = &*input_it;
++input_it;
port_pairs[i].output_port = &*output_it;
++output_it;
}
}
void NotifyProcessor::finishPair(PortsPair & pair)
{
if (!pair.is_finished)
{
pair.output_port->finish();
pair.input_port->close();
pair.is_finished = true;
++num_finished_pairs;
}
}
bool NotifyProcessor::processPair(PortsPair & pair)
{
if (pair.output_port->isFinished())
{
finishPair(pair);
return false;
}
if (pair.input_port->isFinished())
{
finishPair(pair);
return false;
}
if (!pair.output_port->canPush())
{
pair.input_port->setNotNeeded();
return false;
}
pair.input_port->setNeeded();
if (pair.input_port->hasData())
{
Chunk chunk = pair.input_port->pull(true);
dataCallback(chunk);
pair.output_port->push(std::move(chunk));
}
return true;
}
IProcessor::Status NotifyProcessor::processRegularPorts(const PortNumbers & updated_inputs, const PortNumbers & updated_outputs)
{
bool need_data = false;
for (const auto & output_number : updated_outputs)
need_data = processPair(port_pairs[output_number]) || need_data;
for (const auto & input_number : updated_inputs)
need_data = processPair(port_pairs[input_number]) || need_data;
if (num_finished_pairs == port_pairs.size())
return Status::Finished;
if (need_data)
return Status::NeedData;
return Status::PortFull;
}
IProcessor::Status NotifyProcessor::prepare(const PortNumbers & updated_inputs, const PortNumbers & updated_outputs)
{
auto status = processRegularPorts(updated_inputs, updated_outputs);
if (status != Status::Ready)
return status;
if (ready == AuxPortState::NotInitialized && isReady())
ready = AuxPortState::Triggered;
if (ready == AuxPortState::Triggered)
{
if (aux_out_port.canPush())
{
aux_out_port.push({});
ready = AuxPortState::Finished;
return Status::Ready;
}
return Status::PortFull;
}
if (waiting == AuxPortState::NotInitialized && isWaiting())
{
aux_in_port.setNeeded();
waiting = AuxPortState::Triggered;
}
if (waiting == AuxPortState::Triggered)
{
if (aux_in_port.hasData())
{
aux_in_port.pull(true);
waiting = AuxPortState::Finished;
return Status::Ready;
}
return Status::PortFull;
}
return Status::Ready;
}
std::pair<InputPort *, OutputPort *> NotifyProcessor::getAuxPorts()
{
return std::make_pair(&aux_in_port, &aux_out_port);
}
}

View File

@ -1,5 +1,7 @@
#pragma once
#include <Processors/IProcessor.h>
#include <base/unit.h>
#include <Processors/Chunk.h>
namespace DB
{
@ -39,4 +41,76 @@ private:
void finishPair(PortsPair & pair);
};
class NotifyProcessor : public IProcessor
{
public:
NotifyProcessor(const Block & header, size_t num_ports);
String getName() const override { return "NotifyProcessor"; }
Status prepare(const PortNumbers &, const PortNumbers &) override;
std::pair<InputPort *, OutputPort *> getAuxPorts();
virtual bool isReady() const { return true; }
virtual bool isWaiting() const { return false; }
virtual void dataCallback(const Chunk & chunk) { UNUSED(chunk); }
private:
enum class AuxPortState
{
NotInitialized,
Triggered,
Finished,
};
struct PortsPair
{
InputPort * input_port = nullptr;
OutputPort * output_port = nullptr;
bool is_finished = false;
};
bool processPair(PortsPair & pair);
void finishPair(PortsPair & pair);
Status processRegularPorts(const PortNumbers & updated_inputs, const PortNumbers & updated_outputs);
std::vector<PortsPair> port_pairs;
size_t num_finished_pairs = 0;
InputPort aux_in_port;
OutputPort aux_out_port;
AuxPortState ready = AuxPortState::NotInitialized;
AuxPortState waiting = AuxPortState::NotInitialized;
};
class NotifyProcessor2 : public NotifyProcessor
{
public:
using NotifyProcessor::NotifyProcessor;
bool isReady() const override
{
return data_consumed > 10_MiB;
}
bool isWaiting() const override
{
return data_consumed < 10_MiB;
}
void dataCallback(const Chunk & chunk) override
{
data_consumed += chunk.allocatedBytes();
}
private:
size_t data_consumed = 0;
};
}

View File

@ -19,7 +19,7 @@ void connect(OutputPort & output, InputPort & input)
auto out_name = output.getProcessor().getName();
auto in_name = input.getProcessor().getName();
assertCompatibleHeader(output.getHeader(), input.getHeader(), fmt::format(" function connect between {} and {}", out_name, in_name));
assertCompatibleHeader(output.getHeader(), input.getHeader(), fmt::format("function connect between {} and {}", out_name, in_name));
input.output_port = &output;
output.input_port = &input;

View File

@ -7,6 +7,7 @@
#include <Core/ColumnWithTypeAndName.h>
#include <Core/ColumnsWithTypeAndName.h>
#include <Processors/IProcessor.h>
#include <Processors/DelayedPortsProcessor.h>
namespace DB
{
@ -92,18 +93,46 @@ void CreatingSetOnTheFlyStep::updateOutputStream()
}
FilterBySetOnTheFlyStep::FilterBySetOnTheFlyStep(const DataStream & input_stream_, const Names & column_names_, SetWithStatePtr set_)
FilterBySetOnTheFlyStep::FilterBySetOnTheFlyStep(const DataStream & input_stream_, const Names & column_names_,
SetWithStatePtr set_, PortsStatePtr ports_state_)
: ITransformingStep(input_stream_, input_stream_.header, getTraits(true))
, column_names(column_names_)
, set(set_)
, ports_state(ports_state_)
{
if (input_streams.size() != 1)
throw Exception(ErrorCodes::LOGICAL_ERROR, "{} requires exactly one input stream, got {}", getName(), input_streams.size());
}
static void connectAllInputs(OutputPortRawPtrs ports, InputPorts & inputs)
{
auto input_it = inputs.begin();
for (auto & port : ports)
{
connect(*port, *input_it);
input_it++;
}
assert(input_it == inputs.end());
}
void FilterBySetOnTheFlyStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & settings)
{
UNUSED(settings);
Block input_header = pipeline.getHeader();
pipeline.transform([&input_header, this](OutputPortRawPtrs ports)
{
size_t num_streams = ports.size();
auto notifier = std::make_shared<NotifyProcessor>(input_header, num_streams);
connectAllInputs(ports, notifier->getInputs());
ports_state->tryConnectPorts(notifier->getAuxPorts());
return Processors{notifier};
}, /* check_ports= */ false);
pipeline.addSimpleTransform([&](const Block & header, QueryPipelineBuilder::StreamType stream_type) -> ProcessorPtr
{
if (stream_type != QueryPipelineBuilder::StreamType::Main)

View File

@ -33,10 +33,40 @@ private:
class FilterBySetOnTheFlyStep : public ITransformingStep
{
public:
class PortsState : public boost::noncopyable
{
public:
using PortPair = std::pair<InputPort *, OutputPort *>;
/// Remember ports passed on the first call and connect with ones from second call.
bool tryConnectPorts(PortPair rhs_ports)
{
std::lock_guard<std::mutex> lock(mux);
if (input_port || output_port)
{
assert(input_port && output_port);
connect(*rhs_ports.second, *input_port);
connect(*output_port, *rhs_ports.first);
return true;
}
std::tie(input_port, output_port) = rhs_ports;
return false;
}
private:
std::mutex mux;
InputPort * input_port = nullptr;
OutputPort * output_port = nullptr;
};
using PortsStatePtr = std::shared_ptr<PortsState>;
FilterBySetOnTheFlyStep(
const DataStream & input_stream_,
const Names & column_names_,
SetWithStatePtr set_);
SetWithStatePtr set_,
PortsStatePtr ports_state_);
String getName() const override { return "FilterBySetOnTheFly"; }
void transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & settings) override;
@ -48,7 +78,9 @@ private:
void updateOutputStream() override;
Names column_names;
SetWithStatePtr set;
PortsStatePtr ports_state;
};
}

View File

@ -85,6 +85,13 @@ public:
{
}
StrictResizeProcessor(InputPorts inputs_, OutputPorts outputs_)
: IProcessor(inputs_, outputs_)
, current_input(inputs.begin())
, current_output(outputs.begin())
{
}
String getName() const override { return "StrictResize"; }
Status prepare(const PortNumbers &, const PortNumbers &) override;

View File

@ -770,7 +770,7 @@ void Pipe::setSinks(const Pipe::ProcessorGetterWithStreamKind & getter)
header.clear();
}
void Pipe::transform(const Transformer & transformer)
void Pipe::transform(const Transformer & transformer, bool check_ports)
{
if (output_ports.empty())
throw Exception("Cannot transform empty Pipe", ErrorCodes::LOGICAL_ERROR);
@ -799,6 +799,9 @@ void Pipe::transform(const Transformer & transformer)
{
for (const auto & port : processor->getInputs())
{
if (!check_ports)
break;
if (!port.isConnected())
throw Exception(
ErrorCodes::LOGICAL_ERROR,

View File

@ -91,7 +91,7 @@ public:
using Transformer = std::function<Processors(OutputPortRawPtrs ports)>;
/// Transform Pipe in general way.
void transform(const Transformer & transformer);
void transform(const Transformer & transformer, bool check_ports = true);
/// Unite several pipes together. They should have same header.
static Pipe unitePipes(Pipes pipes);

View File

@ -159,10 +159,10 @@ void QueryPipelineBuilder::addChain(Chain chain)
pipe.addChains(std::move(chains));
}
void QueryPipelineBuilder::transform(const Transformer & transformer)
void QueryPipelineBuilder::transform(const Transformer & transformer, bool check_ports)
{
checkInitializedAndNotCompleted();
pipe.transform(transformer);
pipe.transform(transformer, check_ports);
}
void QueryPipelineBuilder::setSinks(const Pipe::ProcessorGetterWithStreamKind & getter)

View File

@ -69,7 +69,7 @@ public:
using Transformer = std::function<Processors(OutputPortRawPtrs ports)>;
/// Transform pipeline in general way.
void transform(const Transformer & transformer);
void transform(const Transformer & transformer, bool check_ports = true);
/// Add TotalsHavingTransform. Resize pipeline to single input. Adds totals port.
void addTotalsHavingTransform(ProcessorPtr transform);