some low level optimization

Signed-off-by: Duc Canh Le <duccanh.le@ahrefs.com>
This commit is contained in:
Duc Canh Le 2023-10-13 07:41:49 +00:00
parent b3f4daf6c8
commit f7830bc13a
5 changed files with 74 additions and 16 deletions

View File

@ -23,9 +23,13 @@ struct ConstantFilterDescription
struct IFilterDescription
{
Int64 has_one = -1;
virtual ColumnPtr filter(const IColumn & column, ssize_t result_size_hint) const = 0;
virtual size_t countBytesInFilter() const = 0;
virtual ~IFilterDescription() = default;
bool hasOne() { return has_one >= 0 ? has_one : hasOneImpl();}
protected:
virtual bool hasOneImpl() = 0;
};
/// Obtain a filter from non constant Column, that may have type: UInt8, Nullable(UInt8).
@ -38,6 +42,8 @@ struct FilterDescription final : public IFilterDescription
ColumnPtr filter(const IColumn & column, ssize_t result_size_hint) const override { return column.filter(*data, result_size_hint); }
size_t countBytesInFilter() const override { return DB::countBytesInFilter(*data); }
protected:
bool hasOneImpl() override { return data ? !memoryIsZero(data->data(), 0, data->size()) : false; }
};
struct SparseFilterDescription final : public IFilterDescription
@ -47,6 +53,8 @@ struct SparseFilterDescription final : public IFilterDescription
ColumnPtr filter(const IColumn & column, ssize_t) const override { return column.index(*filter_indices, 0); }
size_t countBytesInFilter() const override { return filter_indices->size(); }
protected:
bool hasOneImpl() override { return filter_indices && !filter_indices->empty(); }
};
struct ColumnWithTypeAndName;

View File

