mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 08:40:50 +00:00
Revert "Consider lightweight deleted rows when selecting parts to merge"
This commit is contained in:
parent
e0a790de1e
commit
7be39a27cc
@ -593,23 +593,6 @@ UInt64 IMergeTreeDataPart::getMarksCount() const
|
||||
return index_granularity.getMarksCount();
|
||||
}
|
||||
|
||||
UInt64 IMergeTreeDataPart::getExistingBytesOnDisk() const
|
||||
{
|
||||
if (!supportLightweightDeleteMutate() || !hasLightweightDelete() || !rows_count
|
||||
|| !storage.getSettings()->exclude_deleted_rows_for_part_size_in_merge)
|
||||
return bytes_on_disk;
|
||||
|
||||
/// Uninitialized existing_rows_count
|
||||
/// (if existing_rows_count equals rows_count, it means that previously we failed to read existing_rows_count)
|
||||
if (existing_rows_count > rows_count)
|
||||
readExistingRowsCount();
|
||||
|
||||
if (existing_rows_count < rows_count)
|
||||
return bytes_on_disk * existing_rows_count / rows_count;
|
||||
else /// Load failed
|
||||
return bytes_on_disk;
|
||||
}
|
||||
|
||||
size_t IMergeTreeDataPart::getFileSizeOrZero(const String & file_name) const
|
||||
{
|
||||
auto checksum = checksums.files.find(file_name);
|
||||
@ -1304,85 +1287,6 @@ void IMergeTreeDataPart::loadRowsCount()
|
||||
}
|
||||
}
|
||||
|
||||
void IMergeTreeDataPart::readExistingRowsCount() const
|
||||
{
|
||||
if (!supportLightweightDeleteMutate() || !hasLightweightDelete() || !storage.getSettings()->exclude_deleted_rows_for_part_size_in_merge
|
||||
|| existing_rows_count < rows_count || !getMarksCount())
|
||||
return;
|
||||
|
||||
std::lock_guard lock(existing_rows_count_mutex);
|
||||
|
||||
/// Already read by another thread
|
||||
if (existing_rows_count < rows_count)
|
||||
return;
|
||||
|
||||
NamesAndTypesList cols;
|
||||
cols.push_back(LightweightDeleteDescription::FILTER_COLUMN);
|
||||
|
||||
StorageMetadataPtr metadata_ptr = storage.getInMemoryMetadataPtr();
|
||||
StorageSnapshotPtr storage_snapshot_ptr = std::make_shared<StorageSnapshot>(storage, metadata_ptr);
|
||||
|
||||
MergeTreeReaderPtr reader = getReader(
|
||||
cols,
|
||||
storage_snapshot_ptr,
|
||||
MarkRanges{MarkRange(0, getMarksCount())},
|
||||
nullptr,
|
||||
storage.getContext()->getMarkCache().get(),
|
||||
std::make_shared<AlterConversions>(),
|
||||
MergeTreeReaderSettings{},
|
||||
ValueSizeMap{},
|
||||
ReadBufferFromFileBase::ProfileCallback{});
|
||||
|
||||
if (!reader)
|
||||
{
|
||||
LOG_WARNING(storage.log, "Create reader failed while reading existing rows count");
|
||||
existing_rows_count = rows_count;
|
||||
return;
|
||||
}
|
||||
|
||||
size_t current_mark = 0;
|
||||
const size_t total_mark = getMarksCount();
|
||||
|
||||
bool continue_reading = false;
|
||||
size_t current_row = 0;
|
||||
size_t existing_count = 0;
|
||||
|
||||
while (current_row < rows_count)
|
||||
{
|
||||
size_t rows_to_read = index_granularity.getMarkRows(current_mark);
|
||||
continue_reading = (current_mark != 0);
|
||||
|
||||
Columns result;
|
||||
result.resize(1);
|
||||
|
||||
size_t rows_read = reader->readRows(current_mark, total_mark, continue_reading, rows_to_read, result);
|
||||
if (!rows_read)
|
||||
{
|
||||
LOG_WARNING(storage.log, "Part {} has lightweight delete, but _row_exists column not found", name);
|
||||
existing_rows_count = rows_count;
|
||||
return;
|
||||
}
|
||||
|
||||
current_row += rows_read;
|
||||
current_mark += (rows_to_read == rows_read);
|
||||
|
||||
const ColumnUInt8 * row_exists_col = typeid_cast<const ColumnUInt8 *>(result[0].get());
|
||||
if (!row_exists_col)
|
||||
{
|
||||
LOG_WARNING(storage.log, "Part {} _row_exists column type is not UInt8", name);
|
||||
existing_rows_count = rows_count;
|
||||
return;
|
||||
}
|
||||
|
||||
for (UInt8 row_exists : row_exists_col->getData())
|
||||
if (row_exists)
|
||||
existing_count++;
|
||||
}
|
||||
|
||||
existing_rows_count = existing_count;
|
||||
LOG_DEBUG(storage.log, "Part {} existing_rows_count = {}", name, existing_rows_count);
|
||||
}
|
||||
|
||||
void IMergeTreeDataPart::appendFilesOfRowsCount(Strings & files)
|
||||
{
|
||||
files.push_back("count.txt");
|
||||
|
@ -229,13 +229,6 @@ public:
|
||||
|
||||
size_t rows_count = 0;
|
||||
|
||||
/// Existing rows count (excluding lightweight deleted rows)
|
||||
/// UINT64_MAX -> uninitialized
|
||||
/// 0 -> all rows were deleted
|
||||
/// if reading failed, it will be set to rows_count
|
||||
mutable size_t existing_rows_count = UINT64_MAX;
|
||||
mutable std::mutex existing_rows_count_mutex;
|
||||
|
||||
time_t modification_time = 0;
|
||||
/// When the part is removed from the working set. Changes once.
|
||||
mutable std::atomic<time_t> remove_time { std::numeric_limits<time_t>::max() };
|
||||
@ -381,10 +374,6 @@ public:
|
||||
void setBytesOnDisk(UInt64 bytes_on_disk_) { bytes_on_disk = bytes_on_disk_; }
|
||||
void setBytesUncompressedOnDisk(UInt64 bytes_uncompressed_on_disk_) { bytes_uncompressed_on_disk = bytes_uncompressed_on_disk_; }
|
||||
|
||||
/// Returns estimated size of existing rows if setting exclude_deleted_rows_for_part_size_in_merge is true
|
||||
/// Otherwise returns bytes_on_disk
|
||||
UInt64 getExistingBytesOnDisk() const;
|
||||
|
||||
size_t getFileSizeOrZero(const String & file_name) const;
|
||||
auto getFilesChecksums() const { return checksums.files; }
|
||||
|
||||
@ -511,9 +500,6 @@ public:
|
||||
/// True if here is lightweight deleted mask file in part.
|
||||
bool hasLightweightDelete() const { return columns.contains(LightweightDeleteDescription::FILTER_COLUMN.name); }
|
||||
|
||||
/// Read existing rows count from _row_exists column
|
||||
void readExistingRowsCount() const;
|
||||
|
||||
void writeChecksums(const MergeTreeDataPartChecksums & checksums_, const WriteSettings & settings);
|
||||
|
||||
/// Checks the consistency of this data part.
|
||||
|
@ -160,7 +160,7 @@ ReplicatedMergeMutateTaskBase::PrepareResult MergeFromLogEntryTask::prepare()
|
||||
}
|
||||
|
||||
/// Start to make the main work
|
||||
size_t estimated_space_for_merge = MergeTreeDataMergerMutator::estimateNeededDiskSpace(parts, true);
|
||||
size_t estimated_space_for_merge = MergeTreeDataMergerMutator::estimateNeededDiskSpace(parts);
|
||||
|
||||
/// Can throw an exception while reserving space.
|
||||
IMergeTreeDataPart::TTLInfos ttl_infos;
|
||||
|
@ -405,7 +405,7 @@ MergeTreeDataMergerMutator::MergeSelectingInfo MergeTreeDataMergerMutator::getPo
|
||||
}
|
||||
|
||||
IMergeSelector::Part part_info;
|
||||
part_info.size = part->getExistingBytesOnDisk();
|
||||
part_info.size = part->getBytesOnDisk();
|
||||
part_info.age = res.current_time - part->modification_time;
|
||||
part_info.level = part->info.level;
|
||||
part_info.data = ∂
|
||||
@ -611,7 +611,7 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectAllPartsToMergeWithinParti
|
||||
return SelectPartsDecision::CANNOT_SELECT;
|
||||
}
|
||||
|
||||
sum_bytes += (*it)->getExistingBytesOnDisk();
|
||||
sum_bytes += (*it)->getBytesOnDisk();
|
||||
|
||||
prev_it = it;
|
||||
++it;
|
||||
@ -791,7 +791,7 @@ MergeTreeData::DataPartPtr MergeTreeDataMergerMutator::renameMergedTemporaryPart
|
||||
}
|
||||
|
||||
|
||||
size_t MergeTreeDataMergerMutator::estimateNeededDiskSpace(const MergeTreeData::DataPartsVector & source_parts, const bool & is_merge)
|
||||
size_t MergeTreeDataMergerMutator::estimateNeededDiskSpace(const MergeTreeData::DataPartsVector & source_parts)
|
||||
{
|
||||
size_t res = 0;
|
||||
time_t current_time = std::time(nullptr);
|
||||
@ -802,9 +802,6 @@ size_t MergeTreeDataMergerMutator::estimateNeededDiskSpace(const MergeTreeData::
|
||||
if (part_max_ttl && part_max_ttl <= current_time)
|
||||
continue;
|
||||
|
||||
if (is_merge && part->storage.getSettings()->exclude_deleted_rows_for_part_size_in_merge)
|
||||
res += part->getExistingBytesOnDisk();
|
||||
else
|
||||
res += part->getBytesOnDisk();
|
||||
}
|
||||
|
||||
|
@ -192,7 +192,7 @@ public:
|
||||
|
||||
|
||||
/// The approximate amount of disk space needed for merge or mutation. With a surplus.
|
||||
static size_t estimateNeededDiskSpace(const MergeTreeData::DataPartsVector & source_parts, const bool & is_merge);
|
||||
static size_t estimateNeededDiskSpace(const MergeTreeData::DataPartsVector & source_parts);
|
||||
|
||||
private:
|
||||
/** Select all parts belonging to the same partition.
|
||||
|
@ -78,7 +78,6 @@ struct Settings;
|
||||
M(UInt64, number_of_mutations_to_throw, 1000, "If table has at least that many unfinished mutations, throw 'Too many mutations' exception. Disabled if set to 0", 0) \
|
||||
M(UInt64, min_delay_to_mutate_ms, 10, "Min delay of mutating MergeTree table in milliseconds, if there are a lot of unfinished mutations", 0) \
|
||||
M(UInt64, max_delay_to_mutate_ms, 1000, "Max delay of mutating MergeTree table in milliseconds, if there are a lot of unfinished mutations", 0) \
|
||||
M(Bool, exclude_deleted_rows_for_part_size_in_merge, false, "Use an estimated source part size (excluding lightweight deleted rows) when selecting parts to merge", 0) \
|
||||
\
|
||||
/** Inserts settings. */ \
|
||||
M(UInt64, parts_to_delay_insert, 1000, "If table contains at least that many active parts in single partition, artificially slow down insert into table. Disabled if set to 0", 0) \
|
||||
|
@ -49,7 +49,7 @@ ReplicatedMergeMutateTaskBase::PrepareResult MutateFromLogEntryTask::prepare()
|
||||
}
|
||||
|
||||
/// TODO - some better heuristic?
|
||||
size_t estimated_space_for_result = MergeTreeDataMergerMutator::estimateNeededDiskSpace({source_part}, false);
|
||||
size_t estimated_space_for_result = MergeTreeDataMergerMutator::estimateNeededDiskSpace({source_part});
|
||||
|
||||
if (entry.create_time + storage_settings_ptr->prefer_fetch_merged_part_time_threshold.totalSeconds() <= time(nullptr)
|
||||
&& estimated_space_for_result >= storage_settings_ptr->prefer_fetch_merged_part_size_threshold)
|
||||
|
@ -1349,7 +1349,7 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
|
||||
if (auto part_in_memory = asInMemoryPart(part))
|
||||
sum_parts_size_in_bytes += part_in_memory->block.bytes();
|
||||
else
|
||||
sum_parts_size_in_bytes += part->getExistingBytesOnDisk();
|
||||
sum_parts_size_in_bytes += part->getBytesOnDisk();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1085,7 +1085,7 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMerge(
|
||||
if (isTTLMergeType(future_part->merge_type))
|
||||
getContext()->getMergeList().bookMergeWithTTL();
|
||||
|
||||
merging_tagger = std::make_unique<CurrentlyMergingPartsTagger>(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace(future_part->parts, true), *this, metadata_snapshot, false);
|
||||
merging_tagger = std::make_unique<CurrentlyMergingPartsTagger>(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace(future_part->parts), *this, metadata_snapshot, false);
|
||||
return std::make_shared<MergeMutateSelectedEntry>(future_part, std::move(merging_tagger), std::make_shared<MutationCommands>());
|
||||
}
|
||||
|
||||
@ -1301,7 +1301,7 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMutate(
|
||||
future_part->name = part->getNewName(new_part_info);
|
||||
future_part->part_format = part->getFormat();
|
||||
|
||||
tagger = std::make_unique<CurrentlyMergingPartsTagger>(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace({part}, false), *this, metadata_snapshot, true);
|
||||
tagger = std::make_unique<CurrentlyMergingPartsTagger>(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace({part}), *this, metadata_snapshot, true);
|
||||
return std::make_shared<MergeMutateSelectedEntry>(future_part, std::move(tagger), commands, txn);
|
||||
}
|
||||
}
|
||||
|
@ -1,3 +0,0 @@
|
||||
2
|
||||
2
|
||||
1
|
@ -1,23 +0,0 @@
|
||||
DROP TABLE IF EXISTS lwd_merge;
|
||||
|
||||
CREATE TABLE lwd_merge (id UInt64 CODEC(NONE))
|
||||
ENGINE = MergeTree ORDER BY id
|
||||
SETTINGS max_bytes_to_merge_at_max_space_in_pool = 80000, exclude_deleted_rows_for_part_size_in_merge = 0;
|
||||
|
||||
INSERT INTO lwd_merge SELECT number FROM numbers(10000);
|
||||
INSERT INTO lwd_merge SELECT number FROM numbers(10000, 10000);
|
||||
|
||||
OPTIMIZE TABLE lwd_merge;
|
||||
SELECT count() FROM system.parts WHERE database = currentDatabase() AND table = 'lwd_merge' AND active = 1;
|
||||
|
||||
DELETE FROM lwd_merge WHERE id % 10 > 0;
|
||||
|
||||
OPTIMIZE TABLE lwd_merge;
|
||||
SELECT count() FROM system.parts WHERE database = currentDatabase() AND table = 'lwd_merge' AND active = 1;
|
||||
|
||||
ALTER TABLE lwd_merge MODIFY SETTING exclude_deleted_rows_for_part_size_in_merge = 1;
|
||||
|
||||
OPTIMIZE TABLE lwd_merge;
|
||||
SELECT count() FROM system.parts WHERE database = currentDatabase() AND table = 'lwd_merge' AND active = 1;
|
||||
|
||||
DROP TABLE IF EXISTS lwd_merge;
|
Loading…
Reference in New Issue
Block a user