Update slice for arrays.

This commit is contained in:
Nikolai Kochetov 2020-09-16 13:57:23 +03:00
parent 9e00fb44b5
commit 85698e04fa
10 changed files with 88 additions and 60 deletions

View File

@ -43,14 +43,14 @@ std::unique_ptr<IArraySink> createArraySink(ColumnArray & col, size_t column_siz
void concat(const std::vector<std::unique_ptr<IArraySource>> & sources, IArraySink & sink);
void sliceFromLeftConstantOffsetUnbounded(IArraySource & src, IArraySink & sink, size_t offset);
void sliceFromLeftConstantOffsetBounded(IArraySource & src, IArraySink & sink, size_t offset, ssize_t length);
ColumnArray::MutablePtr sliceFromLeftConstantOffsetUnbounded(IArraySource & src, size_t offset);
ColumnArray::MutablePtr sliceFromLeftConstantOffsetBounded(IArraySource & src, size_t offset, ssize_t length);
void sliceFromRightConstantOffsetUnbounded(IArraySource & src, IArraySink & sink, size_t offset);
void sliceFromRightConstantOffsetBounded(IArraySource & src, IArraySink & sink, size_t offset, ssize_t length);
ColumnArray::MutablePtr sliceFromRightConstantOffsetUnbounded(IArraySource & src, size_t offset);
ColumnArray::MutablePtr sliceFromRightConstantOffsetBounded(IArraySource & src, size_t offset, ssize_t length);
void sliceDynamicOffsetUnbounded(IArraySource & src, IArraySink & sink, const IColumn & offset_column);
void sliceDynamicOffsetBounded(IArraySource & src, IArraySink & sink, const IColumn & offset_column, const IColumn & length_column);
ColumnArray::MutablePtr sliceDynamicOffsetUnbounded(IArraySource & src, const IColumn & offset_column);
ColumnArray::MutablePtr sliceDynamicOffsetBounded(IArraySource & src, const IColumn & offset_column, const IColumn & length_column)
void sliceHas(IArraySource & first, IArraySource & second, ArraySearchType search_type, ColumnUInt8 & result);

View File

@ -45,11 +45,6 @@ struct NumericArraySink : public ArraySinkImpl<NumericArraySink<T>>
size_t row_num = 0;
ColumnArray::Offset current_offset = 0;
MutableColumnPtr createValuesColumn()
{
return ColumnVector<T>::create();
}
NumericArraySink(IColumn & elements_, ColumnArray::Offsets & offsets_, size_t column_size)
: elements(elements_), offsets(offsets_)
{
@ -204,11 +199,6 @@ struct NullableArraySink : public ArraySink
NullMap & null_map;
MutableColumnPtr createValuesColumn()
{
return ColumnNullable::create(ArraySink::createValuesColumn(), ColumnUInt8::create());
}
NullableArraySink(IColumn & elements_, ColumnArray::Offsets & offsets_, size_t column_size)
: ArraySink(assert_cast<ColumnNullable &>(elements_).getNestedColumn(), offsets_, column_size)
, null_map(assert_cast<ColumnNullable &>(elements_).getNullMapData())

View File

@ -33,7 +33,7 @@ template <typename T> struct NumericArraySink;
struct StringSink;
struct FixedStringSink;
struct GenericArraySink;
template <typename ArraySink> struct NullableArraySink
template <typename ArraySink> struct NullableArraySink;
template <typename T>
struct NumericArraySource : public ArraySourceImpl<NumericArraySource<T>>
@ -50,6 +50,11 @@ struct NumericArraySource : public ArraySourceImpl<NumericArraySource<T>>
size_t row_num = 0;
ColumnArray::Offset prev_offset = 0;
MutableColumnPtr createValuesColumn()
{
return ColumnVector<T>::create();
}
explicit NumericArraySource(const ColumnArray & arr)
: elements(typeid_cast<const ColVecType &>(arr.getData()).getData()), offsets(arr.getOffsets())
{
@ -533,6 +538,11 @@ struct GenericArraySource : public ArraySourceImpl<GenericArraySource>
size_t row_num = 0;
ColumnArray::Offset prev_offset = 0;
MutableColumnPtr createValuesColumn()
{
return elements.cloneEmpty();
}
explicit GenericArraySource(const ColumnArray & arr)
: elements(arr.getData()), offsets(arr.getOffsets())
{

View File

@ -6,18 +6,23 @@
namespace DB::GatherUtils
{
struct SliceDynamicOffsetBoundedSelectArraySource : public ArraySinkSourceSelector<SliceDynamicOffsetBoundedSelectArraySource>
struct SliceDynamicOffsetBoundedSelectArraySource : public ArraySourceSelector<SliceDynamicOffsetBoundedSelectArraySource>
{
template <typename Source, typename Sink>
static void selectSourceSink(Source && source, Sink && sink, const IColumn & offset_column, const IColumn & length_column)
template <typename Source>
static void selectSourceSink(Source && source, const IColumn & offset_column, const IColumn & length_column, ColumnArray::MutablePtr & result)
{
using Sink = typename Source::SinkType;
result = ColumnArray::create(source.createValuesColumn());
Sink sink(result->getData(), result->getOffsets(), source.getColumnSize());
sliceDynamicOffsetBounded(source, sink, offset_column, length_column);
}
};
void sliceDynamicOffsetBounded(IArraySource & src, IArraySink & sink, const IColumn & offset_column, const IColumn & length_column)
ColumnArray::MutablePtr sliceDynamicOffsetBounded(IArraySource & src, const IColumn & offset_column, const IColumn & length_column)
{
SliceDynamicOffsetBoundedSelectArraySource::select(src, sink, offset_column, length_column);
ColumnArray::MutablePtr res;
SliceDynamicOffsetBoundedSelectArraySource::select(src, offset_column, length_column, res);
return res;
}
}

View File

@ -6,19 +6,24 @@
namespace DB::GatherUtils
{
struct SliceDynamicOffsetUnboundedSelectArraySource : public ArraySinkSourceSelector<SliceDynamicOffsetUnboundedSelectArraySource>
struct SliceDynamicOffsetUnboundedSelectArraySource : public ArraySourceSelector<SliceDynamicOffsetUnboundedSelectArraySource>
{
template <typename Source, typename Sink>
static void selectSourceSink(Source && source, Sink && sink, const IColumn & offset_column)
template <typename Source>
static void selectSourceSink(Source && source, const IColumn & offset_column, ColumnArray::MutablePtr & result)
{
using Sink = typename Source::SinkType;
result = ColumnArray::create(source.createValuesColumn());
Sink sink(result->getData(), result->getOffsets(), source.getColumnSize());
sliceDynamicOffsetUnbounded(source, sink, offset_column);
}
};
void sliceDynamicOffsetUnbounded(IArraySource & src, IArraySink & sink, const IColumn & offset_column)
ColumnArray::MutablePtr sliceDynamicOffsetUnbounded(IArraySource & src, const IColumn & offset_column)
{
SliceDynamicOffsetUnboundedSelectArraySource::select(src, sink, offset_column);
ColumnArray::MutablePtr res;
SliceDynamicOffsetUnboundedSelectArraySource::select(src, sink, offset_column, res);
return res;
}
}

View File

@ -7,18 +7,23 @@
namespace DB::GatherUtils
{
struct SliceFromLeftConstantOffsetBoundedSelectArraySource
: public ArraySinkSourceSelector<SliceFromLeftConstantOffsetBoundedSelectArraySource>
: public ArraySourceSelector<SliceFromLeftConstantOffsetBoundedSelectArraySource>
{
template <typename Source, typename Sink>
static void selectSourceSink(Source && source, Sink && sink, size_t & offset, ssize_t & length)
template <typename Source>
static void selectSourceSink(Source && source, size_t & offset, ssize_t & length, ColumnArray::MutablePtr & result)
{
using Sink = typename Source::SinkType;
result = ColumnArray::create(source.createValuesColumn());
Sink sink(result->getData(), result->getOffsets(), source.getColumnSize());
sliceFromLeftConstantOffsetBounded(source, sink, offset, length);
}
};
void sliceFromLeftConstantOffsetBounded(IArraySource & src, IArraySink & sink, size_t offset, ssize_t length)
ColumnArray::MutablePtr sliceFromLeftConstantOffsetBounded(IArraySource & src, size_t offset, ssize_t length)
{
SliceFromLeftConstantOffsetBoundedSelectArraySource::select(src, sink, offset, length);
ColumnArray::MutablePtr res;
SliceFromLeftConstantOffsetBoundedSelectArraySource::select(src, offset, length, res);
return res;
}
}

View File

@ -7,18 +7,23 @@
namespace DB::GatherUtils
{
struct SliceFromLeftConstantOffsetUnboundedSelectArraySource
: public ArraySinkSourceSelector<SliceFromLeftConstantOffsetUnboundedSelectArraySource>
: public ArraySourceSelector<SliceFromLeftConstantOffsetUnboundedSelectArraySource>
{
template <typename Source, typename Sink>
static void selectSourceSink(Source && source, Sink && sink, size_t & offset)
template <typename Source>
static void selectSourceSink(Source && source, size_t & offset, ColumnArray::MutablePtr & result)
{
using Sink = typename Source::SinkType;
result = ColumnArray::create(source.createValuesColumn());
Sink sink(result->getData(), result->getOffsets(), source.getColumnSize());
sliceFromLeftConstantOffsetUnbounded(source, sink, offset);
}
};
void sliceFromLeftConstantOffsetUnbounded(IArraySource & src, IArraySink & sink, size_t offset)
ColumnArray::MutablePtr sliceFromLeftConstantOffsetUnbounded(IArraySource & src, size_t offset)
{
SliceFromLeftConstantOffsetUnboundedSelectArraySource::select(src, sink, offset);
ColumnArray::MutablePtr res;
SliceFromLeftConstantOffsetUnboundedSelectArraySource::select(src, offset, res);
return res;
}
}

View File

@ -7,18 +7,23 @@
namespace DB::GatherUtils
{
struct SliceFromRightConstantOffsetBoundedSelectArraySource
: public ArraySinkSourceSelector<SliceFromRightConstantOffsetBoundedSelectArraySource>
: public ArraySourceSelector<SliceFromRightConstantOffsetBoundedSelectArraySource>
{
template <typename Source, typename Sink>
static void selectSourceSink(Source && source, Sink && sink, size_t & offset, ssize_t & length)
template <typename Source>
static void selectSourceSink(Source && source, size_t & offset, ssize_t & length, ColumnArray::MutablePtr & result)
{
using Sink = typename Source::SinkType;
result = ColumnArray::create(source.createValuesColumn());
Sink sink(result->getData(), result->getOffsets(), source.getColumnSize());
sliceFromRightConstantOffsetBounded(source, sink, offset, length);
}
};
void sliceFromRightConstantOffsetBounded(IArraySource & src, IArraySink & sink, size_t offset, ssize_t length)
ColumnArray::MutablePtr sliceFromRightConstantOffsetBounded(IArraySource & src, size_t offset, ssize_t length)
{
SliceFromRightConstantOffsetBoundedSelectArraySource::select(src, sink, offset, length);
ColumnArray::MutablePtr res;
SliceFromRightConstantOffsetBoundedSelectArraySource::select(src, offset, length, res);
return res;
}
}

View File

@ -7,18 +7,23 @@
namespace DB::GatherUtils
{
struct SliceFromRightConstantOffsetUnboundedSelectArraySource
: public ArraySinkSourceSelector<SliceFromRightConstantOffsetUnboundedSelectArraySource>
: public ArraySourceSelector<SliceFromRightConstantOffsetUnboundedSelectArraySource>
{
template <typename Source, typename Sink>
static void selectSourceSink(Source && source, Sink && sink, size_t & offset)
template <typename Source>
static void selectSourceSink(Source && source, size_t & offset, ColumnArray::MutablePtr & result)
{
using Sink = typename Source::SinkType;
result = ColumnArray::create(source.createValuesColumn());
Sink sink(result->getData(), result->getOffsets(), source.getColumnSize());
sliceFromRightConstantOffsetUnbounded(source, sink, offset);
}
};
void sliceFromRightConstantOffsetUnbounded(IArraySource & src, IArraySink & sink, size_t offset)
ColumnArray::MutablePtr sliceFromRightConstantOffsetUnbounded(IArraySource & src, size_t offset)
{
SliceFromRightConstantOffsetUnboundedSelectArraySource::select(src, sink, offset);
ColumnArray::MutablePtr res;
SliceFromRightConstantOffsetUnboundedSelectArraySource::select(src, offset, res);
return res;
}
}

View File

@ -79,8 +79,6 @@ public:
return;
}
auto result_column = return_type->createColumn();
auto & array_column = block.getByPosition(arguments[0]).column;
const auto & offset_column = block.getByPosition(arguments[1]).column;
const auto & length_column = arguments.size() > 2 ? block.getByPosition(arguments[2]).column : nullptr;
@ -101,7 +99,7 @@ public:
else
throw Exception{"First arguments for function " + getName() + " must be array.", ErrorCodes::LOGICAL_ERROR};
auto sink = GatherUtils::createArraySink(typeid_cast<ColumnArray &>(*result_column), size);
ColumnArray::MutablePtr sink;
if (offset_column->onlyNull())
{
@ -111,11 +109,11 @@ public:
return;
}
else if (isColumnConst(*length_column))
GatherUtils::sliceFromLeftConstantOffsetBounded(*source, *sink, 0, length_column->getInt(0));
sink = GatherUtils::sliceFromLeftConstantOffsetBounded(*source, 0, length_column->getInt(0));
else
{
auto const_offset_column = ColumnConst::create(ColumnInt8::create(1, 1), size);
GatherUtils::sliceDynamicOffsetBounded(*source, *sink, *const_offset_column, *length_column);
sink = GatherUtils::sliceDynamicOffsetBounded(*source, *const_offset_column, *length_column);
}
}
else if (isColumnConst(*offset_column))
@ -125,30 +123,30 @@ public:
if (!length_column || length_column->onlyNull())
{
if (offset > 0)
GatherUtils::sliceFromLeftConstantOffsetUnbounded(*source, *sink, static_cast<size_t>(offset - 1));
sink = GatherUtils::sliceFromLeftConstantOffsetUnbounded(*source, static_cast<size_t>(offset - 1));
else
GatherUtils::sliceFromRightConstantOffsetUnbounded(*source, *sink, static_cast<size_t>(-offset));
sink = GatherUtils::sliceFromRightConstantOffsetUnbounded(*source, static_cast<size_t>(-offset));
}
else if (isColumnConst(*length_column))
{
ssize_t length = length_column->getInt(0);
if (offset > 0)
GatherUtils::sliceFromLeftConstantOffsetBounded(*source, *sink, static_cast<size_t>(offset - 1), length);
sink = GatherUtils::sliceFromLeftConstantOffsetBounded(*source, static_cast<size_t>(offset - 1), length);
else
GatherUtils::sliceFromRightConstantOffsetBounded(*source, *sink, static_cast<size_t>(-offset), length);
sink = GatherUtils::sliceFromRightConstantOffsetBounded(*source, static_cast<size_t>(-offset), length);
}
else
GatherUtils::sliceDynamicOffsetBounded(*source, *sink, *offset_column, *length_column);
sink = GatherUtils::sliceDynamicOffsetBounded(*source, *offset_column, *length_column);
}
else
{
if (!length_column || length_column->onlyNull())
GatherUtils::sliceDynamicOffsetUnbounded(*source, *sink, *offset_column);
sink = GatherUtils::sliceDynamicOffsetUnbounded(*source, *offset_column);
else
GatherUtils::sliceDynamicOffsetBounded(*source, *sink, *offset_column, *length_column);
sink = GatherUtils::sliceDynamicOffsetBounded(*source, *offset_column, *length_column);
}
block.getByPosition(result).column = std::move(result_column);
block.getByPosition(result).column = std::move(sink);
}
bool useDefaultImplementationForConstants() const override { return true; }