Step 1: make it correct.

This commit is contained in:
Alexey Milovidov 2019-08-23 06:00:06 +03:00
parent ff9e92eab9
commit 341e2e4587
5 changed files with 42 additions and 276 deletions

View File

@ -46,6 +46,8 @@ public:
bool useDefaultImplementationForNulls() const override { return false; } bool useDefaultImplementationForNulls() const override { return false; }
bool useDefaultImplementationForConstants() const override { return false; }
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
{ {
size_t number_of_arguments = arguments.size(); size_t number_of_arguments = arguments.size();
@ -68,183 +70,58 @@ public:
// check that default value column has supertype with first argument // check that default value column has supertype with first argument
if (number_of_arguments == 3) if (number_of_arguments == 3)
{ return getLeastSupertype({arguments[0], arguments[2]});
DataTypes types = {arguments[0], arguments[2]};
try
{
return getLeastSupertype(types);
}
catch (const Exception &)
{
throw Exception(
"Illegal types of arguments (" + types[0]->getName() + ", " + types[1]->getName()
+ ")"
" of function "
+ getName(),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
}
}
return arguments[0]; return arguments[0];
} }
static void insertDefaults(const MutableColumnPtr & target, size_t row_count, ColumnPtr & default_values_column, size_t offset)
{
if (row_count == 0)
{
return;
}
if (default_values_column)
{
if (isColumnConst(*default_values_column))
{
const IColumn & constant_content = assert_cast<const ColumnConst &>(*default_values_column).getDataColumn();
for (size_t row = 0; row < row_count; ++row)
{
target->insertFrom(constant_content, 0);
}
}
else
{
target->insertRangeFrom(*default_values_column, offset, row_count);
}
}
else
{
for (size_t row = 0; row < row_count; ++row)
{
target->insertDefault();
}
}
}
void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result, size_t input_rows_count) override void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result, size_t input_rows_count) override
{ {
const ColumnWithTypeAndName & source_column_name_and_type = block.getByPosition(arguments[0]);
const DataTypePtr & result_type = block.getByPosition(result).type; const DataTypePtr & result_type = block.getByPosition(result).type;
ColumnPtr source_column = source_column_name_and_type.column; const ColumnWithTypeAndName & source_elem = block.getByPosition(arguments[0]);
const ColumnWithTypeAndName & offset_elem = block.getByPosition(arguments[1]);
bool has_defaults = arguments.size() == 3;
// adjust source and default values columns to resulting data type ColumnPtr source_column_casted = castColumn(source_elem, result_type, context);
if (!source_column_name_and_type.type->equals(*result_type)) ColumnPtr offset_column = offset_elem.column;
ColumnPtr default_column_casted;
if (has_defaults)
{ {
source_column = castColumn(source_column_name_and_type, result_type, context); const ColumnWithTypeAndName & default_elem = block.getByPosition(arguments[2]);
default_column_casted = castColumn(default_elem, result_type, context);
} }
ColumnPtr default_values_column; bool source_is_constant = isColumnConst(*source_column_casted);
bool offset_is_constant = isColumnConst(*offset_column);
/// Has argument with default value: neighbor(source, offset, default) bool default_is_constant = false;
if (arguments.size() == 3) if (has_defaults)
default_is_constant = isColumnConst(*default_column_casted);
if (source_is_constant)
source_column_casted = assert_cast<const ColumnConst &>(*source_column_casted).getDataColumnPtr();
if (offset_is_constant)
offset_column = assert_cast<const ColumnConst &>(*offset_column).getDataColumnPtr();
if (default_is_constant)
default_column_casted = assert_cast<const ColumnConst &>(*default_column_casted).getDataColumnPtr();
auto column = result_type->createColumn();
for (size_t row = 0; row < input_rows_count; ++row)
{ {
default_values_column = block.getByPosition(arguments[2]).column; Int64 src_idx = row + offset_column->getInt(offset_is_constant ? 0 : row);
if (!block.getByPosition(arguments[2]).type->equals(*result_type)) if (src_idx >= 0 && src_idx < Int64(input_rows_count))
default_values_column = castColumn(block.getByPosition(arguments[2]), result_type, context); column->insertFrom(*source_column_casted, source_is_constant ? 0 : src_idx);
} else if (has_defaults)
column->insertFrom(*default_column_casted, default_is_constant ? 0 : row);
const auto & offset_structure = block.getByPosition(arguments[1]);
ColumnPtr offset_column = offset_structure.column;
auto is_constant_offset = isColumnConst(*offset_structure.column);
// since we are working with both signed and unsigned - we'll try to use Int64 for handling all of them
const DataTypePtr desired_type = std::make_shared<DataTypeInt64>();
if (!block.getByPosition(arguments[1]).type->equals(*desired_type))
{
offset_column = castColumn(offset_structure, desired_type, context);
}
if (isColumnConst(*source_column))
{
/// NOTE Inconsistency when default_values are specified.
auto column = result_type->createColumnConst(input_rows_count, (*source_column)[0]);
block.getByPosition(result).column = std::move(column);
}
else
{
auto column = result_type->createColumn();
column->reserve(input_rows_count);
// with constant offset - insertRangeFrom
if (is_constant_offset)
{
Int64 offset_value = offset_column->getInt(0);
auto offset_value_casted = static_cast<size_t>(std::abs(offset_value));
size_t default_value_count = std::min(offset_value_casted, input_rows_count);
if (offset_value > 0)
{
// insert shifted value
if (offset_value_casted <= input_rows_count)
{
column->insertRangeFrom(*source_column, offset_value_casted, input_rows_count - offset_value_casted);
}
insertDefaults(column, default_value_count, default_values_column, input_rows_count - default_value_count);
}
else if (offset_value < 0)
{
// insert defaults up to offset_value
insertDefaults(column, default_value_count, default_values_column, 0);
column->insertRangeFrom(*source_column, 0, input_rows_count - default_value_count);
}
else
{
// populate column with source values, when offset is equal to zero
column->insertRangeFrom(*source_column, 0, input_rows_count);
}
}
else else
{ column->insertDefault();
// with dynamic offset - handle row by row
for (size_t row = 0; row < input_rows_count; ++row)
{
Int64 offset_value = offset_column->getInt(row);
if (offset_value == 0)
{
column->insertFrom(*source_column, row);
}
else if (offset_value > 0)
{
size_t real_offset = row + offset_value;
if (real_offset > input_rows_count)
{
if (default_values_column)
{
column->insertFrom(*default_values_column, row);
}
else
{
column->insertDefault();
}
}
else
{
column->insertFrom(*source_column, real_offset);
}
}
else
{
// out of range
auto offset_value_casted = static_cast<size_t>(std::abs(offset_value));
if (offset_value_casted > row)
{
if (default_values_column)
{
column->insertFrom(*default_values_column, row);
}
else
{
column->insertDefault();
}
}
else
{
column->insertFrom(*source_column, row - offset_value_casted);
}
}
}
}
block.getByPosition(result).column = std::move(column);
} }
block.getByPosition(result).column = std::move(column);
} }
private: private:

