updated GatherUtils [#CLICKHOUSE-2090]

This commit is contained in:
Nikolai Kochetov 2017-08-17 22:33:02 +03:00
parent c3fe8bb026
commit c1479a5594
5 changed files with 736 additions and 99 deletions

View File

@ -0,0 +1,51 @@
#pragma once
namespace DB
{
template <typename ... Types>
struct TypeList;
template <typename Type, typename ... Types>
struct TypeList<Type, Types...>
{
using Head = Type;
using Tail = TypeList<Types ...>;
};
template<>
struct TypeList<>
{
};
/// Append Type to TypeList
/// Usage:
/// using TypeListWithType = typename AppendToTypeList<Type, ConcreteTypeList>::Type;
template <typename TypeToAppend, typename List, typename ... Types>
struct AppendToTypeList
{
using Type = typename AppendToTypeList<TypeToAppend, typename List::Tail, Types ..., typename List::Head>::Type;
};
template <typename TypeToAppend, typename ... Types>
struct AppendToTypeList<TypeToAppend, TypeList<>, Types ...>
{
using Type = TypeList<TypeToAppend, Types ...>;
};
/// Apply TypeList as variadic template argument of Class.
/// Usage:
/// using ClassWithAppliedTypeList = typename ApplyTypeListForClass<Class, ConcreteTypeList>::Type;
template <template <typename ...> typename Class, typename List, typename ... Types>
struct ApplyTypeListForClass
{
using Type = typename ApplyTypeListForClass<Class, typename List::Tail, Types ..., typename List::Head>::Type;
};
template <template <typename ...> typename Class, typename ... Types>
struct ApplyTypeListForClass<Class, TypeList<>, Types ...>
{
using Type = Class<Types ...>;
};
}

View File

@ -0,0 +1,10 @@
#pragma once
#include <Core/Types.h>
#include <Common/TypeList.h>
namespace DB
{
using TypeListNumber = TypeList<UInt8, UInt16, UInt32, UInt64, Int8, Int16, Int32, Int64, Float32, Float64>;
}

View File

@ -2892,98 +2892,88 @@ String FunctionArrayConcat::getName() const
DataTypePtr FunctionArrayConcat::getReturnTypeImpl(const DataTypes & arguments) const
{
bool has_nullable = false;
DataTypePtr common_type;
if (arguments.empty())
throw Exception{"Function array requires at least one argument.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH};
DataTypes nested_types;
nested_types.reserve(arguments.size());
for (size_t i : ext::range(0, arguments.size()))
{
const auto & argument = arguments[i];
auto data_type_array = checkAndGetDataType<DataTypeArray>(argument.get());
if (!data_type_array)
auto argument = arguments[i].get();
if (auto data_type_array = checkAndGetDataType<DataTypeArray>(argument))
nested_types.push_back(data_type_array->getNestedType());
else
throw Exception("Argument " + toString(i) + " for function " + getName() + " must be an array but it has type "
+ argument->getName() + ".", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
DataTypePtr nested_type = data_type_array->getNestedType();
if (nested_type->isNullable())
{
has_nullable = true;
auto nullable_type = static_cast<const DataTypeNullable *>(nested_type.get());
nested_type = nullable_type->getNestedType();
}
if (!common_type)
common_type = nested_type;
else if (!common_type->equals(*nested_type.get()))
throw Exception("Arguments for function " + getName() + " has different nested types: "
+ common_type->getName() + " and " + nested_type->getName() + ".", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
}
if (has_nullable)
common_type = std::make_shared<DataTypeNullable>(common_type);
if (foundNumericType(nested_types))
{
/// Since we have found at least one numeric argument, we infer that all
/// the arguments are numeric up to nullity. Let's determine the least
/// common type.
auto enriched_result_type = Conditional::getArrayType(nested_types);
return std::make_shared<DataTypeArray>(enriched_result_type);
}
else
{
/// Otherwise all the arguments must have the same type up to nullability or nullity.
if (!hasArrayIdenticalTypes(nested_types))
throw Exception{"Arguments for function " + getName() + " must have same type or behave as number.", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT};
return std::make_shared<DataTypeArray>(common_type);
return std::make_shared<DataTypeArray>(getArrayElementType(nested_types));
}
}
void FunctionArrayConcat::executeImpl(Block & block, const ColumnNumbers & arguments, size_t result)
{
std::cerr << block.dumpStructure() << std::endl;
auto & result_column = block.getByPosition(result).column;
const DataTypePtr & return_type = block.getByPosition(result).type;
result_column = return_type->createColumn();
ColumnPtr column_to_clone_for_result = block.getByPosition(arguments.front()).column;
bool is_nullable_result = false;
size_t size = 0;
for (auto argument : arguments)
{
const auto & argument_column = block.safeGetByPosition(argument).column;
auto argument_column_array = typeid_cast<ColumnArray *>(argument_column.get());
if (checkColumn<ColumnNullable>(&argument_column_array->getData()))
{
is_nullable_result = true;
column_to_clone_for_result = argument_column;
break;
}
size = argument_column->size();
}
std::vector<ColumnPtr> nullable_columns_holder;
std::vector<GenericArraySource> sources;
std::vector<std::unique_ptr<IArraySource>> sources;
for (auto argument : arguments)
{
auto & argument_column = block.getByPosition(argument).column;
auto argument_column_array = typeid_cast<ColumnArray *>(argument_column.get());
auto argument_column = block.getByPosition(argument).column.get();
size_t column_size = argument_column->size();
bool is_const = false;
if (is_nullable_result)
if (auto argument_column_const = typeid_cast<ColumnConst *>(argument_column))
{
if (!checkColumn<ColumnNullable>(&argument_column_array->getData()))
{
ColumnPtr null_map = std::make_shared<ColumnUInt8>(argument_column_array->getData().size(), 0);
argument_column = std::make_shared<ColumnArray>(
std::make_shared<ColumnNullable>(argument_column_array->getDataPtr(), null_map),
argument_column_array->getOffsetsColumn()
);
}
nullable_columns_holder.push_back(argument_column);
is_const = true;
argument_column = &argument_column_const->getDataColumn();
}
sources.emplace_back(static_cast<ColumnArray &>(*argument_column.get()));
if (auto argument_column_array = typeid_cast<ColumnArray *>(argument_column))
sources.emplace_back(createArraySource(*argument_column_array, is_const, column_size));
else
throw Exception{"Arguments for function " + getName() + " must be arrays.", ErrorCodes::LOGICAL_ERROR};
}
result_column = column_to_clone_for_result->cloneEmpty();
GenericArraySink sink(typeid_cast<ColumnArray &>(*result_column.get()), size);
auto sink = createArraySink(typeid_cast<ColumnArray &>(*result_column.get()), size);
concat(sources, *sink);
while (!sink.isEnd())
auto array = checkAndGetColumn<ColumnArray>(result_column.get());
if (array)
{
for (auto & source : sources)
{
auto slice = source.getWhole();
std::cerr << slice.size << std::endl;
writeSlice(slice, sink);
source.next();
}
sink.next();
std::cerr << sink.elements.size() << std::endl;
auto & offsets = array->getOffsets();
std::cerr << offsets.size() << " " << array->size()<< std::endl;
for (auto & val : offsets)
std::cerr << val << ' ';
std::cerr << std::endl;
}
std::cerr << block.dumpStructure() << std::endl;
}

View File

@ -1414,8 +1414,8 @@ public:
String getName() const override;
bool isVariadic() const override { return false; }
size_t getNumberOfArguments() const override { return 2; }
bool isVariadic() const override { return true; }
size_t getNumberOfArguments() const override { return 0; }
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override;

View File

@ -7,13 +7,19 @@
#include <Columns/ColumnString.h>
#include <Columns/ColumnFixedString.h>
#include <Columns/ColumnConst.h>
#include <Columns/ColumnNullable.h>
#include <Functions/FunctionHelpers.h>
#include <DataTypes/NumberTraits.h>
#include <Common/typeid_cast.h>
#include <Common/memcpySmall.h>
#include <ext/range.h>
#include <Core/TypeListNumber.h>
#include <DataTypes/DataTypeTraits.h>
#include <iostream>
#include <typeindex>
/** These methods are intended for implementation of functions, that
* copy ranges from one or more columns to another column.
@ -36,6 +42,11 @@
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
template <typename T>
struct NumericArraySlice
{
@ -44,8 +55,24 @@ struct NumericArraySlice
};
struct IArraySource
{
virtual ~IArraySource() {}
virtual size_t getSizeForReserve() const = 0;
virtual const typename ColumnArray::Offsets_t & getOffsets() const = 0;
virtual size_t getColumnSize() const = 0;
virtual bool isConst() const { return false; }
virtual bool isNullable() const { return false; }
};
struct IArraySink
{
virtual ~IArraySink() {}
};
template <typename T>
struct NumericArraySource
struct NumericArraySource : public IArraySource
{
using Slice = NumericArraySlice<T>;
using Column = ColumnArray;
@ -77,12 +104,22 @@ struct NumericArraySource
return row_num;
}
const typename ColumnArray::Offsets_t & getOffsets() const
{
return offsets;
}
/// Get size for corresponding call or Sink::reserve to reserve memory for elements.
size_t getSizeForReserve() const
size_t getSizeForReserve() const override
{
return elements.size();
}
size_t getColumnSize() const override
{
return offsets.size();
}
Slice getWhole() const
{
return {&elements[prev_offset], offsets[row_num] - prev_offset};
@ -123,7 +160,7 @@ struct NumericArraySource
template <typename Base>
struct ConstSource
struct ConstSource : public Base
{
using Slice = typename Base::Slice;
using Column = ColumnConst;
@ -131,10 +168,21 @@ struct ConstSource
size_t total_rows;
size_t row_num = 0;
Base base;
/// Base base;
ConstSource(const ColumnConst & col)
: total_rows(col.size()), base(static_cast<const typename Base::Column &>(col.getDataColumn()))
/// template <typename ColumnType>
explicit ConstSource(const ColumnConst & col)
: Base(static_cast<const typename Base::Column &>(col.getDataColumn())), total_rows(col.size())
{
}
template <typename ColumnType>
ConstSource(const ColumnType & col, size_t total_rows) : Base(col), total_rows(total_rows)
{
}
template <typename ColumnType>
ConstSource(const ColumnType & col, const ColumnUInt8 & null_map, size_t total_rows) : Base(col, null_map), total_rows(total_rows)
{
}
@ -155,33 +203,43 @@ struct ConstSource
size_t getSizeForReserve() const
{
return total_rows * base.getSizeForReserve();
return total_rows * Base::getSizeForReserve();
}
Slice getWhole() const
size_t getColumnSize() const // overrides for IArraySource
{
return base.getWhole();
return total_rows;
}
Slice getSliceFromLeft(size_t offset) const
bool isConst() const // overrides for IArraySource
{
return base.getSliceFromLeft(offset);
return true;
}
Slice getSliceFromLeft(size_t offset, size_t length) const
{
return base.getSliceFromLeft(offset, length);
}
Slice getSliceFromRight(size_t offset) const
{
return base.getSliceFromRight(offset);
}
Slice getSliceFromRight(size_t offset, size_t length) const
{
return base.getSliceFromRight(offset, length);
}
// Slice getWhole() const
// {
// return base.getWhole();
// }
//
// Slice getSliceFromLeft(size_t offset) const
// {
// return base.getSliceFromLeft(offset);
// }
//
// Slice getSliceFromLeft(size_t offset, size_t length) const
// {
// return base.getSliceFromLeft(offset, length);
// }
//
// Slice getSliceFromRight(size_t offset) const
// {
// return base.getSliceFromRight(offset);
// }
//
// Slice getSliceFromRight(size_t offset, size_t length) const
// {
// return base.getSliceFromRight(offset, length);
// }
};
@ -336,7 +394,7 @@ struct FixedStringSource
template <typename T>
struct NumericArraySink
struct NumericArraySink : public IArraySink
{
typename ColumnVector<T>::Container_t & elements;
typename ColumnArray::Offsets_t & offsets;
@ -499,7 +557,7 @@ struct GenericArraySlice
};
struct GenericArraySource
struct GenericArraySource : public IArraySource
{
using Slice = GenericArraySlice;
using Column = ColumnArray;
@ -531,7 +589,17 @@ struct GenericArraySource
return row_num;
}
size_t getSizeForReserve() const
const typename ColumnArray::Offsets_t & getOffsets() const
{
return offsets;
}
size_t getSizeForReserve() const override
{
return elements.size();
}
size_t getColumnSize() const override
{
return elements.size();
}
@ -574,7 +642,7 @@ struct GenericArraySource
}
};
struct GenericArraySink
struct GenericArraySink : public IArraySink
{
IColumn & elements;
ColumnArray::Offsets_t & offsets;
@ -611,6 +679,200 @@ struct GenericArraySink
};
template <typename Slice>
struct NullableArraySlice : public Slice
{
const UInt8 * null_map;
NullableArraySlice() {}
NullableArraySlice(const Slice & base) : Slice(base) {}
};
template <typename ArraySource>
struct NullableArraySource : public ArraySource
{
using Slice = NullableArraySlice<typename ArraySource::Slice>;
using ArraySource::prev_offset;
using ArraySource::row_num;
using ArraySource::offsets;
const ColumnUInt8::Container_t & null_map;
NullableArraySource(const ColumnArray & arr, const ColumnUInt8 & null_map)
: ArraySource(arr), null_map(null_map.getData())
{
}
Slice getWhole() const
{
Slice slice = ArraySource::getWhole();
slice.null_map = &null_map[prev_offset];
return slice;
}
Slice getSliceFromLeft(size_t offset) const
{
Slice slice = ArraySource::getSliceFromLeft(offset);
if (!slice.size)
slice.null_map = &null_map[prev_offset];
else
slice.null_map = &null_map[prev_offset + offset];
return slice;
}
Slice getSliceFromLeft(size_t offset, size_t length) const
{
Slice slice = ArraySource::getSliceFromLeft(offset, length);
if (!slice.size)
slice.null_map = &null_map[prev_offset];
else
slice.null_map = &null_map[prev_offset + offset];
return slice;
}
Slice getSliceFromRight(size_t offset) const
{
Slice slice = ArraySource::getSliceFromRight(offset);
if (!slice.size)
slice.null_map = &null_map[prev_offset];
else
slice.null_map = &null_map[offsets[row_num] - offset];
return slice;
}
Slice getSliceFromRight(size_t offset, size_t length) const
{
Slice slice = ArraySource::getSliceFromRight(offset, length);
if (!slice.size)
slice.null_map = &null_map[prev_offset];
else
slice.null_map = &null_map[offsets[row_num] - offset];
return slice;
}
bool isNullable() const override
{
return true;
}
};
template <typename ArraySink>
struct NullableArraySink : public ArraySink
{
ColumnUInt8::Container_t & null_map;
NullableArraySink(ColumnArray & arr, ColumnUInt8 & null_map, size_t column_size)
: ArraySink(arr, column_size), null_map(null_map.getData())
{
}
void reserve(size_t num_elements)
{
ArraySink::reserve(num_elements);
null_map.reserve(num_elements);
}
};
template <typename ... Types>
struct ArraySourceCreator;
template <typename Type, typename ... Types>
struct ArraySourceCreator<Type, Types ...>
{
static std::unique_ptr<IArraySource> create(const ColumnArray & col, const ColumnUInt8 * null_map, bool is_const, size_t total_rows)
{
std::cerr << "Numeric " << typeid(Type).name() << std::endl;
if (typeid_cast<const ColumnVector<Type> *>(&col.getData()))
{
std::cerr << "Created" << typeid(NumericArraySource<Type>).name() << std::endl;
if (null_map)
{
if (is_const)
return std::make_unique<ConstSource<NullableArraySource<NumericArraySource<Type>>>>(col, *null_map, total_rows);
return std::make_unique<NullableArraySource<NumericArraySource<Type>>>(col, *null_map);
}
if (is_const)
return std::make_unique<ConstSource<NumericArraySource<Type>>>(col, total_rows);
return std::make_unique<NumericArraySource<Type>>(col);
}
return ArraySourceCreator<Types...>::create(col, null_map, is_const, total_rows);
}
};
template <>
struct ArraySourceCreator<>
{
static std::unique_ptr<IArraySource> create(const ColumnArray & col, const ColumnUInt8 * null_map, bool is_const, size_t total_rows)
{
std::cerr << "Generic " << typeid(GenericArraySource).name() << std::endl;
if (null_map)
{
if (is_const)
return std::make_unique<ConstSource<NullableArraySource<GenericArraySource>>>(col, *null_map, total_rows);
return std::make_unique<NullableArraySource<GenericArraySource>>(col, *null_map);
}
if (is_const)
return std::make_unique<ConstSource<GenericArraySource>>(col, total_rows);
return std::make_unique<GenericArraySource>(col);
}
};
inline std::unique_ptr<IArraySource> createArraySource(const ColumnArray & col, bool is_const, size_t total_rows)
{
using Creator = typename ApplyTypeListForClass<ArraySourceCreator, TypeListNumber>::Type;
if (auto column_nullable = typeid_cast<const ColumnNullable *>(&col.getData()))
{
ColumnArray column(column_nullable->getNestedColumn(), col.getOffsetsColumn());
return Creator::create(column, &column_nullable->getNullMapConcreteColumn(), is_const, total_rows);
}
return Creator::create(col, nullptr, is_const, total_rows);
}
template <typename ... Types>
struct ArraySinkCreator;
template <typename Type, typename ... Types>
struct ArraySinkCreator<Type, Types ...>
{
static std::unique_ptr<IArraySink> create(ColumnArray & col, ColumnUInt8 * null_map, size_t column_size)
{
if (typeid_cast<ColumnVector<Type> *>(&col.getData()))
{
if (null_map)
return std::make_unique<NullableArraySink<NumericArraySink<Type>>>(col, *null_map, column_size);
return std::make_unique<NumericArraySink<Type>>(col, column_size);
}
return ArraySinkCreator<Types ...>::create(col, null_map, column_size);
}
};
template <>
struct ArraySinkCreator<>
{
static std::unique_ptr<IArraySink> create(ColumnArray & col, ColumnUInt8 * null_map, size_t column_size)
{
if (null_map)
return std::make_unique<NullableArraySink<GenericArraySink>>(col, *null_map, column_size);
return std::make_unique<GenericArraySink>(col, column_size);
}
};
inline std::unique_ptr<IArraySink> createArraySink(ColumnArray & col, size_t column_size)
{
using Creator = ApplyTypeListForClass<ArraySinkCreator, TypeListNumber>::Type;
if (auto column_nullable = typeid_cast<ColumnNullable *>(&col.getData()))
{
ColumnArray column(column_nullable->getNestedColumn(), col.getOffsetsColumn());
return Creator::create(column, &column_nullable->getNullMapConcreteColumn(), column_size);
}
return Creator::create(col, nullptr, column_size);
}
template <typename T>
using NumericSlice = const T *;
@ -700,15 +962,17 @@ struct NumericSink
template <typename T>
void writeSlice(const NumericArraySlice<T> & slice, NumericArraySink<T> & sink)
{
sink.elements.resize(sink.current_offset + slice.size);
memcpySmallAllowReadWriteOverflow15(&sink.elements[sink.current_offset], slice.data, slice.size * sizeof(T));
if (sink.elements.size() < sink.current_offset + slice.size)
sink.elements.resize(sink.current_offset + slice.size);
memcpy(&sink.elements[sink.current_offset], slice.data, slice.size * sizeof(T));
sink.current_offset += slice.size;
}
template <typename T, typename U>
void writeSlice(const NumericArraySlice<T> & slice, NumericArraySink<U> & sink)
{
sink.elements.resize(sink.current_offset + slice.size);
if (sink.elements.size() < sink.current_offset + slice.size)
sink.elements.resize(sink.current_offset + slice.size);
for (size_t i = 0; i < slice.size; ++i)
{
sink.elements[sink.current_offset] = slice.data[i];
@ -718,7 +982,8 @@ void writeSlice(const NumericArraySlice<T> & slice, NumericArraySink<U> & sink)
inline ALWAYS_INLINE void writeSlice(const StringSource::Slice & slice, StringSink & sink)
{
sink.elements.resize(sink.current_offset + slice.size);
if (sink.elements.size() < sink.current_offset + slice.size)
sink.elements.resize(sink.current_offset + slice.size);
memcpySmallAllowReadWriteOverflow15(&sink.elements[sink.current_offset], slice.data, slice.size);
sink.current_offset += slice.size;
}
@ -728,24 +993,179 @@ inline ALWAYS_INLINE void writeSlice(const StringSource::Slice & slice, FixedStr
memcpySmallAllowReadWriteOverflow15(&sink.elements[sink.current_offset], slice.data, slice.size);
}
/// Assuming same types of underlying columns for slice and sink.
inline ALWAYS_INLINE void writeSlice(const GenericArraySlice & slice, GenericArraySink & sink)
{
sink.elements.insertRangeFrom(*slice.elements, slice.begin, slice.size);
if (std::type_index(typeid(slice.elements)) == std::type_index(typeid(&sink.elements)))
sink.elements.insertRangeFrom(*slice.elements, slice.begin, slice.size);
else
{
for (size_t i = 0; i < slice.size; ++i)
{
Field field;
slice.elements->get(slice.begin + i, field);
sink.elements.insert(field);
}
}
sink.current_offset += slice.size;
}
template <typename T, typename U>
void ALWAYS_INLINE writeSlice(const NumericSlice<T> & slice, NumericSink<U> & sink)
template <typename T>
inline ALWAYS_INLINE void writeSlice(const GenericArraySlice & slice, NumericArraySink<T> & sink)
{
*sink.pos = *slice;
if (sink.elements.size() < sink.current_offset + slice.size)
sink.elements.resize(sink.current_offset + slice.size);
for (size_t i = 0; i < slice.size; ++i)
{
Field field;
slice.elements->get(slice.begin + i, field);
sink.elements.push_back(field.get<T>());
}
sink.current_offset += slice.size;
}
template <typename T>
inline ALWAYS_INLINE void writeSlice(const NumericArraySlice<T> & slice, GenericArraySink & sink)
{
for (size_t i = 0; i < slice.size; ++i)
{
Field field = static_cast<typename NearestFieldType<T>::Type>(slice.data[i]);
sink.elements.insert(field);
}
sink.current_offset += slice.size;
}
/// Assuming same types of underlying columns for slice and sink if (ArraySlice, ArraySink) is (GenericArraySlice, GenericArraySink).
template <typename ArraySlice, typename ArraySink>
inline ALWAYS_INLINE void writeSlice(const NullableArraySlice<ArraySlice> & slice, NullableArraySink<ArraySink> & sink)
{
if (sink.null_map.size() < sink.current_offset + slice.size)
sink.null_map.resize(sink.current_offset + slice.size);
memcpy(&sink.null_map[sink.current_offset], slice.null_map, slice.size * sizeof(UInt8));
writeSlice(static_cast<const ArraySlice &>(slice), static_cast<ArraySink &>(sink));
}
/// Assuming same types of underlying columns for slice and sink if (ArraySlice, ArraySink) is (GenericArraySlice, GenericArraySink).
template <typename ArraySlice, typename ArraySink>
inline ALWAYS_INLINE void writeSlice(const ArraySlice & slice, NullableArraySink<ArraySink> & sink)
{
if (sink.null_map.size() < sink.current_offset + slice.size)
sink.null_map.resize(sink.current_offset + slice.size);
memset(&sink.null_map[sink.current_offset], 0, slice.size * sizeof(UInt8));
writeSlice(slice, static_cast<ArraySink &>(sink));
}
/// Algorithms
template <typename Source, typename Sink>
static void append(Source & source, Sink & sink)
{
sink.row_num = 0;
while (!source.isEnd())
{
sink.current_offset = sink.offsets[sink.row_num];
writeSlice(source.getWhole(), sink);
sink.next();
source.next();
}
}
template <template <typename ...> typename Base, typename ... Types>
struct ArraySourceSelector;
template <template <typename ...> typename Base, typename Type, typename ... Types>
struct ArraySourceSelector<Base, Type, Types ...>
{
template <typename ... Args>
static void select(IArraySource & source, Args & ... args)
{
if (auto array = typeid_cast<NumericArraySource<Type> *>(&source))
Base<Types ...>::selectImpl(*array, args ...);
else if(auto nullable_array = typeid_cast<NullableArraySource<NumericArraySource<Type>> *>(&source))
Base<Types ...>::selectImpl(*nullable_array, args ...);
else if (auto const_array = typeid_cast<ConstSource<NumericArraySource<Type>> *>(&source))
Base<Types ...>::selectImpl(*const_array, args ...);
else if(auto const_nullable_array = typeid_cast<ConstSource<NullableArraySource<NumericArraySource<Type>>> *>(&source))
Base<Types ...>::selectImpl(*const_nullable_array, args ...);
else
Base<Types ...>::select(source, args ...);
}
};
template <template <typename ...> typename Base>
struct ArraySourceSelector<Base>
{
template <typename ... Args>
static void select(IArraySource & source, Args & ... args)
{
if (auto array = typeid_cast<GenericArraySource *>(&source))
Base<>::selectImpl(*array, args ...);
else if(auto nullable_array = typeid_cast<NullableArraySource<GenericArraySource> *>(&source))
Base<>::selectImpl(*nullable_array, args ...);
else if (auto const_array = typeid_cast<ConstSource<GenericArraySource> *>(&source))
Base<>::selectImpl(*const_array, args ...);
else if(auto const_nullable_array = typeid_cast<ConstSource<NullableArraySource<GenericArraySource>> *>(&source))
Base<>::selectImpl(*const_nullable_array, args ...);
else
throw Exception(std::string("Unknown ArraySource type: ") + typeid(source).name(), ErrorCodes::LOGICAL_ERROR);
}
};
template <template <typename ...> typename Base, typename ... Types>
struct ArraySinkSelector;
template <template <typename ...> typename Base, typename Type, typename ... Types>
struct ArraySinkSelector<Base, Type, Types ...>
{
template <typename ... Args>
static void select(IArraySink & sink, Args & ... args)
{
if (auto nullable_numeric_sink = typeid_cast<NullableArraySink<NumericArraySink<Type>> *>(&sink))
Base<>::selectImpl(*nullable_numeric_sink, args ...);
else if (auto numeric_sink = typeid_cast<NumericArraySink<Type> *>(&sink))
Base<>::selectImpl(*numeric_sink, args ...);
else
Base<Types ...>::select(sink, args ...);
}
};
template <template <typename ...> typename Base>
struct ArraySinkSelector<Base>
{
template <typename ... Args>
static void select(IArraySink & sink, Args & ... args)
{
if (auto nullable_generic_sink = typeid_cast<NullableArraySink<GenericArraySink> *>(&sink))
Base<>::selectImpl(*nullable_generic_sink, args ...);
else if (auto generic_sink = typeid_cast<GenericArraySink *>(&sink))
Base<>::selectImpl(*generic_sink, args ...);
else
throw Exception(std::string("Unknown ArraySink type: ") + typeid(sink).name(), ErrorCodes::LOGICAL_ERROR);
}
};
template <typename ... Types>
struct ArrayAppend : public ArraySourceSelector<ArrayAppend, Types ...>
{
template <typename Source, typename Sink>
static void selectImpl(Source & source, Sink & sink)
{
append<Source, Sink>(source, sink);
}
};
template <typename Sink>
static void append(IArraySource & source, Sink & sink)
{
// using List = typename AppendToTypeList<Sink, TypeListNumber>::Type;
using AppendImpl = typename ApplyTypeListForClass<ArrayAppend, TypeListNumber>::Type;
AppendImpl::select(source, sink);
}
template <typename SourceA, typename SourceB, typename Sink>
void NO_INLINE concat(SourceA && src_a, SourceB && src_b, Sink && sink)
void NO_INLINE concat(SourceA & src_a, SourceB & src_b, Sink & sink)
{
sink.reserve(src_a.getSizeForReserve() + src_b.getSizeForReserve());
@ -760,6 +1180,12 @@ void NO_INLINE concat(SourceA && src_a, SourceB && src_b, Sink && sink)
}
}
template <typename SourceA, typename SourceB, typename Sink>
void NO_INLINE concat(SourceA && src_a, SourceB && src_b, Sink && sink)
{
concat(src_a, src_b, sink);
}
template <typename Sink>
void NO_INLINE concat(StringSources & sources, Sink && sink)
{
@ -774,6 +1200,163 @@ void NO_INLINE concat(StringSources & sources, Sink && sink)
}
}
template <typename SourceType, typename SinkType>
struct ConcatGenericImpl
{
static void concat(GenericArraySource * generic_source, SinkType & sink)
{
auto source = static_cast<SourceType *>(generic_source);
writeSlice(source->getWhole(), sink);
source->next();
}
};
template <typename Sink>
static void NO_INLINE concatGeneric(const std::vector<std::unique_ptr<IArraySource>> & sources, Sink & sink)
{
std::vector<GenericArraySource *> generic_sources;
std::vector<bool> is_nullable;
std::vector<bool> is_const;
generic_sources.reserve(sources.size());
is_nullable.assign(sources.size(), false);
is_const.assign(sources.size(), false);
for (auto i : ext::range(0, sources.size()))
{
const auto &source = sources[i];
if (auto generic_source = typeid_cast<GenericArraySource *>(source.get()))
generic_sources.push_back(static_cast<GenericArraySource *>(generic_source));
else if (auto const_generic_source = typeid_cast<ConstSource<GenericArraySource> *>(source.get()))
{
generic_sources.push_back(static_cast<GenericArraySource *>(const_generic_source));
is_const[i] = true;
}
else if (auto nullable_source = typeid_cast<NullableArraySource<GenericArraySource> *>(source.get()))
{
generic_sources.push_back(static_cast<GenericArraySource *>(nullable_source));
is_nullable[i] = true;
}
else if (auto const_nullable_source = typeid_cast<ConstSource<NullableArraySource<GenericArraySource>> *>(source.get()))
{
generic_sources.push_back(static_cast<GenericArraySource *>(const_nullable_source));
is_nullable[i] = is_const[i] = true;
}
else
throw Exception(std::string("GenericArraySource expected for GenericArraySink, got: ") + typeid(source).name(),
ErrorCodes::LOGICAL_ERROR);
}
while (!sink.isEnd())
{
for (auto i : ext::range(0, sources.size()))
{
auto source = generic_sources[i];
if (is_const[i])
{
if (is_nullable[i])
ConcatGenericImpl<ConstSource<NullableArraySource<GenericArraySource>>, Sink>::concat(source, sink);
else
ConcatGenericImpl<ConstSource<GenericArraySource>, Sink>::concat(source, sink);
}
else
{
if (is_nullable[i])
ConcatGenericImpl<NullableArraySource<GenericArraySource>, Sink>::concat(source, sink);
else
ConcatGenericImpl<GenericArraySource, Sink>::concat(source, sink);
}
}
sink.next();
}
}
template <typename Sink>
void NO_INLINE concat(const std::vector<std::unique_ptr<IArraySource>> & sources, Sink & sink)
{
size_t elements_to_reserve = 0;
bool is_first = true;
/// Prepare offsets column. Offsets should point to starts of result arrays.
for (const auto & source : sources)
{
elements_to_reserve += source->getSizeForReserve();
const auto & offsets = source->getOffsets();
if (is_first)
{
sink.offsets.resize(source->getColumnSize());
memset(&sink.offsets[0], 0, sink.offsets.size() * sizeof(offsets[0]));
is_first = false;
}
std::cerr << "Offsets:";
for (size_t i : ext::range(1, offsets.size()))
{
std::cerr << ' ' << offsets[i];
}
std::cerr << std::endl;
if (source->isConst())
{
for (size_t i : ext::range(1, offsets.size()))
sink.offsets[i] += offsets[0];
}
else
{
for (size_t i : ext::range(1, offsets.size()))
sink.offsets[i] += offsets[i - 1] - (i > 1 ? offsets[i - 2] : 0);
}
}
std::cerr << "Sink offsets:";
for (size_t i : ext::range(1, sink.offsets.size()))
{
sink.offsets[i] += sink.offsets[i - 1];
std::cerr << ' ' << sink.offsets[i];
}
std::cerr << std::endl;
sink.reserve(elements_to_reserve);
for (const auto & source : sources)
{
append<Sink>(*source, sink);
}
}
template <typename ... Types>
struct ArrayConcat;
template <typename ... Types>
struct ArrayConcat : public ArraySinkSelector<ArrayConcat, Types ...>
{
using Sources = std::vector<std::unique_ptr<IArraySource>>;
template <typename Sink>
static void selectImpl(Sink & sink, Sources & sources)
{
concat<Sink>(sources, sink);
}
static void selectImpl(GenericArraySink & sink, Sources & sources)
{
concatGeneric<GenericArraySink>(sources, sink);
}
static void selectImpl(NullableArraySink<GenericArraySink> & sink, Sources & sources)
{
concatGeneric<NullableArraySink<GenericArraySink>>(sources, sink);
}
};
inline void concat(std::vector<std::unique_ptr<IArraySource>> & sources, IArraySink & sink)
{
/// using List = typename AppendToTypeList<std::vector<std::unique_ptr<IArraySource>>, TypeListNumber>::Type;
using ConcatImpl = typename ApplyTypeListForClass<ArrayConcat, TypeListNumber>::Type;
using Sources = std::vector<std::unique_ptr<IArraySource>>;
return ConcatImpl::select(sink, sources);
}
template <typename Source, typename Sink>
void NO_INLINE sliceFromLeftConstantOffsetUnbounded(Source && src, Sink && sink, size_t offset)
@ -870,6 +1453,9 @@ void NO_INLINE sliceDynamicOffsetBounded(Source && src, Sink && sink, IColumn &
}
template <typename SourceA, typename SourceB, typename Sink>
void NO_INLINE conditional(SourceA && src_a, SourceB && src_b, Sink && sink, const PaddedPODArray<UInt8> & condition)
{