Fixed function if of FixedString arguments [#CLICKHOUSE-3202].

This commit is contained in:
Alexey Milovidov 2017-08-05 02:18:51 +03:00 committed by alexey-milovidov
parent b9d12ae234
commit 767d025fb3
2 changed files with 89 additions and 85 deletions

View File

@ -17,6 +17,8 @@
#include <Functions/FunctionHelpers.h> #include <Functions/FunctionHelpers.h>
#include <DataTypes/NumberTraits.h> #include <DataTypes/NumberTraits.h>
#include <DataTypes/DataTypeTraits.h> #include <DataTypes/DataTypeTraits.h>
#include <Functions/GatherUtils.h>
namespace DB namespace DB
{ {
@ -1114,92 +1116,68 @@ private:
const ColumnString * col_else = checkAndGetColumn<ColumnString>(col_else_untyped); const ColumnString * col_else = checkAndGetColumn<ColumnString>(col_else_untyped);
const ColumnFixedString * col_then_fixed = checkAndGetColumn<ColumnFixedString>(col_then_untyped); const ColumnFixedString * col_then_fixed = checkAndGetColumn<ColumnFixedString>(col_then_untyped);
const ColumnFixedString * col_else_fixed = checkAndGetColumn<ColumnFixedString>(col_else_untyped); const ColumnFixedString * col_else_fixed = checkAndGetColumn<ColumnFixedString>(col_else_untyped);
const ColumnConst * col_then_const = checkAndGetColumnConstStringOrFixedString(col_then_untyped); const ColumnConst * col_then_const = checkAndGetColumnConst<ColumnString>(col_then_untyped);
const ColumnConst * col_else_const = checkAndGetColumnConstStringOrFixedString(col_else_untyped); const ColumnConst * col_else_const = checkAndGetColumnConst<ColumnString>(col_else_untyped);
const ColumnConst * col_then_const_fixed = checkAndGetColumnConst<ColumnFixedString>(col_then_untyped);
const ColumnConst * col_else_const_fixed = checkAndGetColumnConst<ColumnFixedString>(col_else_untyped);
if ((col_then || col_then_const || col_then_fixed) && (col_else || col_else_const || col_else_fixed)) const PaddedPODArray<UInt8> & cond_data = cond_col->getData();
{ size_t rows = cond_data.size();
if (col_then_fixed && col_else_fixed)
if ((col_then_fixed || col_then_const_fixed)
&& (col_else_fixed || col_else_const_fixed))
{ {
/// The result is FixedString. /// The result is FixedString.
if (col_then_fixed->getN() != col_else_fixed->getN()) auto col_res_untyped = col_then_untyped->cloneEmpty();
throw Exception("FixedString columns as 'then' and 'else' arguments of function 'if' has different sizes", ErrorCodes::ILLEGAL_COLUMN); block.getByPosition(result).column = col_res_untyped;
ColumnFixedString * col_res = static_cast<ColumnFixedString *>(col_res_untyped.get());
auto sink = FixedStringSink(*col_res, rows);
size_t N = col_then_fixed->getN(); if (col_then_fixed && col_else_fixed)
conditional(FixedStringSource(*col_then_fixed), FixedStringSource(*col_else_fixed), sink, cond_data);
else if (col_then_fixed && col_else_const_fixed)
conditional(FixedStringSource(*col_then_fixed), ConstSource<FixedStringSource>(*col_else_const_fixed), sink, cond_data);
else if (col_then_const_fixed && col_else_fixed)
conditional(ConstSource<FixedStringSource>(*col_then_const_fixed), FixedStringSource(*col_else_fixed), sink, cond_data);
else if (col_then_const_fixed && col_else_const_fixed)
conditional(ConstSource<FixedStringSource>(*col_then_const_fixed), ConstSource<FixedStringSource>(*col_else_const_fixed), sink, cond_data);
auto col_res = std::make_shared<ColumnFixedString>(N); return true;
block.getByPosition(result).column = col_res;
ColumnFixedString::Chars_t & res_vec = col_res->getChars();
StringIfImpl::vector_fixed_vector_fixed(
cond_col->getData(),
col_then_fixed->getChars(),
col_else_fixed->getChars(),
N,
res_vec);
} }
else
if ((col_then || col_then_const || col_then_fixed || col_then_const_fixed)
&& (col_else || col_else_const || col_else_fixed || col_else_const_fixed))
{ {
/// The result is String. /// The result is String.
std::shared_ptr<ColumnString> col_res = std::make_shared<ColumnString>(); std::shared_ptr<ColumnString> col_res = std::make_shared<ColumnString>();
block.getByPosition(result).column = col_res; block.getByPosition(result).column = col_res;
auto sink = StringSink(*col_res, rows);
ColumnString::Chars_t & res_vec = col_res->getChars();
ColumnString::Offsets_t & res_offsets = col_res->getOffsets();
if (col_then && col_else) if (col_then && col_else)
StringIfImpl::vector_vector( conditional(StringSource(*col_then), StringSource(*col_else), sink, cond_data);
cond_col->getData(),
col_then->getChars(), col_then->getOffsets(),
col_else->getChars(), col_else->getOffsets(),
res_vec, res_offsets);
else if (col_then && col_else_const) else if (col_then && col_else_const)
StringIfImpl::vector_constant( conditional(StringSource(*col_then), ConstSource<StringSource>(*col_else_const), sink, cond_data);
cond_col->getData(),
col_then->getChars(), col_then->getOffsets(),
col_else_const->getValue<String>(),
res_vec, res_offsets);
else if (col_then_const && col_else) else if (col_then_const && col_else)
StringIfImpl::constant_vector( conditional(ConstSource<StringSource>(*col_then_const), StringSource(*col_else), sink, cond_data);
cond_col->getData(),
col_then_const->getValue<String>(),
col_else->getChars(), col_else->getOffsets(),
res_vec, res_offsets);
else if (col_then_const && col_else_const) else if (col_then_const && col_else_const)
StringIfImpl::constant_constant( conditional(ConstSource<StringSource>(*col_then_const), ConstSource<StringSource>(*col_else_const), sink, cond_data);
cond_col->getData(),
col_then_const->getValue<String>(),
col_else_const->getValue<String>(),
res_vec, res_offsets);
else if (col_then && col_else_fixed) else if (col_then && col_else_fixed)
StringIfImpl::vector_vector_fixed( conditional(StringSource(*col_then), FixedStringSource(*col_else_fixed), sink, cond_data);
cond_col->getData(),
col_then->getChars(), col_then->getOffsets(),
col_else_fixed->getChars(), col_else_fixed->getN(),
res_vec, res_offsets);
else if (col_then_fixed && col_else) else if (col_then_fixed && col_else)
StringIfImpl::vector_fixed_vector( conditional(FixedStringSource(*col_then_fixed), StringSource(*col_else), sink, cond_data);
cond_col->getData(),
col_then_fixed->getChars(), col_then_fixed->getN(),
col_else->getChars(), col_else->getOffsets(),
res_vec, res_offsets);
else if (col_then_const && col_else_fixed) else if (col_then_const && col_else_fixed)
StringIfImpl::constant_vector_fixed( conditional(ConstSource<StringSource>(*col_then_const), FixedStringSource(*col_else_fixed), sink, cond_data);
cond_col->getData(),
col_then_const->getValue<String>(),
col_else_fixed->getChars(), col_else_fixed->getN(),
res_vec, res_offsets);
else if (col_then_fixed && col_else_const) else if (col_then_fixed && col_else_const)
StringIfImpl::vector_fixed_constant( conditional(FixedStringSource(*col_then_fixed), ConstSource<StringSource>(*col_else_const), sink, cond_data);
cond_col->getData(), else if (col_then && col_else_const_fixed)
col_then_fixed->getChars(), col_then_fixed->getN(), conditional(StringSource(*col_then), ConstSource<FixedStringSource>(*col_else_const_fixed), sink, cond_data);
col_else_const->getValue<String>(), else if (col_then_const_fixed && col_else)
res_vec, res_offsets); conditional(ConstSource<FixedStringSource>(*col_then_const_fixed), StringSource(*col_else), sink, cond_data);
else else if (col_then_const && col_else_const_fixed)
return false; conditional(ConstSource<StringSource>(*col_then_const), ConstSource<FixedStringSource>(*col_else_const_fixed), sink, cond_data);
} else if (col_then_const_fixed && col_else_const)
conditional(ConstSource<FixedStringSource>(*col_then_const_fixed), ConstSource<StringSource>(*col_else_const), sink, cond_data);
return true; return true;
} }
@ -1224,25 +1202,25 @@ private:
if (col_then_elements && col_else_elements) if (col_then_elements && col_else_elements)
StringArrayIfImpl::vector_vector( StringArrayIfImpl::vector_vector(
cond_col->getData(), cond_data,
col_then_elements->getChars(), col_then_elements->getOffsets(), col_arr_then->getOffsets(), col_then_elements->getChars(), col_then_elements->getOffsets(), col_arr_then->getOffsets(),
col_else_elements->getChars(), col_else_elements->getOffsets(), col_arr_else->getOffsets(), col_else_elements->getChars(), col_else_elements->getOffsets(), col_arr_else->getOffsets(),
res_chars, res_string_offsets, res_array_offsets); res_chars, res_string_offsets, res_array_offsets);
else if (col_then_elements && col_arr_else_const) else if (col_then_elements && col_arr_else_const)
StringArrayIfImpl::vector_constant( StringArrayIfImpl::vector_constant(
cond_col->getData(), cond_data,
col_then_elements->getChars(), col_then_elements->getOffsets(), col_arr_then->getOffsets(), col_then_elements->getChars(), col_then_elements->getOffsets(), col_arr_then->getOffsets(),
col_arr_else_const->getValue<Array>(), col_arr_else_const->getValue<Array>(),
res_chars, res_string_offsets, res_array_offsets); res_chars, res_string_offsets, res_array_offsets);
else if (col_arr_then_const && col_else_elements) else if (col_arr_then_const && col_else_elements)
StringArrayIfImpl::constant_vector( StringArrayIfImpl::constant_vector(
cond_col->getData(), cond_data,
col_arr_then_const->getValue<Array>(), col_arr_then_const->getValue<Array>(),
col_else_elements->getChars(), col_else_elements->getOffsets(), col_arr_else->getOffsets(), col_else_elements->getChars(), col_else_elements->getOffsets(), col_arr_else->getOffsets(),
res_chars, res_string_offsets, res_array_offsets); res_chars, res_string_offsets, res_array_offsets);
else if (col_arr_then_const && col_arr_else_const) else if (col_arr_then_const && col_arr_else_const)
StringArrayIfImpl::constant_constant( StringArrayIfImpl::constant_constant(
cond_col->getData(), cond_data,
col_arr_then_const->getValue<Array>(), col_arr_then_const->getValue<Array>(),
col_arr_else_const->getValue<Array>(), col_arr_else_const->getValue<Array>(),
res_chars, res_string_offsets, res_array_offsets); res_chars, res_string_offsets, res_array_offsets);

View File

@ -390,7 +390,7 @@ struct FixedStringSink
ColumnString::Offset_t current_offset = 0; ColumnString::Offset_t current_offset = 0;
FixedStringSink(ColumnFixedString & col, size_t column_size) FixedStringSink(ColumnFixedString & col, size_t column_size)
: elements(col.getChars()), total_rows(column_size) : elements(col.getChars()), string_size(col.getN()), total_rows(column_size)
{ {
elements.resize(column_size * string_size); elements.resize(column_size * string_size);
} }
@ -589,6 +589,11 @@ inline void writeSlice(const StringSource::Slice & slice, StringSink & sink)
sink.current_offset += slice.size; sink.current_offset += slice.size;
} }
inline void writeSlice(const StringSource::Slice & slice, FixedStringSink & sink)
{
memcpySmallAllowReadWriteOverflow15(&sink.elements[sink.current_offset], slice.data, slice.size);
}
/// Assuming same types of underlying columns for slice and sink. /// Assuming same types of underlying columns for slice and sink.
inline void writeSlice(const GenericArraySlice & slice, GenericArraySink & sink) inline void writeSlice(const GenericArraySlice & slice, GenericArraySink & sink)
{ {
@ -723,4 +728,25 @@ void sliceDynamicOffsetBounded(Source && src, Sink && sink, IColumn & offset_col
} }
} }
template <typename SourceA, typename SourceB, typename Sink>
void conditional(SourceA && src_a, SourceB && src_b, Sink && sink, const PaddedPODArray<UInt8> & condition)
{
const UInt8 * cond_pos = &condition[0];
const UInt8 * cond_end = cond_pos + condition.size();
while (cond_pos < cond_end)
{
if (*cond_pos)
writeSlice(src_a.getWhole(), sink);
else
writeSlice(src_b.getWhole(), sink);
++cond_pos;
src_a.next();
src_b.next();
sink.next();
}
}
} }