#pragma once
#include
#include
#include "Sources.h"
#include "Sinks.h"
#include
#include
#include "GatherUtils.h"
#include "sliceEqualElements.h"
#include "sliceHasImplAnyAll.h"
namespace DB::ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int TOO_LARGE_ARRAY_SIZE;
}
namespace DB::GatherUtils
{
inline constexpr size_t MAX_ARRAY_SIZE = 1 << 30;
/// Methods to copy Slice to Sink, overloaded for various combinations of types.
template
void writeSlice(const NumericArraySlice & slice, NumericArraySink & sink)
{
sink.elements.resize(sink.current_offset + slice.size);
memcpySmallAllowReadWriteOverflow15(&sink.elements[sink.current_offset], slice.data, slice.size * sizeof(T));
sink.current_offset += slice.size;
}
template
void writeSlice(const NumericArraySlice & slice, NumericArraySink & sink)
{
using NativeU = NativeType;
sink.elements.resize(sink.current_offset + slice.size);
for (size_t i = 0; i < slice.size; ++i)
{
const auto & src = slice.data[i];
auto & dst = sink.elements[sink.current_offset];
if constexpr (is_over_big_int || is_over_big_int)
{
if constexpr (is_decimal)
dst = static_cast(src.value);
else
dst = static_cast(src);
}
else
dst = static_cast(src);
++sink.current_offset;
}
}
inline ALWAYS_INLINE void writeSlice(const StringSource::Slice & slice, StringSink & sink)
{
sink.elements.resize(sink.current_offset + slice.size);
memcpySmallAllowReadWriteOverflow15(&sink.elements[sink.current_offset], slice.data, slice.size);
sink.current_offset += slice.size;
}
inline ALWAYS_INLINE void writeSlice(const StringSource::Slice & slice, FixedStringSink & sink)
{
memcpySmallAllowReadWriteOverflow15(&sink.elements[sink.current_offset], slice.data, slice.size);
}
/// Assuming same types of underlying columns for slice and sink if (ArraySlice, ArraySink) is (GenericArraySlice, GenericArraySink).
inline ALWAYS_INLINE void writeSlice(const GenericArraySlice & slice, GenericArraySink & sink)
{
if (slice.elements->structureEquals(sink.elements))
{
sink.elements.insertRangeFrom(*slice.elements, slice.begin, slice.size);
sink.current_offset += slice.size;
}
else
throw Exception(ErrorCodes::LOGICAL_ERROR, "Function writeSlice expects same column types for GenericArraySlice and GenericArraySink.");
}
template
inline ALWAYS_INLINE void writeSlice(const GenericArraySlice & slice, NumericArraySink & sink)
{
sink.elements.resize(sink.current_offset + slice.size);
for (size_t i = 0; i < slice.size; ++i)
{
Field field;
slice.elements->get(slice.begin + i, field);
sink.elements.push_back(applyVisitor(FieldVisitorConvertToNumber(), field));
}
sink.current_offset += slice.size;
}
template
inline ALWAYS_INLINE void writeSlice(const NumericArraySlice & slice, GenericArraySink & sink)
{
for (size_t i = 0; i < slice.size; ++i)
{
if constexpr (is_decimal)
{
DecimalField field(T(slice.data[i]), 0); /// TODO: Decimal scale
sink.elements.insert(field);
}
else
{
Field field = T(slice.data[i]);
sink.elements.insert(field);
}
}
sink.current_offset += slice.size;
}
template
inline ALWAYS_INLINE void writeSlice(const NullableSlice & slice, NullableArraySink & sink)
{
sink.null_map.resize(sink.current_offset + slice.size);
if (slice.size == 1) /// Always true for ValueSlice.
sink.null_map[sink.current_offset] = *slice.null_map;
else
memcpySmallAllowReadWriteOverflow15(&sink.null_map[sink.current_offset], slice.null_map, slice.size * sizeof(UInt8));
writeSlice(static_cast(slice), static_cast(sink));
}
template
inline ALWAYS_INLINE void writeSlice(const Slice & slice, NullableArraySink & sink)
{
sink.null_map.resize(sink.current_offset + slice.size);
if (slice.size == 1) /// Always true for ValueSlice.
sink.null_map[sink.current_offset] = 0;
else if (slice.size)
memset(&sink.null_map[sink.current_offset], 0, slice.size * sizeof(UInt8));
writeSlice(slice, static_cast(sink));
}
template
void writeSlice(const NumericValueSlice & slice, NumericArraySink & sink)
{
sink.elements.resize(sink.current_offset + 1);
sink.elements[sink.current_offset] = slice.value;
++sink.current_offset;
}
/// Assuming same types of underlying columns for slice and sink if (ArraySlice, ArraySink) is (GenericValueSlice, GenericArraySink).
inline ALWAYS_INLINE void writeSlice(const GenericValueSlice & slice, GenericArraySink & sink)
{
if (slice.elements->structureEquals(sink.elements))
{
sink.elements.insertFrom(*slice.elements, slice.position);
++sink.current_offset;
}
else
throw Exception(ErrorCodes::LOGICAL_ERROR, "Function writeSlice expects same column types for GenericValueSlice and GenericArraySink.");
}
template
inline ALWAYS_INLINE void writeSlice(const GenericValueSlice & slice, NumericArraySink & sink)
{
sink.elements.resize(sink.current_offset + 1);
Field field;
slice.elements->get(slice.position, field);
sink.elements.push_back(applyVisitor(FieldVisitorConvertToNumber(), field));
++sink.current_offset;
}
template
inline ALWAYS_INLINE void writeSlice(const NumericValueSlice & slice, GenericArraySink & sink)
{
Field field = T(slice.value);
sink.elements.insert(field);
++sink.current_offset;
}
template
void NO_INLINE concat(SourceA && src_a, SourceB && src_b, Sink && sink)
{
sink.reserve(src_a.getSizeForReserve() + src_b.getSizeForReserve());
while (!src_a.isEnd())
{
writeSlice(src_a.getWhole(), sink);
writeSlice(src_b.getWhole(), sink);
sink.next();
src_a.next();
src_b.next();
}
}
template
void concat(const std::vector> & array_sources, Sink && sink)
{
size_t sources_num = array_sources.size();
std::vector is_const(sources_num);
auto check_and_get_size_to_reserve = [] (auto source, IArraySource * array_source)
{
if (source == nullptr)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Concat function expected {} or {} but got {}",
demangle(typeid(Source).name()), demangle(typeid(ConstSource