add staleness to sql

This commit is contained in:
Mikhail Artemenko 2024-10-25 17:58:43 +00:00
parent 6f8e953bd9
commit a7b23292f9
12 changed files with 115 additions and 7 deletions

View File

@ -498,6 +498,8 @@ QueryTreeNodePtr QueryTreeBuilder::buildSortList(const ASTPtr & order_by_express
sort_node->getFillTo() = buildExpression(order_by_element.getFillTo(), context);
if (order_by_element.getFillStep())
sort_node->getFillStep() = buildExpression(order_by_element.getFillStep(), context);
if (order_by_element.getFillStaleness())
sort_node->getFillStaleness() = buildExpression(order_by_element.getFillStaleness(), context);
list_node->getNodes().push_back(std::move(sort_node));
}

View File

@ -432,8 +432,13 @@ ProjectionName QueryAnalyzer::calculateWindowProjectionName(const QueryTreeNodeP
return buffer.str();
}
ProjectionName QueryAnalyzer::calculateSortColumnProjectionName(const QueryTreeNodePtr & sort_column_node, const ProjectionName & sort_expression_projection_name,
const ProjectionName & fill_from_expression_projection_name, const ProjectionName & fill_to_expression_projection_name, const ProjectionName & fill_step_expression_projection_name)
ProjectionName QueryAnalyzer::calculateSortColumnProjectionName(
const QueryTreeNodePtr & sort_column_node,
const ProjectionName & sort_expression_projection_name,
const ProjectionName & fill_from_expression_projection_name,
const ProjectionName & fill_to_expression_projection_name,
const ProjectionName & fill_step_expression_projection_name,
const ProjectionName & fill_staleness_expression_projection_name)
{
auto & sort_node_typed = sort_column_node->as<SortNode &>();
@ -463,6 +468,9 @@ ProjectionName QueryAnalyzer::calculateSortColumnProjectionName(const QueryTreeN
if (sort_node_typed.hasFillStep())
sort_column_projection_name_buffer << " STEP " << fill_step_expression_projection_name;
if (sort_node_typed.hasFillStaleness())
sort_column_projection_name_buffer << " STALENESS " << fill_staleness_expression_projection_name;
}
return sort_column_projection_name_buffer.str();
@ -3993,6 +4001,7 @@ ProjectionNames QueryAnalyzer::resolveSortNodeList(QueryTreeNodePtr & sort_node_
ProjectionNames fill_from_expression_projection_names;
ProjectionNames fill_to_expression_projection_names;
ProjectionNames fill_step_expression_projection_names;
ProjectionNames fill_staleness_expression_projection_names;
auto & sort_node_list_typed = sort_node_list->as<ListNode &>();
for (auto & node : sort_node_list_typed.getNodes())
@ -4083,11 +4092,38 @@ ProjectionNames QueryAnalyzer::resolveSortNodeList(QueryTreeNodePtr & sort_node_
fill_step_expression_projection_names_size);
}
if (sort_node.hasFillStaleness())
{
fill_staleness_expression_projection_names = resolveExpressionNode(sort_node.getFillStaleness(), scope, false /*allow_lambda_expression*/, false /*allow_table_expression*/);
const auto * constant_node = sort_node.getFillStaleness()->as<ConstantNode>();
if (!constant_node)
throw Exception(ErrorCodes::INVALID_WITH_FILL_EXPRESSION,
"Sort FILL STALENESS expression must be constant with numeric or interval type. Actual {}. In scope {}",
sort_node.getFillStaleness()->formatASTForErrorMessage(),
scope.scope_node->formatASTForErrorMessage());
bool is_number = isColumnedAsNumber(constant_node->getResultType());
bool is_interval = WhichDataType(constant_node->getResultType()).isInterval();
if (!is_number && !is_interval)
throw Exception(ErrorCodes::INVALID_WITH_FILL_EXPRESSION,
"Sort FILL STALENESS expression must be constant with numeric or interval type. Actual {}. In scope {}",
sort_node.getFillStaleness()->formatASTForErrorMessage(),
scope.scope_node->formatASTForErrorMessage());
size_t fill_staleness_expression_projection_names_size = fill_staleness_expression_projection_names.size();
if (fill_staleness_expression_projection_names_size != 1)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Sort FILL STALENESS expression expected 1 projection name. Actual {}",
fill_staleness_expression_projection_names_size);
}
auto sort_column_projection_name = calculateSortColumnProjectionName(node,
sort_expression_projection_names[0],
fill_from_expression_projection_names.empty() ? "" : fill_from_expression_projection_names.front(),
fill_to_expression_projection_names.empty() ? "" : fill_to_expression_projection_names.front(),
fill_step_expression_projection_names.empty() ? "" : fill_step_expression_projection_names.front());
fill_step_expression_projection_names.empty() ? "" : fill_step_expression_projection_names.front(),
fill_staleness_expression_projection_names.empty() ? "" : fill_staleness_expression_projection_names.front());
result_projection_names.push_back(std::move(sort_column_projection_name));
@ -4095,6 +4131,7 @@ ProjectionNames QueryAnalyzer::resolveSortNodeList(QueryTreeNodePtr & sort_node_
fill_from_expression_projection_names.clear();
fill_to_expression_projection_names.clear();
fill_step_expression_projection_names.clear();
fill_staleness_expression_projection_names.clear();
}
return result_projection_names;

