Fixed function if of FixedString arguments [#CLICKHOUSE-3202].

This commit is contained in:
Alexey Milovidov 2017-08-05 02:18:51 +03:00
parent b9d12ae234
commit aecafc0931
2 changed files with 89 additions and 85 deletions

View File

@ -17,6 +17,8 @@
#include <Functions/FunctionHelpers.h>
#include <DataTypes/NumberTraits.h>
#include <DataTypes/DataTypeTraits.h>
#include <Functions/GatherUtils.h>
namespace DB
{
@ -1114,92 +1116,68 @@ private:
const ColumnString * col_else = checkAndGetColumn<ColumnString>(col_else_untyped);
const ColumnFixedString * col_then_fixed = checkAndGetColumn<ColumnFixedString>(col_then_untyped);
const ColumnFixedString * col_else_fixed = checkAndGetColumn<ColumnFixedString>(col_else_untyped);
const ColumnConst * col_then_const = checkAndGetColumnConstStringOrFixedString(col_then_untyped);
const ColumnConst * col_else_const = checkAndGetColumnConstStringOrFixedString(col_else_untyped);
const ColumnConst * col_then_const = checkAndGetColumnConst<ColumnString>(col_then_untyped);
const ColumnConst * col_else_const = checkAndGetColumnConst<ColumnString>(col_else_untyped);
const ColumnConst * col_then_const_fixed = checkAndGetColumnConst<ColumnFixedString>(col_then_untyped);
const ColumnConst * col_else_const_fixed = checkAndGetColumnConst<ColumnFixedString>(col_else_untyped);
if ((col_then || col_then_const || col_then_fixed) && (col_else || col_else_const || col_else_fixed))
const PaddedPODArray<UInt8> & cond_data = cond_col->getData();
size_t rows = cond_data.size();
if ((col_then_fixed || col_then_const_fixed)
&& (col_else_fixed || col_else_const_fixed))
{
/// The result is FixedString.
auto col_res_untyped = col_then_untyped->cloneEmpty();
block.getByPosition(result).column = col_res_untyped;
ColumnFixedString * col_res = static_cast<ColumnFixedString *>(col_res_untyped.get());
auto sink = FixedStringSink(*col_res, rows);
if (col_then_fixed && col_else_fixed)
{
/// The result is FixedString.
conditional(FixedStringSource(*col_then_fixed), FixedStringSource(*col_else_fixed), sink, cond_data);
else if (col_then_fixed && col_else_const_fixed)
conditional(FixedStringSource(*col_then_fixed), ConstSource<FixedStringSource>(*col_else_const_fixed), sink, cond_data);
else if (col_then_const_fixed && col_else_fixed)
conditional(ConstSource<FixedStringSource>(*col_then_const_fixed), FixedStringSource(*col_else_fixed), sink, cond_data);
else if (col_then_const_fixed && col_else_const_fixed)
conditional(ConstSource<FixedStringSource>(*col_then_const_fixed), ConstSource<FixedStringSource>(*col_else_const_fixed), sink, cond_data);
if (col_then_fixed->getN() != col_else_fixed->getN())
throw Exception("FixedString columns as 'then' and 'else' arguments of function 'if' has different sizes", ErrorCodes::ILLEGAL_COLUMN);
return true;
}
size_t N = col_then_fixed->getN();
if ((col_then || col_then_const || col_then_fixed || col_then_const_fixed)
&& (col_else || col_else_const || col_else_fixed || col_else_const_fixed))
{
/// The result is String.
std::shared_ptr<ColumnString> col_res = std::make_shared<ColumnString>();
block.getByPosition(result).column = col_res;
auto sink = StringSink(*col_res, rows);
auto col_res = std::make_shared<ColumnFixedString>(N);
block.getByPosition(result).column = col_res;
ColumnFixedString::Chars_t & res_vec = col_res->getChars();
StringIfImpl::vector_fixed_vector_fixed(
cond_col->getData(),
col_then_fixed->getChars(),
col_else_fixed->getChars(),
N,
res_vec);
}
else
{
/// The result is String.
std::shared_ptr<ColumnString> col_res = std::make_shared<ColumnString>();
block.getByPosition(result).column = col_res;
ColumnString::Chars_t & res_vec = col_res->getChars();
ColumnString::Offsets_t & res_offsets = col_res->getOffsets();
if (col_then && col_else)
StringIfImpl::vector_vector(
cond_col->getData(),
col_then->getChars(), col_then->getOffsets(),
col_else->getChars(), col_else->getOffsets(),
res_vec, res_offsets);
else if (col_then && col_else_const)
StringIfImpl::vector_constant(
cond_col->getData(),
col_then->getChars(), col_then->getOffsets(),
col_else_const->getValue<String>(),
res_vec, res_offsets);
else if (col_then_const && col_else)
StringIfImpl::constant_vector(
cond_col->getData(),
col_then_const->getValue<String>(),
col_else->getChars(), col_else->getOffsets(),
res_vec, res_offsets);
else if (col_then_const && col_else_const)
StringIfImpl::constant_constant(
cond_col->getData(),
col_then_const->getValue<String>(),
col_else_const->getValue<String>(),
res_vec, res_offsets);
else if (col_then && col_else_fixed)
StringIfImpl::vector_vector_fixed(
cond_col->getData(),
col_then->getChars(), col_then->getOffsets(),
col_else_fixed->getChars(), col_else_fixed->getN(),
res_vec, res_offsets);
else if (col_then_fixed && col_else)
StringIfImpl::vector_fixed_vector(
cond_col->getData(),
col_then_fixed->getChars(), col_then_fixed->getN(),
col_else->getChars(), col_else->getOffsets(),
res_vec, res_offsets);
else if (col_then_const && col_else_fixed)
StringIfImpl::constant_vector_fixed(
cond_col->getData(),
col_then_const->getValue<String>(),
col_else_fixed->getChars(), col_else_fixed->getN(),
res_vec, res_offsets);
else if (col_then_fixed && col_else_const)
StringIfImpl::vector_fixed_constant(
cond_col->getData(),
col_then_fixed->getChars(), col_then_fixed->getN(),
col_else_const->getValue<String>(),
res_vec, res_offsets);
else
return false;
}
if (col_then && col_else)
conditional(StringSource(*col_then), StringSource(*col_else), sink, cond_data);
else if (col_then && col_else_const)
conditional(StringSource(*col_then), ConstSource<StringSource>(*col_else_const), sink, cond_data);
else if (col_then_const && col_else)
conditional(ConstSource<StringSource>(*col_then_const), StringSource(*col_else), sink, cond_data);
else if (col_then_const && col_else_const)
conditional(ConstSource<StringSource>(*col_then_const), ConstSource<StringSource>(*col_else_const), sink, cond_data);
else if (col_then && col_else_fixed)
conditional(StringSource(*col_then), FixedStringSource(*col_else_fixed), sink, cond_data);
else if (col_then_fixed && col_else)
conditional(FixedStringSource(*col_then_fixed), StringSource(*col_else), sink, cond_data);
else if (col_then_const && col_else_fixed)
conditional(ConstSource<StringSource>(*col_then_const), FixedStringSource(*col_else_fixed), sink, cond_data);
else if (col_then_fixed && col_else_const)
conditional(FixedStringSource(*col_then_fixed), ConstSource<StringSource>(*col_else_const), sink, cond_data);
else if (col_then && col_else_const_fixed)
conditional(StringSource(*col_then), ConstSource<FixedStringSource>(*col_else_const_fixed), sink, cond_data);
else if (col_then_const_fixed && col_else)
conditional(ConstSource<FixedStringSource>(*col_then_const_fixed), StringSource(*col_else), sink, cond_data);
else if (col_then_const && col_else_const_fixed)
conditional(ConstSource<StringSource>(*col_then_const), ConstSource<FixedStringSource>(*col_else_const_fixed), sink, cond_data);
else if (col_then_const_fixed && col_else_const)
conditional(ConstSource<FixedStringSource>(*col_then_const_fixed), ConstSource<StringSource>(*col_else_const), sink, cond_data);
return true;
}
@ -1224,25 +1202,25 @@ private:
if (col_then_elements && col_else_elements)
StringArrayIfImpl::vector_vector(
cond_col->getData(),
cond_data,
col_then_elements->getChars(), col_then_elements->getOffsets(), col_arr_then->getOffsets(),
col_else_elements->getChars(), col_else_elements->getOffsets(), col_arr_else->getOffsets(),
res_chars, res_string_offsets, res_array_offsets);
else if (col_then_elements && col_arr_else_const)
StringArrayIfImpl::vector_constant(
cond_col->getData(),
cond_data,
col_then_elements->getChars(), col_then_elements->getOffsets(), col_arr_then->getOffsets(),
col_arr_else_const->getValue<Array>(),
res_chars, res_string_offsets, res_array_offsets);
else if (col_arr_then_const && col_else_elements)
StringArrayIfImpl::constant_vector(
cond_col->getData(),
cond_data,
col_arr_then_const->getValue<Array>(),
col_else_elements->getChars(), col_else_elements->getOffsets(), col_arr_else->getOffsets(),
res_chars, res_string_offsets, res_array_offsets);
else if (col_arr_then_const && col_arr_else_const)
StringArrayIfImpl::constant_constant(
cond_col->getData(),
cond_data,
col_arr_then_const->getValue<Array>(),
col_arr_else_const->getValue<Array>(),
res_chars, res_string_offsets, res_array_offsets);

View File

@ -390,7 +390,7 @@ struct FixedStringSink
ColumnString::Offset_t current_offset = 0;
FixedStringSink(ColumnFixedString & col, size_t column_size)
: elements(col.getChars()), total_rows(column_size)
: elements(col.getChars()), string_size(col.getN()), total_rows(column_size)
{
elements.resize(column_size * string_size);
}
@ -589,6 +589,11 @@ inline void writeSlice(const StringSource::Slice & slice, StringSink & sink)
sink.current_offset += slice.size;
}
inline void writeSlice(const StringSource::Slice & slice, FixedStringSink & sink)
{
memcpySmallAllowReadWriteOverflow15(&sink.elements[sink.current_offset], slice.data, slice.size);
}
/// Assuming same types of underlying columns for slice and sink.
inline void writeSlice(const GenericArraySlice & slice, GenericArraySink & sink)
{
@ -723,4 +728,25 @@ void sliceDynamicOffsetBounded(Source && src, Sink && sink, IColumn & offset_col
}
}
template <typename SourceA, typename SourceB, typename Sink>
void conditional(SourceA && src_a, SourceB && src_b, Sink && sink, const PaddedPODArray<UInt8> & condition)
{
const UInt8 * cond_pos = &condition[0];
const UInt8 * cond_end = cond_pos + condition.size();
while (cond_pos < cond_end)
{
if (*cond_pos)
writeSlice(src_a.getWhole(), sink);
else
writeSlice(src_b.getWhole(), sink);
++cond_pos;
src_a.next();
src_b.next();
sink.next();
}
}
}