From 6d5b23de672f8ddc3e0857ab13f82d8bc0c96f46 Mon Sep 17 00:00:00 2001 From: Alexander Kuzmenkov Date: Tue, 15 Dec 2020 03:36:03 +0300 Subject: [PATCH] something works --- src/Core/Block.cpp | 16 ++ src/Interpreters/ActionsDAG.cpp | 2 +- src/Interpreters/ExpressionActions.cpp | 4 + src/Interpreters/ExpressionAnalyzer.cpp | 6 +- src/Interpreters/InterpreterSelectQuery.cpp | 14 +- src/Processors/QueryPlan/WindowStep.cpp | 102 ++++++------ src/Processors/QueryPlan/WindowStep.h | 15 +- .../Transforms/ExpressionTransform.h | 1 - src/Processors/Transforms/WindowTransform.cpp | 149 ++++++++++++++++-- src/Processors/Transforms/WindowTransform.h | 36 ++++- .../RocksDB/StorageEmbeddedRocksDB.cpp | 2 + 11 files changed, 258 insertions(+), 89 deletions(-) diff --git a/src/Core/Block.cpp b/src/Core/Block.cpp index cd2855739e2..014fa001588 100644 --- a/src/Core/Block.cpp +++ b/src/Core/Block.cpp @@ -56,6 +56,10 @@ void Block::insert(size_t position, const ColumnWithTypeAndName & elem) index_by_name.emplace(elem.name, position); data.emplace(data.begin() + position, elem); + +// fmt::print(stderr, "block: {}\n", dumpStructure()); +// fmt::print(stderr, "(1) insert column {} at \n{}\n", elem.name, +// StackTrace().toString()); } void Block::insert(size_t position, ColumnWithTypeAndName && elem) @@ -70,17 +74,29 @@ void Block::insert(size_t position, ColumnWithTypeAndName && elem) index_by_name.emplace(elem.name, position); data.emplace(data.begin() + position, std::move(elem)); + +// fmt::print(stderr, "block: {}\n", dumpStructure()); +// fmt::print(stderr, "(2) insert column {} at \n{}\n", elem.name, +// StackTrace().toString()); } void Block::insert(const ColumnWithTypeAndName & elem) { +// fmt::print(stderr, "block: {}\n", dumpStructure()); +// fmt::print(stderr, "(3) insert column {} at \n{}\n", elem.name, +// StackTrace().toString()); + index_by_name.emplace(elem.name, data.size()); data.emplace_back(elem); } void Block::insert(ColumnWithTypeAndName && elem) { +// fmt::print(stderr, "block: {}\n", dumpStructure()); +// fmt::print(stderr, "(4) insert column {} at \n{}\n", elem.name, +// StackTrace().toString()); + index_by_name.emplace(elem.name, data.size()); data.emplace_back(std::move(elem)); } diff --git a/src/Interpreters/ActionsDAG.cpp b/src/Interpreters/ActionsDAG.cpp index 75915bdad1a..cd6fab40aaf 100644 --- a/src/Interpreters/ActionsDAG.cpp +++ b/src/Interpreters/ActionsDAG.cpp @@ -624,7 +624,7 @@ ActionsDAGPtr ActionsDAG::makeConvertingActions( { auto & input = inputs[res_elem.name]; if (input.empty()) - throw Exception("Cannot find column " + backQuoteIfNeed(res_elem.name) + " in source stream", + throw Exception("Cannot find column '" + backQuoteIfNeed(res_elem.name) + "' in source stream", ErrorCodes::THERE_IS_NO_COLUMN); src_node = actions_dag->inputs[input.front()]; diff --git a/src/Interpreters/ExpressionActions.cpp b/src/Interpreters/ExpressionActions.cpp index f0dcf830c82..4c2ec0e0dc8 100644 --- a/src/Interpreters/ExpressionActions.cpp +++ b/src/Interpreters/ExpressionActions.cpp @@ -373,6 +373,10 @@ static void executeAction(const ExpressionActions::Action & action, ExecutionCon res_column.column = action.node->column->cloneResized(num_rows); res_column.type = action.node->result_type; res_column.name = action.node->result_name; + + + fmt::print(stderr, "execute column action {} at\n{}\n", + action.toString(), StackTrace().toString()); break; } diff --git a/src/Interpreters/ExpressionAnalyzer.cpp b/src/Interpreters/ExpressionAnalyzer.cpp index 0d54f75c588..802004b7ad6 100644 --- a/src/Interpreters/ExpressionAnalyzer.cpp +++ b/src/Interpreters/ExpressionAnalyzer.cpp @@ -1024,14 +1024,14 @@ void SelectQueryExpressionAnalyzer::appendWindowFunctionsArguments( getRootActionsNoMakeSet(f.function_node->arguments, true /* no subqueries */, step.actions()); - // Add column with window function name and value "1". + // Add empty INPUT with window function name. // It is an aggregate function, so it won't be added by getRootActions. ColumnWithTypeAndName col; col.type = std::make_shared(); - col.column = col.type->createColumnConst(1 /* size */, UInt64(1) /* field */); + col.column = col.type->createColumn(); col.name = f.column_name; - step.actions()->addColumn(col); + step.actions()->addInput(col); } // /* diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index 2d6cb64bb7e..446072602e0 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -1788,20 +1788,10 @@ void InterpreterSelectQuery::executeWindow(QueryPlan & query_plan) + w.window_name + "'"); query_plan.addStep(std::move(merging_sorted)); - // Add column with window function name and value "1". - ColumnWithTypeAndName col; - col.type = std::make_shared(); - col.column = col.type->createColumnConst(1 /* size */, UInt64(1) /* field */); - col.name = f.column_name; - - ActionsDAGPtr window_dag = std::make_shared(); - window_dag->addColumn(col); - - fmt::print(stderr, "window dag: {}\n", window_dag->dumpDAG()); - auto window_step = std::make_unique( query_plan.getCurrentDataStream(), - window_dag); + w, + std::vector(1, f)); window_step->setStepDescription("Window step for function '" + f.column_name + "'"); diff --git a/src/Processors/QueryPlan/WindowStep.cpp b/src/Processors/QueryPlan/WindowStep.cpp index 2db6bd3d416..495513c1a2a 100644 --- a/src/Processors/QueryPlan/WindowStep.cpp +++ b/src/Processors/QueryPlan/WindowStep.cpp @@ -1,6 +1,7 @@ #include #include +#include #include #include #include @@ -8,18 +9,18 @@ namespace DB { -static ITransformingStep::Traits getTraits(const ActionsDAGPtr & actions) +static ITransformingStep::Traits getTraits() { return ITransformingStep::Traits { { - .preserves_distinct_columns = !actions->hasArrayJoin(), + .preserves_distinct_columns = true, .returns_single_stream = false, .preserves_number_of_streams = true, - .preserves_sorting = !actions->hasArrayJoin(), + .preserves_sorting = true, }, { - .preserves_number_of_rows = !actions->hasArrayJoin(), + .preserves_number_of_rows = true } }; } @@ -40,71 +41,72 @@ static ITransformingStep::Traits getJoinTraits() }; } +static Block addWindowFunctionColumns(const Block & block, + std::vector window_functions) +{ + fmt::print(stderr, "input header: {}\n", block.dumpStructure()); + + //auto result = block.cloneWithoutColumns(); + auto result = block; + + fmt::print(stderr, "header after clone: {}\n", result.dumpStructure()); + + for (const auto & f : window_functions) + { + ColumnWithTypeAndName column_with_type; + column_with_type.name = f.column_name; + column_with_type.type = f.aggregate_function->getReturnType(); + column_with_type.column = column_with_type.type->createColumn(); + + result.insert(column_with_type); + } + + fmt::print(stderr, "header after insert: {}\n", result.dumpStructure()); + + return result; +} + WindowStep::WindowStep(const DataStream & input_stream_, - ActionsDAGPtr actions_dag_) + const WindowDescription & window_description_, + const std::vector & window_functions_) : ITransformingStep( input_stream_, - Transform::transformHeader(input_stream_.header, - std::make_shared(actions_dag_)), - getTraits(actions_dag_)) - , actions_dag(std::move(actions_dag_)) + addWindowFunctionColumns(input_stream_.header, window_functions_), + getTraits()) + , window_description(window_description_) + , window_functions(window_functions_) + , input_header(input_stream_.header) { /// Some columns may be removed by expression. updateDistinctColumns(output_stream->header, output_stream->distinct_columns); } -void WindowStep::updateInputStream(DataStream input_stream, bool keep_header) -{ - Block out_header = keep_header - ? std::move(output_stream->header) - : Transform::transformHeader(input_stream.header, - std::make_shared(actions_dag)); - - output_stream = createOutputStream( - input_stream, - std::move(out_header), - getDataStreamTraits()); - - input_streams.clear(); - input_streams.emplace_back(std::move(input_stream)); -} - void WindowStep::transformPipeline(QueryPipeline & pipeline) { - auto expression = std::make_shared(actions_dag); - pipeline.addSimpleTransform([&](const Block & header) + pipeline.addSimpleTransform([&](const Block & /*header*/) { - return std::make_shared(header, expression); + return std::make_shared(input_header, + output_stream->header, window_description, window_functions); }); - if (!blocksHaveEqualStructure(pipeline.getHeader(), output_stream->header)) - { - auto convert_actions_dag = ActionsDAG::makeConvertingActions( - pipeline.getHeader().getColumnsWithTypeAndName(), - output_stream->header.getColumnsWithTypeAndName(), - ActionsDAG::MatchColumnsMode::Name); - auto convert_actions = std::make_shared(convert_actions_dag); - - pipeline.addSimpleTransform([&](const Block & header) - { - return std::make_shared(header, convert_actions); - }); - } + assertBlocksHaveEqualStructure(pipeline.getHeader(), output_stream->header, + "WindowStep transform for '" + window_description.window_name + "'"); } void WindowStep::describeActions(FormatSettings & settings) const { String prefix(settings.offset, ' '); - bool first = true; + (void) prefix; + //bool first = true; - auto expression = std::make_shared(actions_dag); - for (const auto & action : expression->getActions()) - { - settings.out << prefix << (first ? "Actions: " - : " "); - first = false; - settings.out << action.toString() << '\n'; - } + //auto expression = std::make_shared(actions_dag); + //for (const auto & action : expression->getActions()) + //{ + // settings.out << prefix << (first ? "Actions: " + // : " "); + // first = false; + // settings.out << action.toString() << '\n'; + //} } } diff --git a/src/Processors/QueryPlan/WindowStep.h b/src/Processors/QueryPlan/WindowStep.h index 4b67e9563db..5bdb14ae5fe 100644 --- a/src/Processors/QueryPlan/WindowStep.h +++ b/src/Processors/QueryPlan/WindowStep.h @@ -1,6 +1,8 @@ #pragma once #include +#include + namespace DB { @@ -14,19 +16,20 @@ class WindowStep : public ITransformingStep public: using Transform = WindowTransform; - explicit WindowStep(const DataStream & input_stream_, ActionsDAGPtr actions_dag_); + explicit WindowStep(const DataStream & input_stream_, + const WindowDescription & window_description_, + const std::vector & window_functions_); + String getName() const override { return "Expression"; } void transformPipeline(QueryPipeline & pipeline) override; - void updateInputStream(DataStream input_stream, bool keep_header); - void describeActions(FormatSettings & settings) const override; - const ActionsDAGPtr & getExpression() const { return actions_dag; } - private: - ActionsDAGPtr actions_dag; + WindowDescription window_description; + std::vector window_functions; + Block input_header; }; } diff --git a/src/Processors/Transforms/ExpressionTransform.h b/src/Processors/Transforms/ExpressionTransform.h index eed8bf089cd..c0c0a63a6a8 100644 --- a/src/Processors/Transforms/ExpressionTransform.h +++ b/src/Processors/Transforms/ExpressionTransform.h @@ -21,7 +21,6 @@ public: String getName() const override { - fmt::print(stderr, "et getname at \n{}\n", StackTrace().toString()); return "ExpressionTransform"; } diff --git a/src/Processors/Transforms/WindowTransform.cpp b/src/Processors/Transforms/WindowTransform.cpp index f4bc660b940..1e166fe1aff 100644 --- a/src/Processors/Transforms/WindowTransform.cpp +++ b/src/Processors/Transforms/WindowTransform.cpp @@ -2,30 +2,157 @@ #include +#include + namespace DB { -Block WindowTransform::transformHeader(Block header, const ExpressionActionsPtr & expression) +WindowTransform::WindowTransform(const Block & input_header_, + const Block & output_header_, + const WindowDescription & window_description_, + const std::vector & window_function_descriptions + ) + // FIXME this is where the output column is added + : ISimpleTransform(input_header_, output_header_, + false /* skip_empty_chunks */) + , input_header(input_header_) + , window_description(window_description_) { - size_t num_rows = header.rows(); - expression->execute(header, num_rows, true); - return header; + fmt::print(stderr, "input header {}\n", input_header.dumpStructure()); + + workspaces.reserve(window_function_descriptions.size()); + for (size_t i = 0; i < window_function_descriptions.size(); ++i) + { + WindowFunctionWorkspace workspace; + workspace.window_function = window_function_descriptions[i]; + + const auto & aggregate_function + = workspace.window_function.aggregate_function; + if (!arena && aggregate_function->allocatesMemoryInArena()) + { + arena = std::make_unique(); + } + + workspace.argument_column_indices.reserve( + workspace.window_function.argument_names.size()); + workspace.argument_columns.reserve( + workspace.window_function.argument_names.size()); + for (const auto & argument_name : workspace.window_function.argument_names) + { + workspace.argument_column_indices.push_back( + input_header.getPositionByName(argument_name)); + + fmt::print(stderr, + "window function '{}' argument column '{}' at '{}'\n", + workspace.window_function.column_name, + argument_name, + workspace.argument_column_indices.back()); + + } + + workspace.aggregate_function_state.reset(aggregate_function->sizeOfData(), + aggregate_function->alignOfData()); + aggregate_function->create(workspace.aggregate_function_state.data()); + + workspaces.push_back(std::move(workspace)); + } + + partition_by_indices.reserve(window_description.partition_by.size()); + for (const auto & column : window_description.partition_by) + { + partition_by_indices.push_back( + input_header.getPositionByName(column.column_name)); + } } - -WindowTransform::WindowTransform(const Block & header_, - ExpressionActionsPtr expression_) - : ISimpleTransform(header_, transformHeader(header_, expression_), false) - , expression(std::move(expression_)) +WindowTransform::~WindowTransform() { + // Some states may be not created yet if the creation failed. + for (size_t i = 0; i < workspaces.size(); i++) + { + workspaces[i].window_function.aggregate_function->destroy( + workspaces[i].aggregate_function_state.data()); + } } void WindowTransform::transform(Chunk & chunk) { - size_t num_rows = chunk.getNumRows(); + const size_t num_rows = chunk.getNumRows(); auto block = getInputPort().getHeader().cloneWithColumns(chunk.detachColumns()); - expression->execute(block, num_rows); + fmt::print(stderr, "block before window transform:\n{}\n", block.dumpStructure()); + + for (auto & workspace : workspaces) + { + workspace.argument_columns.clear(); + for (const auto column_index : workspace.argument_column_indices) + { + fmt::print(stderr, "argument column index '{}'\n", column_index); + workspace.argument_columns.push_back( + block.getColumns()[column_index].get()); + } + } + + for (auto & ws : workspaces) + { + const auto & f = ws.window_function; + const auto * a = f.aggregate_function.get(); + + //* + ColumnWithTypeAndName column_with_type; + column_with_type.name = f.column_name; + column_with_type.type = a->getReturnType(); + auto c = column_with_type.type->createColumn(); + column_with_type.column.reset(c.get()); + + size_t partition_start = 0; + for (size_t row = 0; row < num_rows; row++) + { + // Check whether the new partition has started and reinitialize the + // aggregate function states. + if (row > 0) + { + for (const size_t column_index : partition_by_indices) + { + const auto * column = block.getColumns()[column_index].get(); + if (column->compareAt(row, row - 1, *column, + 1 /* nan_direction_hint */) != 0) + { + ws.window_function.aggregate_function->destroy( + ws.aggregate_function_state.data()); + ws.window_function.aggregate_function->create( + ws.aggregate_function_state.data()); + break; + } + } + } + + // Update the aggregate function state and save the result. + a->add(ws.aggregate_function_state.data(), + ws.argument_columns.data(), + row, + arena.get()); + + a->insertResultInto(ws.aggregate_function_state.data(), + *c, + arena.get()); + } + + block.insert(column_with_type); + /*/ + auto & column_with_type = block.getByName(f.column_name); + auto c = IColumn::mutate(std::move(column_with_type.column)); + + for (size_t i = 0; i < num_rows; i++) + { + c->insert(UInt64(i)); + } + + column_with_type.column.reset(c.get()); + //*/ + } + + fmt::print(stderr, "block after window transform:\n{}\n", block.dumpStructure()); chunk.setColumns(block.getColumns(), num_rows); } diff --git a/src/Processors/Transforms/WindowTransform.h b/src/Processors/Transforms/WindowTransform.h index 08648905eda..58b1ed13ae0 100644 --- a/src/Processors/Transforms/WindowTransform.h +++ b/src/Processors/Transforms/WindowTransform.h @@ -1,12 +1,27 @@ #pragma once #include +#include + +#include + namespace DB { class ExpressionActions; using ExpressionActionsPtr = std::shared_ptr; +class Arena; + +struct WindowFunctionWorkspace +{ + WindowFunctionDescription window_function; + AlignedBuffer aggregate_function_state; + std::vector argument_column_indices; + // Be careful, this is per-chunk. + std::vector argument_columns; +}; + /** Executes a certain expression over the block. * The expression consists of column identifiers from the block, constants, common functions. * For example: hits * 2 + 3, url LIKE '%yandex%' @@ -16,8 +31,12 @@ class WindowTransform : public ISimpleTransform { public: WindowTransform( - const Block & header_, - ExpressionActionsPtr expression_); + const Block & input_header, + const Block & output_header, + const WindowDescription & window_description, + const std::vector & window_functions); + + ~WindowTransform() override; String getName() const override { @@ -26,11 +45,18 @@ public: static Block transformHeader(Block header, const ExpressionActionsPtr & expression); -protected: void transform(Chunk & chunk) override; -private: - ExpressionActionsPtr expression; +public: + Block input_header; + + WindowDescription window_description; + std::vector partition_by_indices; + + std::vector workspaces; + + std::unique_ptr arena; + std::vector aggregate_function_data; }; } diff --git a/src/Storages/RocksDB/StorageEmbeddedRocksDB.cpp b/src/Storages/RocksDB/StorageEmbeddedRocksDB.cpp index ae28205d07b..80b25793806 100644 --- a/src/Storages/RocksDB/StorageEmbeddedRocksDB.cpp +++ b/src/Storages/RocksDB/StorageEmbeddedRocksDB.cpp @@ -3,6 +3,8 @@ #include #include + +#include #include #include