fixed slice for nullable index [#CLICKHOUSE-2090]

This commit is contained in:
Nikolai Kochetov 2017-08-24 21:59:28 +03:00
parent d8529e1846
commit d213efa974

View File

@ -1052,14 +1052,15 @@ template <typename Source, typename Sink>
void NO_INLINE sliceDynamicOffsetUnbounded(Source & src, Sink & sink, IColumn & offset_column) void NO_INLINE sliceDynamicOffsetUnbounded(Source & src, Sink & sink, IColumn & offset_column)
{ {
const bool is_null = offset_column.isNull(); const bool is_null = offset_column.isNull();
const bool is_nullable = offset_column.isNullable(); auto * nullable = typeid_cast<ColumnNullable *>(&offset_column);
auto null_map = is_nullable ? &static_cast<ColumnNullable &>(offset_column).getNullMapConcreteColumn().getData() : nullptr; ColumnUInt8::Container_t * null_map = nullable ? &nullable->getNullMapConcreteColumn().getData() : nullptr;
IColumn * nested_column = nullable ? nullable->getNestedColumn().get() : &offset_column;
while (!src.isEnd()) while (!src.isEnd())
{ {
auto row_num = src.rowNum(); auto row_num = src.rowNum();
bool has_offset = !is_null && !(is_nullable && (*null_map)[row_num]); bool has_offset = !is_null && !(null_map && (*null_map)[row_num]);
Int64 offset = has_offset ? offset_column.getInt(row_num) : 1; Int64 offset = has_offset ? nested_column->getInt(row_num) : 1;
if (offset != 0) if (offset != 0)
{ {
@ -1082,20 +1083,22 @@ template <typename Source, typename Sink>
void NO_INLINE sliceDynamicOffsetBounded(Source & src, Sink & sink, IColumn & offset_column, IColumn & length_column) void NO_INLINE sliceDynamicOffsetBounded(Source & src, Sink & sink, IColumn & offset_column, IColumn & length_column)
{ {
const bool is_offset_null = offset_column.isNull(); const bool is_offset_null = offset_column.isNull();
const bool is_offset_nullable = offset_column.isNullable(); auto * offset_nullable = typeid_cast<ColumnNullable *>(&offset_column);
auto offset_null_map = is_offset_nullable ? &static_cast<ColumnNullable &>(offset_column).getNullMapConcreteColumn().getData() : nullptr; ColumnUInt8::Container_t * offset_null_map = offset_nullable ? &offset_nullable->getNullMapConcreteColumn().getData() : nullptr;
IColumn * offset_nested_column = offset_nullable ? offset_nullable->getNestedColumn().get() : &offset_column;
const bool is_length_null = length_column.isNull(); const bool is_length_null = length_column.isNull();
const bool is_length_nullable = length_column.isNullable(); auto * length_nullable = typeid_cast<ColumnNullable *>(&length_column);
auto length_null_map = is_length_nullable ? &static_cast<ColumnNullable &>(length_column).getNullMapConcreteColumn().getData() : nullptr; ColumnUInt8::Container_t * length_null_map = length_nullable ? &length_nullable->getNullMapConcreteColumn().getData() : nullptr;
IColumn * length_nested_column = length_nullable ? length_nullable->getNestedColumn().get() : &length_column;
while (!src.isEnd()) while (!src.isEnd())
{ {
size_t row_num = src.rowNum(); size_t row_num = src.rowNum();
bool has_offset = !is_offset_null && !(is_offset_nullable && (*offset_null_map)[row_num]); bool has_offset = !is_offset_null && !(offset_null_map && (*offset_null_map)[row_num]);
bool has_length = !is_length_null && !(is_length_nullable && (*length_null_map)[row_num]); bool has_length = !is_length_null && !(length_null_map && (*length_null_map)[row_num]);
Int64 offset = has_offset ? offset_column.getInt(row_num) : 1; Int64 offset = has_offset ? offset_nested_column->getInt(row_num) : 1;
Int64 size = has_length ? length_column.getInt(row_num) : static_cast<Int64>(src.getElementSize()); Int64 size = has_length ? length_nested_column->getInt(row_num) : static_cast<Int64>(src.getElementSize());
if (size < 0) if (size < 0)
size += offset > 0 ? static_cast<Int64>(src.getElementSize()) - (offset - 1) : -offset; size += offset > 0 ? static_cast<Int64>(src.getElementSize()) - (offset - 1) : -offset;