something works

This commit is contained in:
Alexander Kuzmenkov 2020-12-15 03:36:03 +03:00
parent dea446be2f
commit 6d5b23de67
11 changed files with 258 additions and 89 deletions

View File

@ -56,6 +56,10 @@ void Block::insert(size_t position, const ColumnWithTypeAndName & elem)
index_by_name.emplace(elem.name, position); index_by_name.emplace(elem.name, position);
data.emplace(data.begin() + position, elem); data.emplace(data.begin() + position, elem);
// fmt::print(stderr, "block: {}\n", dumpStructure());
// fmt::print(stderr, "(1) insert column {} at \n{}\n", elem.name,
// StackTrace().toString());
} }
void Block::insert(size_t position, ColumnWithTypeAndName && elem) void Block::insert(size_t position, ColumnWithTypeAndName && elem)
@ -70,17 +74,29 @@ void Block::insert(size_t position, ColumnWithTypeAndName && elem)
index_by_name.emplace(elem.name, position); index_by_name.emplace(elem.name, position);
data.emplace(data.begin() + position, std::move(elem)); data.emplace(data.begin() + position, std::move(elem));
// fmt::print(stderr, "block: {}\n", dumpStructure());
// fmt::print(stderr, "(2) insert column {} at \n{}\n", elem.name,
// StackTrace().toString());
} }
void Block::insert(const ColumnWithTypeAndName & elem) void Block::insert(const ColumnWithTypeAndName & elem)
{ {
// fmt::print(stderr, "block: {}\n", dumpStructure());
// fmt::print(stderr, "(3) insert column {} at \n{}\n", elem.name,
// StackTrace().toString());
index_by_name.emplace(elem.name, data.size()); index_by_name.emplace(elem.name, data.size());
data.emplace_back(elem); data.emplace_back(elem);
} }
void Block::insert(ColumnWithTypeAndName && elem) void Block::insert(ColumnWithTypeAndName && elem)
{ {
// fmt::print(stderr, "block: {}\n", dumpStructure());
// fmt::print(stderr, "(4) insert column {} at \n{}\n", elem.name,
// StackTrace().toString());
index_by_name.emplace(elem.name, data.size()); index_by_name.emplace(elem.name, data.size());
data.emplace_back(std::move(elem)); data.emplace_back(std::move(elem));
} }

View File

@ -624,7 +624,7 @@ ActionsDAGPtr ActionsDAG::makeConvertingActions(
{ {
auto & input = inputs[res_elem.name]; auto & input = inputs[res_elem.name];
if (input.empty()) if (input.empty())
throw Exception("Cannot find column " + backQuoteIfNeed(res_elem.name) + " in source stream", throw Exception("Cannot find column '" + backQuoteIfNeed(res_elem.name) + "' in source stream",
ErrorCodes::THERE_IS_NO_COLUMN); ErrorCodes::THERE_IS_NO_COLUMN);
src_node = actions_dag->inputs[input.front()]; src_node = actions_dag->inputs[input.front()];

View File

@ -373,6 +373,10 @@ static void executeAction(const ExpressionActions::Action & action, ExecutionCon
res_column.column = action.node->column->cloneResized(num_rows); res_column.column = action.node->column->cloneResized(num_rows);
res_column.type = action.node->result_type; res_column.type = action.node->result_type;
res_column.name = action.node->result_name; res_column.name = action.node->result_name;
fmt::print(stderr, "execute column action {} at\n{}\n",
action.toString(), StackTrace().toString());
break; break;
} }

View File

@ -1024,14 +1024,14 @@ void SelectQueryExpressionAnalyzer::appendWindowFunctionsArguments(
getRootActionsNoMakeSet(f.function_node->arguments, getRootActionsNoMakeSet(f.function_node->arguments,
true /* no subqueries */, step.actions()); true /* no subqueries */, step.actions());
// Add column with window function name and value "1". // Add empty INPUT with window function name.
// It is an aggregate function, so it won't be added by getRootActions. // It is an aggregate function, so it won't be added by getRootActions.
ColumnWithTypeAndName col; ColumnWithTypeAndName col;
col.type = std::make_shared<DataTypeInt64>(); col.type = std::make_shared<DataTypeInt64>();
col.column = col.type->createColumnConst(1 /* size */, UInt64(1) /* field */); col.column = col.type->createColumn();
col.name = f.column_name; col.name = f.column_name;
step.actions()->addColumn(col); step.actions()->addInput(col);
} }
// /* // /*

View File

@ -1788,20 +1788,10 @@ void InterpreterSelectQuery::executeWindow(QueryPlan & query_plan)
+ w.window_name + "'"); + w.window_name + "'");
query_plan.addStep(std::move(merging_sorted)); query_plan.addStep(std::move(merging_sorted));
// Add column with window function name and value "1".
ColumnWithTypeAndName col;
col.type = std::make_shared<DataTypeInt64>();
col.column = col.type->createColumnConst(1 /* size */, UInt64(1) /* field */);
col.name = f.column_name;
ActionsDAGPtr window_dag = std::make_shared<ActionsDAG>();
window_dag->addColumn(col);
fmt::print(stderr, "window dag: {}\n", window_dag->dumpDAG());
auto window_step = std::make_unique<WindowStep>( auto window_step = std::make_unique<WindowStep>(
query_plan.getCurrentDataStream(), query_plan.getCurrentDataStream(),
window_dag); w,
std::vector<WindowFunctionDescription>(1, f));
window_step->setStepDescription("Window step for function '" window_step->setStepDescription("Window step for function '"
+ f.column_name + "'"); + f.column_name + "'");

