This commit is contained in:
Daniil Ivanik 2024-02-03 19:46:00 +01:00
parent f91feb0dcb
commit 3f0cfbd8c0
3 changed files with 197 additions and 122 deletions

View File

@ -27,10 +27,39 @@ void iota(T * begin, size_t count, T first_value)
return iotaImpl(begin, count, first_value);
}
MULTITARGET_FUNCTION_AVX2_SSE42(
MULTITARGET_FUNCTION_HEADER(template <iota_supported_types T> void NO_INLINE),
iotaWithStepImpl, MULTITARGET_FUNCTION_BODY((T * begin, size_t count, T first_value, T step) /// NOLINT
{
for (size_t i = 0; i < count; i++)
*(begin + i) = static_cast<T>(first_value + i * step);
})
)
template <iota_supported_types T>
void iota_with_step(T * begin, size_t count, T first_value, T step)
{
#if USE_MULTITARGET_CODE
if (isArchSupported(TargetArch::AVX2))
return iotaWithStepImplAVX2(begin, count, first_value, step);
if (isArchSupported(TargetArch::SSE42))
return iotaWithStepImplSSE42(begin, count, first_value, step);
#endif
return iotaWithStepImpl(begin, count, first_value, step);
}
template void iota(UInt8 * begin, size_t count, UInt8 first_value);
template void iota(UInt32 * begin, size_t count, UInt32 first_value);
template void iota(UInt64 * begin, size_t count, UInt64 first_value);
#if defined(OS_DARWIN)
template void iota(size_t * begin, size_t count, size_t first_value);
#endif
template void iota_with_step(UInt8 * begin, size_t count, UInt8 first_value, UInt8 step);
template void iota_with_step(UInt32 * begin, size_t count, UInt32 first_value, UInt32 step);
template void iota_with_step(UInt64 * begin, size_t count, UInt64 first_value, UInt64 step);
#if defined(OS_DARWIN)
extern template void iota_with_step(size_t * begin, size_t count, size_t first_value, size_t step);
#endif
}

View File

@ -31,4 +31,13 @@ extern template void iota(UInt64 * begin, size_t count, UInt64 first_value);
#if defined(OS_DARWIN)
extern template void iota(size_t * begin, size_t count, size_t first_value);
#endif
template <iota_supported_types T> void iota_with_step(T * begin, size_t count, T first_value, T step);
extern template void iota_with_step(UInt8 * begin, size_t count, UInt8 first_value, UInt8 step);
extern template void iota_with_step(UInt32 * begin, size_t count, UInt32 first_value, UInt32 step);
extern template void iota_with_step(UInt64 * begin, size_t count, UInt64 first_value, UInt64 step);
#if defined(OS_DARWIN)
extern template void iota(size_t * begin, size_t count, size_t first_value, size_t step);
#endif
}

View File

