dbms: Server: Adding nullable type support to functions manipulating arrays. [#METR-19266]

This commit is contained in:
Alexey Arno 2016-09-20 10:27:45 +03:00
parent 58fec71f0f
commit a38fef4c60

View File

@ -2462,24 +2462,20 @@ bool FunctionArrayReverse::executeNumber(
const ColumnNullable * nullable_col, const ColumnNullable * nullable_col,
ColumnNullable * nullable_res_col) ColumnNullable * nullable_res_col)
{ {
if (const ColumnVector<T> * src_data_concrete = typeid_cast<const ColumnVector<T> *>(&src_data)) auto do_reverse = [](const auto & src_data, const auto & src_offsets, auto & res_data)
{ {
const PaddedPODArray<T> & src_data = src_data_concrete->getData();
PaddedPODArray<T> & res_data = typeid_cast<ColumnVector<T> &>(res_data_col).getData();
size_t size = src_offsets.size(); size_t size = src_offsets.size();
res_data.resize(src_data.size());
ColumnArray::Offset_t src_prev_offset = 0; ColumnArray::Offset_t src_prev_offset = 0;
for (size_t i = 0; i < size; ++i) for (size_t i = 0; i < size; ++i)
{ {
const T * src = &src_data[src_prev_offset]; const auto * src = &src_data[src_prev_offset];
const T * src_end = &src_data[src_offsets[i]]; const auto * src_end = &src_data[src_offsets[i]];
if (src == src_end) if (src == src_end)
continue; continue;
T * dst = &res_data[src_offsets[i] - 1]; auto dst = &res_data[src_offsets[i] - 1];
while (src < src_end) while (src < src_end)
{ {
@ -2490,35 +2486,21 @@ bool FunctionArrayReverse::executeNumber(
src_prev_offset = src_offsets[i]; src_prev_offset = src_offsets[i];
} }
};
if (const ColumnVector<T> * src_data_concrete = typeid_cast<const ColumnVector<T> *>(&src_data))
{
const PaddedPODArray<T> & src_data = src_data_concrete->getData();
PaddedPODArray<T> & res_data = typeid_cast<ColumnVector<T> &>(res_data_col).getData();
res_data.resize(src_data.size());
do_reverse(src_data, src_offsets, res_data);
/// XXX Refactor the code below.
if ((nullable_col != nullptr) && (nullable_res_col != nullptr)) if ((nullable_col != nullptr) && (nullable_res_col != nullptr))
{ {
const auto & src_null_map = static_cast<const ColumnUInt8 &>(*nullable_col->getNullValuesByteMap()).getData(); const auto & src_null_map = static_cast<const ColumnUInt8 &>(*nullable_col->getNullValuesByteMap()).getData();
auto & res_null_map = static_cast<ColumnUInt8 &>(*nullable_res_col->getNullValuesByteMap()).getData(); auto & res_null_map = static_cast<ColumnUInt8 &>(*nullable_res_col->getNullValuesByteMap()).getData();
res_null_map.resize(src_data.size()); res_null_map.resize(src_data.size());
do_reverse(src_null_map, src_offsets, res_null_map);
ColumnArray::Offset_t src_prev_offset = 0;
for (size_t i = 0; i < size; ++i)
{
const UInt8 * src = &src_null_map[src_prev_offset];
const UInt8 * src_end = &src_null_map[src_offsets[i]];
if (src == src_end)
continue;
UInt8 * dst = &res_null_map[src_offsets[i] - 1];
while (src < src_end)
{
*dst = *src;
++src;
--dst;
}
src_prev_offset = src_offsets[i];
}
} }
return true; return true;