2021-09-16 21:19:58 +00:00
# include <Storages/MergeTree/MutateFromLogEntryTask.h>
2022-04-27 15:05:45 +00:00
# include <Common/logger_useful.h>
2021-09-16 21:19:58 +00:00
# include <Common/ProfileEvents.h>
# include <Storages/StorageReplicatedMergeTree.h>
namespace ProfileEvents
{
extern const Event DataAfterMutationDiffersFromReplica ;
extern const Event ReplicatedPartMutations ;
}
namespace DB
{
2022-03-19 16:31:33 +00:00
ReplicatedMergeMutateTaskBase : : PrepareResult MutateFromLogEntryTask : : prepare ( )
2021-09-16 21:19:58 +00:00
{
const String & source_part_name = entry . source_parts . at ( 0 ) ;
const auto storage_settings_ptr = storage . getSettings ( ) ;
LOG_TRACE ( log , " Executing log entry to mutate part {} to {} " , source_part_name , entry . new_part_name ) ;
MergeTreeData : : DataPartPtr source_part = storage . getActiveContainingPart ( source_part_name ) ;
if ( ! source_part )
{
LOG_DEBUG ( log , " Source part {} for {} is not ready; will try to fetch it instead " , source_part_name , entry . new_part_name ) ;
2022-03-19 16:31:33 +00:00
return PrepareResult {
. prepared_successfully = false ,
. need_to_check_missing_part_in_fetch = true ,
. part_log_writer = { }
} ;
2021-09-16 21:19:58 +00:00
}
if ( source_part - > name ! = source_part_name )
{
2022-02-01 09:52:02 +00:00
LOG_WARNING ( log ,
" Part {} is covered by {} but should be mutated to {}. "
" Possibly the mutation of this part is not needed and will be skipped. "
" This shouldn't happen often. " ,
source_part_name , source_part - > name , entry . new_part_name ) ;
2022-03-19 16:31:33 +00:00
return PrepareResult {
. prepared_successfully = false ,
. need_to_check_missing_part_in_fetch = true ,
. part_log_writer = { }
} ;
2021-09-16 21:19:58 +00:00
}
/// TODO - some better heuristic?
size_t estimated_space_for_result = MergeTreeDataMergerMutator : : estimateNeededDiskSpace ( { source_part } ) ;
if ( entry . create_time + storage_settings_ptr - > prefer_fetch_merged_part_time_threshold . totalSeconds ( ) < = time ( nullptr )
& & estimated_space_for_result > = storage_settings_ptr - > prefer_fetch_merged_part_size_threshold )
{
/// If entry is old enough, and have enough size, and some replica has the desired part,
/// then prefer fetching from replica.
String replica = storage . findReplicaHavingPart ( entry . new_part_name , true ) ; /// NOTE excessive ZK requests for same data later, may remove.
if ( ! replica . empty ( ) )
{
LOG_DEBUG ( log , " Prefer to fetch {} from replica {} " , entry . new_part_name , replica ) ;
2022-03-19 16:31:33 +00:00
return PrepareResult {
. prepared_successfully = false ,
. need_to_check_missing_part_in_fetch = true ,
. part_log_writer = { }
} ;
2021-09-16 21:19:58 +00:00
}
}
2022-02-10 11:15:08 +00:00
/// In some use cases merging can be more expensive than fetching
/// and it may be better to spread merges tasks across the replicas
/// instead of doing exactly the same merge cluster-wise
2022-02-10 19:45:52 +00:00
if ( storage . merge_strategy_picker . shouldMergeOnSingleReplica ( entry ) )
{
std : : optional < String > replica_to_execute_merge = storage . merge_strategy_picker . pickReplicaToExecuteMerge ( entry ) ;
2022-02-10 11:15:08 +00:00
if ( replica_to_execute_merge )
{
LOG_DEBUG ( log ,
" Prefer fetching part {} from replica {} due to execute_merges_on_single_replica_time_threshold " ,
entry . new_part_name , replica_to_execute_merge . value ( ) ) ;
2022-03-19 16:31:33 +00:00
return PrepareResult {
. prepared_successfully = false ,
. need_to_check_missing_part_in_fetch = true ,
. part_log_writer = { }
} ;
2022-02-10 11:15:08 +00:00
}
}
2021-09-16 21:19:58 +00:00
new_part_info = MergeTreePartInfo : : fromPartName ( entry . new_part_name , storage . format_version ) ;
2022-04-19 20:47:29 +00:00
commands = std : : make_shared < MutationCommands > ( storage . queue . getMutationCommands ( source_part , new_part_info . mutation ) ) ;
2021-09-16 21:19:58 +00:00
/// Once we mutate part, we must reserve space on the same disk, because mutations can possibly create hardlinks.
/// Can throw an exception.
2022-04-22 16:58:09 +00:00
reserved_space = storage . reserveSpace ( estimated_space_for_result , source_part - > data_part_storage ) ;
2021-09-16 21:19:58 +00:00
table_lock_holder = storage . lockForShare (
RWLockImpl : : NO_QUERY , storage_settings_ptr - > lock_acquire_timeout_for_background_operations ) ;
StorageMetadataPtr metadata_snapshot = storage . getInMemoryMetadataPtr ( ) ;
2022-03-16 19:16:26 +00:00
transaction_ptr = std : : make_unique < MergeTreeData : : Transaction > ( storage , NO_TRANSACTION_RAW ) ;
2021-09-16 21:19:58 +00:00
future_mutated_part = std : : make_shared < FutureMergedMutatedPart > ( ) ;
future_mutated_part - > name = entry . new_part_name ;
future_mutated_part - > uuid = entry . new_part_uuid ;
future_mutated_part - > parts . push_back ( source_part ) ;
future_mutated_part - > part_info = new_part_info ;
future_mutated_part - > updatePath ( storage , reserved_space . get ( ) ) ;
future_mutated_part - > type = source_part - > getType ( ) ;
2022-02-10 19:45:52 +00:00
if ( storage_settings_ptr - > allow_remote_fs_zero_copy_replication )
{
2022-05-11 22:04:54 +00:00
if ( auto disk = reserved_space - > getDisk ( ) ; disk - > supportZeroCopyReplication ( ) )
2022-02-10 19:45:52 +00:00
{
String dummy ;
if ( ! storage . findReplicaHavingCoveringPart ( entry . new_part_name , true , dummy ) . empty ( ) )
{
2022-04-20 11:59:02 +00:00
LOG_DEBUG ( log , " Mutation of part {} finished by some other replica, will download mutated part " , entry . new_part_name ) ;
2022-03-19 16:31:33 +00:00
return PrepareResult {
. prepared_successfully = false ,
. need_to_check_missing_part_in_fetch = true ,
. part_log_writer = { }
} ;
2022-02-10 19:45:52 +00:00
}
zero_copy_lock = storage . tryCreateZeroCopyExclusiveLock ( entry . new_part_name , disk ) ;
if ( ! zero_copy_lock )
{
2022-04-20 11:59:02 +00:00
LOG_DEBUG ( log , " Mutation of part {} started by some other replica, will wait it and mutated merged part " , entry . new_part_name ) ;
2022-03-19 16:31:33 +00:00
return PrepareResult {
. prepared_successfully = false ,
. need_to_check_missing_part_in_fetch = false ,
. part_log_writer = { }
} ;
2022-02-10 19:45:52 +00:00
}
2022-09-15 16:54:27 +00:00
else if ( ! storage . findReplicaHavingCoveringPart ( entry . new_part_name , true , dummy ) . empty ( ) )
{
/// Why this if still needed? We can check for part in zookeeper, don't find it and sleep for any amount of time. During this sleep part will be actually commited from other replica
/// and exclusive zero copy lock will be released. We will take the lock and execute mutation one more time, while it was possible just to download the part from other replica.
///
/// It's also posible just because reads in [Zoo]Keeper are not lineariazable.
///
/// NOTE: In case of mutation and hardlinks it can even lead to extremely rare dataloss (we will produce new part with the same hardlinks, don't fetch the same from other replica), so this check is important.
zero_copy_lock - > lock - > unlock ( ) ;
LOG_DEBUG ( log , " We took zero copy lock, but mutation of part {} finished by some other replica, will release lock and download mutated part to avoid data duplication " , entry . new_part_name ) ;
return PrepareResult {
. prepared_successfully = false ,
. need_to_check_missing_part_in_fetch = true ,
. part_log_writer = { }
} ;
}
else
{
LOG_DEBUG ( log , " Zero copy lock taken, will mutate part {} " , entry . new_part_name ) ;
}
2022-02-10 19:45:52 +00:00
}
}
2021-10-03 08:21:54 +00:00
const Settings & settings = storage . getContext ( ) - > getSettingsRef ( ) ;
merge_mutate_entry = storage . getContext ( ) - > getMergeList ( ) . insert (
storage . getStorageID ( ) ,
future_mutated_part ,
Fix possible memory_tracker use-after-free for merges/mutations
There are two possible cases for execution merges/mutations:
1) from background thread
2) from OPTIMIZE TABLE query
1) is pretty simple, it's memory tracking structure is as follow:
current_thread::memory_tracker = level=Thread / description="(for thread)" ==
background_thread_memory_tracker = level=Thread / description="(for thread)"
current_thread::memory_tracker.parent = level=Global / description="(total)"
So as you can see it is pretty simple and MemoryTrackerThreadSwitcher
does not do anything icky for this case.
2) is complex, it's memory tracking structure is as follow:
current_thread::memory_tracker = level=Thread / description="(for thread)"
current_thread::memory_tracker.parent = level=Process / description="(for query)" ==
background_thread_memory_tracker = level=Process / description="(for query)"
Before this patch to track memory (and related things, like sampling,
profiling and so on) for OPTIMIZE TABLE query dirty hacks was done to
do this, since current_thread memory_tracker was of Thread scope, that
does not have any limits.
And so if will change parent for it to Merge/Mutate memory tracker
(which also does not have some of settings) it will not be correctly
tracked.
To address this Merge/Mutate was set as parent not to the
current_thread memory_tracker but to it's parent, since it's scope is
Process with all settings.
But that parent's memory_tracker is the memory_tracker of the
thread_group, and so if you will have nested ThreadPool inside
merge/mutate (this is the case for s3 async writes, which has been
added in #33291) you may get use-after-free of memory_tracker.
Consider the following example:
MemoryTrackerThreadSwitcher()
thread_group.memory_tracker.parent = merge_list_entry->memory_tracker
(see also background_thread_memory_tracker above)
CurrentThread::attachTo()
current_thread.memory_tracker.parent = thread_group.memory_tracker
CurrentThread::detachQuery()
current_thread.memory_tracker.parent = thread_group.memory_tracker.parent
# and this is equal to merge_list_entry->memory_tracker
~MemoryTrackerThreadSwitcher()
thread_group.memory_tracker = thread_group.memory_tracker.parent
So after the following we will get incorrect memory_tracker (from the
mege_list_entry) when the next job in that ThreadPool will not have
thread_group, since in this case it will not try to update the
current_thread.memory_tracker.parent and use-after-free will happens.
So to address the (2) issue, settings from the parent memory_tracker
should be copied to the merge_list_entry->memory_tracker, to avoid
playing with parent memory tracker.
Note, that settings from the query (OPTIMIZE TABLE) is not available at
that time, so it cannot be used (instead of parent's memory tracker
settings).
v2: remove memory_tracker.setOrRaiseHardLimit() from settings
Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
2022-02-18 07:45:29 +00:00
settings ) ;
2021-09-16 21:19:58 +00:00
stopwatch_ptr = std : : make_unique < Stopwatch > ( ) ;
2021-10-04 21:13:18 +00:00
fake_query_context = Context : : createCopy ( storage . getContext ( ) ) ;
fake_query_context - > makeQueryContext ( ) ;
fake_query_context - > setCurrentQueryId ( " " ) ;
2021-09-16 21:19:58 +00:00
mutate_task = storage . merger_mutator . mutatePartToTemporaryPart (
future_mutated_part , metadata_snapshot , commands , merge_mutate_entry . get ( ) ,
2022-03-16 19:16:26 +00:00
entry . create_time , fake_query_context , NO_TRANSACTION_PTR , reserved_space , table_lock_holder ) ;
2021-09-16 21:19:58 +00:00
2021-09-30 21:26:24 +00:00
/// Adjust priority
for ( auto & item : future_mutated_part - > parts )
priority + = item - > getBytesOnDisk ( ) ;
2022-03-19 16:31:33 +00:00
return { true , true , [ this ] ( const ExecutionStatus & execution_status )
2021-09-16 21:19:58 +00:00
{
storage . writePartLog (
PartLogElement : : MUTATE_PART , execution_status , stopwatch_ptr - > elapsed ( ) ,
entry . new_part_name , new_part , future_mutated_part - > parts , merge_mutate_entry . get ( ) ) ;
} } ;
}
bool MutateFromLogEntryTask : : finalize ( ReplicatedMergeMutateTaskBase : : PartLogWriter write_part_log )
{
new_part = mutate_task - > getFuture ( ) . get ( ) ;
2022-06-28 10:51:49 +00:00
auto builder = mutate_task - > getBuilder ( ) ;
2021-09-16 21:19:58 +00:00
2022-06-28 12:41:22 +00:00
if ( ! builder )
builder = new_part - > data_part_storage - > getBuilder ( ) ;
2022-06-28 10:51:49 +00:00
storage . renameTempPartAndReplace ( new_part , * transaction_ptr , builder ) ;
2021-09-16 21:19:58 +00:00
try
{
2022-06-30 20:51:27 +00:00
storage . checkPartChecksumsAndCommit ( * transaction_ptr , new_part , mutate_task - > getHardlinkedFiles ( ) ) ;
2021-09-16 21:19:58 +00:00
}
catch ( const Exception & e )
{
if ( MergeTreeDataPartChecksums : : isBadChecksumsErrorCode ( e . code ( ) ) )
{
transaction_ptr - > rollback ( ) ;
ProfileEvents : : increment ( ProfileEvents : : DataAfterMutationDiffersFromReplica ) ;
2022-03-25 14:54:05 +00:00
LOG_ERROR ( log , " {}. Data after mutation is not byte-identical to data on another replicas. "
" We will download merged part from replica to force byte-identical result. " , getCurrentExceptionMessage ( false ) ) ;
2021-09-16 21:19:58 +00:00
write_part_log ( ExecutionStatus : : fromCurrentException ( ) ) ;
if ( storage . getSettings ( ) - > detach_not_byte_identical_parts )
storage . forgetPartAndMoveToDetached ( std : : move ( new_part ) , " mutate-not-byte-identical " ) ;
else
storage . tryRemovePartImmediately ( std : : move ( new_part ) ) ;
/// No need to delete the part from ZK because we can be sure that the commit transaction
/// didn't go through.
return false ;
}
throw ;
}
2022-02-10 19:45:52 +00:00
if ( zero_copy_lock )
{
LOG_DEBUG ( log , " Removing zero-copy lock " ) ;
zero_copy_lock - > lock - > unlock ( ) ;
}
2021-09-16 21:19:58 +00:00
/** With `ZSESSIONEXPIRED` or `ZOPERATIONTIMEOUT`, we can inadvertently roll back local changes to the parts.
* This is not a problem , because in this case the entry will remain in the queue , and we will try again .
*/
storage . merge_selecting_task - > schedule ( ) ;
ProfileEvents : : increment ( ProfileEvents : : ReplicatedPartMutations ) ;
write_part_log ( { } ) ;
return true ;
}
}