remove unneeded fields

This commit is contained in:
Anton Popov 2024-05-28 11:45:12 +00:00
parent f225649332
commit 2f6a86f3a1
6 changed files with 51 additions and 53 deletions

View File

@ -188,6 +188,18 @@ NamesAndTypesList NamesAndTypesList::filter(const Names & names) const
return filter(NameSet(names.begin(), names.end()));
}
NamesAndTypesList NamesAndTypesList::eraseNames(const NameSet & names) const
{
NamesAndTypesList res;
for (const auto & column : *this)
{
if (!names.contains(column.name))
res.push_back(column);
}
return res;
}
NamesAndTypesList NamesAndTypesList::addTypes(const Names & names) const
{
/// NOTE: It's better to make a map in `IStorage` than to create it here every time again.

View File

@ -111,6 +111,9 @@ public:
/// Leave only the columns whose names are in the `names`. In `names` there can be superfluous columns.
NamesAndTypesList filter(const Names & names) const;
/// Leave only the columns whose names are not in the `names`.
NamesAndTypesList eraseNames(const NameSet & names) const;
/// Unlike `filter`, returns columns in the order in which they go in `names`.
NamesAndTypesList addTypes(const Names & names) const;

View File

@ -19,18 +19,18 @@ public:
size_t sum_index_columns = 0;
size_t sum_ordinary_columns = 0;
ColumnSizeEstimator(ColumnToSize && map_, const Names & key_columns, const Names & ordinary_columns)
ColumnSizeEstimator(ColumnToSize && map_, const NamesAndTypesList & key_columns, const NamesAndTypesList & ordinary_columns)
: map(std::move(map_))
{
for (const auto & name : key_columns)
for (const auto & [name, _] : key_columns)
if (!map.contains(name)) map[name] = 0;
for (const auto & name : ordinary_columns)
for (const auto & [name, _] : ordinary_columns)
if (!map.contains(name)) map[name] = 0;
for (const auto & name : key_columns)
for (const auto & [name, _] : key_columns)
sum_index_columns += map.at(name);
for (const auto & name : ordinary_columns)
for (const auto & [name, _] : ordinary_columns)
sum_ordinary_columns += map.at(name);
sum_total = std::max(static_cast<decltype(sum_index_columns)>(1), sum_index_columns + sum_ordinary_columns);

View File

@ -166,7 +166,6 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::extractMergingAndGatheringColu
if (key_columns.contains(column.name))
{
global_ctx->merging_columns.emplace_back(column);
global_ctx->merging_column_names.emplace_back(column.name);
auto it = global_ctx->skip_indexes_by_column.find(column.name);
if (it != global_ctx->skip_indexes_by_column.end())
@ -180,7 +179,6 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::extractMergingAndGatheringColu
else
{
global_ctx->gathering_columns.emplace_back(column);
global_ctx->gathering_column_names.emplace_back(column.name);
}
}
}
@ -251,8 +249,8 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare()
if (!global_ctx->parent_part)
global_ctx->temporary_directory_lock = global_ctx->data->getTemporaryPartDirectoryHolder(local_tmp_part_basename);
global_ctx->all_column_names = global_ctx->metadata_snapshot->getColumns().getNamesOfPhysical();
global_ctx->storage_columns = global_ctx->metadata_snapshot->getColumns().getAllPhysical();
extractMergingAndGatheringColumns();
auto object_columns = MergeTreeData::getConcreteObjectColumns(global_ctx->future_part->parts, global_ctx->metadata_snapshot->getColumns());
@ -272,10 +270,10 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare()
ctx->force_ttl = false;
if (enabledBlockNumberColumn(global_ctx))
addStorageColumn(global_ctx, BlockNumberColumn::name, BlockNumberColumn::type);
addGatheringColumn(global_ctx, BlockNumberColumn::name, BlockNumberColumn::type);
if (enabledBlockOffsetColumn(global_ctx))
addStorageColumn(global_ctx, BlockOffsetColumn::name, BlockOffsetColumn::type);
addGatheringColumn(global_ctx, BlockOffsetColumn::name, BlockOffsetColumn::type);
SerializationInfo::Settings info_settings =
{
@ -324,6 +322,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare()
ctx->sum_input_rows_upper_bound = global_ctx->merge_list_element_ptr->total_rows_count;
ctx->sum_compressed_bytes_upper_bound = global_ctx->merge_list_element_ptr->total_size_bytes_compressed;
global_ctx->chosen_merge_algorithm = chooseMergeAlgorithm();
global_ctx->merge_list_element_ptr->merge_algorithm.store(global_ctx->chosen_merge_algorithm, std::memory_order_relaxed);
@ -344,14 +343,13 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare()
case MergeAlgorithm::Horizontal:
{
global_ctx->merging_columns = global_ctx->storage_columns;
global_ctx->merging_column_names = global_ctx->all_column_names;
global_ctx->merging_skip_indexes = global_ctx->metadata_snapshot->getSecondaryIndices();
global_ctx->gathering_columns.clear();
global_ctx->skip_indexes_by_column.clear();
break;
}
case MergeAlgorithm::Vertical:
{
extractMergingAndGatheringColumns();
ctx->rows_sources_uncompressed_write_buf = ctx->tmp_disk->createRawStream();
ctx->rows_sources_write_buf = std::make_unique<CompressedWriteBuffer>(*ctx->rows_sources_uncompressed_write_buf);
@ -361,8 +359,8 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare()
ctx->column_sizes = ColumnSizeEstimator(
std::move(local_merged_column_to_size),
global_ctx->merging_column_names,
global_ctx->gathering_column_names);
global_ctx->merging_columns,
global_ctx->gathering_columns);
break;
}
@ -370,9 +368,6 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare()
throw Exception(ErrorCodes::LOGICAL_ERROR, "Merge algorithm must be chosen");
}
assert(global_ctx->gathering_columns.size() == global_ctx->gathering_column_names.size());
assert(global_ctx->merging_columns.size() == global_ctx->merging_column_names.size());
/// If merge is vertical we cannot calculate it
ctx->blocks_are_granules_size = (global_ctx->chosen_merge_algorithm == MergeAlgorithm::Vertical);
@ -389,28 +384,25 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare()
/// resources for this).
if (!ctx->need_remove_expired_values)
{
size_t expired_columns = 0;
auto part_serialization_infos = global_ctx->new_data_part->getSerializationInfos();
NameSet columns_to_remove;
for (auto & [column_name, ttl] : global_ctx->new_data_part->ttl_infos.columns_ttl)
{
if (ttl.finished())
{
global_ctx->new_data_part->expired_columns.insert(column_name);
LOG_TRACE(ctx->log, "Adding expired column {} for part {}", column_name, global_ctx->new_data_part->name);
std::erase(global_ctx->gathering_column_names, column_name);
std::erase(global_ctx->merging_column_names, column_name);
std::erase(global_ctx->all_column_names, column_name);
columns_to_remove.insert(column_name);
part_serialization_infos.erase(column_name);
++expired_columns;
}
}
if (expired_columns)
if (!columns_to_remove.empty())
{
global_ctx->gathering_columns = global_ctx->gathering_columns.filter(global_ctx->gathering_column_names);
global_ctx->merging_columns = global_ctx->merging_columns.filter(global_ctx->merging_column_names);
global_ctx->storage_columns = global_ctx->storage_columns.filter(global_ctx->all_column_names);
global_ctx->gathering_columns = global_ctx->gathering_columns.eraseNames(columns_to_remove);
global_ctx->merging_columns = global_ctx->merging_columns.eraseNames(columns_to_remove);
global_ctx->storage_columns = global_ctx->storage_columns.eraseNames(columns_to_remove);
global_ctx->new_data_part->setColumns(
global_ctx->storage_columns,
@ -448,15 +440,13 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare()
return false;
}
void MergeTask::addStorageColumn(GlobalRuntimeContextPtr global_ctx, const String & name, const DataTypePtr & type)
void MergeTask::addGatheringColumn(GlobalRuntimeContextPtr global_ctx, const String & name, const DataTypePtr & type)
{
if (global_ctx->storage_columns.contains(name))
return;
global_ctx->storage_columns.emplace_back(name, type);
global_ctx->all_column_names.emplace_back(name);
global_ctx->gathering_columns.emplace_back(name, type);
global_ctx->gathering_column_names.emplace_back(name);
}
@ -470,7 +460,6 @@ MergeTask::StageRuntimeContextPtr MergeTask::ExecuteAndFinalizeHorizontalPart::g
new_ctx->compression_codec = std::move(ctx->compression_codec);
new_ctx->tmp_disk = std::move(ctx->tmp_disk);
new_ctx->it_name_and_type = std::move(ctx->it_name_and_type);
new_ctx->column_num_for_vertical_merge = std::move(ctx->column_num_for_vertical_merge);
new_ctx->read_with_direct_io = std::move(ctx->read_with_direct_io);
new_ctx->need_sync = std::move(ctx->need_sync);
@ -557,7 +546,7 @@ bool MergeTask::VerticalMergeStage::prepareVerticalMergeForAllColumns() const
size_t sum_input_rows_exact = global_ctx->merge_list_element_ptr->rows_read;
size_t input_rows_filtered = *global_ctx->input_rows_filtered;
global_ctx->merge_list_element_ptr->columns_written = global_ctx->merging_column_names.size();
global_ctx->merge_list_element_ptr->columns_written = global_ctx->merging_columns.size();
global_ctx->merge_list_element_ptr->progress.store(ctx->column_sizes->keyColumnsWeight(), std::memory_order_relaxed);
ctx->rows_sources_write_buf->next();
@ -592,14 +581,12 @@ bool MergeTask::VerticalMergeStage::prepareVerticalMergeForAllColumns() const
/// Move ownership from std::unique_ptr<ReadBuffer> to std::unique_ptr<ReadBufferFromFile> for CompressedReadBufferFromFile.
/// First, release ownership from unique_ptr to base type.
reread_buf.release(); /// NOLINT(bugprone-unused-return-value,hicpp-ignored-remove-result): we already have the pointer value in `reread_buffer_raw`
/// Then, move ownership to unique_ptr to concrete type.
std::unique_ptr<ReadBufferFromFile> reread_buffer_from_file(reread_buffer_raw);
/// CompressedReadBufferFromFile expects std::unique_ptr<ReadBufferFromFile> as argument.
ctx->rows_sources_read_buf = std::make_unique<CompressedReadBufferFromFile>(std::move(reread_buffer_from_file));
/// For external cycle
global_ctx->gathering_column_names_size = global_ctx->gathering_column_names.size();
ctx->column_num_for_vertical_merge = 0;
ctx->it_name_and_type = global_ctx->gathering_columns.cbegin();
const auto & settings = global_ctx->context->getSettingsRef();
@ -743,8 +730,7 @@ void MergeTask::VerticalMergeStage::finalizeVerticalMergeForOneColumn() const
global_ctx->merge_list_element_ptr->bytes_written_uncompressed += bytes;
global_ctx->merge_list_element_ptr->progress.store(ctx->progress_before + ctx->column_sizes->columnWeight(column_name), std::memory_order_relaxed);
/// This is the external cycle increment.
++ctx->column_num_for_vertical_merge;
/// This is the external loop increment.
++ctx->it_name_and_type;
}
@ -776,9 +762,9 @@ bool MergeTask::MergeProjectionsStage::mergeMinMaxIndexAndPrepareProjections() c
LOG_DEBUG(ctx->log,
"Merge sorted {} rows, containing {} columns ({} merged, {} gathered) in {} sec., {} rows/sec., {}/sec.",
global_ctx->merge_list_element_ptr->rows_read,
global_ctx->all_column_names.size(),
global_ctx->merging_column_names.size(),
global_ctx->gathering_column_names.size(),
global_ctx->storage_columns.size(),
global_ctx->merging_columns.size(),
global_ctx->gathering_columns.size(),
elapsed_seconds,
global_ctx->merge_list_element_ptr->rows_read / elapsed_seconds,
ReadableSize(global_ctx->merge_list_element_ptr->bytes_read_uncompressed / elapsed_seconds));
@ -915,7 +901,7 @@ bool MergeTask::VerticalMergeStage::executeVerticalMergeForAllColumns() const
return false;
/// This is the external cycle condition
if (ctx->column_num_for_vertical_merge >= global_ctx->gathering_column_names_size)
if (ctx->it_name_and_type == global_ctx->gathering_columns.end())
return false;
switch (ctx->vertical_merge_one_column_state)
@ -996,6 +982,8 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream()
global_ctx->horizontal_stage_progress = std::make_unique<MergeStageProgress>(
ctx->column_sizes ? ctx->column_sizes->keyColumnsWeight() : 1.0);
auto merging_column_names = global_ctx->merging_columns.getNames();
for (const auto & part : global_ctx->future_part->parts)
{
Pipe pipe = createMergeTreeSequentialSource(
@ -1003,7 +991,7 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream()
*global_ctx->data,
global_ctx->storage_snapshot,
part,
global_ctx->merging_column_names,
merging_column_names,
/*mark_ranges=*/ {},
/*apply_deleted_mask=*/ true,
ctx->read_with_direct_io,
@ -1143,12 +1131,12 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream()
/// If deduplicate_by_columns is empty, add all columns except virtuals.
if (global_ctx->deduplicate_by_columns.empty())
{
for (const auto & column_name : global_ctx->merging_column_names)
for (const auto & column : global_ctx->merging_columns)
{
if (virtuals.tryGet(column_name, VirtualsKind::Persistent))
if (virtuals.tryGet(column.name, VirtualsKind::Persistent))
continue;
global_ctx->deduplicate_by_columns.emplace_back(column_name);
global_ctx->deduplicate_by_columns.emplace_back(column.name);
}
}

View File

@ -167,17 +167,13 @@ private:
NamesAndTypesList gathering_columns{};
NamesAndTypesList merging_columns{};
Names gathering_column_names{};
Names merging_column_names{};
NamesAndTypesList storage_columns{};
Names all_column_names{};
MergeTreeData::DataPart::Checksums checksums_gathered_columns{};
IndicesDescription merging_skip_indexes;
std::unordered_map<String, IndicesDescription> skip_indexes_by_column;
MergeAlgorithm chosen_merge_algorithm{MergeAlgorithm::Undecided};
size_t gathering_column_names_size{0};
std::unique_ptr<MergeStageProgress> horizontal_stage_progress{nullptr};
std::unique_ptr<MergeStageProgress> column_progress{nullptr};
@ -238,7 +234,6 @@ private:
/// Dependencies for next stages
std::list<DB::NameAndTypePair>::const_iterator it_name_and_type;
size_t column_num_for_vertical_merge{0};
bool need_sync{false};
};
@ -292,7 +287,6 @@ private:
CompressionCodecPtr compression_codec;
TemporaryDataOnDiskPtr tmp_disk{nullptr};
std::list<DB::NameAndTypePair>::const_iterator it_name_and_type;
size_t column_num_for_vertical_merge{0};
bool read_with_direct_io{false};
bool need_sync{false};
/// End dependencies from previous stages
@ -422,7 +416,7 @@ private:
return global_ctx->data->getSettings()->enable_block_offset_column && global_ctx->metadata_snapshot->getGroupByTTLs().empty();
}
static void addStorageColumn(GlobalRuntimeContextPtr global_ctx, const String & name, const DataTypePtr & type);
static void addGatheringColumn(GlobalRuntimeContextPtr global_ctx, const String & name, const DataTypePtr & type);
};
/// FIXME

View File

@ -176,6 +176,7 @@ MergeTreeDataPartWriterOnDisk::MergeTreeDataPartWriterOnDisk(
if (settings.rewrite_primary_key)
initPrimaryIndex();
initSkipIndices();
initStatistics();
}