Merge pull request #67846 from ClickHouse/revert-67392-revert-66099-better-index-calc

Revert "Revert "Slightly better calculation of primary index""
This commit is contained in:
Anton Popov 2024-08-06 10:36:01 +00:00 committed by GitHub
commit 3656b9094a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 80 additions and 46 deletions

View File

@ -1,4 +1,5 @@
#include <Storages/MergeTree/IMergeTreeDataPartWriter.h>
#include <Common/MemoryTrackerBlockerInThread.h>
namespace DB
{
@ -71,9 +72,21 @@ IMergeTreeDataPartWriter::IMergeTreeDataPartWriter(
Columns IMergeTreeDataPartWriter::releaseIndexColumns()
{
return Columns(
std::make_move_iterator(index_columns.begin()),
std::make_move_iterator(index_columns.end()));
/// The memory for index was allocated without thread memory tracker.
/// We need to deallocate it in shrinkToFit without memory tracker as well.
MemoryTrackerBlockerInThread temporarily_disable_memory_tracker;
Columns result;
result.reserve(index_columns.size());
for (auto & column : index_columns)
{
column->shrinkToFit();
result.push_back(std::move(column));
}
index_columns.clear();
return result;
}
SerializationPtr IMergeTreeDataPartWriter::getSerialization(const String & column_name) const

View File

@ -255,6 +255,12 @@ void MergeTreeDataPartWriterOnDisk::initPrimaryIndex()
index_compressor_stream = std::make_unique<CompressedWriteBuffer>(*index_file_hashing_stream, primary_key_compression_codec, settings.primary_key_compress_block_size);
index_source_hashing_stream = std::make_unique<HashingWriteBuffer>(*index_compressor_stream);
}
const auto & primary_key_types = metadata_snapshot->getPrimaryKey().data_types;
index_serializations.reserve(primary_key_types.size());
for (const auto & type : primary_key_types)
index_serializations.push_back(type->getDefaultSerialization());
}
}
@ -300,22 +306,30 @@ void MergeTreeDataPartWriterOnDisk::initSkipIndices()
store = std::make_shared<GinIndexStore>(stream_name, data_part_storage, data_part_storage, storage_settings->max_digestion_size_per_segment);
gin_index_stores[stream_name] = store;
}
skip_indices_aggregators.push_back(skip_index->createIndexAggregatorForPart(store, settings));
skip_index_accumulated_marks.push_back(0);
}
}
void MergeTreeDataPartWriterOnDisk::calculateAndSerializePrimaryIndexRow(const Block & index_block, size_t row)
{
chassert(index_block.columns() == index_serializations.size());
auto & index_stream = compress_primary_key ? *index_source_hashing_stream : *index_file_hashing_stream;
for (size_t i = 0; i < index_block.columns(); ++i)
{
const auto & column = index_block.getByPosition(i).column;
index_columns[i]->insertFrom(*column, row);
index_serializations[i]->serializeBinary(*column, row, index_stream, {});
}
}
void MergeTreeDataPartWriterOnDisk::calculateAndSerializePrimaryIndex(const Block & primary_index_block, const Granules & granules_to_write)
{
size_t primary_columns_num = primary_index_block.columns();
if (index_columns.empty())
{
index_types = primary_index_block.getDataTypes();
index_columns.resize(primary_columns_num);
last_block_index_columns.resize(primary_columns_num);
for (size_t i = 0; i < primary_columns_num; ++i)
index_columns[i] = primary_index_block.getByPosition(i).column->cloneEmpty();
}
if (!metadata_snapshot->hasPrimaryKey())
return;
{
/** While filling index (index_columns), disable memory tracker.
@ -326,25 +340,20 @@ void MergeTreeDataPartWriterOnDisk::calculateAndSerializePrimaryIndex(const Bloc
*/
MemoryTrackerBlockerInThread temporarily_disable_memory_tracker;
if (index_columns.empty())
index_columns = primary_index_block.cloneEmptyColumns();
/// Write index. The index contains Primary Key value for each `index_granularity` row.
for (const auto & granule : granules_to_write)
{
if (metadata_snapshot->hasPrimaryKey() && granule.mark_on_start)
{
for (size_t j = 0; j < primary_columns_num; ++j)
{
const auto & primary_column = primary_index_block.getByPosition(j);
index_columns[j]->insertFrom(*primary_column.column, granule.start_row);
primary_column.type->getDefaultSerialization()->serializeBinary(
*primary_column.column, granule.start_row, compress_primary_key ? *index_source_hashing_stream : *index_file_hashing_stream, {});
}
}
if (granule.mark_on_start)
calculateAndSerializePrimaryIndexRow(primary_index_block, granule.start_row);
}
}
/// store last index row to write final mark at the end of column
for (size_t j = 0; j < primary_columns_num; ++j)
last_block_index_columns[j] = primary_index_block.getByPosition(j).column;
/// Store block with last index row to write final mark at the end of column
if (with_final_mark)
last_index_block = primary_index_block;
}
void MergeTreeDataPartWriterOnDisk::calculateAndSerializeStatistics(const Block & block)
@ -421,19 +430,14 @@ void MergeTreeDataPartWriterOnDisk::fillPrimaryIndexChecksums(MergeTreeData::Dat
if (index_file_hashing_stream)
{
if (write_final_mark)
if (write_final_mark && last_index_block)
{
for (size_t j = 0; j < index_columns.size(); ++j)
{
const auto & column = *last_block_index_columns[j];
size_t last_row_number = column.size() - 1;
index_columns[j]->insertFrom(column, last_row_number);
index_types[j]->getDefaultSerialization()->serializeBinary(
column, last_row_number, compress_primary_key ? *index_source_hashing_stream : *index_file_hashing_stream, {});
}
last_block_index_columns.clear();
MemoryTrackerBlockerInThread temporarily_disable_memory_tracker;
calculateAndSerializePrimaryIndexRow(last_index_block, last_index_block.rows() - 1);
}
last_index_block.clear();
if (compress_primary_key)
{
index_source_hashing_stream->finalize();

View File

@ -173,10 +173,10 @@ protected:
std::unique_ptr<HashingWriteBuffer> index_source_hashing_stream;
bool compress_primary_key;
DataTypes index_types;
/// Index columns from the last block
/// It's written to index file in the `writeSuffixAndFinalizePart` method
Columns last_block_index_columns;
/// Last block with index columns.
/// It's written to index file in the `writeSuffixAndFinalizePart` method.
Block last_index_block;
Serializations index_serializations;
bool data_written = false;
@ -193,6 +193,7 @@ private:
void initStatistics();
virtual void fillIndexGranularity(size_t index_granularity_for_block, size_t rows_in_block) = 0;
void calculateAndSerializePrimaryIndexRow(const Block & index_block, size_t row);
struct ExecutionStatistics
{

View File

@ -1,4 +1,4 @@
100000000 140000000
100000000 100000000
0 0
1
100000000 100000000

View File

@ -1,8 +1,8 @@
100000000 140000000
100000000 140000000
100000000 140000000
100000000 100000000
100000000 100000000
100000000 100000000
0 0
100000000 140000000
100000000 100000000
0 0
0 0
1

View File

@ -1,4 +1,4 @@
100000000 140000000
100000000 140000000
100000000 100000000
100000000 100000000
0 0
0 0

View File

@ -0,0 +1 @@
150000

View File

@ -0,0 +1,15 @@
-- Tags: no-debug, no-tsan, no-msan, no-asan, no-random-settings, no-random-merge-tree-settings
DROP TABLE IF EXISTS t_primary_index_memory;
CREATE TABLE t_primary_index_memory (s String) ENGINE = MergeTree
ORDER BY s SETTINGS index_granularity = 1;
INSERT INTO t_primary_index_memory SELECT repeat('a', 10000) FROM numbers(150000)
SETTINGS
max_block_size = 32,
max_memory_usage = '100M',
max_insert_block_size = 1024,
min_insert_block_size_rows = 1024;
SELECT count() FROM t_primary_index_memory;
DROP TABLE t_primary_index_memory;