From 3f0cfbd8c0816b007ff85b1a3997696ce5ed3214 Mon Sep 17 00:00:00 2001 From: Daniil Ivanik Date: Sat, 3 Feb 2024 19:46:00 +0100 Subject: [PATCH] Kek --- src/Common/iota.cpp | 29 ++ src/Common/iota.h | 9 + .../QueryPlan/ReadFromSystemNumbersStep.cpp | 281 ++++++++++-------- 3 files changed, 197 insertions(+), 122 deletions(-) diff --git a/src/Common/iota.cpp b/src/Common/iota.cpp index 98f18eb195b..532c4bde76d 100644 --- a/src/Common/iota.cpp +++ b/src/Common/iota.cpp @@ -27,10 +27,39 @@ void iota(T * begin, size_t count, T first_value) return iotaImpl(begin, count, first_value); } +MULTITARGET_FUNCTION_AVX2_SSE42( + MULTITARGET_FUNCTION_HEADER(template void NO_INLINE), + iotaWithStepImpl, MULTITARGET_FUNCTION_BODY((T * begin, size_t count, T first_value, T step) /// NOLINT + { + for (size_t i = 0; i < count; i++) + *(begin + i) = static_cast(first_value + i * step); + }) +) + +template +void iota_with_step(T * begin, size_t count, T first_value, T step) +{ +#if USE_MULTITARGET_CODE + if (isArchSupported(TargetArch::AVX2)) + return iotaWithStepImplAVX2(begin, count, first_value, step); + + if (isArchSupported(TargetArch::SSE42)) + return iotaWithStepImplSSE42(begin, count, first_value, step); +#endif + return iotaWithStepImpl(begin, count, first_value, step); +} + template void iota(UInt8 * begin, size_t count, UInt8 first_value); template void iota(UInt32 * begin, size_t count, UInt32 first_value); template void iota(UInt64 * begin, size_t count, UInt64 first_value); #if defined(OS_DARWIN) template void iota(size_t * begin, size_t count, size_t first_value); #endif + +template void iota_with_step(UInt8 * begin, size_t count, UInt8 first_value, UInt8 step); +template void iota_with_step(UInt32 * begin, size_t count, UInt32 first_value, UInt32 step); +template void iota_with_step(UInt64 * begin, size_t count, UInt64 first_value, UInt64 step); +#if defined(OS_DARWIN) +extern template void iota_with_step(size_t * begin, size_t count, size_t first_value, size_t step); +#endif } diff --git a/src/Common/iota.h b/src/Common/iota.h index 7910274d15d..f40cde9d5db 100644 --- a/src/Common/iota.h +++ b/src/Common/iota.h @@ -31,4 +31,13 @@ extern template void iota(UInt64 * begin, size_t count, UInt64 first_value); #if defined(OS_DARWIN) extern template void iota(size_t * begin, size_t count, size_t first_value); #endif + +template void iota_with_step(T * begin, size_t count, T first_value, T step); + +extern template void iota_with_step(UInt8 * begin, size_t count, UInt8 first_value, UInt8 step); +extern template void iota_with_step(UInt32 * begin, size_t count, UInt32 first_value, UInt32 step); +extern template void iota_with_step(UInt64 * begin, size_t count, UInt64 first_value, UInt64 step); +#if defined(OS_DARWIN) +extern template void iota(size_t * begin, size_t count, size_t first_value, size_t step); +#endif } diff --git a/src/Processors/QueryPlan/ReadFromSystemNumbersStep.cpp b/src/Processors/QueryPlan/ReadFromSystemNumbersStep.cpp index 13a14ffb917..f85473e43c3 100644 --- a/src/Processors/QueryPlan/ReadFromSystemNumbersStep.cpp +++ b/src/Processors/QueryPlan/ReadFromSystemNumbersStep.cpp @@ -12,6 +12,8 @@ #include #include +#include + namespace DB { @@ -30,9 +32,9 @@ public: : ISource(createHeader(column_name)), block_size(block_size_), next(offset_), step(step_), inner_step(inner_step_), inner_remainder(offset_ % inner_step_) { } - String getName() const override { return "Numbers"; } + static Block createHeader(const std::string& column_name) { return {ColumnWithTypeAndName(ColumnUInt64::create(), std::make_shared(), column_name)}; } protected: @@ -57,10 +59,7 @@ protected: ColumnUInt64::Container & vec = column->getData(); UInt64 * pos = vec.data(); /// This also accelerates the code. UInt64 * end = &vec[filtered_block_size]; - iota(pos, static_cast(end - pos), UInt64{0}); - for (UInt64 p = 0; p < filtered_block_size; p += 1) { - vec[p] = vec[p] * inner_step + first_element; - } + iota_with_step(pos, static_cast(end - pos), first_element, inner_step); next += step; @@ -77,28 +76,46 @@ private: UInt64 inner_remainder; }; - -[[maybe_unused]] UInt128 sizeOfRange(const Range & r) -{ - UInt128 size; - if (r.right.isPositiveInfinity()) - return static_cast(std::numeric_limits::max()) - r.left.get() + r.left_included; - - size = static_cast(r.right.get()) - r.left.get() + 1; - - if (!r.left_included) - size--; - - if (!r.right_included) - size--; - assert(size >= 0); - return size; +struct RangeWithStep { + Range range; + UInt64 step; }; -[[maybe_unused]] auto sizeOfRanges(const Ranges & rs) +using RangesWithStep = std::vector; + +std::optional stepped_range_from_range(const Range& r, UInt64 step, UInt64 remainder) { + UInt64 begin = (r.left.get() / step) * step; + if (begin > std::numeric_limits::max() - remainder) { + return std::nullopt; + } + begin += remainder; + while (begin <= r.left.get() - r.left_included) { + if (std::numeric_limits::max() - step < begin) { + return std::nullopt; + } + begin += step; + } + + LOG_DEBUG(&Poco::Logger::get("stepped_range_from_range"), "Begin: {}", begin); + UInt128 right_edge = (r.right.get() + r.right_included); + if (begin >= right_edge) { + return std::nullopt; + } + return std::optional{RangeWithStep{Range(begin, true, static_cast(right_edge - 1), true), step}}; +} + +[[maybe_unused]] UInt128 sizeOfRange(const RangeWithStep & r) +{ + if (r.range.right.isPositiveInfinity()) + return static_cast(std::numeric_limits::max() - r.range.left.get()) / r.step + r.range.left_included; + + return static_cast(r.range.right.get() - r.range.left.get()) / r.step + 1; +}; + +[[maybe_unused]] auto sizeOfRanges(const RangesWithStep & rs) { UInt128 total_size{}; - for (const Range & r : rs) + for (const RangeWithStep & r : rs) { /// total_size will never overflow total_size += sizeOfRange(r); @@ -127,7 +144,7 @@ public: using RangesStatePtr = std::shared_ptr; - [[maybe_unused]] NumbersRangedSource(const Ranges & ranges_, RangesStatePtr & ranges_state_, UInt64 base_block_size_, const std::string& column_name) + [[maybe_unused]] NumbersRangedSource(const RangesWithStep & ranges_, RangesStatePtr & ranges_state_, UInt64 base_block_size_, const std::string& column_name) : ISource(NumbersSource::createHeader(column_name)), ranges(ranges_), ranges_state(ranges_state_), base_block_size(base_block_size_) { } @@ -187,9 +204,9 @@ protected: if (ranges.empty()) return {}; - auto first_value = [](const Range & r) { return r.left.get() + (r.left_included ? 0 : 1); }; + auto first_value = [](const RangeWithStep & r) { return r.range.left.get() + (r.range.left_included ? 0 : 1); }; - auto last_value = [](const Range & r) { return r.right.get() - (r.right_included ? 0 : 1); }; + auto last_value = [](const RangeWithStep & r) { return r.range.right.get() - (r.range.right_included ? 0 : 1); }; /// Find the data range. /// If data left is small, shrink block size. @@ -215,31 +232,33 @@ protected: UInt128 can_provide = cursor.offset_in_ranges == end.offset_in_ranges ? end.offset_in_range - cursor.offset_in_range - : static_cast(last_value(range)) - first_value(range) + 1 - cursor.offset_in_range; + : static_cast(last_value(range) - first_value(range)) / range.step + 1 - cursor.offset_in_range; /// set value to block - auto set_value = [&pos](UInt128 & start_value, UInt128 & end_value) + auto set_value = [&pos, this](UInt128 & start_value, UInt128 & end_value) { if (end_value > std::numeric_limits::max()) { - while (start_value < end_value) - *(pos++) = start_value++; + while (start_value < end_value) { + *(pos++) = start_value; + start_value += this->step; + } } else { auto start_value_64 = static_cast(start_value); auto end_value_64 = static_cast(end_value); auto size = end_value_64 - start_value_64; - iota(pos, static_cast(size), start_value_64); + iota_with_step(pos, static_cast(size), start_value_64, step); pos += size; } }; if (can_provide > need) { - UInt64 start_value = first_value(range) + cursor.offset_in_range; + UInt64 start_value = first_value(range) + cursor.offset_in_range * step; /// end_value will never overflow - iota(pos, static_cast(need), start_value); + iota_with_step(pos, static_cast(need), start_value, step); pos += need; provided += need; @@ -248,8 +267,8 @@ protected: else if (can_provide == need) { /// to avoid UInt64 overflow - UInt128 start_value = static_cast(first_value(range)) + cursor.offset_in_range; - UInt128 end_value = start_value + need; + UInt128 start_value = static_cast(first_value(range)) + cursor.offset_in_range * step; + UInt128 end_value = start_value + need * step; set_value(start_value, end_value); provided += need; @@ -259,8 +278,8 @@ protected: else { /// to avoid UInt64 overflow - UInt128 start_value = static_cast(first_value(range)) + cursor.offset_in_range; - UInt128 end_value = start_value + can_provide; + UInt128 start_value = static_cast(first_value(range)) + cursor.offset_in_range * step; + UInt128 end_value = start_value + can_provide * step; set_value(start_value, end_value); provided += static_cast(can_provide); @@ -277,13 +296,15 @@ protected: private: /// The ranges is shared between all streams. - Ranges ranges; + RangesWithStep ranges; /// Ranges state shared between all streams, actually is the start of the ranges. RangesStatePtr ranges_state; /// Base block size, will shrink when data left is not enough. UInt64 base_block_size; + + UInt64 step; }; } @@ -304,7 +325,7 @@ namespace /// Shrink ranges to size. /// For example: ranges: [1, 5], [8, 100]; size: 7, we will get [1, 5], [8, 9] -[[maybe_unused]] void shrinkRanges(Ranges & ranges, size_t size) +[[maybe_unused]] void shrinkRanges(RangesWithStep & ranges, size_t size) { size_t last_range_idx = 0; for (size_t i = 0; i < ranges.size(); i++) @@ -323,9 +344,9 @@ namespace else { auto & range = ranges[i]; - UInt64 right = range.left.get() + static_cast(size); - range.right = Field(right); - range.right_included = !range.left_included; + UInt64 right = range.range.left.get() + static_cast(size); + range.range.right = Field(right); + range.range.right_included = !range.range.left_included; last_range_idx = i; break; } @@ -393,101 +414,117 @@ Pipe ReadFromSystemNumbersStep::makePipe() num_streams = 1; /// Build rpn of query filters - // KeyCondition condition(buildFilterDAG(), context, column_names, key_expression); + KeyCondition condition(buildFilterDAG(), context, column_names, key_expression); Pipe pipe; Ranges ranges; - // if (condition.extractPlainRanges(ranges)) - // { - // /// Intersect ranges with table range - // std::optional table_range; - // std::optional overflowed_table_range; + if (condition.extractPlainRanges(ranges)) + { + LOG_DEBUG(&Poco::Logger::get("My logger"), "Use optimization"); + /// Intersect ranges with table range + std::optional table_range; + std::optional overflowed_table_range; - // if (numbers_storage.limit.has_value()) - // { - // if (std::numeric_limits::max() - numbers_storage.offset >= *(numbers_storage.limit)) - // { - // table_range.emplace(FieldRef(numbers_storage.offset), true, FieldRef(numbers_storage.offset + *(numbers_storage.limit)), false); - // } - // /// UInt64 overflow, for example: SELECT number FROM numbers(18446744073709551614, 5) - // else - // { - // table_range.emplace(FieldRef(numbers_storage.offset), true, std::numeric_limits::max(), true); - // auto overflow_end = UInt128(numbers_storage.offset) + UInt128(*numbers_storage.limit); - // overflowed_table_range.emplace( - // FieldRef(UInt64(0)), true, FieldRef(UInt64(overflow_end - std::numeric_limits::max() - 1)), false); - // } - // } - // else - // { - // table_range.emplace(FieldRef(numbers_storage.offset), true, FieldRef(std::numeric_limits::max()), true); - // } + if (numbers_storage.limit.has_value()) + { + if (std::numeric_limits::max() - numbers_storage.offset >= *(numbers_storage.limit)) + { + table_range.emplace(FieldRef(numbers_storage.offset), true, FieldRef(numbers_storage.offset + *(numbers_storage.limit)), false); + } + /// UInt64 overflow, for example: SELECT number FROM numbers(18446744073709551614, 5) + else + { + table_range.emplace(FieldRef(numbers_storage.offset), true, std::numeric_limits::max(), true); + auto overflow_end = UInt128(numbers_storage.offset) + UInt128(*numbers_storage.limit); + overflowed_table_range.emplace( + FieldRef(UInt64(0)), true, FieldRef(UInt64(overflow_end - std::numeric_limits::max() - 1)), false); + } + } + else + { + table_range.emplace(FieldRef(numbers_storage.offset), true, FieldRef(std::numeric_limits::max()), true); + } + LOG_DEBUG(&Poco::Logger::get("My logger"), "Found table ranges"); - // Ranges intersected_ranges; - // for (auto & r : ranges) - // { - // auto intersected_range = table_range->intersectWith(r); - // if (intersected_range) - // intersected_ranges.push_back(*intersected_range); - // } - // /// intersection with overflowed_table_range goes back. - // if (overflowed_table_range.has_value()) - // { - // for (auto & r : ranges) - // { - // auto intersected_range = overflowed_table_range->intersectWith(r); - // if (intersected_range) - // intersected_ranges.push_back(*overflowed_table_range); - // } - // } + RangesWithStep intersected_ranges; + for (auto & r : ranges) + { + auto intersected_range = table_range->intersectWith(r); + if (intersected_range.has_value()) { + auto range_with_step = stepped_range_from_range(intersected_range.value(), numbers_storage.step, numbers_storage.offset % numbers_storage.step); + if (range_with_step.has_value()) { + intersected_ranges.push_back(*range_with_step); + } + } + } - // /// ranges is blank, return a source who has no data - // if (intersected_ranges.empty()) - // { - // pipe.addSource(std::make_shared(NumbersSource::createHeader(numbers_storage.column_name))); - // return pipe; - // } - // const auto & limit_length = limit_length_and_offset.first; - // const auto & limit_offset = limit_length_and_offset.second; - // /// If intersected ranges is limited or we can pushdown limit. - // if (!intersected_ranges.rbegin()->right.isPositiveInfinity() || should_pushdown_limit) - // { - // UInt128 total_size = sizeOfRanges(intersected_ranges); - // UInt128 query_limit = limit_length + limit_offset; + for (const auto& range : intersected_ranges) { + LOG_DEBUG(&Poco::Logger::get("Ranges"), "Left: {}; Right {}, LI: {}, RI: {}, Step: {}", range.range.left.get(), range.range.right.get(), range.range.left_included, range.range.right_included, range.step); + // std::cout << + } + /// intersection with overflowed_table_range goes back. + if (overflowed_table_range.has_value()) + { + for (auto & r : ranges) + { + auto intersected_range = overflowed_table_range->intersectWith(r); + if (intersected_range) { + auto range_with_step = stepped_range_from_range(intersected_range.value(), numbers_storage.step, static_cast((static_cast(numbers_storage.offset) + std::numeric_limits::max() + 1) % numbers_storage.step)); + if (range_with_step) { + intersected_ranges.push_back(*range_with_step); + } + } + } + } - // /// limit total_size by query_limit - // if (should_pushdown_limit && query_limit < total_size) - // { - // total_size = query_limit; - // /// We should shrink intersected_ranges for case: - // /// intersected_ranges: [1, 4], [7, 100]; query_limit: 2 - // shrinkRanges(intersected_ranges, total_size); - // } + /// ranges is blank, return a source who has no data + if (intersected_ranges.empty()) + { + pipe.addSource(std::make_shared(NumbersSource::createHeader(numbers_storage.column_name))); + return pipe; + } + const auto & limit_length = limit_length_and_offset.first; + const auto & limit_offset = limit_length_and_offset.second; - // checkLimits(size_t(total_size)); + /// If intersected ranges is limited or we can pushdown limit. + if (!intersected_ranges.rbegin()->range.right.isPositiveInfinity() || should_pushdown_limit) + { + UInt128 total_size = sizeOfRanges(intersected_ranges); + UInt128 query_limit = limit_length + limit_offset; - // if (total_size / max_block_size < num_streams) - // num_streams = static_cast(total_size / max_block_size); + /// limit total_size by query_limit + if (should_pushdown_limit && query_limit < total_size) + { + total_size = query_limit; + /// We should shrink intersected_ranges for case: + /// intersected_ranges: [1, 4], [7, 100]; query_limit: 2 + shrinkRanges(intersected_ranges, total_size); + } - // if (num_streams == 0) - // num_streams = 1; + checkLimits(size_t(total_size)); - // /// Ranges state, all streams will share the state. - // auto ranges_state = std::make_shared(); - // for (size_t i = 0; i < num_streams; ++i) - // { - // auto source = std::make_shared(intersected_ranges, ranges_state, max_block_size, numbers_storage.column_name); + if (total_size / max_block_size < num_streams) + num_streams = static_cast(total_size / max_block_size); - // if (i == 0) - // source->addTotalRowsApprox(total_size); + if (num_streams == 0) + num_streams = 1; - // pipe.addSource(std::move(source)); - // } - // return pipe; - // } - // } + /// Ranges state, all streams will share the state. + auto ranges_state = std::make_shared(); + for (size_t i = 0; i < num_streams; ++i) + { + auto source = std::make_shared(intersected_ranges, ranges_state, max_block_size, numbers_storage.column_name); + + if (i == 0) + source->addTotalRowsApprox(total_size); + + pipe.addSource(std::move(source)); + } + return pipe; + } + } /// Fall back to NumbersSource for (size_t i = 0; i < num_streams; ++i)