This commit is contained in:
kevinyhzou 2023-12-14 19:33:15 +08:00
parent e29b78d20b
commit 21b86c0650
2 changed files with 52 additions and 46 deletions

View File

@ -522,6 +522,7 @@ class IColumn;
M(Bool, parsedatetime_parse_without_leading_zeros, true, "Formatters '%c', '%l' and '%k' in function 'parseDateTime()' parse months and hours without leading zeros.", 0) \
M(Bool, formatdatetime_format_without_leading_zeros, false, "Formatters '%c', '%l' and '%k' in function 'formatDateTime()' print months and hours without leading zeros.", 0) \
M(Bool, allow_execute_multiif_columnar_by_memcpy, false, "Allow execute multiIf function columnar by memcpy", 0) \
M(Float, skew_threshold_use_memcpy_execute_multiif_columnar, 0.9f, "The condition skew threshold to use memcpy while execute mulitiif columnar", 0) \
M(Bool, formatdatetime_f_prints_single_zero, false, "Formatter '%f' in function 'formatDateTime()' produces a single zero instead of six zeros if the formatted value has no fractional seconds.", 0) \
M(Bool, formatdatetime_parsedatetime_m_is_month_name, true, "Formatter '%M' in functions 'formatDateTime()' and 'parseDateTime()' produces the month name instead of minutes.", 0) \
M(UInt64, max_partitions_per_insert_block, 100, "Limit maximum number of partitions in single INSERTed block. Zero means unlimited. Throw exception if the block contains too many partitions. This setting is a safety threshold, because using large number of partitions is a common misconception.", 0) \

View File

