forgot new files

This commit is contained in:
Alexander Kuzmenkov 2020-12-11 01:16:58 +03:00
parent 88e7bc5b60
commit 6f991725c3
4 changed files with 211 additions and 0 deletions

View File

@ -0,0 +1,110 @@
#include <Processors/QueryPlan/WindowStep.h>
#include <Processors/Transforms/WindowTransform.h>
#include <Processors/QueryPipeline.h>
#include <Interpreters/ExpressionActions.h>
#include <IO/Operators.h>
namespace DB
{
static ITransformingStep::Traits getTraits(const ActionsDAGPtr & actions)
{
return ITransformingStep::Traits
{
{
.preserves_distinct_columns = !actions->hasArrayJoin(),
.returns_single_stream = false,
.preserves_number_of_streams = true,
.preserves_sorting = !actions->hasArrayJoin(),
},
{
.preserves_number_of_rows = !actions->hasArrayJoin(),
}
};
}
static ITransformingStep::Traits getJoinTraits()
{
return ITransformingStep::Traits
{
{
.preserves_distinct_columns = false,
.returns_single_stream = false,
.preserves_number_of_streams = true,
.preserves_sorting = false,
},
{
.preserves_number_of_rows = false,
}
};
}
WindowStep::WindowStep(const DataStream & input_stream_,
ActionsDAGPtr actions_dag_)
: ITransformingStep(
input_stream_,
Transform::transformHeader(input_stream_.header,
std::make_shared<ExpressionActions>(actions_dag_)),
getTraits(actions_dag_))
, actions_dag(std::move(actions_dag_))
{
/// Some columns may be removed by expression.
updateDistinctColumns(output_stream->header, output_stream->distinct_columns);
}
void WindowStep::updateInputStream(DataStream input_stream, bool keep_header)
{
Block out_header = keep_header
? std::move(output_stream->header)
: Transform::transformHeader(input_stream.header,
std::make_shared<ExpressionActions>(actions_dag));
output_stream = createOutputStream(
input_stream,
std::move(out_header),
getDataStreamTraits());
input_streams.clear();
input_streams.emplace_back(std::move(input_stream));
}
void WindowStep::transformPipeline(QueryPipeline & pipeline)
{
auto expression = std::make_shared<ExpressionActions>(actions_dag);
pipeline.addSimpleTransform([&](const Block & header)
{
return std::make_shared<Transform>(header, expression);
});
if (!blocksHaveEqualStructure(pipeline.getHeader(), output_stream->header))
{
auto convert_actions_dag = ActionsDAG::makeConvertingActions(
pipeline.getHeader().getColumnsWithTypeAndName(),
output_stream->header.getColumnsWithTypeAndName(),
ActionsDAG::MatchColumnsMode::Name);
auto convert_actions = std::make_shared<ExpressionActions>(convert_actions_dag);
pipeline.addSimpleTransform([&](const Block & header)
{
return std::make_shared<ExpressionTransform>(header, convert_actions);
});
}
}
void WindowStep::describeActions(FormatSettings & settings) const
{
String prefix(settings.offset, ' ');
bool first = true;
auto expression = std::make_shared<ExpressionActions>(actions_dag);
for (const auto & action : expression->getActions())
{
settings.out << prefix << (first ? "Actions: "
: " ");
first = false;
settings.out << action.toString() << '\n';
}
}
}

View File

@ -0,0 +1,32 @@
#pragma once
#include <Processors/QueryPlan/ITransformingStep.h>
namespace DB
{
class ActionsDAG;
using ActionsDAGPtr = std::shared_ptr<ActionsDAG>;
class WindowTransform;
class WindowStep : public ITransformingStep
{
public:
using Transform = WindowTransform;
explicit WindowStep(const DataStream & input_stream_, ActionsDAGPtr actions_dag_);
String getName() const override { return "Expression"; }
void transformPipeline(QueryPipeline & pipeline) override;
void updateInputStream(DataStream input_stream, bool keep_header);
void describeActions(FormatSettings & settings) const override;
const ActionsDAGPtr & getExpression() const { return actions_dag; }
private:
ActionsDAGPtr actions_dag;
};
}

View File

@ -0,0 +1,33 @@
#include <Processors/Transforms/WindowTransform.h>
#include <Interpreters/ExpressionActions.h>
namespace DB
{
Block WindowTransform::transformHeader(Block header, const ExpressionActionsPtr & expression)
{
size_t num_rows = header.rows();
expression->execute(header, num_rows, true);
return header;
}
WindowTransform::WindowTransform(const Block & header_,
ExpressionActionsPtr expression_)
: ISimpleTransform(header_, transformHeader(header_, expression_), false)
, expression(std::move(expression_))
{
}
void WindowTransform::transform(Chunk & chunk)
{
size_t num_rows = chunk.getNumRows();
auto block = getInputPort().getHeader().cloneWithColumns(chunk.detachColumns());
expression->execute(block, num_rows);
chunk.setColumns(block.getColumns(), num_rows);
}
}

View File

@ -0,0 +1,36 @@
#pragma once
#include <Processors/ISimpleTransform.h>
namespace DB
{
class ExpressionActions;
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
/** Executes a certain expression over the block.
* The expression consists of column identifiers from the block, constants, common functions.
* For example: hits * 2 + 3, url LIKE '%yandex%'
* The expression processes each row independently of the others.
*/
class WindowTransform : public ISimpleTransform
{
public:
WindowTransform(
const Block & header_,
ExpressionActionsPtr expression_);
String getName() const override
{
return "WindowTransform";
}
static Block transformHeader(Block header, const ExpressionActionsPtr & expression);
protected:
void transform(Chunk & chunk) override;
private:
ExpressionActionsPtr expression;
};
}