View File

@ -140,7 +140,8 @@ private:
const ProjectionName & sort_expression_projection_name,
const ProjectionName & fill_from_expression_projection_name,
const ProjectionName & fill_to_expression_projection_name,
const ProjectionName & fill_step_expression_projection_name);
const ProjectionName & fill_step_expression_projection_name,
const ProjectionName & fill_staleness_expression_projection_name);
QueryTreeNodePtr tryGetLambdaFromSQLUserDefinedFunctions(const std::string & function_name, ContextPtr context);

View File

@ -69,6 +69,12 @@ void SortNode::dumpTreeImpl(WriteBuffer & buffer, FormatState & format_state, si
buffer << '\n' << std::string(indent + 2, ' ') << "FILL STEP\n";
getFillStep()->dumpTreeImpl(buffer, format_state, indent + 4);
}
if (hasFillStaleness())
{
buffer << '\n' << std::string(indent + 2, ' ') << "FILL STALENESS\n";
getFillStaleness()->dumpTreeImpl(buffer, format_state, indent + 4);
}
}
bool SortNode::isEqualImpl(const IQueryTreeNode & rhs, CompareOptions) const
@ -132,6 +138,8 @@ ASTPtr SortNode::toASTImpl(const ConvertToASTOptions & options) const
result->setFillTo(getFillTo()->toAST(options));
if (hasFillStep())
result->setFillStep(getFillStep()->toAST(options));
if (hasFillStaleness())
result->setFillStaleness(getFillStaleness()->toAST(options));
return result;
}

View File

@ -105,6 +105,24 @@ public:
return children[fill_step_child_index];
}
/// Returns true if sort node has fill step, false otherwise
bool hasFillStaleness() const
{
return children[fill_staleness_child_index] != nullptr;
}
/// Get fill step
const QueryTreeNodePtr & getFillStaleness() const
{
return children[fill_staleness_child_index];
}
/// Get fill step
QueryTreeNodePtr & getFillStaleness()
{
return children[fill_staleness_child_index];
}
/// Get collator
const std::shared_ptr<Collator> & getCollator() const
{
@ -144,7 +162,8 @@ private:
static constexpr size_t fill_from_child_index = 1;
static constexpr size_t fill_to_child_index = 2;
static constexpr size_t fill_step_child_index = 3;
static constexpr size_t children_size = fill_step_child_index + 1;
static constexpr size_t fill_staleness_child_index = 4;
static constexpr size_t children_size = fill_staleness_child_index + 1;
SortDirection sort_direction = SortDirection::ASCENDING;
std::optional<SortDirection> nulls_sort_direction;

View File

@ -54,6 +54,11 @@ void ASTOrderByElement::formatImpl(const FormatSettings & settings, FormatState
settings.ostr << (settings.hilite ? hilite_keyword : "") << " STEP " << (settings.hilite ? hilite_none : "");
fill_step->formatImpl(settings, state, frame);
}
if (auto fill_staleness = getFillStaleness())
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << " STALENESS " << (settings.hilite ? hilite_none : "");
fill_staleness->formatImpl(settings, state, frame);
}
}
}

View File

@ -18,6 +18,7 @@ private:
FILL_FROM,
FILL_TO,
FILL_STEP,
FILL_STALENESS,
};
public:
@ -32,12 +33,14 @@ public:
void setFillFrom(ASTPtr node) { setChild(Child::FILL_FROM, node); }
void setFillTo(ASTPtr node) { setChild(Child::FILL_TO, node); }
void setFillStep(ASTPtr node) { setChild(Child::FILL_STEP, node); }
void setFillStaleness(ASTPtr node) { setChild(Child::FILL_STALENESS, node); }
/** Collation for locale-specific string comparison. If empty, then sorting done by bytes. */
ASTPtr getCollation() const { return getChild(Child::COLLATION); }
ASTPtr getFillFrom() const { return getChild(Child::FILL_FROM); }
ASTPtr getFillTo() const { return getChild(Child::FILL_TO); }
ASTPtr getFillStep() const { return getChild(Child::FILL_STEP); }
ASTPtr getFillStaleness() const { return getChild(Child::FILL_STALENESS); }
String getID(char) const override { return "OrderByElement"; }

View File

@ -541,6 +541,7 @@ namespace DB
MR_MACROS(YY, "YY") \
MR_MACROS(YYYY, "YYYY") \
MR_MACROS(ZKPATH, "ZKPATH") \
MR_MACROS(STALENESS, "STALENESS") \
/// The list of keywords where underscore is intentional
#define APPLY_FOR_PARSER_KEYWORDS_WITH_UNDERSCORES(MR_MACROS) \

