dbms: Server: Adding nullable type support to functions manipulating arrays. [#METR-19266]

This commit is contained in:
Alexey Arno 2016-09-20 18:59:47 +03:00
parent 1ade2397e4
commit 9ceac9ac31

View File

@ -192,6 +192,8 @@ struct IndexCount
template <typename T, typename U, typename IndexConv>
struct ArrayIndexNumImpl
{
private:
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wsign-compare"
@ -349,6 +351,7 @@ struct ArrayIndexNumImpl
}
}
public:
template <typename ScalarOrVector>
static void vector(
const PaddedPODArray<T> & data, const ColumnArray::Offsets_t & offsets,
@ -557,16 +560,15 @@ struct ArrayIndexStringImpl
}
};
/** Catch-all implementation for arrays of arbitary type.
*/
/// Catch-all implementation for arrays of arbitary type.
/// To compare with constant value, create non-constant column with single element,
/// and pass is_value_has_single_element_to_compare = true.
template <typename IndexConv, bool is_value_has_single_element_to_compare>
struct ArrayIndexGenericImpl
{
/** To compare with constant value, create non-constant column with single element,
* and pass is_value_has_single_element_to_compare = true.
*/
static void vector(
private:
/// Both function arguments are ordinary.
static void vectorCase1(
const IColumn & data, const ColumnArray::Offsets_t & offsets,
const IColumn & value,
PaddedPODArray<typename IndexConv::ResultType> & result)
@ -581,9 +583,173 @@ struct ArrayIndexGenericImpl
typename IndexConv::ResultType current = 0;
for (size_t j = 0; j < array_size; ++j)
{
if (0 == data.compareAt(current_offset + j, is_value_has_single_element_to_compare ? 0 : i, value, 1))
{
if (!IndexConv::apply(j, current))
break;
}
}
result[i] = current;
current_offset = offsets[i];
}
}
/// The 2nd function argument is nullable.
static void vectorCase2(
const IColumn & data, const ColumnArray::Offsets_t & offsets,
const IColumn & value,
PaddedPODArray<typename IndexConv::ResultType> & result,
const PaddedPODArray<UInt8> & null_map_item)
{
size_t size = offsets.size();
result.resize(size);
ColumnArray::Offset_t current_offset = 0;
for (size_t i = 0; i < size; ++i)
{
size_t array_size = offsets[i] - current_offset;
typename IndexConv::ResultType current = 0;
for (size_t j = 0; j < array_size; ++j)
{
if ((null_map_item[i] == 0) &&
(0 == data.compareAt(current_offset + j, is_value_has_single_element_to_compare ? 0 : i, value, 1)))
{
if (!IndexConv::apply(j, current))
break;
}
}
result[i] = current;
current_offset = offsets[i];
}
}
/// The 1st function argument is a non-constant array of nullable values.
static void vectorCase3(
const IColumn & data, const ColumnArray::Offsets_t & offsets,
const IColumn & value,
PaddedPODArray<typename IndexConv::ResultType> & result,
const PaddedPODArray<UInt8> & null_map_data)
{
size_t size = offsets.size();
result.resize(size);
ColumnArray::Offset_t current_offset = 0;
for (size_t i = 0; i < size; ++i)
{
size_t array_size = offsets[i] - current_offset;
typename IndexConv::ResultType current = 0;
for (size_t j = 0; j < array_size; ++j)
{
if (null_map_data[current_offset + j] == 1)
{
}
else if (0 == data.compareAt(current_offset + j, is_value_has_single_element_to_compare ? 0 : i, value, 1))
{
if (!IndexConv::apply(j, current))
break;
}
}
result[i] = current;
current_offset = offsets[i];
}
}
/// The 1st function argument is a non-constant array of nullable values.
/// The 2nd function argument is nullable.
static void vectorCase4(
const IColumn & data, const ColumnArray::Offsets_t & offsets,
const IColumn & value,
PaddedPODArray<typename IndexConv::ResultType> & result,
const PaddedPODArray<UInt8> & null_map_data,
const PaddedPODArray<UInt8> & null_map_item)
{
size_t size = offsets.size();
result.resize(size);
ColumnArray::Offset_t current_offset = 0;
for (size_t i = 0; i < size; ++i)
{
size_t array_size = offsets[i] - current_offset;
typename IndexConv::ResultType current = 0;
for (size_t j = 0; j < array_size; ++j)
{
bool hit = false;
if (null_map_data[current_offset + j] == 1)
{
if (null_map_item[i] == 1)
hit = true;
}
else if (0 == data.compareAt(current_offset + j, is_value_has_single_element_to_compare ? 0 : i, value, 1))
hit = true;
if (hit)
{
if (!IndexConv::apply(j, current))
break;
}
}
}
}
public:
static void vector(
const IColumn & data, const ColumnArray::Offsets_t & offsets,
const IColumn & value,
PaddedPODArray<typename IndexConv::ResultType> & result,
const PaddedPODArray<UInt8> * null_map_data,
const PaddedPODArray<UInt8> * null_map_item)
{
/// Processing is split into 4 cases.
if ((null_map_data == nullptr) && (null_map_item == nullptr))
vectorCase1(data, offsets, value, result);
else if ((null_map_data == nullptr) && (null_map_item != nullptr))
vectorCase2(data, offsets, value, result, *null_map_item);
else if ((null_map_data != nullptr) && (null_map_item == nullptr))
vectorCase3(data, offsets, value, result, *null_map_data);
else
vectorCase4(data, offsets, value, result, *null_map_data, *null_map_item);
}
};
/// Catch-all implementation for arrays of arbitary type
/// when the 2nd function argument is a NULL value.
template <typename IndexConv>
struct ArrayIndexGenericNullImpl
{
static void vector(
const IColumn & data, const ColumnArray::Offsets_t & offsets,
PaddedPODArray<typename IndexConv::ResultType> & result,
const PaddedPODArray<UInt8> * null_map_data)
{
size_t size = offsets.size();
result.resize(size);
if (null_map_data == nullptr)
return;
const auto & null_map_ref = *null_map_data;
ColumnArray::Offset_t current_offset = 0;
for (size_t i = 0; i < size; ++i)
{
size_t array_size = offsets[i] - current_offset;
typename IndexConv::ResultType current = 0;
for (size_t j = 0; j < array_size; ++j)
{
if (null_map_ref[current_offset + j] == 1)
{
if (!IndexConv::apply(j, current))
break;
}
}
result[i] = current;
current_offset = offsets[i];
@ -591,7 +757,6 @@ struct ArrayIndexGenericImpl
}
};
template <typename IndexConv, typename Name>
class FunctionArrayIndex : public IFunction
{
@ -631,6 +796,9 @@ private:
if (!col_nested)
return false;
const auto col_res = std::make_shared<ResultColumnType>();
block.getByPosition(result).column = col_res;
/// Null maps of the 1st and second function arguments,
/// if it applies.
const PaddedPODArray<UInt8> * null_map_data = nullptr;
@ -650,33 +818,14 @@ private:
const auto item_arg = block.getByPosition(arguments[1]).column.get();
if (item_arg->isNull())
{
const auto col_res = std::make_shared<ResultColumnType>();
block.getByPosition(result).column = col_res;
ArrayIndexNumNullImpl<T, IndexConv>::vector(col_nested->getData(), col_array->getOffsets(),
col_res->getData(), null_map_data);
}
else if (const auto item_arg_const = typeid_cast<const ColumnConst<U> *>(item_arg))
{
const auto col_res = std::make_shared<ResultColumnType>();
block.getByPosition(result).column = col_res;
if (item_arg->isNull())
ArrayIndexNumNullImpl<T, IndexConv>::vector(col_nested->getData(), col_array->getOffsets(),
col_res->getData(), null_map_data);
else
ArrayIndexNumImpl<T, U, IndexConv>::vector(col_nested->getData(), col_array->getOffsets(),
item_arg_const->getData(), col_res->getData(), null_map_data, nullptr);
}
ArrayIndexNumImpl<T, U, IndexConv>::vector(col_nested->getData(), col_array->getOffsets(),
item_arg_const->getData(), col_res->getData(), null_map_data, nullptr);
else if (const auto item_arg_vector = typeid_cast<const ColumnVector<U> *>(item_arg))
{
const auto col_res = std::make_shared<ResultColumnType>();
block.getByPosition(result).column = col_res;
ArrayIndexNumImpl<T, U, IndexConv>::vector(col_nested->getData(), col_array->getOffsets(),
item_arg_vector->getData(), col_res->getData(), null_map_data, null_map_item);
}
else
return false;
@ -695,6 +844,9 @@ private:
if (!col_nested)
return false;
const auto col_res = std::make_shared<ResultColumnType>();
block.getByPosition(result).column = col_res;
/// Null maps of the 1st and second function arguments,
/// if it applies.
const PaddedPODArray<UInt8> * null_map_data = nullptr;
@ -714,31 +866,16 @@ private:
const auto item_arg = block.getByPosition(arguments[1]).column.get();
if (item_arg->isNull())
{
const auto col_res = std::make_shared<ResultColumnType>();
block.getByPosition(result).column = col_res;
ArrayIndexStringNullImpl<IndexConv>::vector_const(col_nested->getChars(), col_array->getOffsets(),
col_nested->getOffsets(), col_res->getData(), null_map_data);
}
else if (const auto item_arg_const = typeid_cast<const ColumnConst<String> *>(item_arg))
{
const auto col_res = std::make_shared<ResultColumnType>();
block.getByPosition(result).column = col_res;
ArrayIndexStringImpl<IndexConv>::vector_const(col_nested->getChars(), col_array->getOffsets(),
col_nested->getOffsets(), item_arg_const->getData(), col_res->getData(),
null_map_data);
}
else if (const auto item_arg_vector = typeid_cast<const ColumnString *>(item_arg))
{
const auto col_res = std::make_shared<ResultColumnType>();
block.getByPosition(result).column = col_res;
ArrayIndexStringImpl<IndexConv>::vector_vector(col_nested->getChars(), col_array->getOffsets(),
col_nested->getOffsets(), item_arg_vector->getChars(), item_arg_vector->getOffsets(),
col_res->getData(), null_map_data, null_map_item);
}
else
return false;
@ -833,22 +970,40 @@ private:
const auto col_res = std::make_shared<ResultColumnType>();
block.getByPosition(result).column = col_res;
if (item_arg.isConst())
/// Null maps of the 1st and second function arguments,
/// if it applies.
const PaddedPODArray<UInt8> * null_map_data = nullptr;
const PaddedPODArray<UInt8> * null_map_item = nullptr;
if (arguments.size() > 2)
{
ArrayIndexGenericImpl<IndexConv, true>::vector(col_nested, col_array->getOffsets(),
*item_arg.cut(0, 1)->convertToFullColumnIfConst(), col_res->getData());
const auto & null_map1 = block.getByPosition(arguments[2]).column;
if (null_map1)
null_map_data = &static_cast<const ColumnUInt8 &>(*null_map1).getData();
const auto & null_map2 = block.getByPosition(arguments[3]).column;
if (null_map2)
null_map_item = &static_cast<const ColumnUInt8 &>(*null_map2).getData();
}
if (item_arg.isNull())
ArrayIndexGenericNullImpl<IndexConv>::vector(col_nested, col_array->getOffsets(),
col_res->getData(), null_map_data);
else if (item_arg.isConst())
ArrayIndexGenericImpl<IndexConv, true>::vector(col_nested, col_array->getOffsets(),
*item_arg.cut(0, 1)->convertToFullColumnIfConst(), col_res->getData(),
null_map_data, nullptr);
else
{
/// If item_arg is tuple and have constants.
if (auto materialized_tuple = item_arg.convertToFullColumnIfConst())
{
ArrayIndexGenericImpl<IndexConv, false>::vector(
col_nested, col_array->getOffsets(), *materialized_tuple, col_res->getData());
}
col_nested, col_array->getOffsets(), *materialized_tuple, col_res->getData(),
null_map_data, null_map_item);
else
ArrayIndexGenericImpl<IndexConv, false>::vector(
col_nested, col_array->getOffsets(), item_arg, col_res->getData());
col_nested, col_array->getOffsets(), item_arg, col_res->getData(),
null_map_data, null_map_item);
}
return true;