diff --git a/dbms/src/Functions/FunctionsArray.cpp b/dbms/src/Functions/FunctionsArray.cpp index f70709b2f07..49c2af2b812 100644 --- a/dbms/src/Functions/FunctionsArray.cpp +++ b/dbms/src/Functions/FunctionsArray.cpp @@ -2536,34 +2536,29 @@ bool FunctionArrayReverse::executeFixedString( const ColumnNullable * nullable_col, ColumnNullable * nullable_res_col) { - auto do_reverse = [](const auto & src_data, const auto & src_offsets, auto & res_data) - { - size_t size = src_offsets.size(); - ColumnArray::Offset_t src_prev_offset = 0; - - for (size_t i = 0; i < size; ++i) - { - const auto * src = &src_data[src_prev_offset]; - const auto * src_end = &src_data[src_offsets[i]]; - - if (src == src_end) - continue; - - auto dst = &res_data[src_offsets[i] - 1]; - - while (src < src_end) - { - *dst = *src; - ++src; - --dst; - } - - src_prev_offset = src_offsets[i]; - } - }; - if (const ColumnFixedString * src_data_concrete = typeid_cast(&src_data)) { + /// XXX The original implementation, i.e. everything but the new code + /// XXX for nullable support is buggy. Example with a table with one column + /// XXX whose type is Array(FixedString(3))): + /// │ ['a\0\0','bc\0','def'] │ + /// │ ['a\0\0','bc\0','def'] │ + /// │ ['a\0\0','bc\0','def'] │ + /// │ ['a\0\0','bc\0','def'] │ + /// │ ['a\0\0','bc\0','def'] │ + /// │ ['a\0\0','bc\0','def'] │ + /// │ ['a\0\0','bc\0','def'] │ + /// + /// XXX SELECT reverse(col) FROM table gives the following output: + /// + /// │ ['def','a\0\0','bc\0'] │ + /// │ ['def','a\0\0','bc\0'] │ + /// │ ['def','a\0\0','bc\0'] │ + /// │ ['def','a\0\0','bc\0'] │ + /// │ ['def','a\0\0','bc\0'] │ + /// │ ['def','a\0\0','bc\0'] │ + /// │ ['def','\0\0\0','\0\0\0'] │ + const size_t n = src_data_concrete->getN(); const ColumnFixedString::Chars_t & src_data = src_data_concrete->getChars(); ColumnFixedString::Chars_t & res_data = typeid_cast(res_data_col).getChars(); @@ -2597,8 +2592,29 @@ bool FunctionArrayReverse::executeFixedString( /// Make a reverted null map. const auto & src_null_map = static_cast(*nullable_col->getNullValuesByteMap()).getData(); auto & res_null_map = static_cast(*nullable_res_col->getNullValuesByteMap()).getData(); - res_null_map.resize(src_data.size()); - do_reverse(src_null_map, src_offsets, res_null_map); + res_null_map.resize(src_null_map.size()); + + ColumnArray::Offset_t src_prev_offset = 0; + + for (size_t i = 0; i < size; ++i) + { + const UInt8 * src = &src_null_map[src_prev_offset]; + const UInt8 * src_end = &src_null_map[src_offsets[i]]; + + if (src == src_end) + continue; + + UInt8 * dst = &res_null_map[src_offsets[i] - 1]; + + while (src < src_end) + { + *dst = *src; + ++src; + --dst; + } + + src_prev_offset = src_offsets[i]; + } } return true;