Merge pull request #64098 from ClickHouse/speed-up-index-set-a-little

Speed up Set index a little
This commit is contained in:
Alexey Milovidov 2024-05-20 10:41:21 +00:00 committed by GitHub
commit deb5b47829
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 45 additions and 58 deletions

View File

@ -1296,8 +1296,7 @@ MarkRanges MergeTreeDataSelectExecutor::filterMarksUsingIndex(
size_t last_index_mark = 0; size_t last_index_mark = 0;
PostingsCacheForStore cache_in_store; PostingsCacheForStore cache_in_store;
if (dynamic_cast<const MergeTreeIndexFullText *>(index_helper.get()))
if (dynamic_cast<const MergeTreeIndexFullText *>(&*index_helper) != nullptr)
cache_in_store.store = GinIndexStoreFactory::instance().get(index_helper->getFileName(), part->getDataPartStoragePtr()); cache_in_store.store = GinIndexStoreFactory::instance().get(index_helper->getFileName(), part->getDataPartStoragePtr());
for (size_t i = 0; i < ranges.size(); ++i) for (size_t i = 0; i < ranges.size(); ++i)
@ -1315,12 +1314,12 @@ MarkRanges MergeTreeDataSelectExecutor::filterMarksUsingIndex(
auto ann_condition = std::dynamic_pointer_cast<IMergeTreeIndexConditionApproximateNearestNeighbor>(condition); auto ann_condition = std::dynamic_pointer_cast<IMergeTreeIndexConditionApproximateNearestNeighbor>(condition);
if (ann_condition != nullptr) if (ann_condition != nullptr)
{ {
// vector of indexes of useful ranges /// An array of indices of useful ranges.
auto result = ann_condition->getUsefulRanges(granule); auto result = ann_condition->getUsefulRanges(granule);
for (auto range : result) for (auto range : result)
{ {
// range for corresponding index /// The range for the corresponding index.
MarkRange data_range( MarkRange data_range(
std::max(ranges[i].begin, index_mark * index_granularity + range), std::max(ranges[i].begin, index_mark * index_granularity + range),
std::min(ranges[i].end, index_mark * index_granularity + range + 1)); std::min(ranges[i].end, index_mark * index_granularity + range + 1));
@ -1344,8 +1343,8 @@ MarkRanges MergeTreeDataSelectExecutor::filterMarksUsingIndex(
continue; continue;
MarkRange data_range( MarkRange data_range(
std::max(ranges[i].begin, index_mark * index_granularity), std::max(ranges[i].begin, index_mark * index_granularity),
std::min(ranges[i].end, (index_mark + 1) * index_granularity)); std::min(ranges[i].end, (index_mark + 1) * index_granularity));
if (res.empty() || data_range.begin - res.back().end > min_marks_for_seek) if (res.empty() || data_range.begin - res.back().end > min_marks_for_seek)
res.push_back(data_range); res.push_back(data_range);

View File

@ -35,8 +35,7 @@ MergeTreeIndexGranuleSet::MergeTreeIndexGranuleSet(
size_t max_rows_) size_t max_rows_)
: index_name(index_name_) : index_name(index_name_)
, max_rows(max_rows_) , max_rows(max_rows_)
, index_sample_block(index_sample_block_) , block(index_sample_block_.cloneEmpty())
, block(index_sample_block)
{ {
} }
@ -47,8 +46,7 @@ MergeTreeIndexGranuleSet::MergeTreeIndexGranuleSet(
MutableColumns && mutable_columns_) MutableColumns && mutable_columns_)
: index_name(index_name_) : index_name(index_name_)
, max_rows(max_rows_) , max_rows(max_rows_)
, index_sample_block(index_sample_block_) , block(index_sample_block_.cloneWithColumns(std::move(mutable_columns_)))
, block(index_sample_block.cloneWithColumns(std::move(mutable_columns_)))
{ {
} }
@ -67,10 +65,11 @@ void MergeTreeIndexGranuleSet::serializeBinary(WriteBuffer & ostr) const
} }
size_serialization->serializeBinary(size(), ostr, {}); size_serialization->serializeBinary(size(), ostr, {});
size_t num_columns = block.columns();
for (size_t i = 0; i < index_sample_block.columns(); ++i) for (size_t i = 0; i < num_columns; ++i)
{ {
const auto & type = index_sample_block.getByPosition(i).type; const auto & type = block.getByPosition(i).type;
ISerialization::SerializeBinaryBulkSettings settings; ISerialization::SerializeBinaryBulkSettings settings;
settings.getter = [&ostr](ISerialization::SubstreamPath) -> WriteBuffer * { return &ostr; }; settings.getter = [&ostr](ISerialization::SubstreamPath) -> WriteBuffer * { return &ostr; };
@ -92,8 +91,6 @@ void MergeTreeIndexGranuleSet::deserializeBinary(ReadBuffer & istr, MergeTreeInd
if (version != 1) if (version != 1)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown index version {}.", version); throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown index version {}.", version);
block.clear();
Field field_rows; Field field_rows;
const auto & size_type = DataTypePtr(std::make_shared<DataTypeUInt64>()); const auto & size_type = DataTypePtr(std::make_shared<DataTypeUInt64>());
size_type->getDefaultSerialization()->deserializeBinary(field_rows, istr, {}); size_type->getDefaultSerialization()->deserializeBinary(field_rows, istr, {});
@ -102,24 +99,22 @@ void MergeTreeIndexGranuleSet::deserializeBinary(ReadBuffer & istr, MergeTreeInd
if (rows_to_read == 0) if (rows_to_read == 0)
return; return;
for (size_t i = 0; i < index_sample_block.columns(); ++i) size_t num_columns = block.columns();
ISerialization::DeserializeBinaryBulkSettings settings;
settings.getter = [&](ISerialization::SubstreamPath) -> ReadBuffer * { return &istr; };
settings.position_independent_encoding = false;
for (size_t i = 0; i < num_columns; ++i)
{ {
const auto & column = index_sample_block.getByPosition(i); auto & elem = block.getByPosition(i);
const auto & type = column.type; elem.column = elem.column->cloneEmpty();
ColumnPtr new_column = type->createColumn();
ISerialization::DeserializeBinaryBulkSettings settings;
settings.getter = [&](ISerialization::SubstreamPath) -> ReadBuffer * { return &istr; };
settings.position_independent_encoding = false;
ISerialization::DeserializeBinaryBulkStatePtr state; ISerialization::DeserializeBinaryBulkStatePtr state;
auto serialization = type->getDefaultSerialization(); auto serialization = elem.type->getDefaultSerialization();
serialization->deserializeBinaryBulkStatePrefix(settings, state); serialization->deserializeBinaryBulkStatePrefix(settings, state);
serialization->deserializeBinaryBulkWithMultipleStreams(new_column, rows_to_read, settings, state, nullptr); serialization->deserializeBinaryBulkWithMultipleStreams(elem.column, rows_to_read, settings, state, nullptr);
block.insert(ColumnWithTypeAndName(new_column, type, column.name));
} }
} }
@ -272,6 +267,8 @@ MergeTreeIndexConditionSet::MergeTreeIndexConditionSet(
filter_actions_dag->removeUnusedActions(); filter_actions_dag->removeUnusedActions();
actions = std::make_shared<ExpressionActions>(filter_actions_dag); actions = std::make_shared<ExpressionActions>(filter_actions_dag);
actions_output_column_name = filter_actions_dag->getOutputs().at(0)->result_name;
} }
bool MergeTreeIndexConditionSet::alwaysUnknownOrTrue() const bool MergeTreeIndexConditionSet::alwaysUnknownOrTrue() const
@ -284,42 +281,19 @@ bool MergeTreeIndexConditionSet::mayBeTrueOnGranule(MergeTreeIndexGranulePtr idx
if (isUseless()) if (isUseless())
return true; return true;
auto granule = std::dynamic_pointer_cast<MergeTreeIndexGranuleSet>(idx_granule); const MergeTreeIndexGranuleSet & granule = assert_cast<const MergeTreeIndexGranuleSet &>(*idx_granule);
if (!granule)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Set index condition got a granule with the wrong type");
if (isUseless() || granule->empty() || (max_rows != 0 && granule->size() > max_rows)) size_t size = granule.size();
if (size == 0 || (max_rows != 0 && size > max_rows))
return true; return true;
Block result = granule->block; Block result = granule.block;
actions->execute(result); actions->execute(result);
const auto & filter_node_name = actions->getActionsDAG().getOutputs().at(0)->result_name; const auto & column = result.getByName(actions_output_column_name).column;
auto column = result.getByName(filter_node_name).column->convertToFullColumnIfConst()->convertToFullColumnIfLowCardinality();
if (column->onlyNull()) for (size_t i = 0; i < size; ++i)
return false; if (!column->isNullAt(i) && (column->get64(i) & 1))
const auto * col_uint8 = typeid_cast<const ColumnUInt8 *>(column.get());
const NullMap * null_map = nullptr;
if (const auto * col_nullable = checkAndGetColumn<ColumnNullable>(&*column))
{
col_uint8 = typeid_cast<const ColumnUInt8 *>(&col_nullable->getNestedColumn());
null_map = &col_nullable->getNullMapData();
}
if (!col_uint8)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"ColumnUInt8 expected as Set index condition result");
const auto & condition = col_uint8->getData();
size_t column_size = column->size();
for (size_t i = 0; i < column_size; ++i)
if ((!null_map || (*null_map)[i] == 0) && condition[i] & 1)
return true; return true;
return false; return false;

View File

@ -34,7 +34,6 @@ struct MergeTreeIndexGranuleSet final : public IMergeTreeIndexGranule
const String index_name; const String index_name;
const size_t max_rows; const size_t max_rows;
const Block index_sample_block;
Block block; Block block;
}; };
@ -127,6 +126,7 @@ private:
std::unordered_set<String> key_columns; std::unordered_set<String> key_columns;
ExpressionActionsPtr actions; ExpressionActionsPtr actions;
String actions_output_column_name;
}; };

View File

@ -0,0 +1,14 @@
<test>
<create_query>
CREATE TABLE test_set (k UInt32, x UInt32, INDEX idx (x) TYPE set(10) GRANULARITY 1) ENGINE = MergeTree ORDER BY k SETTINGS index_granularity = 111;
</create_query>
<fill_query>SYSTEM STOP MERGES</fill_query>
<fill_query>INSERT INTO test_set SELECT number, number DIV 100 + rand() % 7 FROM numbers(3000000) SETTINGS max_insert_threads = 4;</fill_query>
<query>
SELECT count() FROM test_set WHERE x = 1234 SETTINGS max_threads = 8;
</query>
<drop_query>SYSTEM START MERGES</drop_query>
<drop_query>DROP TABLE IF EXISTS test_set</drop_query>
</test>