diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.cpp b/src/Storages/MergeTree/IMergeTreeDataPart.cpp index 29189d20a93..87f23b0da2a 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.cpp +++ b/src/Storages/MergeTree/IMergeTreeDataPart.cpp @@ -593,23 +593,6 @@ UInt64 IMergeTreeDataPart::getMarksCount() const return index_granularity.getMarksCount(); } -UInt64 IMergeTreeDataPart::getExistingBytesOnDisk() const -{ - if (!supportLightweightDeleteMutate() || !hasLightweightDelete() || !rows_count - || !storage.getSettings()->exclude_deleted_rows_for_part_size_in_merge) - return bytes_on_disk; - - /// Uninitialized existing_rows_count - /// (if existing_rows_count equals rows_count, it means that previously we failed to read existing_rows_count) - if (existing_rows_count > rows_count) - readExistingRowsCount(); - - if (existing_rows_count < rows_count) - return bytes_on_disk * existing_rows_count / rows_count; - else /// Load failed - return bytes_on_disk; -} - size_t IMergeTreeDataPart::getFileSizeOrZero(const String & file_name) const { auto checksum = checksums.files.find(file_name); @@ -1304,85 +1287,6 @@ void IMergeTreeDataPart::loadRowsCount() } } -void IMergeTreeDataPart::readExistingRowsCount() const -{ - if (!supportLightweightDeleteMutate() || !hasLightweightDelete() || !storage.getSettings()->exclude_deleted_rows_for_part_size_in_merge - || existing_rows_count < rows_count || !getMarksCount()) - return; - - std::lock_guard lock(existing_rows_count_mutex); - - /// Already read by another thread - if (existing_rows_count < rows_count) - return; - - NamesAndTypesList cols; - cols.push_back(LightweightDeleteDescription::FILTER_COLUMN); - - StorageMetadataPtr metadata_ptr = storage.getInMemoryMetadataPtr(); - StorageSnapshotPtr storage_snapshot_ptr = std::make_shared(storage, metadata_ptr); - - MergeTreeReaderPtr reader = getReader( - cols, - storage_snapshot_ptr, - MarkRanges{MarkRange(0, getMarksCount())}, - nullptr, - storage.getContext()->getMarkCache().get(), - std::make_shared(), - MergeTreeReaderSettings{}, - ValueSizeMap{}, - ReadBufferFromFileBase::ProfileCallback{}); - - if (!reader) - { - LOG_WARNING(storage.log, "Create reader failed while reading existing rows count"); - existing_rows_count = rows_count; - return; - } - - size_t current_mark = 0; - const size_t total_mark = getMarksCount(); - - bool continue_reading = false; - size_t current_row = 0; - size_t existing_count = 0; - - while (current_row < rows_count) - { - size_t rows_to_read = index_granularity.getMarkRows(current_mark); - continue_reading = (current_mark != 0); - - Columns result; - result.resize(1); - - size_t rows_read = reader->readRows(current_mark, total_mark, continue_reading, rows_to_read, result); - if (!rows_read) - { - LOG_WARNING(storage.log, "Part {} has lightweight delete, but _row_exists column not found", name); - existing_rows_count = rows_count; - return; - } - - current_row += rows_read; - current_mark += (rows_to_read == rows_read); - - const ColumnUInt8 * row_exists_col = typeid_cast(result[0].get()); - if (!row_exists_col) - { - LOG_WARNING(storage.log, "Part {} _row_exists column type is not UInt8", name); - existing_rows_count = rows_count; - return; - } - - for (UInt8 row_exists : row_exists_col->getData()) - if (row_exists) - existing_count++; - } - - existing_rows_count = existing_count; - LOG_DEBUG(storage.log, "Part {} existing_rows_count = {}", name, existing_rows_count); -} - void IMergeTreeDataPart::appendFilesOfRowsCount(Strings & files) { files.push_back("count.txt"); diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.h b/src/Storages/MergeTree/IMergeTreeDataPart.h index efc42788ebb..640a1f1d0a3 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.h +++ b/src/Storages/MergeTree/IMergeTreeDataPart.h @@ -229,13 +229,6 @@ public: size_t rows_count = 0; - /// Existing rows count (excluding lightweight deleted rows) - /// UINT64_MAX -> uninitialized - /// 0 -> all rows were deleted - /// if reading failed, it will be set to rows_count - mutable size_t existing_rows_count = UINT64_MAX; - mutable std::mutex existing_rows_count_mutex; - time_t modification_time = 0; /// When the part is removed from the working set. Changes once. mutable std::atomic remove_time { std::numeric_limits::max() }; @@ -381,10 +374,6 @@ public: void setBytesOnDisk(UInt64 bytes_on_disk_) { bytes_on_disk = bytes_on_disk_; } void setBytesUncompressedOnDisk(UInt64 bytes_uncompressed_on_disk_) { bytes_uncompressed_on_disk = bytes_uncompressed_on_disk_; } - /// Returns estimated size of existing rows if setting exclude_deleted_rows_for_part_size_in_merge is true - /// Otherwise returns bytes_on_disk - UInt64 getExistingBytesOnDisk() const; - size_t getFileSizeOrZero(const String & file_name) const; auto getFilesChecksums() const { return checksums.files; } @@ -511,9 +500,6 @@ public: /// True if here is lightweight deleted mask file in part. bool hasLightweightDelete() const { return columns.contains(LightweightDeleteDescription::FILTER_COLUMN.name); } - /// Read existing rows count from _row_exists column - void readExistingRowsCount() const; - void writeChecksums(const MergeTreeDataPartChecksums & checksums_, const WriteSettings & settings); /// Checks the consistency of this data part. diff --git a/src/Storages/MergeTree/MergeFromLogEntryTask.cpp b/src/Storages/MergeTree/MergeFromLogEntryTask.cpp index 8cdbdac7821..9be31859a19 100644 --- a/src/Storages/MergeTree/MergeFromLogEntryTask.cpp +++ b/src/Storages/MergeTree/MergeFromLogEntryTask.cpp @@ -160,7 +160,7 @@ ReplicatedMergeMutateTaskBase::PrepareResult MergeFromLogEntryTask::prepare() } /// Start to make the main work - size_t estimated_space_for_merge = MergeTreeDataMergerMutator::estimateNeededDiskSpace(parts, true); + size_t estimated_space_for_merge = MergeTreeDataMergerMutator::estimateNeededDiskSpace(parts); /// Can throw an exception while reserving space. IMergeTreeDataPart::TTLInfos ttl_infos; diff --git a/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp b/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp index 722f00c75d5..42f480ed18a 100644 --- a/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp +++ b/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp @@ -405,7 +405,7 @@ MergeTreeDataMergerMutator::MergeSelectingInfo MergeTreeDataMergerMutator::getPo } IMergeSelector::Part part_info; - part_info.size = part->getExistingBytesOnDisk(); + part_info.size = part->getBytesOnDisk(); part_info.age = res.current_time - part->modification_time; part_info.level = part->info.level; part_info.data = ∂ @@ -611,7 +611,7 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectAllPartsToMergeWithinParti return SelectPartsDecision::CANNOT_SELECT; } - sum_bytes += (*it)->getExistingBytesOnDisk(); + sum_bytes += (*it)->getBytesOnDisk(); prev_it = it; ++it; @@ -791,7 +791,7 @@ MergeTreeData::DataPartPtr MergeTreeDataMergerMutator::renameMergedTemporaryPart } -size_t MergeTreeDataMergerMutator::estimateNeededDiskSpace(const MergeTreeData::DataPartsVector & source_parts, const bool & is_merge) +size_t MergeTreeDataMergerMutator::estimateNeededDiskSpace(const MergeTreeData::DataPartsVector & source_parts) { size_t res = 0; time_t current_time = std::time(nullptr); @@ -802,10 +802,7 @@ size_t MergeTreeDataMergerMutator::estimateNeededDiskSpace(const MergeTreeData:: if (part_max_ttl && part_max_ttl <= current_time) continue; - if (is_merge && part->storage.getSettings()->exclude_deleted_rows_for_part_size_in_merge) - res += part->getExistingBytesOnDisk(); - else - res += part->getBytesOnDisk(); + res += part->getBytesOnDisk(); } return static_cast(res * DISK_USAGE_COEFFICIENT_TO_RESERVE); diff --git a/src/Storages/MergeTree/MergeTreeDataMergerMutator.h b/src/Storages/MergeTree/MergeTreeDataMergerMutator.h index 8ff70d2776c..5e8a89c94a4 100644 --- a/src/Storages/MergeTree/MergeTreeDataMergerMutator.h +++ b/src/Storages/MergeTree/MergeTreeDataMergerMutator.h @@ -192,7 +192,7 @@ public: /// The approximate amount of disk space needed for merge or mutation. With a surplus. - static size_t estimateNeededDiskSpace(const MergeTreeData::DataPartsVector & source_parts, const bool & is_merge); + static size_t estimateNeededDiskSpace(const MergeTreeData::DataPartsVector & source_parts); private: /** Select all parts belonging to the same partition. diff --git a/src/Storages/MergeTree/MergeTreeSettings.h b/src/Storages/MergeTree/MergeTreeSettings.h index aad1e684daa..a80ef5f81ad 100644 --- a/src/Storages/MergeTree/MergeTreeSettings.h +++ b/src/Storages/MergeTree/MergeTreeSettings.h @@ -78,7 +78,6 @@ struct Settings; M(UInt64, number_of_mutations_to_throw, 1000, "If table has at least that many unfinished mutations, throw 'Too many mutations' exception. Disabled if set to 0", 0) \ M(UInt64, min_delay_to_mutate_ms, 10, "Min delay of mutating MergeTree table in milliseconds, if there are a lot of unfinished mutations", 0) \ M(UInt64, max_delay_to_mutate_ms, 1000, "Max delay of mutating MergeTree table in milliseconds, if there are a lot of unfinished mutations", 0) \ - M(Bool, exclude_deleted_rows_for_part_size_in_merge, false, "Use an estimated source part size (excluding lightweight deleted rows) when selecting parts to merge", 0) \ \ /** Inserts settings. */ \ M(UInt64, parts_to_delay_insert, 1000, "If table contains at least that many active parts in single partition, artificially slow down insert into table. Disabled if set to 0", 0) \ diff --git a/src/Storages/MergeTree/MutateFromLogEntryTask.cpp b/src/Storages/MergeTree/MutateFromLogEntryTask.cpp index 620b0e34c6a..a9ff687fe4d 100644 --- a/src/Storages/MergeTree/MutateFromLogEntryTask.cpp +++ b/src/Storages/MergeTree/MutateFromLogEntryTask.cpp @@ -49,7 +49,7 @@ ReplicatedMergeMutateTaskBase::PrepareResult MutateFromLogEntryTask::prepare() } /// TODO - some better heuristic? - size_t estimated_space_for_result = MergeTreeDataMergerMutator::estimateNeededDiskSpace({source_part}, false); + size_t estimated_space_for_result = MergeTreeDataMergerMutator::estimateNeededDiskSpace({source_part}); if (entry.create_time + storage_settings_ptr->prefer_fetch_merged_part_time_threshold.totalSeconds() <= time(nullptr) && estimated_space_for_result >= storage_settings_ptr->prefer_fetch_merged_part_size_threshold) diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index 632d14d22cb..2d0617e5826 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -1349,7 +1349,7 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry( if (auto part_in_memory = asInMemoryPart(part)) sum_parts_size_in_bytes += part_in_memory->block.bytes(); else - sum_parts_size_in_bytes += part->getExistingBytesOnDisk(); + sum_parts_size_in_bytes += part->getBytesOnDisk(); } } diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index b43a176d851..9378aaa1f6a 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -1085,7 +1085,7 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMerge( if (isTTLMergeType(future_part->merge_type)) getContext()->getMergeList().bookMergeWithTTL(); - merging_tagger = std::make_unique(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace(future_part->parts, true), *this, metadata_snapshot, false); + merging_tagger = std::make_unique(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace(future_part->parts), *this, metadata_snapshot, false); return std::make_shared(future_part, std::move(merging_tagger), std::make_shared()); } @@ -1301,7 +1301,7 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMutate( future_part->name = part->getNewName(new_part_info); future_part->part_format = part->getFormat(); - tagger = std::make_unique(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace({part}, false), *this, metadata_snapshot, true); + tagger = std::make_unique(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace({part}), *this, metadata_snapshot, true); return std::make_shared(future_part, std::move(tagger), commands, txn); } } diff --git a/tests/queries/0_stateless/02942_consider_lwd_when_merge.reference b/tests/queries/0_stateless/02942_consider_lwd_when_merge.reference deleted file mode 100644 index 19920de3d3c..00000000000 --- a/tests/queries/0_stateless/02942_consider_lwd_when_merge.reference +++ /dev/null @@ -1,3 +0,0 @@ -2 -2 -1 diff --git a/tests/queries/0_stateless/02942_consider_lwd_when_merge.sql b/tests/queries/0_stateless/02942_consider_lwd_when_merge.sql deleted file mode 100644 index a65e8877020..00000000000 --- a/tests/queries/0_stateless/02942_consider_lwd_when_merge.sql +++ /dev/null @@ -1,23 +0,0 @@ -DROP TABLE IF EXISTS lwd_merge; - -CREATE TABLE lwd_merge (id UInt64 CODEC(NONE)) - ENGINE = MergeTree ORDER BY id -SETTINGS max_bytes_to_merge_at_max_space_in_pool = 80000, exclude_deleted_rows_for_part_size_in_merge = 0; - -INSERT INTO lwd_merge SELECT number FROM numbers(10000); -INSERT INTO lwd_merge SELECT number FROM numbers(10000, 10000); - -OPTIMIZE TABLE lwd_merge; -SELECT count() FROM system.parts WHERE database = currentDatabase() AND table = 'lwd_merge' AND active = 1; - -DELETE FROM lwd_merge WHERE id % 10 > 0; - -OPTIMIZE TABLE lwd_merge; -SELECT count() FROM system.parts WHERE database = currentDatabase() AND table = 'lwd_merge' AND active = 1; - -ALTER TABLE lwd_merge MODIFY SETTING exclude_deleted_rows_for_part_size_in_merge = 1; - -OPTIMIZE TABLE lwd_merge; -SELECT count() FROM system.parts WHERE database = currentDatabase() AND table = 'lwd_merge' AND active = 1; - -DROP TABLE IF EXISTS lwd_merge;