@ -35,13 +35,13 @@ namespace
///
/// Additionally the arguments, conditions or branches, support nullable types
/// and the NULL value, with a NULL condition treated as false.
class FunctionMultiIf final : public FunctionIfBase
class FunctionMultiIf : public FunctionIfBase
{
public:
static constexpr auto name = "multiIf";
static FunctionPtr create(ContextPtr context_) { return std::make_shared<FunctionMultiIf>(context_); }
explicit FunctionMultiIf(ContextPtr context_) : context(context_) { }
explicit FunctionMultiIf(ContextPtr context_) : context(context_) {}
String getName() const override { return name; }
bool isVariadic() const override { return true; }
@ -143,7 +143,6 @@ public:
* depending on values of conditions.
*/
std::vector<Instruction> instructions;
instructions.reserve(arguments.size() / 2 + 1);
@ -256,13 +255,15 @@ public:
MutableColumnPtr res = ColumnVector<TYPE>::create(rows); \
if (!result_type->isNullable()) \
{ \
executeInstructionsColumnar<TYPE, INDEX>(instructions, rows, res, settings.allow_execute_multiif_columnar_by_memcpy); \
executeInstructionsColumnar<TYPE, INDEX>(instructions, rows, res, instruction_use_memory_copy, \
settings.skew_threshold_use_memcpy_execute_multiif_columnar); \
return std::move(res); \
} \
else \
{ \
MutableColumnPtr null_map = ColumnUInt8::create(rows); \
executeInstructionsColumnarForNullable<TYPE, INDEX>(instructions, rows, res, null_map, settings.allow_execute_multiif_columnar_by_memcpy); \
executeInstructionsColumnarForNullable<TYPE, INDEX>(instructions, rows, res, null_map, instruction_use_memory_copy, \
settings.skew_threshold_use_memcpy_execute_multiif_columnar); \
return ColumnNullable::create(std::move(res), std::move(null_map)); \
} \
}
@ -304,6 +305,8 @@ public:
}
private:
mutable std::optional<int> instruction_use_memory_copy;
static void executeInstructions(std::vector<Instruction> & instructions, size_t rows, const MutableColumnPtr & res)
{
for (size_t i = 0; i < rows; ++i)
@ -382,9 +385,27 @@ private:
}
}
template<typename S>
static void calculateWhetherUseMemoryCopy(PaddedPODArray<S> & inserts, size_t instructions_size, std::optional<int> & instruction_use_memory_copy,
double threshold_use_memcpy)
{
if (!instruction_use_memory_copy.has_value())
{
std::vector<UInt64> instruction_insert_sizes(instructions_size);
for (size_t i = 0; i < inserts.size(); ++i)
instruction_insert_sizes[inserts[i]] += 1;
for (size_t i = 0; i < instruction_insert_sizes.size(); ++i)
{
if (instruction_insert_sizes[i] * 1.0 / inserts.size() >= threshold_use_memcpy)
instruction_use_memory_copy.emplace(static_cast<int>(i));
}
}
}
template<typename T, typename S>
static void executeInstructionsColumnarForNullable(std::vector<Instruction> & instructions, size_t rows,
const MutableColumnPtr & res, const MutableColumnPtr & null_map, bool execute_by_memcpy)
const MutableColumnPtr & res, const MutableColumnPtr & null_map, std::optional<int> & instruction_use_memory_copy, double threshold_use_memcpy)
{
PaddedPODArray<S> inserts(rows, static_cast<S>(instructions.size()));
calculateInserts(instructions, rows, inserts);
@ -393,6 +414,8 @@ private:
PaddedPODArray<UInt8> & null_map_data = assert_cast<ColumnUInt8 &>(*null_map).getData();
std::vector<const ColumnVector<T> *> data_cols(instructions.size());
std::vector<const ColumnUInt8 *> null_map_cols(instructions.size());
calculateWhetherUseMemoryCopy(inserts, instructions.size(), instruction_use_memory_copy, threshold_use_memcpy);
for (size_t i = 0; i < instructions.size(); ++i)
{
if (instructions[i].source->isNullable())
@ -410,63 +433,53 @@ private:
}
else
{
null_map_cols[i] = nullptr;
null_map_cols[i] = ColumnUInt8::create(rows).get();
data_cols[i] = assert_cast<const ColumnVector<T> *>(instructions[i].source.get());
}
}
if (!execute_by_memcpy)
if (!instruction_use_memory_copy.has_value())
{
std::cout << "has no value 111" << std::endl;
for (size_t row_i = 0; row_i < rows; ++row_i)
{
auto & instruction = instructions[inserts[row_i]];
size_t index = instruction.source_is_constant ? 0 : row_i;
res_data[row_i] = data_cols[inserts[row_i]]->getData()[index];
null_map_data[row_i] = null_map_cols[inserts[row_i]] ? null_map_cols[inserts[row_i]]->getData()[index] : 0;
null_map_data[row_i] = null_map_cols[inserts[row_i]]->getData()[index];
}
}
else
{
size_t insert_start_pos = 0;
int val = instruction_use_memory_copy.value();
memcpy(res_data.data(), data_cols[val]->getData().data(), sizeof(T) * rows);
memcpy(null_map_data.data(), null_map_cols[val]->getData().data(), sizeof(UInt8) * rows);
for (size_t row_i = 0; row_i < rows; ++row_i)
{
S curr_insert = inserts[row_i];
if (row_i != rows -1 && curr_insert == inserts[row_i + 1])
if (inserts[row_i] == val)
{
continue;
}
else
{
if (instructions[curr_insert].source_is_constant)
{
for (size_t i = insert_start_pos; i <= row_i; ++i)
{
res_data[i] = data_cols[curr_insert]->getData()[0];
null_map_data[i] = null_map_cols[curr_insert]->getData()[0];
}
}
else
{
memcpy(res_data.data() + insert_start_pos,
data_cols[curr_insert]->getData().data() + insert_start_pos, sizeof(T) * (row_i + 1 - insert_start_pos));
memcpy(null_map_data.data() + insert_start_pos,
null_map_cols[curr_insert]->getData().data() + insert_start_pos, sizeof(UInt8) * (row_i + 1 - insert_start_pos));
}
insert_start_pos = row_i + 1;
}
auto & instruction = instructions[inserts[row_i]];
size_t index = instruction.source_is_constant ? 0 : row_i;
res_data[row_i] = data_cols[inserts[row_i]]->getData()[index];
null_map_data[row_i] = null_map_cols[inserts[row_i]]->getData()[index];
}
}
}
template <typename T, typename S>
static void executeInstructionsColumnar(std::vector<Instruction> & instructions, size_t rows, const MutableColumnPtr & res, bool execute_by_memcpy)
static void executeInstructionsColumnar(std::vector<Instruction> & instructions, size_t rows, const MutableColumnPtr & res,
std::optional<int> & instruction_use_memory_copy, double threshold_use_memcpy)
{
PaddedPODArray<S> inserts(rows, static_cast<S>(instructions.size()));
calculateInserts(instructions, rows, inserts);
PaddedPODArray<T> & res_data = assert_cast<ColumnVector<T> &>(*res).getData();
if (!execute_by_memcpy)
calculateWhetherUseMemoryCopy(inserts, instructions.size(), instruction_use_memory_copy, threshold_use_memcpy);
if (!instruction_use_memory_copy.has_value())
{
std::cout << "has no value" << std::endl;
for (size_t row_i = 0; row_i < rows; ++row_i)
{
auto & instruction = instructions[inserts[row_i]];
@ -476,25 +489,17 @@ private:
}
else
{
size_t insert_start_pos = 0;
std::vector<const ColumnVector<T> *> data_cols(instructions.size());
for (size_t i = 0; i < instructions.size(); ++i)
{
data_cols[i] = assert_cast<const ColumnVector<T> *>(instructions[i].source.get());
}
int val = instruction_use_memory_copy.value();
memcpy(res_data.data(), data_cols[val]->getData().data(), sizeof(T) * rows);
for (size_t row_i = 0; row_i < rows; ++row_i)
{
S curr_insert = inserts[row_i];
if (row_i != rows -1 && curr_insert == inserts[row_i + 1])
{
if (inserts[row_i] == val)
continue;
}
else
{
memcpy(res_data.data() + insert_start_pos,
data_cols[curr_insert]->getData().data() + insert_start_pos, sizeof(T) * (row_i + 1 - insert_start_pos));
insert_start_pos = row_i + 1;
}
res_data[row_i] = data_cols[inserts[row_i]]->getData()[row_i];
}
}
}