View File

@ -1,6 +1,7 @@
#include <Processors/QueryPlan/WindowStep.h> #include <Processors/QueryPlan/WindowStep.h>
#include <Processors/Transforms/WindowTransform.h> #include <Processors/Transforms/WindowTransform.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <Processors/QueryPipeline.h> #include <Processors/QueryPipeline.h>
#include <Interpreters/ExpressionActions.h> #include <Interpreters/ExpressionActions.h>
#include <IO/Operators.h> #include <IO/Operators.h>
@ -8,18 +9,18 @@
namespace DB namespace DB
{ {
static ITransformingStep::Traits getTraits(const ActionsDAGPtr & actions) static ITransformingStep::Traits getTraits()
{ {
return ITransformingStep::Traits return ITransformingStep::Traits
{ {
{ {
.preserves_distinct_columns = !actions->hasArrayJoin(), .preserves_distinct_columns = true,
.returns_single_stream = false, .returns_single_stream = false,
.preserves_number_of_streams = true, .preserves_number_of_streams = true,
.preserves_sorting = !actions->hasArrayJoin(), .preserves_sorting = true,
}, },
{ {
.preserves_number_of_rows = !actions->hasArrayJoin(), .preserves_number_of_rows = true
} }
}; };
} }
@ -40,71 +41,72 @@ static ITransformingStep::Traits getJoinTraits()
}; };
} }
static Block addWindowFunctionColumns(const Block & block,
std::vector<WindowFunctionDescription> window_functions)
{
fmt::print(stderr, "input header: {}\n", block.dumpStructure());
//auto result = block.cloneWithoutColumns();
auto result = block;
fmt::print(stderr, "header after clone: {}\n", result.dumpStructure());
for (const auto & f : window_functions)
{
ColumnWithTypeAndName column_with_type;
column_with_type.name = f.column_name;
column_with_type.type = f.aggregate_function->getReturnType();
column_with_type.column = column_with_type.type->createColumn();
result.insert(column_with_type);
}
fmt::print(stderr, "header after insert: {}\n", result.dumpStructure());
return result;
}
WindowStep::WindowStep(const DataStream & input_stream_, WindowStep::WindowStep(const DataStream & input_stream_,
ActionsDAGPtr actions_dag_) const WindowDescription & window_description_,
const std::vector<WindowFunctionDescription> & window_functions_)
: ITransformingStep( : ITransformingStep(
input_stream_, input_stream_,
Transform::transformHeader(input_stream_.header, addWindowFunctionColumns(input_stream_.header, window_functions_),
std::make_shared<ExpressionActions>(actions_dag_)), getTraits())
getTraits(actions_dag_)) , window_description(window_description_)
, actions_dag(std::move(actions_dag_)) , window_functions(window_functions_)
, input_header(input_stream_.header)
{ {
/// Some columns may be removed by expression. /// Some columns may be removed by expression.
updateDistinctColumns(output_stream->header, output_stream->distinct_columns); updateDistinctColumns(output_stream->header, output_stream->distinct_columns);
} }
void WindowStep::updateInputStream(DataStream input_stream, bool keep_header)
{
Block out_header = keep_header
? std::move(output_stream->header)
: Transform::transformHeader(input_stream.header,
std::make_shared<ExpressionActions>(actions_dag));
output_stream = createOutputStream(
input_stream,
std::move(out_header),
getDataStreamTraits());
input_streams.clear();
input_streams.emplace_back(std::move(input_stream));
}
void WindowStep::transformPipeline(QueryPipeline & pipeline) void WindowStep::transformPipeline(QueryPipeline & pipeline)
{ {
auto expression = std::make_shared<ExpressionActions>(actions_dag); pipeline.addSimpleTransform([&](const Block & /*header*/)
pipeline.addSimpleTransform([&](const Block & header)
{ {
return std::make_shared<Transform>(header, expression); return std::make_shared<Transform>(input_header,
output_stream->header, window_description, window_functions);
}); });
if (!blocksHaveEqualStructure(pipeline.getHeader(), output_stream->header)) assertBlocksHaveEqualStructure(pipeline.getHeader(), output_stream->header,
{ "WindowStep transform for '" + window_description.window_name + "'");
auto convert_actions_dag = ActionsDAG::makeConvertingActions(
pipeline.getHeader().getColumnsWithTypeAndName(),
output_stream->header.getColumnsWithTypeAndName(),
ActionsDAG::MatchColumnsMode::Name);
auto convert_actions = std::make_shared<ExpressionActions>(convert_actions_dag);
pipeline.addSimpleTransform([&](const Block & header)
{
return std::make_shared<ExpressionTransform>(header, convert_actions);
});
}
} }
void WindowStep::describeActions(FormatSettings & settings) const void WindowStep::describeActions(FormatSettings & settings) const
{ {
String prefix(settings.offset, ' '); String prefix(settings.offset, ' ');
bool first = true; (void) prefix;
//bool first = true;
auto expression = std::make_shared<ExpressionActions>(actions_dag); //auto expression = std::make_shared<ExpressionActions>(actions_dag);
for (const auto & action : expression->getActions()) //for (const auto & action : expression->getActions())
{ //{
settings.out << prefix << (first ? "Actions: " // settings.out << prefix << (first ? "Actions: "
: " "); // : " ");
first = false; // first = false;
settings.out << action.toString() << '\n'; // settings.out << action.toString() << '\n';
} //}
} }
} }