View File

@ -64,6 +64,6 @@ Dynamic column and offset, without defaults
4 -4 0 4 -4 0
5 -6 0 5 -6 0
Constant column Constant column
0 1000 0 0
1 1000 1 0
2 1000 2 0

View File

@ -5,9 +5,9 @@ select neighbor(1); -- { serverError 42 }
-- greater than 3 arguments -- greater than 3 arguments
select neighbor(1,2,3,4); -- { serverError 42 } select neighbor(1,2,3,4); -- { serverError 42 }
-- bad default value -- bad default value
select neighbor(dummy, 1, 'hello'); -- { serverError 43 } select neighbor(dummy, 1, 'hello'); -- { serverError 386 }
-- types without common supertype (UInt64 and Int8) -- types without common supertype (UInt64 and Int8)
select number, neighbor(number, 1, -10) from numbers(3); -- { serverError 43 } select number, neighbor(number, 1, -10) from numbers(3); -- { serverError 386 }
-- nullable offset is not allowed -- nullable offset is not allowed
select number, if(number > 1, number, null) as offset, neighbor(number, offset) from numbers(3); -- { serverError 43 } select number, if(number > 1, number, null) as offset, neighbor(number, offset) from numbers(3); -- { serverError 43 }
select 'Zero offset'; select 'Zero offset';

