From 5c558d0be9003174512dfb709c3c92d3d43537ca Mon Sep 17 00:00:00 2001 From: zvonand Date: Fri, 27 May 2022 15:07:22 +0300 Subject: [PATCH] old work upload --- src/Common/IntervalKind.cpp | 14 ++ src/Common/IntervalKind.h | 1 + src/Processors/Transforms/WindowTransform.cpp | 157 ++++++++---------- 3 files changed, 82 insertions(+), 90 deletions(-) diff --git a/src/Common/IntervalKind.cpp b/src/Common/IntervalKind.cpp index 1478b832282..5a32341365c 100644 --- a/src/Common/IntervalKind.cpp +++ b/src/Common/IntervalKind.cpp @@ -7,6 +7,7 @@ namespace DB namespace ErrorCodes { extern const int SYNTAX_ERROR; + extern const int NOT_IMPLEMENTED; } Int32 IntervalKind::toAvgSeconds() const @@ -28,6 +29,19 @@ Int32 IntervalKind::toAvgSeconds() const __builtin_unreachable(); } +UInt64 IntervalKind::toAvgNanoseconds() const +{ + switch (kind) + { + case IntervalKind::Nanosecond: return 1; + case IntervalKind::Microsecond: return 1000; + case IntervalKind::Millisecond: return 1000000; + case IntervalKind::Second: return 1 * 1000000000; + default: + throw Exception("Only sub-second interval can be converted to nanoseconds", ErrorCodes::NOT_IMPLEMENTED); + } +} + IntervalKind IntervalKind::fromAvgSeconds(Int64 num_seconds) { if (num_seconds) diff --git a/src/Common/IntervalKind.h b/src/Common/IntervalKind.h index d5f2b5672cd..41a7e318e82 100644 --- a/src/Common/IntervalKind.h +++ b/src/Common/IntervalKind.h @@ -32,6 +32,7 @@ struct IntervalKind /// Returns number of seconds in one interval. /// For `Month`, `Quarter` and `Year` the function returns an average number of seconds. Int32 toAvgSeconds() const; + UInt64 toAvgNanoseconds() const; /// Chooses an interval kind based on number of seconds. /// For example, `IntervalKind::fromAvgSeconds(3600)` returns `IntervalKind::Hour`. diff --git a/src/Processors/Transforms/WindowTransform.cpp b/src/Processors/Transforms/WindowTransform.cpp index 994a9067cb8..0d73868369d 100644 --- a/src/Processors/Transforms/WindowTransform.cpp +++ b/src/Processors/Transforms/WindowTransform.cpp @@ -11,6 +11,7 @@ #include #include #include +#include #include #include @@ -2086,64 +2087,40 @@ struct WindowFunctionLagLeadInFrame final : public WindowFunction }; -struct WindowFunctionNonNegativeDerivative final : public WindowFunction +// nonNegativeDerivative(metric_column, timestamp_column[, INTERVAL 1 SECOND]) +struct WindowFunctionNonNegativeDerivative final : public RecurrentWindowFunction<0> { + static constexpr size_t ARGUMENT_METRIC = 0; + static constexpr size_t ARGUMENT_TIMESTAMP = 1; + static constexpr size_t ARGUMENT_INTERVAL = 2; + WindowFunctionNonNegativeDerivative(const std::string & name_, - const DataTypes & argument_types_, const Array & parameters_) - : WindowFunction(name_, argument_types_, parameters_) + const DataTypes & argument_types_, const Array & parameters_) + : RecurrentWindowFunction(name_, argument_types_, parameters_) { - if (!parameters.empty()) + if (argument_types.size() != 2 && argument_types.size() != 3) { throw Exception(ErrorCodes::BAD_ARGUMENTS, - "Function {} cannot be parameterized", name_); + "Function {} takes 2 or 3 arguments", name_); } - if (argument_types.empty()) + if (!isNumber(argument_types[ARGUMENT_METRIC])) { throw Exception(ErrorCodes::BAD_ARGUMENTS, - "Function {} takes at least one argument", name_); + "Argument {} must be a number, '{}' given", + ARGUMENT_METRIC, + argument_types[ARGUMENT_METRIC]->getName()); } - if (argument_types.size() == 1) - { - return; - } - - if (!isInt64OrUInt64FieldType(argument_types[1]->getDefault().getType())) + if (!isNumber(argument_types[ARGUMENT_TIMESTAMP]) && !isDateTime(argument_types[ARGUMENT_TIMESTAMP]) && !isDateTime64(argument_types[ARGUMENT_TIMESTAMP])) { throw Exception(ErrorCodes::BAD_ARGUMENTS, - "Offset must be an integer, '{}' given", - argument_types[1]->getName()); + "Argument {} must be DateTime, DateTime64 or a number, '{}' given", + ARGUMENT_TIMESTAMP, + argument_types[ARGUMENT_TIMESTAMP]->getName()); } + interval_length = applyVisitor(FieldVisitorConvertToNumber(), parameters_[0].get()); - if (argument_types.size() == 2) - { - return; - } - - const auto supertype = getLeastSupertype(DataTypes{argument_types[0], argument_types[2]}); - if (!supertype) - { - throw Exception(ErrorCodes::BAD_ARGUMENTS, - "There is no supertype for the argument type '{}' and the default value type '{}'", - argument_types[0]->getName(), - argument_types[2]->getName()); - } - if (!argument_types[0]->equals(*supertype)) - { - throw Exception(ErrorCodes::BAD_ARGUMENTS, - "The supertype '{}' for the argument type '{}' and the default value type '{}' is not the same as the argument type", - supertype->getName(), - argument_types[0]->getName(), - argument_types[2]->getName()); - } - - if (argument_types.size() > 3) - { - throw Exception(ErrorCodes::BAD_ARGUMENTS, - "Function '{}' accepts at most 3 arguments, {} given", - name, argument_types.size()); - } } DataTypePtr getReturnType() const override { return argument_types[0]; } @@ -2153,58 +2130,58 @@ struct WindowFunctionNonNegativeDerivative final : public WindowFunction void windowInsertResultInto(const WindowTransform * transform, size_t function_index) override { - const auto & current_block = transform->blockAt(transform->current_row); - IColumn & to = *current_block.output_columns[function_index]; - const auto & workspace = transform->workspaces[function_index]; +// const auto & current_block = transform->blockAt(transform->current_row); +// IColumn & to = *current_block.output_columns[function_index]; +// const auto & workspace = transform->workspaces[function_index]; +// +// const auto * interval_type = checkAndGetDataType(interval_column.type.get()); - int64_t offset = 1; - if (argument_types.size() > 1) - { - offset = (*current_block.input_columns[ - workspace.argument_column_indices[1]])[ - transform->current_row.row].get(); +// if (argument_types.size() > 2) +// { +// interval_kind = (*current_block.input_columns[ +// workspace.argument_column_indices[2]])[ +// transform->current_row.row].get(); +// } +// +// +// //const DataTypeInterval interval_type = (*current_block.input_columns[workspace.argument_column_indices[ARGUMENT_TIMESTAMP]]).getDataType(); +// const auto * interval_type = checkAndGetDataType(interval_column.type.get()); +// +// if (!interval_type) +// throw Exception( +// "Illegal column for second argument of function " + getName() + ", must be an interval of time.", +// ErrorCodes::ILLEGAL_COLUMN); +// +// const auto * interval_column_const_int64 = checkAndGetColumnConst(interval_column.column.get()); +// if (!interval_column_const_int64) +// throw Exception( +// "Illegal column for second argument of function " + getName() + ", must be a const interval of time.", ErrorCodes::ILLEGAL_COLUMN); +// +// Int64 num_units = interval_column_const_int64->getValue(); +// if (num_units <= 0) +// throw Exception("Value for second argument of function " + getName() + " must be positive.", ErrorCodes::ARGUMENT_OUT_OF_BOUND); - /// Either overflow or really negative value, both is not acceptable. - if (offset < 0) - { - throw Exception(ErrorCodes::BAD_ARGUMENTS, - "The offset for function {} must be in (0, {}], {} given", - getName(), INT64_MAX, offset); - } - } +// const IColumn & default_column = *current_block.input_columns[workspace.argument_column_indices[2]].get(); +// const auto ts_scale = *current_block.input_columns[workspace.argument_column_indices[2]].get +// +// if (interval_kind == IntervalKind::Second || interval_kind == IntervalKind::Millisecond || interval_kind == IntervalKind::Microsecond || interval_kind == IntervalKind::Nanosecond){ +// auto nanosecs_in_interval = interval_kind.toAvgNanoseconds(); +// } - const auto [target_row, offset_left] = transform->moveRowNumber( - transform->current_row, offset * (is_lead ? 1 : -1)); + Float64 last_metric = getLastValueFromInputColumn(transform, function_index, ARGUMENT_METRIC); + Float64 last_timestamp = getLastValueFromInputColumn(transform, function_index, ARGUMENT_TIMESTAMP); - if (offset_left != 0 - || target_row < transform->frame_start - || transform->frame_end <= target_row) - { - // Offset is outside the frame. - if (argument_types.size() > 2) - { - // Column with default values is specified. - // The conversion through Field is inefficient, but we accept - // subtypes of the argument type as a default value (for convenience), - // and it's a pain to write conversion that respects ColumnNothing - // and ColumnConst and so on. - const IColumn & default_column = *current_block.input_columns[ - workspace.argument_column_indices[2]].get(); - to.insert(default_column[transform->current_row.row]); - } - else - { - to.insertDefault(); - } - } - else - { - // Offset is inside the frame. - to.insertFrom(*transform->blockAt(target_row).input_columns[ - workspace.argument_column_indices[0]], - target_row.row); - } + Float64 curr_metric = getCurrentValueFromInputColumn(transform, function_index, ARGUMENT_METRIC); + Float64 curr_timestamp = getCurrentValueFromInputColumn(transform, function_index, ARGUMENT_TIMESTAMP); + + Float64 time_elapsed = last_timestamp - curr_timestamp; + Float64 metric_diff = last_metric - curr_metric; + Float64 result = metric_diff / time_elapsed * interval_length; + + setValueToOutputColumn(transform, function_index, result); } +private: + Float64 interval_length = 1; };