Merge pull request #64783 from canhld94/fix_vertical_final2

Remove bad optimization in vertical final implementation and re-enable vertical final
This commit is contained in:
Nikita Taranov 2024-06-14 11:17:33 +00:00 committed by GitHub
commit dc1d710b82
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 20 additions and 176 deletions

View File

@ -23,15 +23,10 @@ struct ConstantFilterDescription
struct IFilterDescription
{
/// has_one can be pre-compute during creating the filter description in some cases
Int64 has_one = -1;
virtual ColumnPtr filter(const IColumn & column, ssize_t result_size_hint) const = 0;
virtual size_t countBytesInFilter() const = 0;
virtual ~IFilterDescription() = default;
bool hasOne() { return has_one >= 0 ? has_one : hasOneImpl();}
protected:
/// Calculate if filter has a non-zero from the filter values, may update has_one
virtual bool hasOneImpl() = 0;
};
/// Obtain a filter from non constant Column, that may have type: UInt8, Nullable(UInt8).
@ -45,7 +40,6 @@ struct FilterDescription final : public IFilterDescription
ColumnPtr filter(const IColumn & column, ssize_t result_size_hint) const override { return column.filter(*data, result_size_hint); }
size_t countBytesInFilter() const override { return DB::countBytesInFilter(*data); }
protected:
bool hasOneImpl() override { return data ? (has_one = !memoryIsZero(data->data(), 0, data->size())) : false; }
};
struct SparseFilterDescription final : public IFilterDescription
@ -56,7 +50,6 @@ struct SparseFilterDescription final : public IFilterDescription
ColumnPtr filter(const IColumn & column, ssize_t) const override { return column.index(*filter_indices, 0); }
size_t countBytesInFilter() const override { return filter_indices->size(); }
protected:
bool hasOneImpl() override { return filter_indices && !filter_indices->empty(); }
};
struct ColumnWithTypeAndName;

View File