View File

@ -1,6 +1,8 @@
#pragma once #pragma once
#include <Processors/QueryPlan/ITransformingStep.h> #include <Processors/QueryPlan/ITransformingStep.h>
#include <Interpreters/AggregateDescription.h>
namespace DB namespace DB
{ {
@ -14,19 +16,20 @@ class WindowStep : public ITransformingStep
public: public:
using Transform = WindowTransform; using Transform = WindowTransform;
explicit WindowStep(const DataStream & input_stream_, ActionsDAGPtr actions_dag_); explicit WindowStep(const DataStream & input_stream_,
const WindowDescription & window_description_,
const std::vector<WindowFunctionDescription> & window_functions_);
String getName() const override { return "Expression"; } String getName() const override { return "Expression"; }
void transformPipeline(QueryPipeline & pipeline) override; void transformPipeline(QueryPipeline & pipeline) override;
void updateInputStream(DataStream input_stream, bool keep_header);
void describeActions(FormatSettings & settings) const override; void describeActions(FormatSettings & settings) const override;
const ActionsDAGPtr & getExpression() const { return actions_dag; }
private: private:
ActionsDAGPtr actions_dag; WindowDescription window_description;
std::vector<WindowFunctionDescription> window_functions;
Block input_header;
}; };
} }

View File

