mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-17 21:24:28 +00:00
cleanup
This commit is contained in:
parent
a23a9fd947
commit
fabb37d3c9
@ -58,9 +58,6 @@ static DataTypes convertLowCardinalityTypesToNested(const DataTypes & types)
|
|||||||
AggregateFunctionPtr AggregateFunctionFactory::get(
|
AggregateFunctionPtr AggregateFunctionFactory::get(
|
||||||
const String & name, const DataTypes & argument_types, const Array & parameters, AggregateFunctionProperties & out_properties) const
|
const String & name, const DataTypes & argument_types, const Array & parameters, AggregateFunctionProperties & out_properties) const
|
||||||
{
|
{
|
||||||
fmt::print(stderr, "get aggregate function {} at \n{}\n",
|
|
||||||
name, StackTrace().toString());
|
|
||||||
|
|
||||||
auto type_without_low_cardinality = convertLowCardinalityTypesToNested(argument_types);
|
auto type_without_low_cardinality = convertLowCardinalityTypesToNested(argument_types);
|
||||||
|
|
||||||
/// If one of the types is Nullable, we apply aggregate function combinator "Null".
|
/// If one of the types is Nullable, we apply aggregate function combinator "Null".
|
||||||
|
@ -52,7 +52,6 @@ template <typename T> using AggregateFunctionSumKahan =
|
|||||||
template <template <typename> class Function>
|
template <template <typename> class Function>
|
||||||
AggregateFunctionPtr createAggregateFunctionSum(const std::string & name, const DataTypes & argument_types, const Array & parameters)
|
AggregateFunctionPtr createAggregateFunctionSum(const std::string & name, const DataTypes & argument_types, const Array & parameters)
|
||||||
{
|
{
|
||||||
fmt::print(stderr, "create sum at \n{}\n", StackTrace().toString());
|
|
||||||
assertNoParameters(name, parameters);
|
assertNoParameters(name, parameters);
|
||||||
assertUnary(name, argument_types);
|
assertUnary(name, argument_types);
|
||||||
|
|
||||||
|
@ -56,10 +56,6 @@ void Block::insert(size_t position, const ColumnWithTypeAndName & elem)
|
|||||||
|
|
||||||
index_by_name.emplace(elem.name, position);
|
index_by_name.emplace(elem.name, position);
|
||||||
data.emplace(data.begin() + position, elem);
|
data.emplace(data.begin() + position, elem);
|
||||||
|
|
||||||
// fmt::print(stderr, "block: {}\n", dumpStructure());
|
|
||||||
// fmt::print(stderr, "(1) insert column {} at \n{}\n", elem.name,
|
|
||||||
// StackTrace().toString());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void Block::insert(size_t position, ColumnWithTypeAndName && elem)
|
void Block::insert(size_t position, ColumnWithTypeAndName && elem)
|
||||||
@ -74,29 +70,17 @@ void Block::insert(size_t position, ColumnWithTypeAndName && elem)
|
|||||||
|
|
||||||
index_by_name.emplace(elem.name, position);
|
index_by_name.emplace(elem.name, position);
|
||||||
data.emplace(data.begin() + position, std::move(elem));
|
data.emplace(data.begin() + position, std::move(elem));
|
||||||
|
|
||||||
// fmt::print(stderr, "block: {}\n", dumpStructure());
|
|
||||||
// fmt::print(stderr, "(2) insert column {} at \n{}\n", elem.name,
|
|
||||||
// StackTrace().toString());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void Block::insert(const ColumnWithTypeAndName & elem)
|
void Block::insert(const ColumnWithTypeAndName & elem)
|
||||||
{
|
{
|
||||||
// fmt::print(stderr, "block: {}\n", dumpStructure());
|
|
||||||
// fmt::print(stderr, "(3) insert column {} at \n{}\n", elem.name,
|
|
||||||
// StackTrace().toString());
|
|
||||||
|
|
||||||
index_by_name.emplace(elem.name, data.size());
|
index_by_name.emplace(elem.name, data.size());
|
||||||
data.emplace_back(elem);
|
data.emplace_back(elem);
|
||||||
}
|
}
|
||||||
|
|
||||||
void Block::insert(ColumnWithTypeAndName && elem)
|
void Block::insert(ColumnWithTypeAndName && elem)
|
||||||
{
|
{
|
||||||
// fmt::print(stderr, "block: {}\n", dumpStructure());
|
|
||||||
// fmt::print(stderr, "(4) insert column {} at \n{}\n", elem.name,
|
|
||||||
// StackTrace().toString());
|
|
||||||
|
|
||||||
index_by_name.emplace(elem.name, data.size());
|
index_by_name.emplace(elem.name, data.size());
|
||||||
data.emplace_back(std::move(elem));
|
data.emplace_back(std::move(elem));
|
||||||
}
|
}
|
||||||
|
@ -43,7 +43,6 @@ void registerFunctionBuildId(FunctionFactory &);
|
|||||||
void registerFunctionUptime(FunctionFactory &);
|
void registerFunctionUptime(FunctionFactory &);
|
||||||
void registerFunctionTimeZone(FunctionFactory &);
|
void registerFunctionTimeZone(FunctionFactory &);
|
||||||
void registerFunctionRunningAccumulate(FunctionFactory &);
|
void registerFunctionRunningAccumulate(FunctionFactory &);
|
||||||
void registerFunctionWindow(FunctionFactory &);
|
|
||||||
void registerFunctionRunningDifference(FunctionFactory &);
|
void registerFunctionRunningDifference(FunctionFactory &);
|
||||||
void registerFunctionRunningDifferenceStartingWithFirstValue(FunctionFactory &);
|
void registerFunctionRunningDifferenceStartingWithFirstValue(FunctionFactory &);
|
||||||
void registerFunctionFinalizeAggregation(FunctionFactory &);
|
void registerFunctionFinalizeAggregation(FunctionFactory &);
|
||||||
@ -110,7 +109,6 @@ void registerFunctionsMiscellaneous(FunctionFactory & factory)
|
|||||||
registerFunctionUptime(factory);
|
registerFunctionUptime(factory);
|
||||||
registerFunctionTimeZone(factory);
|
registerFunctionTimeZone(factory);
|
||||||
registerFunctionRunningAccumulate(factory);
|
registerFunctionRunningAccumulate(factory);
|
||||||
registerFunctionWindow(factory);
|
|
||||||
registerFunctionRunningDifference(factory);
|
registerFunctionRunningDifference(factory);
|
||||||
registerFunctionRunningDifferenceStartingWithFirstValue(factory);
|
registerFunctionRunningDifferenceStartingWithFirstValue(factory);
|
||||||
registerFunctionFinalizeAggregation(factory);
|
registerFunctionFinalizeAggregation(factory);
|
||||||
|
@ -75,8 +75,6 @@ public:
|
|||||||
|
|
||||||
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t /*input_rows_count*/) const override
|
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t /*input_rows_count*/) const override
|
||||||
{
|
{
|
||||||
fmt::print(stderr, "executeImpl at \n{}\n", StackTrace().toString());
|
|
||||||
|
|
||||||
const ColumnAggregateFunction * column_with_states
|
const ColumnAggregateFunction * column_with_states
|
||||||
= typeid_cast<const ColumnAggregateFunction *>(&*arguments.at(0).column);
|
= typeid_cast<const ColumnAggregateFunction *>(&*arguments.at(0).column);
|
||||||
|
|
||||||
@ -138,136 +136,6 @@ public:
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
namespace
|
|
||||||
{
|
|
||||||
|
|
||||||
class FunctionWindow : public IFunction
|
|
||||||
{
|
|
||||||
mutable ColumnsWithTypeAndName prev_columns = {};
|
|
||||||
mutable int prev_index = -1;
|
|
||||||
|
|
||||||
mutable AlignedBuffer place;
|
|
||||||
mutable std::unique_ptr<Arena> arena;
|
|
||||||
|
|
||||||
public:
|
|
||||||
static constexpr auto name = "window";
|
|
||||||
static FunctionPtr create(const Context &)
|
|
||||||
{
|
|
||||||
return std::make_shared<FunctionWindow>();
|
|
||||||
}
|
|
||||||
|
|
||||||
String getName() const override
|
|
||||||
{
|
|
||||||
return name;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool isStateful() const override
|
|
||||||
{
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool isVariadic() const override { return true; }
|
|
||||||
|
|
||||||
size_t getNumberOfArguments() const override { return 0; }
|
|
||||||
|
|
||||||
bool isDeterministic() const override { return false; }
|
|
||||||
|
|
||||||
bool isDeterministicInScopeOfQuery() const override
|
|
||||||
{
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
|
|
||||||
{
|
|
||||||
fmt::print(stderr, "getReturnType for {} at \n{}\n",
|
|
||||||
static_cast<const void *>(this), StackTrace().toString());
|
|
||||||
|
|
||||||
if (arguments.empty() || arguments.size() > 3)
|
|
||||||
throw Exception("Incorrect number of arguments of function " + getName() + ". Must be up to 3 (function, PARTITION BY, ORDER BY).",
|
|
||||||
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
|
|
||||||
|
|
||||||
const DataTypeAggregateFunction * type = checkAndGetDataType<DataTypeAggregateFunction>(arguments[0].get());
|
|
||||||
if (!type)
|
|
||||||
throw Exception("Argument for function " + getName() + " must have type AggregateFunction - state of aggregate function.",
|
|
||||||
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
|
|
||||||
|
|
||||||
const auto & agg_func = *type->getFunction();
|
|
||||||
place.reset(agg_func.sizeOfData(), agg_func.alignOfData());
|
|
||||||
|
|
||||||
/// Will pass empty arena if agg_func does not allocate memory in arena
|
|
||||||
arena = agg_func.allocatesMemoryInArena() ? std::make_unique<Arena>() : nullptr;
|
|
||||||
|
|
||||||
|
|
||||||
return type->getReturnType();
|
|
||||||
}
|
|
||||||
|
|
||||||
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t /*input_rows_count*/) const override
|
|
||||||
{
|
|
||||||
fmt::print(stderr, "executeImpl for {} at \n{}\n",
|
|
||||||
static_cast<const void *>(this), StackTrace().toString());
|
|
||||||
|
|
||||||
const ColumnAggregateFunction * column_with_states
|
|
||||||
= typeid_cast<const ColumnAggregateFunction *>(&*arguments.at(0).column);
|
|
||||||
|
|
||||||
if (!column_with_states)
|
|
||||||
throw Exception("Illegal column " + arguments.at(0).column->getName()
|
|
||||||
+ " of first argument of function "
|
|
||||||
+ getName(),
|
|
||||||
ErrorCodes::ILLEGAL_COLUMN);
|
|
||||||
|
|
||||||
ColumnPtr column_with_groups;
|
|
||||||
|
|
||||||
if (arguments.size() >= 2)
|
|
||||||
column_with_groups = arguments[1].column;
|
|
||||||
|
|
||||||
AggregateFunctionPtr aggregate_function_ptr = column_with_states->getAggregateFunction();
|
|
||||||
const IAggregateFunction & agg_func = *aggregate_function_ptr;
|
|
||||||
|
|
||||||
|
|
||||||
auto result_column_ptr = agg_func.getReturnType()->createColumn();
|
|
||||||
IColumn & result_column = *result_column_ptr;
|
|
||||||
result_column.reserve(column_with_states->size());
|
|
||||||
|
|
||||||
const auto & states = column_with_states->getData();
|
|
||||||
|
|
||||||
bool state_created = false;
|
|
||||||
SCOPE_EXIT({
|
|
||||||
if (state_created)
|
|
||||||
agg_func.destroy(place.data());
|
|
||||||
});
|
|
||||||
|
|
||||||
size_t row_number = 0;
|
|
||||||
for (const auto & state_to_add : states)
|
|
||||||
{
|
|
||||||
if (row_number == 0 || (column_with_groups && column_with_groups->compareAt(row_number, row_number - 1, *column_with_groups, 1) != 0))
|
|
||||||
{
|
|
||||||
if (state_created)
|
|
||||||
{
|
|
||||||
agg_func.destroy(place.data());
|
|
||||||
state_created = false;
|
|
||||||
}
|
|
||||||
|
|
||||||
agg_func.create(place.data());
|
|
||||||
state_created = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
agg_func.merge(place.data(), state_to_add, arena.get());
|
|
||||||
agg_func.insertResultInto(place.data(), result_column, arena.get());
|
|
||||||
|
|
||||||
++row_number;
|
|
||||||
}
|
|
||||||
|
|
||||||
return result_column_ptr;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
void registerFunctionWindow(FunctionFactory & factory)
|
|
||||||
{
|
|
||||||
factory.registerFunction<FunctionWindow>();
|
|
||||||
}
|
|
||||||
|
|
||||||
void registerFunctionRunningAccumulate(FunctionFactory & factory)
|
void registerFunctionRunningAccumulate(FunctionFactory & factory)
|
||||||
{
|
{
|
||||||
factory.registerFunction<FunctionRunningAccumulate>();
|
factory.registerFunction<FunctionRunningAccumulate>();
|
||||||
|
@ -624,7 +624,7 @@ ActionsDAGPtr ActionsDAG::makeConvertingActions(
|
|||||||
{
|
{
|
||||||
auto & input = inputs[res_elem.name];
|
auto & input = inputs[res_elem.name];
|
||||||
if (input.empty())
|
if (input.empty())
|
||||||
throw Exception("Cannot find column '" + backQuoteIfNeed(res_elem.name) + "' in source stream",
|
throw Exception("Cannot find column '" + backQuote(res_elem.name) + "' in source stream",
|
||||||
ErrorCodes::THERE_IS_NO_COLUMN);
|
ErrorCodes::THERE_IS_NO_COLUMN);
|
||||||
|
|
||||||
src_node = actions_dag->inputs[input.front()];
|
src_node = actions_dag->inputs[input.front()];
|
||||||
@ -641,12 +641,12 @@ ActionsDAGPtr ActionsDAG::makeConvertingActions(
|
|||||||
if (ignore_constant_values)
|
if (ignore_constant_values)
|
||||||
src_node = const_cast<Node *>(&actions_dag->addColumn(res_elem, true));
|
src_node = const_cast<Node *>(&actions_dag->addColumn(res_elem, true));
|
||||||
else if (res_const->getField() != src_const->getField())
|
else if (res_const->getField() != src_const->getField())
|
||||||
throw Exception("Cannot convert column " + backQuoteIfNeed(res_elem.name) + " because "
|
throw Exception("Cannot convert column " + backQuote(res_elem.name) + " because "
|
||||||
"it is constant but values of constants are different in source and result",
|
"it is constant but values of constants are different in source and result",
|
||||||
ErrorCodes::ILLEGAL_COLUMN);
|
ErrorCodes::ILLEGAL_COLUMN);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
throw Exception("Cannot convert column " + backQuoteIfNeed(res_elem.name) + " because "
|
throw Exception("Cannot convert column " + backQuote(res_elem.name) + " because "
|
||||||
"it is non constant in source stream but must be constant in result",
|
"it is non constant in source stream but must be constant in result",
|
||||||
ErrorCodes::ILLEGAL_COLUMN);
|
ErrorCodes::ILLEGAL_COLUMN);
|
||||||
}
|
}
|
||||||
|
@ -373,10 +373,6 @@ static void executeAction(const ExpressionActions::Action & action, ExecutionCon
|
|||||||
res_column.column = action.node->column->cloneResized(num_rows);
|
res_column.column = action.node->column->cloneResized(num_rows);
|
||||||
res_column.type = action.node->result_type;
|
res_column.type = action.node->result_type;
|
||||||
res_column.name = action.node->result_name;
|
res_column.name = action.node->result_name;
|
||||||
|
|
||||||
|
|
||||||
fmt::print(stderr, "execute column action {} at\n{}\n",
|
|
||||||
action.toString(), StackTrace().toString());
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1004,16 +1004,11 @@ void SelectQueryExpressionAnalyzer::appendAggregateFunctionsArguments(Expression
|
|||||||
if (node->arguments)
|
if (node->arguments)
|
||||||
for (auto & argument : node->arguments->children)
|
for (auto & argument : node->arguments->children)
|
||||||
getRootActions(argument, only_types, step.actions());
|
getRootActions(argument, only_types, step.actions());
|
||||||
|
|
||||||
|
|
||||||
fmt::print(stderr, "actions after appendAggregateFunctionsArguments: \n{} at \n{}\n", chain.dumpChain(), StackTrace().toString());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void SelectQueryExpressionAnalyzer::appendWindowFunctionsArguments(
|
void SelectQueryExpressionAnalyzer::appendWindowFunctionsArguments(
|
||||||
ExpressionActionsChain & chain, bool /* only_types */)
|
ExpressionActionsChain & chain, bool /* only_types */)
|
||||||
{
|
{
|
||||||
fmt::print(stderr, "actions before window: {}\n", chain.dumpChain());
|
|
||||||
|
|
||||||
ExpressionActionsChain::Step & step = chain.lastStep(aggregated_columns);
|
ExpressionActionsChain::Step & step = chain.lastStep(aggregated_columns);
|
||||||
|
|
||||||
for (const auto & f : window_functions)
|
for (const auto & f : window_functions)
|
||||||
@ -1039,28 +1034,6 @@ void SelectQueryExpressionAnalyzer::appendWindowFunctionsArguments(
|
|||||||
step.actions()->addInput(col);
|
step.actions()->addInput(col);
|
||||||
}
|
}
|
||||||
|
|
||||||
// /*
|
|
||||||
// for (const auto & desc : aggregate_descriptions)
|
|
||||||
// for (const auto & name : desc.argument_names)
|
|
||||||
// step.required_output.emplace_back(name);
|
|
||||||
// */
|
|
||||||
|
|
||||||
// const auto * select_query = getSelectQuery();
|
|
||||||
// /// Collect aggregates removing duplicates by node.getColumnName()
|
|
||||||
// /// It's not clear why we recollect aggregates (for query parts) while we're able to use previously collected ones (for entire query)
|
|
||||||
// /// @note The original recollection logic didn't remove duplicates.
|
|
||||||
// GetAggregatesVisitor::Data data;
|
|
||||||
// GetAggregatesVisitor(data).visit(select_query->select());
|
|
||||||
|
|
||||||
// // 1) just add everything there is in the AST,
|
|
||||||
// for (const ASTFunction * node : data.window_functions)
|
|
||||||
// {
|
|
||||||
// for (auto & argument : node->arguments->children)
|
|
||||||
// {
|
|
||||||
// getRootActions(argument, only_types, step.actions());
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
|
|
||||||
// 2) mark the columns that are really required:
|
// 2) mark the columns that are really required:
|
||||||
for (const auto & f : window_functions)
|
for (const auto & f : window_functions)
|
||||||
{
|
{
|
||||||
@ -1081,8 +1054,6 @@ void SelectQueryExpressionAnalyzer::appendWindowFunctionsArguments(
|
|||||||
step.required_output.push_back(c.column_name);
|
step.required_output.push_back(c.column_name);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fmt::print(stderr, "actions after window: {}\n", chain.dumpChain());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
bool SelectQueryExpressionAnalyzer::appendHaving(ExpressionActionsChain & chain, bool only_types)
|
bool SelectQueryExpressionAnalyzer::appendHaving(ExpressionActionsChain & chain, bool only_types)
|
||||||
@ -1092,8 +1063,6 @@ bool SelectQueryExpressionAnalyzer::appendHaving(ExpressionActionsChain & chain,
|
|||||||
if (!select_query->having())
|
if (!select_query->having())
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
fmt::print(stderr, "has having\n");
|
|
||||||
|
|
||||||
ExpressionActionsChain::Step & step = chain.lastStep(aggregated_columns);
|
ExpressionActionsChain::Step & step = chain.lastStep(aggregated_columns);
|
||||||
|
|
||||||
getRootActionsForHaving(select_query->having(), only_types, step.actions());
|
getRootActionsForHaving(select_query->having(), only_types, step.actions());
|
||||||
@ -1267,8 +1236,6 @@ ActionsDAGPtr SelectQueryExpressionAnalyzer::appendProjectResult(ExpressionActio
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fmt::print(stderr, "chain before last projection: {}\n",
|
|
||||||
chain.dumpChain());
|
|
||||||
auto actions = chain.getLastActions();
|
auto actions = chain.getLastActions();
|
||||||
actions->project(result_columns);
|
actions->project(result_columns);
|
||||||
return actions;
|
return actions;
|
||||||
@ -1473,8 +1440,6 @@ ExpressionAnalysisResult::ExpressionAnalysisResult(
|
|||||||
chain.addStep();
|
chain.addStep();
|
||||||
}
|
}
|
||||||
|
|
||||||
fmt::print(stderr, "chain before aggregate: {}\n", chain.dumpChain());
|
|
||||||
|
|
||||||
if (need_aggregate)
|
if (need_aggregate)
|
||||||
{
|
{
|
||||||
/// TODO correct conditions
|
/// TODO correct conditions
|
||||||
@ -1483,29 +1448,18 @@ ExpressionAnalysisResult::ExpressionAnalysisResult(
|
|||||||
&& storage && query.groupBy();
|
&& storage && query.groupBy();
|
||||||
|
|
||||||
query_analyzer.appendGroupBy(chain, only_types || !first_stage, optimize_aggregation_in_order, group_by_elements_actions);
|
query_analyzer.appendGroupBy(chain, only_types || !first_stage, optimize_aggregation_in_order, group_by_elements_actions);
|
||||||
|
|
||||||
fmt::print(stderr, "chain after appendGroupBy: {}\n", chain.dumpChain());
|
|
||||||
|
|
||||||
query_analyzer.appendAggregateFunctionsArguments(chain, only_types || !first_stage);
|
query_analyzer.appendAggregateFunctionsArguments(chain, only_types || !first_stage);
|
||||||
|
|
||||||
fmt::print(stderr, "chain after appendAggregateFunctionsArguments: {}\n", chain.dumpChain());
|
|
||||||
|
|
||||||
before_aggregation = chain.getLastActions();
|
before_aggregation = chain.getLastActions();
|
||||||
|
|
||||||
finalize_chain(chain);
|
finalize_chain(chain);
|
||||||
|
|
||||||
fmt::print(stderr, "chain after finalize_chain: {}\n", chain.dumpChain());
|
|
||||||
|
|
||||||
if (query_analyzer.appendHaving(chain, only_types || !second_stage))
|
if (query_analyzer.appendHaving(chain, only_types || !second_stage))
|
||||||
{
|
{
|
||||||
fmt::print(stderr, "chain after appendHaving: {}\n", chain.dumpChain());
|
|
||||||
before_having = chain.getLastActions();
|
before_having = chain.getLastActions();
|
||||||
chain.addStep();
|
chain.addStep();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fmt::print(stderr, "chain after aggregate: {}\n", chain.dumpChain());
|
|
||||||
|
|
||||||
bool join_allow_read_in_order = true;
|
bool join_allow_read_in_order = true;
|
||||||
if (hasJoin())
|
if (hasJoin())
|
||||||
{
|
{
|
||||||
@ -1521,22 +1475,10 @@ ExpressionAnalysisResult::ExpressionAnalysisResult(
|
|||||||
&& !query.final()
|
&& !query.final()
|
||||||
&& join_allow_read_in_order;
|
&& join_allow_read_in_order;
|
||||||
|
|
||||||
/*
|
|
||||||
if (has_window)
|
|
||||||
{
|
|
||||||
query_analyzer.appendWindowFunctionsArguments(chain, only_types || !first_stage);
|
|
||||||
before_window = chain.getLastActions();
|
|
||||||
|
|
||||||
finalize_chain(chain);
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
|
|
||||||
/// If there is aggregation, we execute expressions in SELECT and ORDER BY on the initiating server, otherwise on the source servers.
|
/// If there is aggregation, we execute expressions in SELECT and ORDER BY on the initiating server, otherwise on the source servers.
|
||||||
query_analyzer.appendSelect(chain, only_types || (need_aggregate ? !second_stage : !first_stage));
|
query_analyzer.appendSelect(chain, only_types || (need_aggregate ? !second_stage : !first_stage));
|
||||||
fmt::print(stderr, "chain after select: {}\n", chain.dumpChain());
|
|
||||||
|
|
||||||
query_analyzer.appendWindowFunctionsArguments(chain, only_types || !first_stage);
|
query_analyzer.appendWindowFunctionsArguments(chain, only_types || !first_stage);
|
||||||
fmt::print(stderr, "chain after window: {}\n", chain.dumpChain());
|
|
||||||
|
|
||||||
selected_columns = chain.getLastStep().required_output;
|
selected_columns = chain.getLastStep().required_output;
|
||||||
has_order_by = query.orderBy() != nullptr;
|
has_order_by = query.orderBy() != nullptr;
|
||||||
@ -1546,10 +1488,6 @@ ExpressionAnalysisResult::ExpressionAnalysisResult(
|
|||||||
optimize_read_in_order,
|
optimize_read_in_order,
|
||||||
order_by_elements_actions);
|
order_by_elements_actions);
|
||||||
|
|
||||||
fmt::print(stderr, "chain after order by: {}\n", chain.dumpChain());
|
|
||||||
|
|
||||||
//if (h
|
|
||||||
|
|
||||||
if (query_analyzer.appendLimitBy(chain, only_types || !second_stage))
|
if (query_analyzer.appendLimitBy(chain, only_types || !second_stage))
|
||||||
{
|
{
|
||||||
before_limit_by = chain.getLastActions();
|
before_limit_by = chain.getLastActions();
|
||||||
@ -1565,10 +1503,6 @@ ExpressionAnalysisResult::ExpressionAnalysisResult(
|
|||||||
removeExtraColumns();
|
removeExtraColumns();
|
||||||
|
|
||||||
checkActions();
|
checkActions();
|
||||||
|
|
||||||
fmt::print(stderr, "ExpressionAnalysisResult created at \n{}\n",
|
|
||||||
StackTrace().toString());
|
|
||||||
fmt::print(stderr, "ExpressionAnalysisResult: \n{}\n", dump());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void ExpressionAnalysisResult::finalize(const ExpressionActionsChain & chain, size_t where_step_num)
|
void ExpressionAnalysisResult::finalize(const ExpressionActionsChain & chain, size_t where_step_num)
|
||||||
|
@ -958,7 +958,6 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, const BlockInpu
|
|||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
/// FIXME calculate windows here
|
|
||||||
executeExpression(query_plan, expressions.before_order_and_select, "Before ORDER BY and SELECT");
|
executeExpression(query_plan, expressions.before_order_and_select, "Before ORDER BY and SELECT");
|
||||||
executeWindow(query_plan);
|
executeWindow(query_plan);
|
||||||
executeDistinct(query_plan, true, expressions.selected_columns, true);
|
executeDistinct(query_plan, true, expressions.selected_columns, true);
|
||||||
@ -1006,7 +1005,6 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, const BlockInpu
|
|||||||
else if (expressions.hasHaving())
|
else if (expressions.hasHaving())
|
||||||
executeHaving(query_plan, expressions.before_having);
|
executeHaving(query_plan, expressions.before_having);
|
||||||
|
|
||||||
/// FIXME calculate windows here
|
|
||||||
executeExpression(query_plan, expressions.before_order_and_select, "Before ORDER BY and SELECT");
|
executeExpression(query_plan, expressions.before_order_and_select, "Before ORDER BY and SELECT");
|
||||||
executeWindow(query_plan);
|
executeWindow(query_plan);
|
||||||
executeDistinct(query_plan, true, expressions.selected_columns, true);
|
executeDistinct(query_plan, true, expressions.selected_columns, true);
|
||||||
@ -1748,10 +1746,6 @@ void InterpreterSelectQuery::executeWindow(QueryPlan & query_plan)
|
|||||||
{
|
{
|
||||||
const auto & w = query_analyzer->window_descriptions[f.window_name];
|
const auto & w = query_analyzer->window_descriptions[f.window_name];
|
||||||
|
|
||||||
fmt::print(stderr, "window function '{}' over window '{}'\n",
|
|
||||||
f.column_name, f.window_name);
|
|
||||||
fmt::print(stderr, "{}\n{}\n", f.dump(), w.dump());
|
|
||||||
|
|
||||||
const Settings & settings = context->getSettingsRef();
|
const Settings & settings = context->getSettingsRef();
|
||||||
|
|
||||||
auto partial_sorting = std::make_unique<PartialSortingStep>(
|
auto partial_sorting = std::make_unique<PartialSortingStep>(
|
||||||
|
@ -443,7 +443,7 @@ std::vector<const ASTFunction *> getAggregates(ASTPtr & query, const ASTSelectQu
|
|||||||
|
|
||||||
std::vector<const ASTFunction *> getWindowFunctions(ASTPtr & query, const ASTSelectQuery & select_query)
|
std::vector<const ASTFunction *> getWindowFunctions(ASTPtr & query, const ASTSelectQuery & select_query)
|
||||||
{
|
{
|
||||||
/// There can not be aggregate functions inside the WHERE and PREWHERE.
|
/// There can not be window functions inside the WHERE and PREWHERE.
|
||||||
if (select_query.where())
|
if (select_query.where())
|
||||||
assertNoWindows(select_query.where(), "in WHERE");
|
assertNoWindows(select_query.where(), "in WHERE");
|
||||||
if (select_query.prewhere())
|
if (select_query.prewhere())
|
||||||
@ -452,7 +452,7 @@ std::vector<const ASTFunction *> getWindowFunctions(ASTPtr & query, const ASTSel
|
|||||||
GetAggregatesVisitor::Data data;
|
GetAggregatesVisitor::Data data;
|
||||||
GetAggregatesVisitor(data).visit(query);
|
GetAggregatesVisitor(data).visit(query);
|
||||||
|
|
||||||
/// There can not be other aggregate functions within the aggregate functions.
|
/// There can not be other window functions within the aggregate functions.
|
||||||
for (const ASTFunction * node : data.window_functions)
|
for (const ASTFunction * node : data.window_functions)
|
||||||
{
|
{
|
||||||
if (node->arguments)
|
if (node->arguments)
|
||||||
@ -465,10 +465,6 @@ std::vector<const ASTFunction *> getWindowFunctions(ASTPtr & query, const ASTSel
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fmt::print(stderr, "getWindowFunctions ({}) for \n{}\n at \n{}\n",
|
|
||||||
data.window_functions.size(), query->formatForErrorMessage(),
|
|
||||||
StackTrace().toString());
|
|
||||||
|
|
||||||
return data.window_functions;
|
return data.window_functions;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -485,8 +481,6 @@ TreeRewriterResult::TreeRewriterResult(
|
|||||||
{
|
{
|
||||||
collectSourceColumns(add_special);
|
collectSourceColumns(add_special);
|
||||||
is_remote_storage = storage && storage->isRemote();
|
is_remote_storage = storage && storage->isRemote();
|
||||||
|
|
||||||
fmt::print(stderr, "TreeRewriterResult created at \n{}\n", StackTrace().toString());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Add columns from storage to source_columns list. Deduplicate resulted list.
|
/// Add columns from storage to source_columns list. Deduplicate resulted list.
|
||||||
|
@ -44,13 +44,8 @@ static ITransformingStep::Traits getJoinTraits()
|
|||||||
static Block addWindowFunctionColumns(const Block & block,
|
static Block addWindowFunctionColumns(const Block & block,
|
||||||
std::vector<WindowFunctionDescription> window_functions)
|
std::vector<WindowFunctionDescription> window_functions)
|
||||||
{
|
{
|
||||||
fmt::print(stderr, "input header: {}\n", block.dumpStructure());
|
|
||||||
|
|
||||||
//auto result = block.cloneWithoutColumns();
|
|
||||||
auto result = block;
|
auto result = block;
|
||||||
|
|
||||||
fmt::print(stderr, "header after clone: {}\n", result.dumpStructure());
|
|
||||||
|
|
||||||
for (const auto & f : window_functions)
|
for (const auto & f : window_functions)
|
||||||
{
|
{
|
||||||
ColumnWithTypeAndName column_with_type;
|
ColumnWithTypeAndName column_with_type;
|
||||||
@ -61,8 +56,6 @@ static Block addWindowFunctionColumns(const Block & block,
|
|||||||
result.insert(column_with_type);
|
result.insert(column_with_type);
|
||||||
}
|
}
|
||||||
|
|
||||||
fmt::print(stderr, "header after insert: {}\n", result.dumpStructure());
|
|
||||||
|
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -97,16 +90,7 @@ void WindowStep::describeActions(FormatSettings & settings) const
|
|||||||
{
|
{
|
||||||
String prefix(settings.offset, ' ');
|
String prefix(settings.offset, ' ');
|
||||||
(void) prefix;
|
(void) prefix;
|
||||||
//bool first = true;
|
/// FIXME add some printing
|
||||||
|
|
||||||
//auto expression = std::make_shared<ExpressionActions>(actions_dag);
|
|
||||||
//for (const auto & action : expression->getActions())
|
|
||||||
//{
|
|
||||||
// settings.out << prefix << (first ? "Actions: "
|
|
||||||
// : " ");
|
|
||||||
// first = false;
|
|
||||||
// settings.out << action.toString() << '\n';
|
|
||||||
//}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -63,9 +63,6 @@ FilterTransform::FilterTransform(
|
|||||||
auto & column = transformed_header.getByPosition(filter_column_position).column;
|
auto & column = transformed_header.getByPosition(filter_column_position).column;
|
||||||
if (column)
|
if (column)
|
||||||
constant_filter_description = ConstantFilterDescription(*column);
|
constant_filter_description = ConstantFilterDescription(*column);
|
||||||
|
|
||||||
fmt::print(stderr, "FilterTransform created at \n{}\n",
|
|
||||||
StackTrace().toString());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
IProcessor::Status FilterTransform::prepare()
|
IProcessor::Status FilterTransform::prepare()
|
||||||
|
@ -12,14 +12,11 @@ WindowTransform::WindowTransform(const Block & input_header_,
|
|||||||
const WindowDescription & window_description_,
|
const WindowDescription & window_description_,
|
||||||
const std::vector<WindowFunctionDescription> & window_function_descriptions
|
const std::vector<WindowFunctionDescription> & window_function_descriptions
|
||||||
)
|
)
|
||||||
// FIXME this is where the output column is added
|
|
||||||
: ISimpleTransform(input_header_, output_header_,
|
: ISimpleTransform(input_header_, output_header_,
|
||||||
false /* skip_empty_chunks */)
|
false /* skip_empty_chunks */)
|
||||||
, input_header(input_header_)
|
, input_header(input_header_)
|
||||||
, window_description(window_description_)
|
, window_description(window_description_)
|
||||||
{
|
{
|
||||||
fmt::print(stderr, "input header {}\n", input_header.dumpStructure());
|
|
||||||
|
|
||||||
workspaces.reserve(window_function_descriptions.size());
|
workspaces.reserve(window_function_descriptions.size());
|
||||||
for (size_t i = 0; i < window_function_descriptions.size(); ++i)
|
for (size_t i = 0; i < window_function_descriptions.size(); ++i)
|
||||||
{
|
{
|
||||||
@ -42,12 +39,6 @@ WindowTransform::WindowTransform(const Block & input_header_,
|
|||||||
workspace.argument_column_indices.push_back(
|
workspace.argument_column_indices.push_back(
|
||||||
input_header.getPositionByName(argument_name));
|
input_header.getPositionByName(argument_name));
|
||||||
|
|
||||||
fmt::print(stderr,
|
|
||||||
"window function '{}' argument column '{}' at '{}'\n",
|
|
||||||
workspace.window_function.column_name,
|
|
||||||
argument_name,
|
|
||||||
workspace.argument_column_indices.back());
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
workspace.aggregate_function_state.reset(aggregate_function->sizeOfData(),
|
workspace.aggregate_function_state.reset(aggregate_function->sizeOfData(),
|
||||||
@ -80,14 +71,11 @@ void WindowTransform::transform(Chunk & chunk)
|
|||||||
const size_t num_rows = chunk.getNumRows();
|
const size_t num_rows = chunk.getNumRows();
|
||||||
auto block = getInputPort().getHeader().cloneWithColumns(chunk.detachColumns());
|
auto block = getInputPort().getHeader().cloneWithColumns(chunk.detachColumns());
|
||||||
|
|
||||||
fmt::print(stderr, "block before window transform:\n{}\n", block.dumpStructure());
|
|
||||||
|
|
||||||
for (auto & workspace : workspaces)
|
for (auto & workspace : workspaces)
|
||||||
{
|
{
|
||||||
workspace.argument_columns.clear();
|
workspace.argument_columns.clear();
|
||||||
for (const auto column_index : workspace.argument_column_indices)
|
for (const auto column_index : workspace.argument_column_indices)
|
||||||
{
|
{
|
||||||
fmt::print(stderr, "argument column index '{}'\n", column_index);
|
|
||||||
workspace.argument_columns.push_back(
|
workspace.argument_columns.push_back(
|
||||||
block.getColumns()[column_index].get());
|
block.getColumns()[column_index].get());
|
||||||
}
|
}
|
||||||
@ -105,7 +93,6 @@ void WindowTransform::transform(Chunk & chunk)
|
|||||||
auto c = column_with_type.type->createColumn();
|
auto c = column_with_type.type->createColumn();
|
||||||
column_with_type.column.reset(c.get());
|
column_with_type.column.reset(c.get());
|
||||||
|
|
||||||
size_t partition_start = 0;
|
|
||||||
for (size_t row = 0; row < num_rows; row++)
|
for (size_t row = 0; row < num_rows; row++)
|
||||||
{
|
{
|
||||||
// Check whether the new partition has started and reinitialize the
|
// Check whether the new partition has started and reinitialize the
|
||||||
@ -152,8 +139,6 @@ void WindowTransform::transform(Chunk & chunk)
|
|||||||
//*/
|
//*/
|
||||||
}
|
}
|
||||||
|
|
||||||
fmt::print(stderr, "block after window transform:\n{}\n", block.dumpStructure());
|
|
||||||
|
|
||||||
chunk.setColumns(block.getColumns(), num_rows);
|
chunk.setColumns(block.getColumns(), num_rows);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user