Merge pull request #58097 from ClickHouse/revert-57648-refine-lwd-merge

Revert "Consider lightweight deleted rows when selecting parts to merge"
This commit is contained in:
Alexey Milovidov 2023-12-20 23:46:52 +01:00 committed by GitHub
commit bf7ed7890c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 10 additions and 150 deletions

View File

@ -593,23 +593,6 @@ UInt64 IMergeTreeDataPart::getMarksCount() const
return index_granularity.getMarksCount(); return index_granularity.getMarksCount();
} }
UInt64 IMergeTreeDataPart::getExistingBytesOnDisk() const
{
if (!supportLightweightDeleteMutate() || !hasLightweightDelete() || !rows_count
|| !storage.getSettings()->exclude_deleted_rows_for_part_size_in_merge)
return bytes_on_disk;
/// Uninitialized existing_rows_count
/// (if existing_rows_count equals rows_count, it means that previously we failed to read existing_rows_count)
if (existing_rows_count > rows_count)
readExistingRowsCount();
if (existing_rows_count < rows_count)
return bytes_on_disk * existing_rows_count / rows_count;
else /// Load failed
return bytes_on_disk;
}
size_t IMergeTreeDataPart::getFileSizeOrZero(const String & file_name) const size_t IMergeTreeDataPart::getFileSizeOrZero(const String & file_name) const
{ {
auto checksum = checksums.files.find(file_name); auto checksum = checksums.files.find(file_name);
@ -1304,85 +1287,6 @@ void IMergeTreeDataPart::loadRowsCount()
} }
} }
void IMergeTreeDataPart::readExistingRowsCount() const
{
if (!supportLightweightDeleteMutate() || !hasLightweightDelete() || !storage.getSettings()->exclude_deleted_rows_for_part_size_in_merge
|| existing_rows_count < rows_count || !getMarksCount())
return;
std::lock_guard lock(existing_rows_count_mutex);
/// Already read by another thread
if (existing_rows_count < rows_count)
return;
NamesAndTypesList cols;
cols.push_back(LightweightDeleteDescription::FILTER_COLUMN);
StorageMetadataPtr metadata_ptr = storage.getInMemoryMetadataPtr();
StorageSnapshotPtr storage_snapshot_ptr = std::make_shared<StorageSnapshot>(storage, metadata_ptr);
MergeTreeReaderPtr reader = getReader(
cols,
storage_snapshot_ptr,
MarkRanges{MarkRange(0, getMarksCount())},
nullptr,
storage.getContext()->getMarkCache().get(),
std::make_shared<AlterConversions>(),
MergeTreeReaderSettings{},
ValueSizeMap{},
ReadBufferFromFileBase::ProfileCallback{});
if (!reader)
{
LOG_WARNING(storage.log, "Create reader failed while reading existing rows count");
existing_rows_count = rows_count;
return;
}
size_t current_mark = 0;
const size_t total_mark = getMarksCount();
bool continue_reading = false;
size_t current_row = 0;
size_t existing_count = 0;
while (current_row < rows_count)
{
size_t rows_to_read = index_granularity.getMarkRows(current_mark);
continue_reading = (current_mark != 0);
Columns result;
result.resize(1);
size_t rows_read = reader->readRows(current_mark, total_mark, continue_reading, rows_to_read, result);
if (!rows_read)
{
LOG_WARNING(storage.log, "Part {} has lightweight delete, but _row_exists column not found", name);
existing_rows_count = rows_count;
return;
}
current_row += rows_read;
current_mark += (rows_to_read == rows_read);
const ColumnUInt8 * row_exists_col = typeid_cast<const ColumnUInt8 *>(result[0].get());
if (!row_exists_col)
{
LOG_WARNING(storage.log, "Part {} _row_exists column type is not UInt8", name);
existing_rows_count = rows_count;
return;
}
for (UInt8 row_exists : row_exists_col->getData())
if (row_exists)
existing_count++;
}
existing_rows_count = existing_count;
LOG_DEBUG(storage.log, "Part {} existing_rows_count = {}", name, existing_rows_count);
}
void IMergeTreeDataPart::appendFilesOfRowsCount(Strings & files) void IMergeTreeDataPart::appendFilesOfRowsCount(Strings & files)
{ {
files.push_back("count.txt"); files.push_back("count.txt");

View File

@ -229,13 +229,6 @@ public:
size_t rows_count = 0; size_t rows_count = 0;
/// Existing rows count (excluding lightweight deleted rows)
/// UINT64_MAX -> uninitialized
/// 0 -> all rows were deleted
/// if reading failed, it will be set to rows_count
mutable size_t existing_rows_count = UINT64_MAX;
mutable std::mutex existing_rows_count_mutex;
time_t modification_time = 0; time_t modification_time = 0;
/// When the part is removed from the working set. Changes once. /// When the part is removed from the working set. Changes once.
mutable std::atomic<time_t> remove_time { std::numeric_limits<time_t>::max() }; mutable std::atomic<time_t> remove_time { std::numeric_limits<time_t>::max() };
@ -381,10 +374,6 @@ public:
void setBytesOnDisk(UInt64 bytes_on_disk_) { bytes_on_disk = bytes_on_disk_; } void setBytesOnDisk(UInt64 bytes_on_disk_) { bytes_on_disk = bytes_on_disk_; }
void setBytesUncompressedOnDisk(UInt64 bytes_uncompressed_on_disk_) { bytes_uncompressed_on_disk = bytes_uncompressed_on_disk_; } void setBytesUncompressedOnDisk(UInt64 bytes_uncompressed_on_disk_) { bytes_uncompressed_on_disk = bytes_uncompressed_on_disk_; }
/// Returns estimated size of existing rows if setting exclude_deleted_rows_for_part_size_in_merge is true
/// Otherwise returns bytes_on_disk
UInt64 getExistingBytesOnDisk() const;
size_t getFileSizeOrZero(const String & file_name) const; size_t getFileSizeOrZero(const String & file_name) const;
auto getFilesChecksums() const { return checksums.files; } auto getFilesChecksums() const { return checksums.files; }
@ -511,9 +500,6 @@ public:
/// True if here is lightweight deleted mask file in part. /// True if here is lightweight deleted mask file in part.
bool hasLightweightDelete() const { return columns.contains(LightweightDeleteDescription::FILTER_COLUMN.name); } bool hasLightweightDelete() const { return columns.contains(LightweightDeleteDescription::FILTER_COLUMN.name); }
/// Read existing rows count from _row_exists column
void readExistingRowsCount() const;
void writeChecksums(const MergeTreeDataPartChecksums & checksums_, const WriteSettings & settings); void writeChecksums(const MergeTreeDataPartChecksums & checksums_, const WriteSettings & settings);
/// Checks the consistency of this data part. /// Checks the consistency of this data part.

View File

@ -160,7 +160,7 @@ ReplicatedMergeMutateTaskBase::PrepareResult MergeFromLogEntryTask::prepare()
} }
/// Start to make the main work /// Start to make the main work
size_t estimated_space_for_merge = MergeTreeDataMergerMutator::estimateNeededDiskSpace(parts, true); size_t estimated_space_for_merge = MergeTreeDataMergerMutator::estimateNeededDiskSpace(parts);
/// Can throw an exception while reserving space. /// Can throw an exception while reserving space.
IMergeTreeDataPart::TTLInfos ttl_infos; IMergeTreeDataPart::TTLInfos ttl_infos;