@ -12,6 +12,8 @@
#include <Common/iota.h>
#include <Common/typeid_cast.h>
#include <Common/logger_useful.h>
namespace DB
{
@ -30,9 +32,9 @@ public:
: ISource(createHeader(column_name)), block_size(block_size_), next(offset_), step(step_), inner_step(inner_step_), inner_remainder(offset_ % inner_step_)
{
}
String getName() const override { return "Numbers"; }
static Block createHeader(const std::string& column_name) { return {ColumnWithTypeAndName(ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), column_name)}; }
protected:
@ -57,10 +59,7 @@ protected:
ColumnUInt64::Container & vec = column->getData();
UInt64 * pos = vec.data(); /// This also accelerates the code.
UInt64 * end = &vec[filtered_block_size];
iota(pos, static_cast<size_t>(end - pos), UInt64{0});
for (UInt64 p = 0; p < filtered_block_size; p += 1) {
vec[p] = vec[p] * inner_step + first_element;
}
iota_with_step(pos, static_cast<size_t>(end - pos), first_element, inner_step);
next += step;
@ -77,28 +76,46 @@ private:
UInt64 inner_remainder;
};
[[maybe_unused]] UInt128 sizeOfRange(const Range & r)
{
UInt128 size;
if (r.right.isPositiveInfinity())
return static_cast<UInt128>(std::numeric_limits<uint64_t>::max()) - r.left.get<UInt64>() + r.left_included;
size = static_cast<UInt128>(r.right.get<UInt64>()) - r.left.get<UInt64>() + 1;
if (!r.left_included)
size--;
if (!r.right_included)
size--;
assert(size >= 0);
return size;
struct RangeWithStep {
Range range;
UInt64 step;
};
[[maybe_unused]] auto sizeOfRanges(const Ranges & rs)
using RangesWithStep = std::vector<RangeWithStep>;
std::optional<RangeWithStep> stepped_range_from_range(const Range& r, UInt64 step, UInt64 remainder) {
UInt64 begin = (r.left.get<UInt64>() / step) * step;
if (begin > std::numeric_limits<UInt64>::max() - remainder) {
return std::nullopt;
}
begin += remainder;
while (begin <= r.left.get<UInt128>() - r.left_included) {
if (std::numeric_limits<UInt64>::max() - step < begin) {
return std::nullopt;
}
begin += step;
}
LOG_DEBUG(&Poco::Logger::get("stepped_range_from_range"), "Begin: {}", begin);
UInt128 right_edge = (r.right.get<UInt128>() + r.right_included);
if (begin >= right_edge) {
return std::nullopt;
}
return std::optional{RangeWithStep{Range(begin, true, static_cast<UInt64>(right_edge - 1), true), step}};
}
[[maybe_unused]] UInt128 sizeOfRange(const RangeWithStep & r)
{
if (r.range.right.isPositiveInfinity())
return static_cast<UInt128>(std::numeric_limits<UInt64>::max() - r.range.left.get<UInt64>()) / r.step + r.range.left_included;
return static_cast<UInt128>(r.range.right.get<UInt64>() - r.range.left.get<UInt64>()) / r.step + 1;
};
[[maybe_unused]] auto sizeOfRanges(const RangesWithStep & rs)
{
UInt128 total_size{};
for (const Range & r : rs)
for (const RangeWithStep & r : rs)
{
/// total_size will never overflow
total_size += sizeOfRange(r);
@ -127,7 +144,7 @@ public:
using RangesStatePtr = std::shared_ptr<RangesState>;
[[maybe_unused]] NumbersRangedSource(const Ranges & ranges_, RangesStatePtr & ranges_state_, UInt64 base_block_size_, const std::string& column_name)
[[maybe_unused]] NumbersRangedSource(const RangesWithStep & ranges_, RangesStatePtr & ranges_state_, UInt64 base_block_size_, const std::string& column_name)
: ISource(NumbersSource::createHeader(column_name)), ranges(ranges_), ranges_state(ranges_state_), base_block_size(base_block_size_)
{
}
@ -187,9 +204,9 @@ protected:
if (ranges.empty())
return {};
auto first_value = [](const Range & r) { return r.left.get<UInt64>() + (r.left_included ? 0 : 1); };
auto first_value = [](const RangeWithStep & r) { return r.range.left.get<UInt64>() + (r.range.left_included ? 0 : 1); };
auto last_value = [](const Range & r) { return r.right.get<UInt64>() - (r.right_included ? 0 : 1); };
auto last_value = [](const RangeWithStep & r) { return r.range.right.get<UInt64>() - (r.range.right_included ? 0 : 1); };
/// Find the data range.
/// If data left is small, shrink block size.
@ -215,31 +232,33 @@ protected:
UInt128 can_provide = cursor.offset_in_ranges == end.offset_in_ranges
? end.offset_in_range - cursor.offset_in_range
: static_cast<UInt128>(last_value(range)) - first_value(range) + 1 - cursor.offset_in_range;
: static_cast<UInt128>(last_value(range) - first_value(range)) / range.step + 1 - cursor.offset_in_range;
/// set value to block
auto set_value = [&pos](UInt128 & start_value, UInt128 & end_value)
auto set_value = [&pos, this](UInt128 & start_value, UInt128 & end_value)
{
if (end_value > std::numeric_limits<UInt64>::max())
{
while (start_value < end_value)
*(pos++) = start_value++;
while (start_value < end_value) {
*(pos++) = start_value;
start_value += this->step;
}
}
else
{
auto start_value_64 = static_cast<UInt64>(start_value);
auto end_value_64 = static_cast<UInt64>(end_value);
auto size = end_value_64 - start_value_64;
iota(pos, static_cast<size_t>(size), start_value_64);
iota_with_step(pos, static_cast<size_t>(size), start_value_64, step);
pos += size;
}
};
if (can_provide > need)
{
UInt64 start_value = first_value(range) + cursor.offset_in_range;
UInt64 start_value = first_value(range) + cursor.offset_in_range * step;
/// end_value will never overflow
iota(pos, static_cast<size_t>(need), start_value);
iota_with_step(pos, static_cast<size_t>(need), start_value, step);
pos += need;
provided += need;
@ -248,8 +267,8 @@ protected:
else if (can_provide == need)
{
/// to avoid UInt64 overflow
UInt128 start_value = static_cast<UInt128>(first_value(range)) + cursor.offset_in_range;
UInt128 end_value = start_value + need;
UInt128 start_value = static_cast<UInt128>(first_value(range)) + cursor.offset_in_range * step;
UInt128 end_value = start_value + need * step;
set_value(start_value, end_value);
provided += need;
@ -259,8 +278,8 @@ protected:
else
{
/// to avoid UInt64 overflow
UInt128 start_value = static_cast<UInt128>(first_value(range)) + cursor.offset_in_range;
UInt128 end_value = start_value + can_provide;
UInt128 start_value = static_cast<UInt128>(first_value(range)) + cursor.offset_in_range * step;
UInt128 end_value = start_value + can_provide * step;
set_value(start_value, end_value);
provided += static_cast<UInt64>(can_provide);
@ -277,13 +296,15 @@ protected:
private:
/// The ranges is shared between all streams.
Ranges ranges;
RangesWithStep ranges;
/// Ranges state shared between all streams, actually is the start of the ranges.
RangesStatePtr ranges_state;
/// Base block size, will shrink when data left is not enough.
UInt64 base_block_size;
UInt64 step;
};
}
@ -304,7 +325,7 @@ namespace
/// Shrink ranges to size.
/// For example: ranges: [1, 5], [8, 100]; size: 7, we will get [1, 5], [8, 9]
[[maybe_unused]] void shrinkRanges(Ranges & ranges, size_t size)
[[maybe_unused]] void shrinkRanges(RangesWithStep & ranges, size_t size)
{
size_t last_range_idx = 0;
for (size_t i = 0; i < ranges.size(); i++)
@ -323,9 +344,9 @@ namespace
else
{
auto & range = ranges[i];
UInt64 right = range.left.get<UInt64>() + static_cast<UInt64>(size);
range.right = Field(right);
range.right_included = !range.left_included;
UInt64 right = range.range.left.get<UInt64>() + static_cast<UInt64>(size);
range.range.right = Field(right);
range.range.right_included = !range.range.left_included;
last_range_idx = i;
break;
}
@ -393,101 +414,117 @@ Pipe ReadFromSystemNumbersStep::makePipe()
num_streams = 1;
/// Build rpn of query filters
// KeyCondition condition(buildFilterDAG(), context, column_names, key_expression);
KeyCondition condition(buildFilterDAG(), context, column_names, key_expression);
Pipe pipe;
Ranges ranges;
// if (condition.extractPlainRanges(ranges))
// {
// /// Intersect ranges with table range
// std::optional<Range> table_range;
// std::optional<Range> overflowed_table_range;
if (condition.extractPlainRanges(ranges))
{
LOG_DEBUG(&Poco::Logger::get("My logger"), "Use optimization");
/// Intersect ranges with table range
std::optional<Range> table_range;
std::optional<Range> overflowed_table_range;
// if (numbers_storage.limit.has_value())
// {
// if (std::numeric_limits<UInt64>::max() - numbers_storage.offset >= *(numbers_storage.limit))
// {
// table_range.emplace(FieldRef(numbers_storage.offset), true, FieldRef(numbers_storage.offset + *(numbers_storage.limit)), false);
// }
// /// UInt64 overflow, for example: SELECT number FROM numbers(18446744073709551614, 5)
// else
// {
// table_range.emplace(FieldRef(numbers_storage.offset), true, std::numeric_limits<UInt64>::max(), true);
// auto overflow_end = UInt128(numbers_storage.offset) + UInt128(*numbers_storage.limit);
// overflowed_table_range.emplace(
// FieldRef(UInt64(0)), true, FieldRef(UInt64(overflow_end - std::numeric_limits<UInt64>::max() - 1)), false);
// }
// }
// else
// {
// table_range.emplace(FieldRef(numbers_storage.offset), true, FieldRef(std::numeric_limits<UInt64>::max()), true);
// }
if (numbers_storage.limit.has_value())
{
if (std::numeric_limits<UInt64>::max() - numbers_storage.offset >= *(numbers_storage.limit))
{
table_range.emplace(FieldRef(numbers_storage.offset), true, FieldRef(numbers_storage.offset + *(numbers_storage.limit)), false);
}
/// UInt64 overflow, for example: SELECT number FROM numbers(18446744073709551614, 5)
else
{
table_range.emplace(FieldRef(numbers_storage.offset), true, std::numeric_limits<UInt64>::max(), true);
auto overflow_end = UInt128(numbers_storage.offset) + UInt128(*numbers_storage.limit);
overflowed_table_range.emplace(
FieldRef(UInt64(0)), true, FieldRef(UInt64(overflow_end - std::numeric_limits<UInt64>::max() - 1)), false);
}
}
else
{
table_range.emplace(FieldRef(numbers_storage.offset), true, FieldRef(std::numeric_limits<UInt64>::max()), true);
}
LOG_DEBUG(&Poco::Logger::get("My logger"), "Found table ranges");
// Ranges intersected_ranges;
// for (auto & r : ranges)
// {
// auto intersected_range = table_range->intersectWith(r);
// if (intersected_range)
// intersected_ranges.push_back(*intersected_range);
// }
// /// intersection with overflowed_table_range goes back.
// if (overflowed_table_range.has_value())
// {
// for (auto & r : ranges)
// {
// auto intersected_range = overflowed_table_range->intersectWith(r);
// if (intersected_range)
// intersected_ranges.push_back(*overflowed_table_range);
// }
// }
RangesWithStep intersected_ranges;
for (auto & r : ranges)
{
auto intersected_range = table_range->intersectWith(r);
if (intersected_range.has_value()) {
auto range_with_step = stepped_range_from_range(intersected_range.value(), numbers_storage.step, numbers_storage.offset % numbers_storage.step);
if (range_with_step.has_value()) {
intersected_ranges.push_back(*range_with_step);
}
}
}
// /// ranges is blank, return a source who has no data
// if (intersected_ranges.empty())
// {
// pipe.addSource(std::make_shared<NullSource>(NumbersSource::createHeader(numbers_storage.column_name)));
// return pipe;
// }
// const auto & limit_length = limit_length_and_offset.first;
// const auto & limit_offset = limit_length_and_offset.second;
// /// If intersected ranges is limited or we can pushdown limit.
// if (!intersected_ranges.rbegin()->right.isPositiveInfinity() || should_pushdown_limit)
// {
// UInt128 total_size = sizeOfRanges(intersected_ranges);
// UInt128 query_limit = limit_length + limit_offset;
for (const auto& range : intersected_ranges) {
LOG_DEBUG(&Poco::Logger::get("Ranges"), "Left: {}; Right {}, LI: {}, RI: {}, Step: {}", range.range.left.get<UInt64>(), range.range.right.get<UInt64>(), range.range.left_included, range.range.right_included, range.step);
// std::cout <<
}
/// intersection with overflowed_table_range goes back.
if (overflowed_table_range.has_value())
{
for (auto & r : ranges)
{
auto intersected_range = overflowed_table_range->intersectWith(r);
if (intersected_range) {
auto range_with_step = stepped_range_from_range(intersected_range.value(), numbers_storage.step, static_cast<UInt64>((static_cast<UInt128>(numbers_storage.offset) + std::numeric_limits<UInt64>::max() + 1) % numbers_storage.step));
if (range_with_step) {
intersected_ranges.push_back(*range_with_step);
}
}
}
}
// /// limit total_size by query_limit
// if (should_pushdown_limit && query_limit < total_size)
// {
// total_size = query_limit;
// /// We should shrink intersected_ranges for case:
// /// intersected_ranges: [1, 4], [7, 100]; query_limit: 2
// shrinkRanges(intersected_ranges, total_size);
// }
/// ranges is blank, return a source who has no data
if (intersected_ranges.empty())
{
pipe.addSource(std::make_shared<NullSource>(NumbersSource::createHeader(numbers_storage.column_name)));
return pipe;
}
const auto & limit_length = limit_length_and_offset.first;
const auto & limit_offset = limit_length_and_offset.second;
// checkLimits(size_t(total_size));
/// If intersected ranges is limited or we can pushdown limit.
if (!intersected_ranges.rbegin()->range.right.isPositiveInfinity() || should_pushdown_limit)
{
UInt128 total_size = sizeOfRanges(intersected_ranges);
UInt128 query_limit = limit_length + limit_offset;
// if (total_size / max_block_size < num_streams)
// num_streams = static_cast<size_t>(total_size / max_block_size);
/// limit total_size by query_limit
if (should_pushdown_limit && query_limit < total_size)
{
total_size = query_limit;
/// We should shrink intersected_ranges for case:
/// intersected_ranges: [1, 4], [7, 100]; query_limit: 2
shrinkRanges(intersected_ranges, total_size);
}
// if (num_streams == 0)
// num_streams = 1;
checkLimits(size_t(total_size));
// /// Ranges state, all streams will share the state.
// auto ranges_state = std::make_shared<NumbersRangedSource::RangesState>();
// for (size_t i = 0; i < num_streams; ++i)
// {
// auto source = std::make_shared<NumbersRangedSource>(intersected_ranges, ranges_state, max_block_size, numbers_storage.column_name);
if (total_size / max_block_size < num_streams)
num_streams = static_cast<size_t>(total_size / max_block_size);
// if (i == 0)
// source->addTotalRowsApprox(total_size);
if (num_streams == 0)
num_streams = 1;
// pipe.addSource(std::move(source));
// }
// return pipe;
// }
// }
/// Ranges state, all streams will share the state.
auto ranges_state = std::make_shared<NumbersRangedSource::RangesState>();
for (size_t i = 0; i < num_streams; ++i)
{
auto source = std::make_shared<NumbersRangedSource>(intersected_ranges, ranges_state, max_block_size, numbers_storage.column_name);
if (i == 0)
source->addTotalRowsApprox(total_size);
pipe.addSource(std::move(source));
}
return pipe;
}
}
/// Fall back to NumbersSource
for (size_t i = 0; i < num_streams; ++i)