View File

@ -2178,6 +2178,7 @@ bool ParserOrderByElement::parseImpl(Pos & pos, ASTPtr & node, Expected & expect
ParserKeyword from(Keyword::FROM);
ParserKeyword to(Keyword::TO);
ParserKeyword step(Keyword::STEP);
ParserKeyword staleness(Keyword::STALENESS);
ParserStringLiteral collate_locale_parser;
ParserExpressionWithOptionalAlias exp_parser(false);
@ -2219,6 +2220,7 @@ bool ParserOrderByElement::parseImpl(Pos & pos, ASTPtr & node, Expected & expect
ASTPtr fill_from;
ASTPtr fill_to;
ASTPtr fill_step;
ASTPtr fill_staleness;
if (with_fill.ignore(pos, expected))
{
has_with_fill = true;
@ -2230,6 +2232,9 @@ bool ParserOrderByElement::parseImpl(Pos & pos, ASTPtr & node, Expected & expect
if (step.ignore(pos, expected) && !exp_parser.parse(pos, fill_step, expected))
return false;
if (staleness.ignore(pos, expected) && !exp_parser.parse(pos, fill_staleness, expected))
return false;
}
auto elem = std::make_shared<ASTOrderByElement>();
@ -2244,6 +2249,7 @@ bool ParserOrderByElement::parseImpl(Pos & pos, ASTPtr & node, Expected & expect
elem->setFillFrom(fill_from);
elem->setFillTo(fill_to);
elem->setFillStep(fill_step);
elem->setFillStaleness(fill_staleness);
node = elem;

View File

@ -847,6 +847,9 @@ void addWithFillStepIfNeeded(QueryPlan & query_plan,
interpolate_description = std::make_shared<InterpolateDescription>(std::move(interpolate_actions_dag), empty_aliases);
}
if (interpolate_description)
LOG_DEBUG(getLogger("addWithFillStepIfNeeded"), "InterpolateDescription: {}", interpolate_description->actions.dumpDAG());
const auto & query_context = planner_context->getQueryContext();
const Settings & settings = query_context->getSettingsRef();
auto filling_step = std::make_unique<FillingStep>(

View File

@ -391,6 +391,9 @@ public:
if (sort_node.hasFillStep())
buffer << " STEP " << calculateActionNodeName(sort_node.getFillStep());
if (sort_node.hasFillStaleness())
buffer << " STALENESS " << calculateActionNodeName(sort_node.getFillStaleness());
}
if (i + 1 != order_by_nodes_size)

View File

@ -43,7 +43,7 @@ std::pair<Field, DataTypePtr> extractWithFillValue(const QueryTreeNodePtr & node
return result;
}
std::pair<Field, std::optional<IntervalKind>> extractWithFillStepValue(const QueryTreeNodePtr & node)
std::pair<Field, std::optional<IntervalKind>> extractWithFillValueWithIntervalKind(const QueryTreeNodePtr & node)
{
const auto & constant_node = node->as<ConstantNode &>();
@ -77,7 +77,7 @@ FillColumnDescription extractWithFillDescription(const SortNode & sort_node)
if (sort_node.hasFillStep())
{
auto extract_result = extractWithFillStepValue(sort_node.getFillStep());
auto extract_result = extractWithFillValueWithIntervalKind(sort_node.getFillStep());
fill_column_description.fill_step = std::move(extract_result.first);
fill_column_description.step_kind = std::move(extract_result.second);
}
@ -87,10 +87,30 @@ FillColumnDescription extractWithFillDescription(const SortNode & sort_node)
fill_column_description.fill_step = Field(direction_value);
}
if (sort_node.getFillStaleness())
{
auto extract_result = extractWithFillValueWithIntervalKind(sort_node.getFillStaleness());
fill_column_description.fill_staleness = std::move(extract_result.first);
fill_column_description.staleness_kind = std::move(extract_result.second);
}
///////////////////////////////////
if (applyVisitor(FieldVisitorAccurateEquals(), fill_column_description.fill_step, Field{0}))
throw Exception(ErrorCodes::INVALID_WITH_FILL_EXPRESSION,
"WITH FILL STEP value cannot be zero");
if (sort_node.hasFillStaleness())
{
if (sort_node.hasFillFrom())
throw Exception(ErrorCodes::INVALID_WITH_FILL_EXPRESSION,
"WITH FILL STALENESS cannot be used together with WITH FILL FROM");
if (applyVisitor(FieldVisitorAccurateLessOrEqual(), fill_column_description.fill_staleness, Field{0}))
throw Exception(ErrorCodes::INVALID_WITH_FILL_EXPRESSION,
"WITH FILL STALENESS value cannot be less or equal zero");
}
if (sort_node.getSortDirection() == SortDirection::ASCENDING)
{
if (applyVisitor(FieldVisitorAccurateLess(), fill_column_description.fill_step, Field{0}))