ClickHouse/src/Storages/MergeTree/MergeFromLogEntryTask.cpp

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

378 lines
16 KiB
C++
Raw Normal View History

#include <Storages/MergeTree/MergeFromLogEntryTask.h>
2022-04-27 15:05:45 +00:00
#include <Common/logger_useful.h>
#include <Common/ProfileEvents.h>
2023-01-24 11:23:17 +00:00
#include <Common/ProfileEventsScope.h>
#include <Storages/StorageReplicatedMergeTree.h>
namespace ProfileEvents
{
extern const Event DataAfterMergeDiffersFromReplica;
extern const Event ReplicatedPartMerges;
}
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_DATA_PART_NAME;
extern const int LOGICAL_ERROR;
}
MergeFromLogEntryTask::MergeFromLogEntryTask(
ReplicatedMergeTreeQueue::SelectedEntryPtr selected_entry_,
StorageReplicatedMergeTree & storage_,
IExecutableTask::TaskResultCallback & task_result_callback_)
: ReplicatedMergeMutateTaskBase(
&Poco::Logger::get(
storage_.getStorageID().getShortName() + "::" + selected_entry_->log_entry->new_part_name + " (MergeFromLogEntryTask)"),
storage_,
selected_entry_,
task_result_callback_)
{
}
ReplicatedMergeMutateTaskBase::PrepareResult MergeFromLogEntryTask::prepare()
{
LOG_TRACE(log, "Executing log entry to merge parts {} to {}",
fmt::join(entry.source_parts, ", "), entry.new_part_name);
const auto storage_settings_ptr = storage.getSettings();
if (storage_settings_ptr->always_fetch_merged_part)
{
LOG_INFO(log, "Will fetch part {} because setting 'always_fetch_merged_part' is true", entry.new_part_name);
return PrepareResult{
.prepared_successfully = false,
.need_to_check_missing_part_in_fetch = true,
.part_log_writer = {}
};
}
2022-03-14 14:42:09 +00:00
if (entry.merge_type == MergeType::TTLRecompress &&
(time(nullptr) - entry.create_time) <= storage_settings_ptr->try_fetch_recompressed_part_timeout.totalSeconds() &&
entry.source_replica != storage.replica_name)
{
LOG_INFO(log, "Will try to fetch part {} until '{}' because this part assigned to recompression merge. "
"Source replica {} will try to merge this part first", entry.new_part_name,
DateLUT::instance().timeToString(entry.create_time + storage_settings_ptr->try_fetch_recompressed_part_timeout.totalSeconds()), entry.source_replica);
/// Waiting other replica to recompress part. No need to check it.
return PrepareResult{
.prepared_successfully = false,
.need_to_check_missing_part_in_fetch = false,
.part_log_writer = {}
};
}
/// In some use cases merging can be more expensive than fetching
/// and it may be better to spread merges tasks across the replicas
/// instead of doing exactly the same merge cluster-wise
2022-02-10 19:45:52 +00:00
if (storage.merge_strategy_picker.shouldMergeOnSingleReplica(entry))
{
2022-02-10 19:45:52 +00:00
std::optional<String> replica_to_execute_merge = storage.merge_strategy_picker.pickReplicaToExecuteMerge(entry);
if (replica_to_execute_merge)
{
LOG_DEBUG(log,
"Prefer fetching part {} from replica {} due to execute_merges_on_single_replica_time_threshold",
entry.new_part_name, replica_to_execute_merge.value());
return PrepareResult{
.prepared_successfully = false,
.need_to_check_missing_part_in_fetch = true,
.part_log_writer = {}
};
}
}
for (const String & source_part_name : entry.source_parts)
{
MergeTreeData::DataPartPtr source_part_or_covering = storage.getActiveContainingPart(source_part_name);
if (!source_part_or_covering)
{
/// We do not have one of source parts locally, try to take some already merged part from someone.
LOG_DEBUG(log, "Don't have all parts (at least {} is missing) for merge {}; will try to fetch it instead. "
"Either pool for fetches is starving, see background_fetches_pool_size, or none of active replicas has it",
source_part_name, entry.new_part_name);
return PrepareResult{
.prepared_successfully = false,
.need_to_check_missing_part_in_fetch = true,
.part_log_writer = {}
};
}
if (source_part_or_covering->name != source_part_name)
{
/// We do not have source part locally, but we have some covering part. Possible options:
/// 1. We already have merged part (source_part_or_covering->name == new_part_name)
/// 2. We have some larger merged part which covers new_part_name (and therefore it covers source_part_name too)
/// 3. We have two intersecting parts, both cover source_part_name. It's logical error.
/// TODO Why 1 and 2 can happen? Do we need more assertions here or somewhere else?
2022-12-23 18:40:29 +00:00
constexpr auto fmt_string = "Part {} is covered by {} but should be merged into {}. This shouldn't happen often.";
2023-01-13 19:34:31 +00:00
String message;
LOG_WARNING(LogToStr(message, log), fmt_string, source_part_name, source_part_or_covering->name, entry.new_part_name);
if (!source_part_or_covering->info.contains(MergeTreePartInfo::fromPartName(entry.new_part_name, storage.format_version)))
2023-01-26 09:52:47 +00:00
throw Exception::createDeprecated(message, ErrorCodes::LOGICAL_ERROR);
return PrepareResult{
.prepared_successfully = false,
.need_to_check_missing_part_in_fetch = true,
.part_log_writer = {}
};
}
parts.push_back(source_part_or_covering);
}
/// All source parts are found locally, we can execute merge
if (entry.create_time + storage_settings_ptr->prefer_fetch_merged_part_time_threshold.totalSeconds() <= time(nullptr))
{
/// If entry is old enough, and have enough size, and part are exists in any replica,
/// then prefer fetching of merged part from replica.
size_t sum_parts_bytes_on_disk = 0;
for (const auto & item : parts)
sum_parts_bytes_on_disk += item->getBytesOnDisk();
if (sum_parts_bytes_on_disk >= storage_settings_ptr->prefer_fetch_merged_part_size_threshold)
{
String replica = storage.findReplicaHavingPart(entry.new_part_name, true); /// NOTE excessive ZK requests for same data later, may remove.
if (!replica.empty())
{
LOG_DEBUG(log, "Prefer to fetch {} from replica {}", entry.new_part_name, replica);
/// We found covering part, no checks for missing part.
return PrepareResult{
.prepared_successfully = false,
.need_to_check_missing_part_in_fetch = false,
.part_log_writer = {}
};
}
}
}
/// Start to make the main work
size_t estimated_space_for_merge = MergeTreeDataMergerMutator::estimateNeededDiskSpace(parts);
/// Can throw an exception while reserving space.
IMergeTreeDataPart::TTLInfos ttl_infos;
size_t max_volume_index = 0;
for (auto & part_ptr : parts)
{
ttl_infos.update(part_ptr->ttl_infos);
2022-10-25 22:14:06 +00:00
auto disk_name = part_ptr->getDataPartStorage().getDiskName();
size_t volume_index = storage.getStoragePolicy()->getVolumeIndexByDiskName(disk_name);
max_volume_index = std::max(max_volume_index, volume_index);
}
/// It will live until the whole task is being destroyed
table_lock_holder = storage.lockForShare(RWLockImpl::NO_QUERY, storage_settings_ptr->lock_acquire_timeout_for_background_operations);
StorageMetadataPtr metadata_snapshot = storage.getInMemoryMetadataPtr();
auto future_merged_part = std::make_shared<FutureMergedMutatedPart>(parts, entry.new_part_format);
if (future_merged_part->name != entry.new_part_name)
{
throw Exception(ErrorCodes::BAD_DATA_PART_NAME, "Future merged part name {} differs from part name in log entry: {}",
backQuote(future_merged_part->name), backQuote(entry.new_part_name));
}
std::optional<CurrentlySubmergingEmergingTagger> tagger;
ReservationSharedPtr reserved_space = storage.balancedReservation(
metadata_snapshot,
estimated_space_for_merge,
max_volume_index,
future_merged_part->name,
future_merged_part->part_info,
future_merged_part->parts,
&tagger,
&ttl_infos);
if (!reserved_space)
reserved_space = storage.reserveSpacePreferringTTLRules(
metadata_snapshot, estimated_space_for_merge, ttl_infos, time(nullptr), max_volume_index);
future_merged_part->uuid = entry.new_part_uuid;
future_merged_part->updatePath(storage, reserved_space.get());
future_merged_part->merge_type = entry.merge_type;
if (storage_settings_ptr->allow_remote_fs_zero_copy_replication)
{
2022-05-11 22:04:54 +00:00
if (auto disk = reserved_space->getDisk(); disk->supportZeroCopyReplication())
{
2022-02-10 19:49:33 +00:00
String dummy;
2022-02-10 19:50:15 +00:00
if (!storage.findReplicaHavingCoveringPart(entry.new_part_name, true, dummy).empty())
{
2022-02-10 19:45:52 +00:00
LOG_DEBUG(log, "Merge of part {} finished by some other replica, will fetch merged part", entry.new_part_name);
/// We found covering part, no checks for missing part.
return PrepareResult{
.prepared_successfully = false,
.need_to_check_missing_part_in_fetch = false,
.part_log_writer = {}
};
2022-02-10 19:45:52 +00:00
}
zero_copy_lock = storage.tryCreateZeroCopyExclusiveLock(entry.new_part_name, disk);
2023-03-16 19:17:29 +00:00
if (!zero_copy_lock || !zero_copy_lock->isLocked())
2022-02-10 19:45:52 +00:00
{
LOG_DEBUG(log, "Merge of part {} started by some other replica, will wait it and fetch merged part", entry.new_part_name);
2023-03-17 16:45:02 +00:00
storage.watchZeroCopyLock(entry.new_part_name, disk);
/// Don't check for missing part -- it's missing because other replica still not
/// finished merge.
return PrepareResult{
.prepared_successfully = false,
.need_to_check_missing_part_in_fetch = false,
.part_log_writer = {}
};
}
else if (!storage.findReplicaHavingCoveringPart(entry.new_part_name, /* active */ false, dummy).empty())
{
2022-09-15 16:56:16 +00:00
/// Why this if still needed? We can check for part in zookeeper, don't find it and sleep for any amount of time. During this sleep part will be actually committed from other replica
/// and exclusive zero copy lock will be released. We will take the lock and execute merge one more time, while it was possible just to download the part from other replica.
///
2022-09-15 16:56:16 +00:00
/// It's also possible just because reads in [Zoo]Keeper are not lineariazable.
///
/// NOTE: In case of mutation and hardlinks it can even lead to extremely rare dataloss (we will produce new part with the same hardlinks, don't fetch the same from other replica), so this check is important.
zero_copy_lock->lock->unlock();
LOG_DEBUG(log, "We took zero copy lock, but merge of part {} finished by some other replica, will release lock and download merged part to avoid data duplication", entry.new_part_name);
return PrepareResult{
.prepared_successfully = false,
.need_to_check_missing_part_in_fetch = true,
.part_log_writer = {}
};
}
else
{
LOG_DEBUG(log, "Zero copy lock taken, will merge part {}", entry.new_part_name);
}
}
}
/// Account TTL merge
if (isTTLMergeType(future_merged_part->merge_type))
storage.getContext()->getMergeList().bookMergeWithTTL();
auto table_id = storage.getStorageID();
/// Add merge to list
const Settings & settings = storage.getContext()->getSettingsRef();
merge_mutate_entry = storage.getContext()->getMergeList().insert(
storage.getStorageID(),
future_merged_part,
Fix possible memory_tracker use-after-free for merges/mutations There are two possible cases for execution merges/mutations: 1) from background thread 2) from OPTIMIZE TABLE query 1) is pretty simple, it's memory tracking structure is as follow: current_thread::memory_tracker = level=Thread / description="(for thread)" == background_thread_memory_tracker = level=Thread / description="(for thread)" current_thread::memory_tracker.parent = level=Global / description="(total)" So as you can see it is pretty simple and MemoryTrackerThreadSwitcher does not do anything icky for this case. 2) is complex, it's memory tracking structure is as follow: current_thread::memory_tracker = level=Thread / description="(for thread)" current_thread::memory_tracker.parent = level=Process / description="(for query)" == background_thread_memory_tracker = level=Process / description="(for query)" Before this patch to track memory (and related things, like sampling, profiling and so on) for OPTIMIZE TABLE query dirty hacks was done to do this, since current_thread memory_tracker was of Thread scope, that does not have any limits. And so if will change parent for it to Merge/Mutate memory tracker (which also does not have some of settings) it will not be correctly tracked. To address this Merge/Mutate was set as parent not to the current_thread memory_tracker but to it's parent, since it's scope is Process with all settings. But that parent's memory_tracker is the memory_tracker of the thread_group, and so if you will have nested ThreadPool inside merge/mutate (this is the case for s3 async writes, which has been added in #33291) you may get use-after-free of memory_tracker. Consider the following example: MemoryTrackerThreadSwitcher() thread_group.memory_tracker.parent = merge_list_entry->memory_tracker (see also background_thread_memory_tracker above) CurrentThread::attachTo() current_thread.memory_tracker.parent = thread_group.memory_tracker CurrentThread::detachQuery() current_thread.memory_tracker.parent = thread_group.memory_tracker.parent # and this is equal to merge_list_entry->memory_tracker ~MemoryTrackerThreadSwitcher() thread_group.memory_tracker = thread_group.memory_tracker.parent So after the following we will get incorrect memory_tracker (from the mege_list_entry) when the next job in that ThreadPool will not have thread_group, since in this case it will not try to update the current_thread.memory_tracker.parent and use-after-free will happens. So to address the (2) issue, settings from the parent memory_tracker should be copied to the merge_list_entry->memory_tracker, to avoid playing with parent memory tracker. Note, that settings from the query (OPTIMIZE TABLE) is not available at that time, so it cannot be used (instead of parent's memory tracker settings). v2: remove memory_tracker.setOrRaiseHardLimit() from settings Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
2022-02-18 07:45:29 +00:00
settings);
2022-03-16 19:16:26 +00:00
transaction_ptr = std::make_unique<MergeTreeData::Transaction>(storage, NO_TRANSACTION_RAW);
stopwatch_ptr = std::make_unique<Stopwatch>();
merge_task = storage.merger_mutator.mergePartsToTemporaryPart(
future_merged_part,
metadata_snapshot,
merge_mutate_entry.get(),
2021-09-24 13:57:44 +00:00
{} /* projection_merge_list_element */,
table_lock_holder,
entry.create_time,
storage.getContext(),
reserved_space,
entry.deduplicate,
entry.deduplicate_by_columns,
[RFC] Replacing merge tree new engine (#41005) * Add new engine to ReplacingMergeTree corresponding to the ReplacingCollapsingMergeTree * Add new test for the new ReplacingMergeTree engine * Limit sign value to -1/1 * Add new engine to ReplacingMergeTree corresponding to the ReplacingCollapsingMergeTree * Add new test for the new ReplacingMergeTree engine * Limit sign value to -1/1 * Replace sign column(Int8) by is_deleted(UInt8) * Add new engine to ReplacingMergeTree corresponding to the ReplacingCollapsingMergeTree * Add new test for the new ReplacingMergeTree engine * Limit sign value to -1/1 * Replace sign column(Int8) by is_deleted(UInt8) * Add new engine to ReplacingMergeTree corresponding to the ReplacingCollapsingMergeTree * Add new test for the new ReplacingMergeTree engine * Limit sign value to -1/1 * Replace sign column(Int8) by is_deleted(UInt8) * Add keyword 'CLEANUP' when OPTIMIZE * Cleanup uniquely when it's a replacingMergeTree * Propagate CLEANUP information and change from 'with_cleanup' to 'cleanup' * Cleanup data flagged as 'is_deleted' * Fix merge when optimize and add a test * Fix OPTIMIZE and INSERT + add tests * New fix for cleanup at the merge * Cleanup debug logs * Add the SETTINGS option 'clean_deleted_rows' that can be 'never' or 'always' * Fix regression bug; Now REplicatedMergeTree can be called as before without 'is_deleted' * Add Replicated tests * Disable tag 'long' for our test and cleanup some white spaces * Update tests * Fix tests and remove additional useless whitespace * Fix replica test * Style clean && add condition check for is_deleted values * clean_deleted_rows settings is nom an enum * Add valid default value to the clean_deleted_rows settings * Update cleanup checkers to use the enum and fix typos in the test * Fix submodule contrib/AMQP-CPP pointer * Add missing messages in test reference and remove a print with non derterministic order * fix replica test reference * Fix edge case * Fix a typo for the spell checker * Fix reference * Fix a condition to raise an error if is_deleted differ from 0/1 and cleanup * Change tests file name and update number * This should fix the ReplacingMergeTree parameter set * Fix replicated parameters * Disable allow_deprecated_syntax_for_merge_tree for our new column * Fix a test * Remove non deterministic order print in the test * Test on replicas * Remove a condition, when checking optional parameters, that should not be sueful since we disabled the deprected_syntaxe * Revert "Remove a condition, when checking optional parameters, that should not be useful since we disabled the deprected_syntaxe" This reverts commit b65d64c05e482945ac20fcfcf0311e1b028ea137. * Fix replica management and limit the number of argument to two maximum, due to the possiblity of deprecated table create/attach failing otherwise * Test a fix for replicated log information error * Try to add sync to have consistent results * Change path of replicas that should cause one issue and add few prints in case it's not that * Get cleanup info on replicas only if information found * Fix style issues * Try to avoid replication error 'cannot select parts...' and and replica read/write field order * Cleanup according to PR reviews and add tests on error raised. * Update src/Storages/MergeTree/registerStorageMergeTree.cpp Co-authored-by: Alexander Tokmakov <tavplubix@gmail.com> * Select ... FINAL don't show rows with is_deleted = true * Update and fix SELECT ... FINAL merge parameter * Remove is_deleted rows only on the version inserted when merge * Fix (master) updates issues * Revert changes that should not be commited * Add changes according to review * Revert changes that should not be commited - part 2 --------- Co-authored-by: Alexander Tokmakov <tavplubix@gmail.com>
2023-02-16 13:03:16 +00:00
entry.cleanup,
2022-02-14 19:50:08 +00:00
storage.merging_params,
2022-03-16 19:16:26 +00:00
NO_TRANSACTION_PTR);
/// Adjust priority
for (auto & item : future_merged_part->parts)
priority += item->getBytesOnDisk();
return {true, true, [this, stopwatch = *stopwatch_ptr] (const ExecutionStatus & execution_status)
{
2023-01-24 11:23:17 +00:00
auto profile_counters_snapshot = std::make_shared<ProfileEvents::Counters::Snapshot>(profile_counters.getPartiallyAtomicSnapshot());
storage.writePartLog(
PartLogElement::MERGE_PARTS, execution_status, stopwatch.elapsed(),
2023-01-24 11:23:17 +00:00
entry.new_part_name, part, parts, merge_mutate_entry.get(), std::move(profile_counters_snapshot));
}};
}
bool MergeFromLogEntryTask::finalize(ReplicatedMergeMutateTaskBase::PartLogWriter write_part_log)
{
part = merge_task->getFuture().get();
2022-10-22 22:51:59 +00:00
storage.merger_mutator.renameMergedTemporaryPart(part, parts, NO_TRANSACTION_PTR, *transaction_ptr);
2022-12-20 17:09:38 +00:00
/// Why we reset task here? Because it holds shared pointer to part and tryRemovePartImmediately will
/// not able to remove the part and will throw an exception (because someone holds the pointer).
///
/// Why we cannot reset task right after obtaining part from getFuture()? Because it holds RAII wrapper for
/// temp directories which guards temporary dir from background removal. So it's right place to reset the task
/// and it's really needed.
merge_task.reset();
try
{
2022-06-30 20:51:27 +00:00
storage.checkPartChecksumsAndCommit(*transaction_ptr, part);
}
catch (const Exception & e)
{
if (MergeTreeDataPartChecksums::isBadChecksumsErrorCode(e.code()))
{
transaction_ptr->rollback();
ProfileEvents::increment(ProfileEvents::DataAfterMergeDiffersFromReplica);
LOG_ERROR(log,
"{}. Data after merge is not byte-identical to data on another replicas. There could be several reasons:"
" 1. Using newer version of compression library after server update."
" 2. Using another compression method."
" 3. Non-deterministic compression algorithm (highly unlikely)."
" 4. Non-deterministic merge algorithm due to logical error in code."
" 5. Data corruption in memory due to bug in code."
" 6. Data corruption in memory due to hardware issue."
" 7. Manual modification of source data after server startup."
" 8. Manual modification of checksums stored in ZooKeeper."
" 9. Part format related settings like 'enable_mixed_granularity_parts' are different on different replicas."
" We will download merged part from replica to force byte-identical result.",
getCurrentExceptionMessage(false));
2023-02-03 06:41:27 +00:00
write_part_log(ExecutionStatus::fromCurrentException("", true));
if (storage.getSettings()->detach_not_byte_identical_parts)
storage.forcefullyMovePartToDetachedAndRemoveFromMemory(std::move(part), "merge-not-byte-identical");
else
storage.tryRemovePartImmediately(std::move(part));
/// No need to delete the part from ZK because we can be sure that the commit transaction
/// didn't go through.
return false;
}
throw;
}
2022-02-10 19:45:52 +00:00
if (zero_copy_lock)
zero_copy_lock->lock->unlock();
/** Removing old parts from ZK and from the disk is delayed - see ReplicatedMergeTreeCleanupThread, clearOldParts.
*/
/** With `ZSESSIONEXPIRED` or `ZOPERATIONTIMEOUT`, we can inadvertently roll back local changes to the parts.
* This is not a problem, because in this case the merge will remain in the queue, and we will try again.
*/
storage.merge_selecting_task->schedule();
ProfileEvents::increment(ProfileEvents::ReplicatedPartMerges);
write_part_log({});
storage.incrementMergedPartsProfileEvent(part->getType());
return true;
}
}