@ -398,7 +398,7 @@ class IColumn;
M(Bool, allow_experimental_analyzer, true, "Allow experimental analyzer.", 0) \
M(Bool, analyzer_compatibility_join_using_top_level_identifier, false, "Force to resolve identifier in JOIN USING from projection (for example, in `SELECT a + 1 AS b FROM t1 JOIN t2 USING (b)` join will be performed by `t1.a + 1 = t2.b`, rather then `t1.b = t2.b`).", 0) \
M(Bool, prefer_global_in_and_join, false, "If enabled, all IN/JOIN operators will be rewritten as GLOBAL IN/JOIN. It's useful when the to-be-joined tables are only available on the initiator and we need to always scatter their data on-the-fly during distributed processing with the GLOBAL keyword. It's also useful to reduce the need to access the external sources joining external tables.", 0) \
M(Bool, enable_vertical_final, false, "Not recommended. If enable, remove duplicated rows during FINAL by marking rows as deleted and filtering them later instead of merging rows", 0) \
M(Bool, enable_vertical_final, true, "If enable, remove duplicated rows during FINAL by marking rows as deleted and filtering them later instead of merging rows", 0) \
\
\
/** Limits during query execution are part of the settings. \

View File

@ -103,6 +103,7 @@ static const std::map<ClickHouseVersion, SettingsChangesHistory::SettingsChanges
{"allow_experimental_statistic", false, false, "Old setting which popped up here being renamed."},
{"allow_statistics_optimize", false, false, "The setting was renamed. The previous name is `allow_statistic_optimize`."},
{"allow_experimental_statistics", false, false, "The setting was renamed. The previous name is `allow_experimental_statistic`."},
{"enable_vertical_final", false, true, "Enable vertical final by default again after fixing bug"},
{"parallel_replicas_custom_key_range_lower", 0, 0, "Add settings to control the range filter when using parallel replicas with dynamic shards"},
{"parallel_replicas_custom_key_range_upper", 0, 0, "Add settings to control the range filter when using parallel replicas with dynamic shards. A value of 0 disables the upper limit"},
}},

View File

@ -2554,10 +2554,6 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc
query_info.storage_limits = std::make_shared<StorageLimitsList>(storage_limits);
query_info.settings_limit_offset_done = options.settings_limit_offset_done;
/// Possible filters: row-security, additional filter, replica filter (before array join), where (after array join)
query_info.has_filters_and_no_array_join_before_filter = row_policy_filter || additional_filter_info
|| parallel_replicas_custom_filter_info
|| (analysis_result.hasWhere() && !analysis_result.before_where->hasArrayJoin() && !analysis_result.array_join);
storage->read(query_plan, required_columns, storage_snapshot, query_info, context, processing_stage, max_block_size, max_streams);
if (context->hasQueryContext() && !options.is_internal)

View File

@ -1092,8 +1092,7 @@ static void addMergingFinal(
MergeTreeData::MergingParams merging_params,
Names partition_key_columns,
size_t max_block_size_rows,
bool enable_vertical_final,
bool can_merge_final_indices_to_next_step_filter)
bool enable_vertical_final)
{
const auto & header = pipe.getHeader();
size_t num_outputs = pipe.numOutputPorts();
@ -1135,7 +1134,7 @@ static void addMergingFinal(
};
pipe.addTransform(get_merging_processor());
if (enable_vertical_final && !can_merge_final_indices_to_next_step_filter)
if (enable_vertical_final)
pipe.addSimpleTransform([](const Block & header_)
{ return std::make_shared<SelectByIndicesTransform>(header_); });
}
@ -1323,8 +1322,7 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal(
data.merging_params,
partition_key_columns,
block_size.max_block_size_rows,
enable_vertical_final,
query_info.has_filters_and_no_array_join_before_filter);
enable_vertical_final);
merging_pipes.emplace_back(Pipe::unitePipes(std::move(pipes)));
}

View File

@ -14,7 +14,6 @@ namespace DB
namespace ErrorCodes
{
extern const int ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER;
extern const int LOGICAL_ERROR;
}
static void replaceFilterToConstant(Block & block, const String & filter_column_name)
@ -37,147 +36,6 @@ static void replaceFilterToConstant(Block & block, const String & filter_column_
}
}
static std::shared_ptr<const ChunkSelectFinalIndices> getSelectByFinalIndices(Chunk & chunk)
{
if (auto select_final_indices_info = std::dynamic_pointer_cast<const ChunkSelectFinalIndices>(chunk.getChunkInfo()))
{
const auto & index_column = select_final_indices_info->select_final_indices;
chunk.setChunkInfo(nullptr);
if (index_column && index_column->size() != chunk.getNumRows())
return select_final_indices_info;
}
return nullptr;
}
static void
executeSelectByIndices(Columns & columns, std::shared_ptr<const ChunkSelectFinalIndices> & select_final_indices_info, size_t & num_rows)
{
if (select_final_indices_info)
{
const auto & index_column = select_final_indices_info->select_final_indices;
for (auto & column : columns)
column = column->index(*index_column, 0);
num_rows = index_column->size();
}
}
static std::unique_ptr<IFilterDescription> combineFilterAndIndices(
std::unique_ptr<FilterDescription> description,
std::shared_ptr<const ChunkSelectFinalIndices> & select_final_indices_info,
size_t num_rows)
{
if (select_final_indices_info)
{
const auto * index_column = select_final_indices_info->select_final_indices;
if (description->hasOne())
{
const auto & selected_by_indices = index_column->getData();
const auto * selected_by_filter = description->data->data();
/// We will recompute new has_one
description->has_one = 0;
/// At this point we know that the filter is not constant, just create a new filter
auto mutable_holder = ColumnUInt8::create(num_rows, 0);
auto & data = mutable_holder->getData();
for (auto idx : selected_by_indices)
{
if (idx >= num_rows)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Index {} out of range {}", idx, num_rows);
data[idx] = 1;
}
/// AND two filters
auto * begin = data.data();
const auto * end = begin + num_rows;
#if defined(__AVX2__)
while (end - begin >= 32)
{
_mm256_storeu_si256(
reinterpret_cast<__m256i *>(begin),
_mm256_and_si256(
_mm256_loadu_si256(reinterpret_cast<const __m256i *>(begin)),
_mm256_loadu_si256(reinterpret_cast<const __m256i *>(selected_by_filter))));
description->has_one |= !memoryIsZero(begin, 0, 32);
begin += 32;
selected_by_filter += 32;
}
#elif defined(__SSE2__)
while (end - begin >= 16)
{
_mm_storeu_si128(
reinterpret_cast<__m128i *>(begin),
_mm_and_si128(
_mm_loadu_si128(reinterpret_cast<const __m128i *>(begin)),
_mm_loadu_si128(reinterpret_cast<const __m128i *>(selected_by_filter))));
description->has_one |= !memoryIsZero(begin, 0, 16);
begin += 16;
selected_by_filter += 16;
}
#endif
while (end - begin >= 8)
{
*reinterpret_cast<UInt64 *>(begin) &= *reinterpret_cast<const UInt64 *>(selected_by_filter);
description->has_one |= *reinterpret_cast<UInt64 *>(begin);
begin += 8;
selected_by_filter += 8;
}
while (end - begin > 0)
{
*begin &= *selected_by_filter;
description->has_one |= *begin;
begin++;
selected_by_filter++;
}
description->data_holder = std::move(mutable_holder);
description->data = &data;
}
}
return std::move(description);
}
static std::unique_ptr<IFilterDescription> combineFilterAndIndices(
std::unique_ptr<SparseFilterDescription> description,
std::shared_ptr<const ChunkSelectFinalIndices> & select_final_indices_info,
size_t num_rows)
{
/// Iterator interface to decorate data from output of std::set_intersection
struct Iterator
{
UInt8 * data;
Int64 & pop_cnt;
explicit Iterator(UInt8 * data_, Int64 & pop_cnt_) : data(data_), pop_cnt(pop_cnt_) {}
Iterator & operator = (UInt64 index) { data[index] = 1; ++pop_cnt; return *this; }
Iterator & operator ++ () { return *this; }
Iterator & operator * () { return *this; }
};
if (select_final_indices_info)
{
const auto * index_column = select_final_indices_info->select_final_indices;
if (description->hasOne())
{
std::unique_ptr<FilterDescription> res;
res->has_one = 0;
const auto & selected_by_indices = index_column->getData();
const auto & selected_by_filter = description->filter_indices->getData();
auto mutable_holder = ColumnUInt8::create(num_rows, 0);
auto & data = mutable_holder->getData();
Iterator decorator(data.data(), res->has_one);
std::set_intersection(selected_by_indices.begin(), selected_by_indices.end(), selected_by_filter.begin(), selected_by_filter.end(), decorator);
res->data_holder = std::move(mutable_holder);
res->data = &data;
return res;
}
}
return std::move(description);
}
Block FilterTransform::transformHeader(
const Block & header, const ActionsDAG * expression, const String & filter_column_name, bool remove_filter_column)
{
@ -267,7 +125,6 @@ void FilterTransform::doTransform(Chunk & chunk)
size_t num_rows_before_filtration = chunk.getNumRows();
auto columns = chunk.detachColumns();
DataTypes types;
auto select_final_indices_info = getSelectByFinalIndices(chunk);
{
Block block = getInputPort().getHeader().cloneWithColumns(columns);
@ -282,7 +139,6 @@ void FilterTransform::doTransform(Chunk & chunk)
if (constant_filter_description.always_true || on_totals)
{
executeSelectByIndices(columns, select_final_indices_info, num_rows_before_filtration);
chunk.setColumns(std::move(columns), num_rows_before_filtration);
removeFilterIfNeed(chunk);
return;
@ -303,7 +159,6 @@ void FilterTransform::doTransform(Chunk & chunk)
if (constant_filter_description.always_true)
{
executeSelectByIndices(columns, select_final_indices_info, num_rows_before_filtration);
chunk.setColumns(std::move(columns), num_rows_before_filtration);
removeFilterIfNeed(chunk);
return;
@ -311,15 +166,9 @@ void FilterTransform::doTransform(Chunk & chunk)
std::unique_ptr<IFilterDescription> filter_description;
if (filter_column->isSparse())
filter_description = combineFilterAndIndices(
std::make_unique<SparseFilterDescription>(*filter_column), select_final_indices_info, num_rows_before_filtration);
filter_description = std::make_unique<SparseFilterDescription>(*filter_column);
else
filter_description = combineFilterAndIndices(
std::make_unique<FilterDescription>(*filter_column), select_final_indices_info, num_rows_before_filtration);
if (!filter_description->has_one)
return;
filter_description = std::make_unique<FilterDescription>(*filter_column);
/** Let's find out how many rows will be in result.
* To do this, we filter out the first non-constant column

View File

@ -211,12 +211,6 @@ struct SelectQueryInfo
/// If query has aggregate functions
bool has_aggregates = false;
/// If query has any filter and no arrayJoin before filter. Used by skipping FINAL
/// Skipping FINAL algorithm will output the original chunk and a column indices of
/// selected rows. If query has filter and doesn't have array join before any filter,
/// we can merge the indices with the first filter in FilterTransform later.
bool has_filters_and_no_array_join_before_filter = false;
ClusterPtr getCluster() const { return !optimized_cluster ? cluster : optimized_cluster; }
bool settings_limit_offset_done = false;

View File

@ -1,3 +1,4 @@
1 2 b 1
-- { echo ON }
SELECT arrayJoin([(k1, v), (k2, v)]) AS row, row.1 as k FROM t FINAL WHERE k1 != 3 AND k = 1 ORDER BY row SETTINGS enable_vertical_final = 0;
(1,4) 1

View File

@ -1,3 +1,15 @@
-- https://github.com/ClickHouse/ClickHouse/issues/64543
DROP TABLE IF EXISTS foo;
DROP TABLE IF EXISTS bar;
CREATE TABLE foo (id UInt64, seq UInt64) ENGINE = Memory;
CREATE TABLE bar (id UInt64, seq UInt64, name String) ENGINE = ReplacingMergeTree ORDER BY id;
INSERT INTO foo VALUES (1, 1);
INSERT INTO bar VALUES (1, 1, 'a') (2, 2, 'b');
INSERT INTO bar VALUES (1, 2, 'b') (2, 3, 'c');
SELECT * FROM bar INNER JOIN foo USING id WHERE bar.seq > foo.seq SETTINGS final = 1;
-- Same problem possible can happen with array join
DROP TABLE IF EXISTS t;
CREATE TABLE t (k1 UInt64, k2 UInt64, v UInt64) ENGINE = ReplacingMergeTree() ORDER BY (k1, k2);
SET optimize_on_insert = 0;
INSERT INTO t VALUES (1, 2, 3) (1, 2, 4) (2, 3, 4), (2, 3, 5);