@ -21,7 +21,6 @@ public:
String getName() const override String getName() const override
{ {
fmt::print(stderr, "et getname at \n{}\n", StackTrace().toString());
return "ExpressionTransform"; return "ExpressionTransform";
} }

View File

@ -2,30 +2,157 @@
#include <Interpreters/ExpressionActions.h> #include <Interpreters/ExpressionActions.h>
#include <Common/Arena.h>
namespace DB namespace DB
{ {
Block WindowTransform::transformHeader(Block header, const ExpressionActionsPtr & expression) WindowTransform::WindowTransform(const Block & input_header_,
const Block & output_header_,
const WindowDescription & window_description_,
const std::vector<WindowFunctionDescription> & window_function_descriptions
)
// FIXME this is where the output column is added
: ISimpleTransform(input_header_, output_header_,
false /* skip_empty_chunks */)
, input_header(input_header_)
, window_description(window_description_)
{ {
size_t num_rows = header.rows(); fmt::print(stderr, "input header {}\n", input_header.dumpStructure());
expression->execute(header, num_rows, true);
return header; workspaces.reserve(window_function_descriptions.size());
for (size_t i = 0; i < window_function_descriptions.size(); ++i)
{
WindowFunctionWorkspace workspace;
workspace.window_function = window_function_descriptions[i];
const auto & aggregate_function
= workspace.window_function.aggregate_function;
if (!arena && aggregate_function->allocatesMemoryInArena())
{
arena = std::make_unique<Arena>();
} }
workspace.argument_column_indices.reserve(
WindowTransform::WindowTransform(const Block & header_, workspace.window_function.argument_names.size());
ExpressionActionsPtr expression_) workspace.argument_columns.reserve(
: ISimpleTransform(header_, transformHeader(header_, expression_), false) workspace.window_function.argument_names.size());
, expression(std::move(expression_)) for (const auto & argument_name : workspace.window_function.argument_names)
{ {
workspace.argument_column_indices.push_back(
input_header.getPositionByName(argument_name));
fmt::print(stderr,
"window function '{}' argument column '{}' at '{}'\n",
workspace.window_function.column_name,
argument_name,
workspace.argument_column_indices.back());
}
workspace.aggregate_function_state.reset(aggregate_function->sizeOfData(),
aggregate_function->alignOfData());
aggregate_function->create(workspace.aggregate_function_state.data());
workspaces.push_back(std::move(workspace));
}
partition_by_indices.reserve(window_description.partition_by.size());
for (const auto & column : window_description.partition_by)
{
partition_by_indices.push_back(
input_header.getPositionByName(column.column_name));
}
}
WindowTransform::~WindowTransform()
{
// Some states may be not created yet if the creation failed.
for (size_t i = 0; i < workspaces.size(); i++)
{
workspaces[i].window_function.aggregate_function->destroy(
workspaces[i].aggregate_function_state.data());
}
} }
void WindowTransform::transform(Chunk & chunk) void WindowTransform::transform(Chunk & chunk)
{ {
size_t num_rows = chunk.getNumRows(); const size_t num_rows = chunk.getNumRows();
auto block = getInputPort().getHeader().cloneWithColumns(chunk.detachColumns()); auto block = getInputPort().getHeader().cloneWithColumns(chunk.detachColumns());
expression->execute(block, num_rows); fmt::print(stderr, "block before window transform:\n{}\n", block.dumpStructure());
for (auto & workspace : workspaces)
{
workspace.argument_columns.clear();
for (const auto column_index : workspace.argument_column_indices)
{
fmt::print(stderr, "argument column index '{}'\n", column_index);
workspace.argument_columns.push_back(
block.getColumns()[column_index].get());
}
}
for (auto & ws : workspaces)
{
const auto & f = ws.window_function;
const auto * a = f.aggregate_function.get();
//*
ColumnWithTypeAndName column_with_type;
column_with_type.name = f.column_name;
column_with_type.type = a->getReturnType();
auto c = column_with_type.type->createColumn();
column_with_type.column.reset(c.get());
size_t partition_start = 0;
for (size_t row = 0; row < num_rows; row++)
{
// Check whether the new partition has started and reinitialize the
// aggregate function states.
if (row > 0)
{
for (const size_t column_index : partition_by_indices)
{
const auto * column = block.getColumns()[column_index].get();
if (column->compareAt(row, row - 1, *column,
1 /* nan_direction_hint */) != 0)
{
ws.window_function.aggregate_function->destroy(
ws.aggregate_function_state.data());
ws.window_function.aggregate_function->create(
ws.aggregate_function_state.data());
break;
}
}
}
// Update the aggregate function state and save the result.
a->add(ws.aggregate_function_state.data(),
ws.argument_columns.data(),
row,
arena.get());
a->insertResultInto(ws.aggregate_function_state.data(),
*c,
arena.get());
}
block.insert(column_with_type);
/*/
auto & column_with_type = block.getByName(f.column_name);
auto c = IColumn::mutate(std::move(column_with_type.column));
for (size_t i = 0; i < num_rows; i++)
{
c->insert(UInt64(i));
}
column_with_type.column.reset(c.get());
//*/
}
fmt::print(stderr, "block after window transform:\n{}\n", block.dumpStructure());
chunk.setColumns(block.getColumns(), num_rows); chunk.setColumns(block.getColumns(), num_rows);
} }

View File

@ -1,12 +1,27 @@
#pragma once #pragma once
#include <Processors/ISimpleTransform.h> #include <Processors/ISimpleTransform.h>
#include <Interpreters/AggregateDescription.h>
#include <Common/AlignedBuffer.h>
namespace DB namespace DB
{ {
class ExpressionActions; class ExpressionActions;
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>; using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
class Arena;
struct WindowFunctionWorkspace
{
WindowFunctionDescription window_function;
AlignedBuffer aggregate_function_state;
std::vector<size_t> argument_column_indices;
// Be careful, this is per-chunk.
std::vector<const IColumn *> argument_columns;
};
/** Executes a certain expression over the block. /** Executes a certain expression over the block.
* The expression consists of column identifiers from the block, constants, common functions. * The expression consists of column identifiers from the block, constants, common functions.
* For example: hits * 2 + 3, url LIKE '%yandex%' * For example: hits * 2 + 3, url LIKE '%yandex%'
@ -16,8 +31,12 @@ class WindowTransform : public ISimpleTransform
{ {
public: public:
WindowTransform( WindowTransform(
const Block & header_, const Block & input_header,
ExpressionActionsPtr expression_); const Block & output_header,
const WindowDescription & window_description,
const std::vector<WindowFunctionDescription> & window_functions);
~WindowTransform() override;
String getName() const override String getName() const override
{ {
@ -26,11 +45,18 @@ public:
static Block transformHeader(Block header, const ExpressionActionsPtr & expression); static Block transformHeader(Block header, const ExpressionActionsPtr & expression);
protected:
void transform(Chunk & chunk) override; void transform(Chunk & chunk) override;
private: public:
ExpressionActionsPtr expression; Block input_header;
WindowDescription window_description;
std::vector<size_t> partition_by_indices;
std::vector<WindowFunctionWorkspace> workspaces;
std::unique_ptr<Arena> arena;
std::vector<AlignedBuffer> aggregate_function_data;
}; };
} }

View File

@ -3,6 +3,8 @@
#include <Storages/RocksDB/EmbeddedRocksDBBlockInputStream.h> #include <Storages/RocksDB/EmbeddedRocksDBBlockInputStream.h>
#include <DataTypes/DataTypesNumber.h> #include <DataTypes/DataTypesNumber.h>
#include <Storages/SelectQueryInfo.h>
#include <Storages/StorageFactory.h> #include <Storages/StorageFactory.h>
#include <Parsers/ASTSelectQuery.h> #include <Parsers/ASTSelectQuery.h>