Added support for non-constant and negative offset and size for substring function (continued) [#CLICKHOUSE-2].

This commit is contained in:
Alexey Milovidov 2017-07-24 09:32:02 +03:00 committed by alexey-milovidov
parent 14de659c99
commit 30ff4a78a6
5 changed files with 120 additions and 27 deletions

View File

@ -107,6 +107,11 @@ public:
return data->get64(0);
}
Int64 getInt(size_t n) const override
{
return data->getInt(0);
}
void insertRangeFrom(const IColumn & src, size_t start, size_t length) override
{
s += length;

View File

@ -221,6 +221,11 @@ public:
UInt64 get64(size_t n) const override;
Int64 getInt(size_t n) const override
{
return Int64(data[n]);
}
void insert(const Field & x) override
{
data.push_back(DB::get<typename NearestFieldType<T>::Type>(x));

View File

@ -111,6 +111,14 @@ public:
throw Exception("Method get64 is not supported for " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
/** If column is numeric, return value of n-th element, casted to Int64.
* Otherwise throw an exception.
*/
virtual Int64 getInt(size_t n) const
{
throw Exception("Method getInt is not supported for " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
/// Removes all elements outside of specified range.
/// Is used in LIMIT operation, for example.
virtual ColumnPtr cut(size_t start, size_t length) const

View File

@ -782,46 +782,119 @@ public:
size_t getNumberOfArguments() const override
{
return 3;
return 0;
}
bool useDefaultImplementationForConstants() const override { return true; }
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
{
if (!checkDataType<DataTypeString>(&*arguments[0]) && !checkDataType<DataTypeFixedString>(&*arguments[0]))
throw Exception(
"Illegal type " + arguments[0]->getName() + " of argument of function " + getName(), ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
size_t number_of_arguments = arguments.size();
if (!arguments[1]->isNumeric() || !arguments[2]->isNumeric())
throw Exception("Illegal type " + (arguments[1]->isNumeric() ? arguments[2]->getName() : arguments[1]->getName())
+ " of argument of function "
if (number_of_arguments < 2 || number_of_arguments > 3)
throw Exception("Number of arguments for function " + getName() + " doesn't match: passed "
+ toString(number_of_arguments) + ", should be 2 or 3",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
if (!checkDataType<DataTypeString>(&*arguments[0]) && !checkDataType<DataTypeFixedString>(&*arguments[0]))
throw Exception("Illegal type " + arguments[0]->getName() + " of argument of function " + getName(), ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
if (!arguments[1]->isNumeric())
throw Exception("Illegal type " + arguments[1]->getName()
+ " of second argument of function "
+ getName(),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
if (number_of_arguments == 3 && !arguments[2]->isNumeric())
throw Exception("Illegal type " + arguments[2]->getName()
+ " of second argument of function "
+ getName(),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
return std::make_shared<DataTypeString>();
}
template <typename Source>
void executeForSource(
const ColumnPtr & column_start, const ColumnPtr & column_length,
const ColumnConst * column_start_const, const ColumnConst * column_length_const,
Int64 start_value, Int64 length_value,
Block & block, size_t result,
Source && source)
{
std::shared_ptr<ColumnString> col_res = std::make_shared<ColumnString>();
if (!column_length)
{
if (column_start_const)
{
if (start_value > 0)
sliceFromLeftConstantOffsetUnbounded(source, StringSink(*col_res, block.rows()), start_value - 1);
else if (start_value < 0)
sliceFromRightConstantOffsetUnbounded(source, StringSink(*col_res, block.rows()), -start_value);
else
throw Exception("Indices in strings are 1-based", ErrorCodes::ZERO_ARRAY_OR_TUPLE_INDEX);
}
else
sliceDynamicOffsetUnbounded(source, StringSink(*col_res, block.rows()), *column_start);
}
else
{
if (column_start_const && column_length_const)
{
if (start_value > 0)
sliceFromLeftConstantOffsetBounded(source, StringSink(*col_res, block.rows()), start_value - 1, length_value);
else if (start_value < 0)
sliceFromRightConstantOffsetBounded(source, StringSink(*col_res, block.rows()), -start_value, length_value);
else
throw Exception("Indices in strings are 1-based", ErrorCodes::ZERO_ARRAY_OR_TUPLE_INDEX);
}
else
sliceDynamicOffsetBounded(source, StringSink(*col_res, block.rows()), *column_start, *column_length);
}
block.getByPosition(result).column = col_res;
}
void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result) override
{
const ColumnPtr column_string = block.getByPosition(arguments[0]).column;
const ColumnPtr column_start = block.getByPosition(arguments[1]).column;
const ColumnPtr column_length = block.getByPosition(arguments[2]).column;
size_t number_of_arguments = arguments.size();
ColumnPtr column_string = block.getByPosition(arguments[0]).column;
ColumnPtr column_start = block.getByPosition(arguments[1]).column;
ColumnPtr column_length;
if (number_of_arguments == 3)
column_length = block.getByPosition(arguments[2]).column;
const ColumnConst * column_start_const = checkAndGetColumn<ColumnConst>(column_start.get());
const ColumnConst * column_length_const = nullptr;
if (number_of_arguments == 3)
column_length_const = checkAndGetColumn<ColumnConst>(column_length.get());
Int64 start_value = 0;
Int64 length_value = 0;
if (column_start_const)
{
start_value = column_start_const->getInt(0);
if (start_value > 0x8000000000000000ULL) /// Larger value could lead to overflow, then bypass bounds checking and read wrong data.
throw Exception("Too large values of second argument provided for function substring.", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
}
if (column_length_const)
{
length_value = column_length_const->getInt(0);
if (length_value < 0)
throw Exception("Third argument provided for function substring could not be negative.", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
if (length_value > 0x8000000000000000ULL)
throw Exception("Too large values of second argument provided for function substring.", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
}
if (const ColumnString * col = checkAndGetColumn<ColumnString>(&*column_string))
{
std::shared_ptr<ColumnString> col_res = std::make_shared<ColumnString>();
block.getByPosition(result).column = col_res;
sliceDynamicOffsetBounded(StringSource(*col), StringSink(*col_res, block.rows()), *column_start, *column_length);
}
executeForSource(column_start, column_length, column_start_const, column_length_const, start_value, length_value, block, result, StringSource(*col));
else if (const ColumnFixedString * col = checkAndGetColumn<ColumnFixedString>(&*column_string))
{
std::shared_ptr<ColumnString> col_res = std::make_shared<ColumnString>();
block.getByPosition(result).column = col_res;
sliceDynamicOffsetBounded(FixedStringSource(*col), StringSink(*col_res, block.rows()), *column_start, *column_length);
}
executeForSource(column_start, column_length, column_start_const, column_length_const, start_value, length_value, block, result, FixedStringSource(*col));
else
throw Exception(
"Illegal column " + block.getByPosition(arguments[0]).column->getName() + " of first argument of function " + getName(),

View File

@ -1,5 +1,7 @@
#pragma once
#include <type_traits>
#include <Columns/ColumnVector.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnString.h>
@ -677,11 +679,11 @@ void sliceDynamicOffsetUnbounded(Source && src, Sink && sink, IColumn & offset_c
{
while (!src.isEnd())
{
Int64 offset = offset_column.get64(src.rowNum());
Int64 offset = offset_column.getInt(src.rowNum());
if (offset != 0)
{
typename Source::Slice slice;
typename std::decay<Source>::type::Slice slice;
if (offset > 0)
slice = src.getSliceFromLeft(offset - 1);
@ -702,12 +704,12 @@ void sliceDynamicOffsetBounded(Source && src, Sink && sink, IColumn & offset_col
while (!src.isEnd())
{
size_t row_num = src.rowNum();
Int64 offset = offset_column.get64(row_num);
UInt64 size = length_column.get64(row_num);
Int64 offset = offset_column.getInt(row_num);
UInt64 size = length_column.getInt(row_num);
if (offset != 0 && size < 0x8000000000000000ULL)
{
typename Source::Slice slice;
typename std::decay<Source>::type::Slice slice;
if (offset > 0)
slice = src.getSliceFromLeft(offset - 1, size);