old work upload

This commit is contained in:
zvonand 2022-05-27 15:07:22 +03:00
parent eaca4f2625
commit 5c558d0be9
3 changed files with 82 additions and 90 deletions

View File

@ -7,6 +7,7 @@ namespace DB
namespace ErrorCodes namespace ErrorCodes
{ {
extern const int SYNTAX_ERROR; extern const int SYNTAX_ERROR;
extern const int NOT_IMPLEMENTED;
} }
Int32 IntervalKind::toAvgSeconds() const Int32 IntervalKind::toAvgSeconds() const
@ -28,6 +29,19 @@ Int32 IntervalKind::toAvgSeconds() const
__builtin_unreachable(); __builtin_unreachable();
} }
UInt64 IntervalKind::toAvgNanoseconds() const
{
switch (kind)
{
case IntervalKind::Nanosecond: return 1;
case IntervalKind::Microsecond: return 1000;
case IntervalKind::Millisecond: return 1000000;
case IntervalKind::Second: return 1 * 1000000000;
default:
throw Exception("Only sub-second interval can be converted to nanoseconds", ErrorCodes::NOT_IMPLEMENTED);
}
}
IntervalKind IntervalKind::fromAvgSeconds(Int64 num_seconds) IntervalKind IntervalKind::fromAvgSeconds(Int64 num_seconds)
{ {
if (num_seconds) if (num_seconds)

View File

@ -32,6 +32,7 @@ struct IntervalKind
/// Returns number of seconds in one interval. /// Returns number of seconds in one interval.
/// For `Month`, `Quarter` and `Year` the function returns an average number of seconds. /// For `Month`, `Quarter` and `Year` the function returns an average number of seconds.
Int32 toAvgSeconds() const; Int32 toAvgSeconds() const;
UInt64 toAvgNanoseconds() const;
/// Chooses an interval kind based on number of seconds. /// Chooses an interval kind based on number of seconds.
/// For example, `IntervalKind::fromAvgSeconds(3600)` returns `IntervalKind::Hour`. /// For example, `IntervalKind::fromAvgSeconds(3600)` returns `IntervalKind::Hour`.

View File

@ -11,6 +11,7 @@
#include <DataTypes/DataTypesNumber.h> #include <DataTypes/DataTypesNumber.h>
#include <DataTypes/getLeastSupertype.h> #include <DataTypes/getLeastSupertype.h>
#include <DataTypes/DataTypeLowCardinality.h> #include <DataTypes/DataTypeLowCardinality.h>
#include <DataTypes/DataTypeInterval.h>
#include <Interpreters/ExpressionActions.h> #include <Interpreters/ExpressionActions.h>
#include <Interpreters/convertFieldToType.h> #include <Interpreters/convertFieldToType.h>
@ -2086,64 +2087,40 @@ struct WindowFunctionLagLeadInFrame final : public WindowFunction
}; };
struct WindowFunctionNonNegativeDerivative final : public WindowFunction // nonNegativeDerivative(metric_column, timestamp_column[, INTERVAL 1 SECOND])
struct WindowFunctionNonNegativeDerivative final : public RecurrentWindowFunction<0>
{ {
static constexpr size_t ARGUMENT_METRIC = 0;
static constexpr size_t ARGUMENT_TIMESTAMP = 1;
static constexpr size_t ARGUMENT_INTERVAL = 2;
WindowFunctionNonNegativeDerivative(const std::string & name_, WindowFunctionNonNegativeDerivative(const std::string & name_,
const DataTypes & argument_types_, const Array & parameters_) const DataTypes & argument_types_, const Array & parameters_)
: WindowFunction(name_, argument_types_, parameters_) : RecurrentWindowFunction(name_, argument_types_, parameters_)
{ {
if (!parameters.empty()) if (argument_types.size() != 2 && argument_types.size() != 3)
{ {
throw Exception(ErrorCodes::BAD_ARGUMENTS, throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Function {} cannot be parameterized", name_); "Function {} takes 2 or 3 arguments", name_);
} }
if (argument_types.empty()) if (!isNumber(argument_types[ARGUMENT_METRIC]))
{ {
throw Exception(ErrorCodes::BAD_ARGUMENTS, throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Function {} takes at least one argument", name_); "Argument {} must be a number, '{}' given",
ARGUMENT_METRIC,
argument_types[ARGUMENT_METRIC]->getName());
} }
if (argument_types.size() == 1) if (!isNumber(argument_types[ARGUMENT_TIMESTAMP]) && !isDateTime(argument_types[ARGUMENT_TIMESTAMP]) && !isDateTime64(argument_types[ARGUMENT_TIMESTAMP]))
{
return;
}
if (!isInt64OrUInt64FieldType(argument_types[1]->getDefault().getType()))
{ {
throw Exception(ErrorCodes::BAD_ARGUMENTS, throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Offset must be an integer, '{}' given", "Argument {} must be DateTime, DateTime64 or a number, '{}' given",
argument_types[1]->getName()); ARGUMENT_TIMESTAMP,
argument_types[ARGUMENT_TIMESTAMP]->getName());
} }
interval_length = applyVisitor(FieldVisitorConvertToNumber<Float64>(), parameters_[0].get<Float64>());
if (argument_types.size() == 2)
{
return;
}
const auto supertype = getLeastSupertype(DataTypes{argument_types[0], argument_types[2]});
if (!supertype)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"There is no supertype for the argument type '{}' and the default value type '{}'",
argument_types[0]->getName(),
argument_types[2]->getName());
}
if (!argument_types[0]->equals(*supertype))
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"The supertype '{}' for the argument type '{}' and the default value type '{}' is not the same as the argument type",
supertype->getName(),
argument_types[0]->getName(),
argument_types[2]->getName());
}
if (argument_types.size() > 3)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Function '{}' accepts at most 3 arguments, {} given",
name, argument_types.size());
}
} }
DataTypePtr getReturnType() const override { return argument_types[0]; } DataTypePtr getReturnType() const override { return argument_types[0]; }
@ -2153,58 +2130,58 @@ struct WindowFunctionNonNegativeDerivative final : public WindowFunction
void windowInsertResultInto(const WindowTransform * transform, void windowInsertResultInto(const WindowTransform * transform,
size_t function_index) override size_t function_index) override
{ {
const auto & current_block = transform->blockAt(transform->current_row); // const auto & current_block = transform->blockAt(transform->current_row);
IColumn & to = *current_block.output_columns[function_index]; // IColumn & to = *current_block.output_columns[function_index];
const auto & workspace = transform->workspaces[function_index]; // const auto & workspace = transform->workspaces[function_index];
//
// const auto * interval_type = checkAndGetDataType<DataTypeInterval>(interval_column.type.get());
int64_t offset = 1; // if (argument_types.size() > 2)
if (argument_types.size() > 1) // {
{ // interval_kind = (*current_block.input_columns[
offset = (*current_block.input_columns[ // workspace.argument_column_indices[2]])[
workspace.argument_column_indices[1]])[ // transform->current_row.row].get<IntervalKind>();
transform->current_row.row].get<Int64>(); // }
//
//
// //const DataTypeInterval interval_type = (*current_block.input_columns[workspace.argument_column_indices[ARGUMENT_TIMESTAMP]]).getDataType();
// const auto * interval_type = checkAndGetDataType<DataTypeInterval>(interval_column.type.get());
//
// if (!interval_type)
// throw Exception(
// "Illegal column for second argument of function " + getName() + ", must be an interval of time.",
// ErrorCodes::ILLEGAL_COLUMN);
//
// const auto * interval_column_const_int64 = checkAndGetColumnConst<ColumnInt64>(interval_column.column.get());
// if (!interval_column_const_int64)
// throw Exception(
// "Illegal column for second argument of function " + getName() + ", must be a const interval of time.", ErrorCodes::ILLEGAL_COLUMN);
//
// Int64 num_units = interval_column_const_int64->getValue<Int64>();
// if (num_units <= 0)
// throw Exception("Value for second argument of function " + getName() + " must be positive.", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
/// Either overflow or really negative value, both is not acceptable. // const IColumn & default_column = *current_block.input_columns[workspace.argument_column_indices[2]].get();
if (offset < 0) // const auto ts_scale = *current_block.input_columns[workspace.argument_column_indices[2]].get
{ //
throw Exception(ErrorCodes::BAD_ARGUMENTS, // if (interval_kind == IntervalKind::Second || interval_kind == IntervalKind::Millisecond || interval_kind == IntervalKind::Microsecond || interval_kind == IntervalKind::Nanosecond){
"The offset for function {} must be in (0, {}], {} given", // auto nanosecs_in_interval = interval_kind.toAvgNanoseconds();
getName(), INT64_MAX, offset); // }
}
}
const auto [target_row, offset_left] = transform->moveRowNumber( Float64 last_metric = getLastValueFromInputColumn<Float64>(transform, function_index, ARGUMENT_METRIC);
transform->current_row, offset * (is_lead ? 1 : -1)); Float64 last_timestamp = getLastValueFromInputColumn<Float64>(transform, function_index, ARGUMENT_TIMESTAMP);
if (offset_left != 0 Float64 curr_metric = getCurrentValueFromInputColumn<Float64>(transform, function_index, ARGUMENT_METRIC);
|| target_row < transform->frame_start Float64 curr_timestamp = getCurrentValueFromInputColumn<Float64>(transform, function_index, ARGUMENT_TIMESTAMP);
|| transform->frame_end <= target_row)
{ Float64 time_elapsed = last_timestamp - curr_timestamp;
// Offset is outside the frame. Float64 metric_diff = last_metric - curr_metric;
if (argument_types.size() > 2) Float64 result = metric_diff / time_elapsed * interval_length;
{
// Column with default values is specified. setValueToOutputColumn(transform, function_index, result);
// The conversion through Field is inefficient, but we accept
// subtypes of the argument type as a default value (for convenience),
// and it's a pain to write conversion that respects ColumnNothing
// and ColumnConst and so on.
const IColumn & default_column = *current_block.input_columns[
workspace.argument_column_indices[2]].get();
to.insert(default_column[transform->current_row.row]);
}
else
{
to.insertDefault();
}
}
else
{
// Offset is inside the frame.
to.insertFrom(*transform->blockAt(target_row).input_columns[
workspace.argument_column_indices[0]],
target_row.row);
}
} }
private:
Float64 interval_length = 1;
}; };