@ -798,8 +798,6 @@ InputOrderInfoPtr buildInputOrderInfo(SortingStep & sorting, QueryPlan::Node & n
if (auto * reading = typeid_cast<ReadFromMergeTree *>(reading_node->step.get()))
{
if (reading->useSkippingFinal())
return nullptr;
auto order_info = buildInputOrderInfo(
reading,
fixed_columns,

View File

@ -315,7 +315,7 @@ ReadFromMergeTree::ReadFromMergeTree(
/// Add explicit description.
setStepDescription(data.getStorageID().getFullNameNotQuoted());
use_skipping_final = context->getSettingsRef().use_skipping_final && data.merging_params.mode == MergeTreeData::MergingParams::Replacing;
use_skipping_final = query_info.isFinal() && context->getSettingsRef().use_skipping_final && data.merging_params.mode == MergeTreeData::MergingParams::Replacing;
updateSortDescriptionForOutputStream(
*output_stream,
@ -1666,6 +1666,10 @@ bool ReadFromMergeTree::requestReadingInOrder(size_t prefix_size, int direction,
if (direction != 1 && query_info.isFinal())
return false;
/// All *InOrder optimization rely on an assumption that output stream is sorted, but skipping FINAL breaks this rule
if (use_skipping_final)
return false;
auto order_info = std::make_shared<InputOrderInfo>(SortDescription{}, prefix_size, direction, limit);
if (query_info.projection)
query_info.projection->input_order_info = order_info;
@ -1697,7 +1701,7 @@ bool ReadFromMergeTree::requestReadingInOrder(size_t prefix_size, int direction,
if (sort_description.size() > used_prefix_of_sorting_key_size)
sort_description.resize(used_prefix_of_sorting_key_size);
output_stream->sort_description = std::move(sort_description);
output_stream->sort_scope = useSkippingFinal() ? DataStream::SortScope::Chunk : DataStream::SortScope::Stream;
output_stream->sort_scope = DataStream::SortScope::Stream;
}
return true;

View File

@ -70,17 +70,62 @@ static std::unique_ptr<IFilterDescription> combineFilterAndIndices(
{
const auto * index_column = select_final_indices_info->select_final_indices;
if (description->data && index_column && index_column->size() != num_rows)
if (index_column && index_column->size() != num_rows && description->hasOne())
{
const auto & selected_by_indices = index_column->getData();
const auto & selected_by_filter = *description->data;
const auto * selected_by_filter = description->data->data();
// description->has_one = 0;
/// At this point we know that the filter is not constant, just create a new filter
auto mutable_holder = ColumnUInt8::create(num_rows, 0);
auto & data = mutable_holder->getData();
for (auto idx : selected_by_indices)
data[idx] |= 1;
for (size_t i = 0; i < num_rows; ++i)
data[i] &= selected_by_filter[i];
data[idx] = 1;
/// AND two filters
auto * begin = data.data();
const auto * end = begin + num_rows;
#if defined(__AVX2__)
while (end - begin >= 32)
{
_mm256_storeu_si256(
reinterpret_cast<__m256i *>(begin),
_mm256_and_si256(
_mm256_loadu_si256(reinterpret_cast<const __m256i *>(begin)),
_mm256_loadu_si256(reinterpret_cast<const __m256i *>(selected_by_filter))));
description->has_one |= !memoryIsZero(begin, 0, 32);
begin += 32;
selected_by_filter += 32;
}
#elif defined(__SSE2__)
while (end - begin >= 16)
{
_mm_storeu_si128(
reinterpret_cast<__m128i *>(begin),
_mm_and_si128(
_mm_loadu_si128(reinterpret_cast<const __m128i *>(begin)),
_mm_loadu_si128(reinterpret_cast<const __m128i *>(selected_by_filter))));
description->has_one |= !memoryIsZero(begin, 0, 16);
begin += 16;
selected_by_filter += 16;
}
#endif
while (end - begin >= 8)
{
*reinterpret_cast<UInt64 *>(begin) &= *reinterpret_cast<const UInt64 *>(selected_by_filter);
description->has_one |= *reinterpret_cast<UInt64 *>(begin);
begin += 8;
selected_by_filter += 8;
}
while (end - begin > 0)
{
*begin &= *selected_by_filter;
description->has_one |= *begin;
begin++;
selected_by_filter++;
}
description->data_holder = std::move(mutable_holder);
description->data = &data;
}
@ -97,8 +142,9 @@ static std::unique_ptr<IFilterDescription> combineFilterAndIndices(
struct Iterator
{
UInt8 * data;
explicit Iterator(UInt8 * data_) : data(data_) {}
Iterator & operator = (UInt64 index) { data[index] |= 1; return *this; }
Int64 & pop_cnt;
explicit Iterator(UInt8 * data_, Int64 & pop_cnt_) : data(data_), pop_cnt(pop_cnt_) {}
Iterator & operator = (UInt64 index) { data[index] = 1; ++pop_cnt; return *this; }
Iterator & operator ++ () { return *this; }
Iterator & operator * () { return *this; }
};
@ -107,14 +153,15 @@ static std::unique_ptr<IFilterDescription> combineFilterAndIndices(
{
const auto * index_column = select_final_indices_info->select_final_indices;
if (description->filter_indices && index_column && index_column->size() != num_rows)
if (index_column && index_column->size() != num_rows && description->hasOne())
{
std::unique_ptr<FilterDescription> res;
res->has_one = 0;
const auto & selected_by_indices = index_column->getData();
const auto & selected_by_filter = description->filter_indices->getData();
auto mutable_holder = ColumnUInt8::create(num_rows, 0);
auto & data = mutable_holder->getData();
Iterator decorator(data.data());
Iterator decorator(data.data(), res->has_one);
std::set_intersection(selected_by_indices.begin(), selected_by_indices.end(), selected_by_filter.begin(), selected_by_filter.end(), decorator);
res->data_holder = std::move(mutable_holder);
res->data = &data;
@ -265,6 +312,10 @@ void FilterTransform::doTransform(Chunk & chunk)
filter_description = combineFilterAndIndices(
std::make_unique<FilterDescription>(*filter_column), select_final_indices_info, num_rows_before_filtration);
if (!filter_description->has_one)
return;
/** Let's find out how many rows will be in result.
* To do this, we filter out the first non-constant column
* or calculate number of set bytes in the filter.

View File

@ -36,9 +36,6 @@ public:
}
chunk.setChunkInfo(nullptr);
}
private:
String index_column_name;
};
}