combine FINAL selection and WHERE filter

Co-authored-by: Joris Giovannangeli <joris.giovannangeli@ahrefs.com>
Signed-off-by: Duc Canh Le <duccanh.le@ahrefs.com>
This commit is contained in:
Duc Canh Le 2023-10-12 04:58:42 +00:00
parent dfc8793b5c
commit 93a4d69c05
3 changed files with 111 additions and 11 deletions

View File

@ -14,8 +14,12 @@ namespace DB
struct ChunkSelectFinalIndices : public ChunkInfo
{
ColumnPtr select_final_indices;
explicit ChunkSelectFinalIndices(MutableColumnPtr select_final_indices_) : select_final_indices(std::move(select_final_indices_)) {}
ColumnPtr column_holder;
const ColumnUInt32 * select_final_indices;
explicit ChunkSelectFinalIndices(MutableColumnPtr select_final_indices_) : column_holder(std::move(select_final_indices_))
{
select_final_indices = typeid_cast<const ColumnUInt32 *>(column_holder.get());
}
};
/** Merges several sorted inputs into one.

View File

@ -986,7 +986,8 @@ static void addMergingFinal(
MergeTreeData::MergingParams merging_params,
Names partition_key_columns,
size_t max_block_size_rows,
bool use_skipping_final)
bool use_skipping_final,
bool query_has_where)
{
const auto & header = pipe.getHeader();
size_t num_outputs = pipe.numOutputPorts();
@ -1041,9 +1042,14 @@ static void addMergingFinal(
};
pipe.addTransform(get_merging_processor());
if (use_skipping_final && merging_params.mode == MergeTreeData::MergingParams::Replacing)
if (use_skipping_final && !query_has_where)
{
/// Skipping FINAL algorithm will output the original chunk and a column indices of selected rows
/// If query has WHERE, we will merge the indices with the filter in FilterTransform later
/// Otherwise, use SelectByIndicesTransform to select rows
pipe.addSimpleTransform([](const Block & header_)
{ return std::make_shared<SelectByIndicesTransform>(header_); });
}
}
@ -1052,6 +1058,8 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal(
{
const auto & settings = context->getSettingsRef();
const auto & data_settings = data.getSettings();
const auto & select_query = query_info.query->as<ASTSelectQuery>();
bool query_has_where = select_query && select_query->where();
PartRangesReadInfo info(parts_with_ranges, settings, *data_settings);
@ -1184,7 +1192,8 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal(
data.merging_params,
partition_key_columns,
block_size.max_block_size_rows,
settings.use_skipping_final);
use_skipping_final,
query_has_where);
merging_pipes.emplace_back(Pipe::unitePipes(std::move(pipes)));
}

View File

@ -5,6 +5,7 @@
#include <Core/Field.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <Processors/Merges/Algorithms/ReplacingSortedAlgorithm.h>
namespace DB
{
@ -34,6 +35,87 @@ static void replaceFilterToConstant(Block & block, const String & filter_column_
}
}
static std::shared_ptr<const ChunkSelectFinalIndices> getSelectByFinalIndices(Chunk & chunk)
{
if (auto select_final_indices_info = std::dynamic_pointer_cast<const ChunkSelectFinalIndices>(chunk.getChunkInfo()))
{
chunk.setChunkInfo(nullptr);
return select_final_indices_info;
}
return nullptr;
}
static void
executeSelectByIndices(Columns & columns, std::shared_ptr<const ChunkSelectFinalIndices> & select_final_indices_info, size_t & num_rows)
{
if (select_final_indices_info)
{
const auto & index_column = select_final_indices_info->select_final_indices;
if (index_column && index_column->size() != num_rows)
for (auto & column : columns)
column = column->index(*index_column, 0);
num_rows = index_column->size();
}
}
static std::unique_ptr<IFilterDescription> combineFilterAndIndices(
std::unique_ptr<FilterDescription> description,
std::shared_ptr<const ChunkSelectFinalIndices> & select_final_indices_info,
size_t num_rows)
{
if (select_final_indices_info)
{
const auto * index_column = select_final_indices_info->select_final_indices;
if (description->data && index_column && index_column->size() != num_rows)
{
const auto & selected_by_indices = index_column->getData();
const auto & selected_by_filter = *description->data;
/// At this point we know that the filter is not constant, just create a new filter
auto mutable_holder = ColumnUInt8::create(num_rows, 0);
ColumnUInt8 * concrete_column = typeid_cast<ColumnUInt8 *>(mutable_holder.get());
auto & data = concrete_column->getData();
for (auto idx : selected_by_indices)
data[idx] |= 1;
for (size_t i = 0; i < num_rows; ++i)
data[i] &= selected_by_filter[i];
description->data_holder = std::move(mutable_holder);
description->data = &data;
}
}
return std::move(description);
}
static std::unique_ptr<IFilterDescription> combineFilterAndIndices(
std::unique_ptr<SparseFilterDescription> description,
std::shared_ptr<const ChunkSelectFinalIndices> & select_final_indices_info,
size_t num_rows)
{
if (select_final_indices_info)
{
const auto * index_column = select_final_indices_info->select_final_indices;
if (description->filter_indices && index_column && index_column->size() != num_rows)
{
std::unique_ptr<FilterDescription> res;
const auto & selected_by_indices = index_column->getData();
const auto & selected_by_filter = typeid_cast<const ColumnUInt64 *>(description->filter_indices)->getData();
auto mutable_holder = ColumnUInt8::create(num_rows, 0);
auto & data = mutable_holder->getData();
for (auto idx : selected_by_indices)
data[idx] += 1;
for (auto idx : selected_by_filter)
data[idx] = (data[idx] + 1) << 1;
res->data_holder = std::move(mutable_holder);
res->data = &data;
return res;
}
}
return description;
}
Block FilterTransform::transformHeader(
Block header,
const ActionsDAG * expression,
@ -126,6 +208,7 @@ void FilterTransform::doTransform(Chunk & chunk)
{
size_t num_rows_before_filtration = chunk.getNumRows();
auto columns = chunk.detachColumns();
auto select_final_indices_info = getSelectByFinalIndices(chunk);
{
Block block = getInputPort().getHeader().cloneWithColumns(columns);
@ -139,6 +222,7 @@ void FilterTransform::doTransform(Chunk & chunk)
if (constant_filter_description.always_true || on_totals)
{
executeSelectByIndices(columns, select_final_indices_info, num_rows_before_filtration);
chunk.setColumns(std::move(columns), num_rows_before_filtration);
removeFilterIfNeed(chunk);
return;
@ -159,11 +243,20 @@ void FilterTransform::doTransform(Chunk & chunk)
if (constant_filter_description.always_true)
{
executeSelectByIndices(columns, select_final_indices_info, num_rows_before_filtration);
chunk.setColumns(std::move(columns), num_rows_before_filtration);
removeFilterIfNeed(chunk);
return;
}
std::unique_ptr<IFilterDescription> filter_description;
if (filter_column->isSparse())
filter_description = combineFilterAndIndices(
std::make_unique<SparseFilterDescription>(*filter_column), select_final_indices_info, num_rows_before_filtration);
else
filter_description = combineFilterAndIndices(
std::make_unique<FilterDescription>(*filter_column), select_final_indices_info, num_rows_before_filtration);
/** Let's find out how many rows will be in result.
* To do this, we filter out the first non-constant column
* or calculate number of set bytes in the filter.
@ -178,12 +271,6 @@ void FilterTransform::doTransform(Chunk & chunk)
}
}
std::unique_ptr<IFilterDescription> filter_description;
if (filter_column->isSparse())
filter_description = std::make_unique<SparseFilterDescription>(*filter_column);
else
filter_description = std::make_unique<FilterDescription>(*filter_column);
size_t num_filtered_rows = 0;
if (first_non_constant_column != num_columns)
{