View File

@ -405,7 +405,7 @@ MergeTreeDataMergerMutator::MergeSelectingInfo MergeTreeDataMergerMutator::getPo
} }
IMergeSelector::Part part_info; IMergeSelector::Part part_info;
part_info.size = part->getExistingBytesOnDisk(); part_info.size = part->getBytesOnDisk();
part_info.age = res.current_time - part->modification_time; part_info.age = res.current_time - part->modification_time;
part_info.level = part->info.level; part_info.level = part->info.level;
part_info.data = &part; part_info.data = &part;
@ -611,7 +611,7 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectAllPartsToMergeWithinParti
return SelectPartsDecision::CANNOT_SELECT; return SelectPartsDecision::CANNOT_SELECT;
} }
sum_bytes += (*it)->getExistingBytesOnDisk(); sum_bytes += (*it)->getBytesOnDisk();
prev_it = it; prev_it = it;
++it; ++it;
@ -791,7 +791,7 @@ MergeTreeData::DataPartPtr MergeTreeDataMergerMutator::renameMergedTemporaryPart
} }
size_t MergeTreeDataMergerMutator::estimateNeededDiskSpace(const MergeTreeData::DataPartsVector & source_parts, const bool & is_merge) size_t MergeTreeDataMergerMutator::estimateNeededDiskSpace(const MergeTreeData::DataPartsVector & source_parts)
{ {
size_t res = 0; size_t res = 0;
time_t current_time = std::time(nullptr); time_t current_time = std::time(nullptr);
@ -802,10 +802,7 @@ size_t MergeTreeDataMergerMutator::estimateNeededDiskSpace(const MergeTreeData::
if (part_max_ttl && part_max_ttl <= current_time) if (part_max_ttl && part_max_ttl <= current_time)
continue; continue;
if (is_merge && part->storage.getSettings()->exclude_deleted_rows_for_part_size_in_merge) res += part->getBytesOnDisk();
res += part->getExistingBytesOnDisk();
else
res += part->getBytesOnDisk();
} }
return static_cast<size_t>(res * DISK_USAGE_COEFFICIENT_TO_RESERVE); return static_cast<size_t>(res * DISK_USAGE_COEFFICIENT_TO_RESERVE);

View File

@ -192,7 +192,7 @@ public:
/// The approximate amount of disk space needed for merge or mutation. With a surplus. /// The approximate amount of disk space needed for merge or mutation. With a surplus.
static size_t estimateNeededDiskSpace(const MergeTreeData::DataPartsVector & source_parts, const bool & is_merge); static size_t estimateNeededDiskSpace(const MergeTreeData::DataPartsVector & source_parts);
private: private:
/** Select all parts belonging to the same partition. /** Select all parts belonging to the same partition.

View File

@ -78,7 +78,6 @@ struct Settings;
M(UInt64, number_of_mutations_to_throw, 1000, "If table has at least that many unfinished mutations, throw 'Too many mutations' exception. Disabled if set to 0", 0) \ M(UInt64, number_of_mutations_to_throw, 1000, "If table has at least that many unfinished mutations, throw 'Too many mutations' exception. Disabled if set to 0", 0) \
M(UInt64, min_delay_to_mutate_ms, 10, "Min delay of mutating MergeTree table in milliseconds, if there are a lot of unfinished mutations", 0) \ M(UInt64, min_delay_to_mutate_ms, 10, "Min delay of mutating MergeTree table in milliseconds, if there are a lot of unfinished mutations", 0) \
M(UInt64, max_delay_to_mutate_ms, 1000, "Max delay of mutating MergeTree table in milliseconds, if there are a lot of unfinished mutations", 0) \ M(UInt64, max_delay_to_mutate_ms, 1000, "Max delay of mutating MergeTree table in milliseconds, if there are a lot of unfinished mutations", 0) \
M(Bool, exclude_deleted_rows_for_part_size_in_merge, false, "Use an estimated source part size (excluding lightweight deleted rows) when selecting parts to merge", 0) \
\ \
/** Inserts settings. */ \ /** Inserts settings. */ \
M(UInt64, parts_to_delay_insert, 1000, "If table contains at least that many active parts in single partition, artificially slow down insert into table. Disabled if set to 0", 0) \ M(UInt64, parts_to_delay_insert, 1000, "If table contains at least that many active parts in single partition, artificially slow down insert into table. Disabled if set to 0", 0) \

View File

@ -49,7 +49,7 @@ ReplicatedMergeMutateTaskBase::PrepareResult MutateFromLogEntryTask::prepare()
} }
/// TODO - some better heuristic? /// TODO - some better heuristic?
size_t estimated_space_for_result = MergeTreeDataMergerMutator::estimateNeededDiskSpace({source_part}, false); size_t estimated_space_for_result = MergeTreeDataMergerMutator::estimateNeededDiskSpace({source_part});
if (entry.create_time + storage_settings_ptr->prefer_fetch_merged_part_time_threshold.totalSeconds() <= time(nullptr) if (entry.create_time + storage_settings_ptr->prefer_fetch_merged_part_time_threshold.totalSeconds() <= time(nullptr)
&& estimated_space_for_result >= storage_settings_ptr->prefer_fetch_merged_part_size_threshold) && estimated_space_for_result >= storage_settings_ptr->prefer_fetch_merged_part_size_threshold)