View File

@ -1,69 +0,0 @@
Zero offset
0 0
1 1
2 2
Nullable values
\N 0 \N
\N 1 2
2 2 \N
Result with different type
0 1
1 2
2 -10
Offset > block
0 0
1 0
2 0
Abs(Offset) > block
0 0
1 0
2 0
Positive offset
0 1
1 2
2 0
Negative offset
0 1
1 2
2 0
Positive offset with defaults
0 2
1 3
2 12
3 13
Negative offset with defaults
0 10
1 11
2 0
3 1
Positive offset with const defaults
0 1
1 2
2 1000
Negative offset with const defaults
0 1000
1 0
2 1
Dynamic column and offset, out of bounds
0 0 0
1 2 3
2 4 20
3 6 30
Dynamic column and offset, negative
0 0 0
1 -2 10
2 -4 20
3 -6 30
4 -8 40
5 -10 50
Dynamic column and offset, without defaults
0 4 4
1 2 3
2 0 2
3 -2 1
4 -4 0
5 -6 0
Constant column
0 1000
1 1000
2 1000

View File

@ -1,42 +0,0 @@
-- no arguments
select neighbour(); -- { serverError 42 }
-- single argument
select neighbour(1); -- { serverError 42 }
-- greater than 3 arguments
select neighbour(1,2,3,4); -- { serverError 42 }
-- bad default value
select neighbour(dummy, 1, 'hello'); -- { serverError 43 }
-- types without common supertype (UInt64 and Int8)
select number, neighbour(number, 1, -10) from numbers(3); -- { serverError 43 }
-- nullable offset is not allowed
select number, if(number > 1, number, null) as offset, neighbour(number, offset) from numbers(3); -- { serverError 43 }
select 'Zero offset';
select number, neighbour(number, 0) from numbers(3);
select 'Nullable values';
select if(number > 1, number, null) as value, number as offset, neighbour(value, offset) as neighbour from numbers(3);
select 'Result with different type';
select toInt32(number) as n, neighbour(n, 1, -10) from numbers(3);
select 'Offset > block';
select number, neighbour(number, 10) from numbers(3);
select 'Abs(Offset) > block';
select number, neighbour(number, -10) from numbers(3);
select 'Positive offset';
select number, neighbour(number, 1) from numbers(3);
select 'Negative offset';
select number, neighbour(number, 1) from numbers(3);
select 'Positive offset with defaults';
select number, neighbour(number, 2, number + 10) from numbers(4);
select 'Negative offset with defaults';
select number, neighbour(number, -2, number + 10) from numbers(4);
select 'Positive offset with const defaults';
select number, neighbour(number, 1, 1000) from numbers(3);
select 'Negative offset with const defaults';
select number, neighbour(number, -1, 1000) from numbers(3);
select 'Dynamic column and offset, out of bounds';
select number, number * 2 as offset, neighbour(number, offset, number * 10) from numbers(4);
select 'Dynamic column and offset, negative';
select number, -number * 2 as offset, neighbour(number, offset, number * 10) from numbers(6);
select 'Dynamic column and offset, without defaults';
select number, -(number - 2) * 2 as offset, neighbour(number, offset) from numbers(6);
select 'Constant column';
select number, neighbour(1000, 10) from numbers(3);