View File

@ -1349,7 +1349,7 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
if (auto part_in_memory = asInMemoryPart(part)) if (auto part_in_memory = asInMemoryPart(part))
sum_parts_size_in_bytes += part_in_memory->block.bytes(); sum_parts_size_in_bytes += part_in_memory->block.bytes();
else else
sum_parts_size_in_bytes += part->getExistingBytesOnDisk(); sum_parts_size_in_bytes += part->getBytesOnDisk();
} }
} }

View File

@ -1085,7 +1085,7 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMerge(
if (isTTLMergeType(future_part->merge_type)) if (isTTLMergeType(future_part->merge_type))
getContext()->getMergeList().bookMergeWithTTL(); getContext()->getMergeList().bookMergeWithTTL();
merging_tagger = std::make_unique<CurrentlyMergingPartsTagger>(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace(future_part->parts, true), *this, metadata_snapshot, false); merging_tagger = std::make_unique<CurrentlyMergingPartsTagger>(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace(future_part->parts), *this, metadata_snapshot, false);
return std::make_shared<MergeMutateSelectedEntry>(future_part, std::move(merging_tagger), std::make_shared<MutationCommands>()); return std::make_shared<MergeMutateSelectedEntry>(future_part, std::move(merging_tagger), std::make_shared<MutationCommands>());
} }
@ -1301,7 +1301,7 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMutate(
future_part->name = part->getNewName(new_part_info); future_part->name = part->getNewName(new_part_info);
future_part->part_format = part->getFormat(); future_part->part_format = part->getFormat();
tagger = std::make_unique<CurrentlyMergingPartsTagger>(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace({part}, false), *this, metadata_snapshot, true); tagger = std::make_unique<CurrentlyMergingPartsTagger>(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace({part}), *this, metadata_snapshot, true);
return std::make_shared<MergeMutateSelectedEntry>(future_part, std::move(tagger), commands, txn); return std::make_shared<MergeMutateSelectedEntry>(future_part, std::move(tagger), commands, txn);
} }
} }

View File

@ -1,23 +0,0 @@
DROP TABLE IF EXISTS lwd_merge;
CREATE TABLE lwd_merge (id UInt64 CODEC(NONE))
ENGINE = MergeTree ORDER BY id
SETTINGS max_bytes_to_merge_at_max_space_in_pool = 80000, exclude_deleted_rows_for_part_size_in_merge = 0;
INSERT INTO lwd_merge SELECT number FROM numbers(10000);
INSERT INTO lwd_merge SELECT number FROM numbers(10000, 10000);
OPTIMIZE TABLE lwd_merge;
SELECT count() FROM system.parts WHERE database = currentDatabase() AND table = 'lwd_merge' AND active = 1;
DELETE FROM lwd_merge WHERE id % 10 > 0;
OPTIMIZE TABLE lwd_merge;
SELECT count() FROM system.parts WHERE database = currentDatabase() AND table = 'lwd_merge' AND active = 1;
ALTER TABLE lwd_merge MODIFY SETTING exclude_deleted_rows_for_part_size_in_merge = 1;
OPTIMIZE TABLE lwd_merge;
SELECT count() FROM system.parts WHERE database = currentDatabase() AND table = 'lwd_merge' AND active = 1;
DROP TABLE IF EXISTS lwd_merge;