ClickHouse/src/Storages/MergeTree/MergePlainMergeTreeTask.cpp

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

164 lines
4.6 KiB
C++
Raw Normal View History

#include <Storages/MergeTree/MergePlainMergeTreeTask.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/StorageMergeTree.h>
#include <Storages/MergeTree/MergeTreeDataMergerMutator.h>
2023-01-23 12:45:28 +00:00
#include <Common/ProfileEventsScope.h>
#include <Common/ProfileEvents.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
StorageID MergePlainMergeTreeTask::getStorageID()
{
return storage.getStorageID();
}
void MergePlainMergeTreeTask::onCompleted()
{
bool delay = state == State::SUCCESS;
task_result_callback(delay);
}
bool MergePlainMergeTreeTask::executeStep()
{
/// All metrics will be saved in the thread_group, including all scheduled tasks.
/// In profile_counters only metrics from this thread will be saved.
ProfileEventsScope profile_events_scope(&profile_counters);
2022-08-04 04:07:32 +00:00
/// Make out memory tracker a parent of current thread memory tracker
2023-03-03 22:09:36 +00:00
std::optional<ThreadGroupSwitcher> switcher;
if (merge_list_entry)
{
2023-03-27 12:31:29 +00:00
switcher.emplace((*merge_list_entry)->thread_group);
}
switch (state)
{
case State::NEED_PREPARE :
{
prepare();
state = State::NEED_EXECUTE;
return true;
}
case State::NEED_EXECUTE :
{
try
{
if (merge_task->execute())
return true;
state = State::NEED_FINISH;
return true;
}
catch (...)
{
2023-02-03 06:41:27 +00:00
write_part_log(ExecutionStatus::fromCurrentException("", true));
throw;
}
}
case State::NEED_FINISH :
{
finish();
state = State::SUCCESS;
return false;
}
case State::SUCCESS:
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Task with state SUCCESS mustn't be executed again");
}
}
return false;
}
void MergePlainMergeTreeTask::prepare()
{
future_part = merge_mutate_entry->future_part;
stopwatch_ptr = std::make_unique<Stopwatch>();
task_context = createTaskContext();
merge_list_entry = storage.getContext()->getMergeList().insert(
storage.getStorageID(),
future_part,
task_context);
write_part_log = [this] (const ExecutionStatus & execution_status)
{
2023-01-23 12:45:28 +00:00
auto profile_counters_snapshot = std::make_shared<ProfileEvents::Counters::Snapshot>(profile_counters.getPartiallyAtomicSnapshot());
merge_task.reset();
storage.writePartLog(
PartLogElement::MERGE_PARTS,
execution_status,
stopwatch_ptr->elapsed(),
future_part->name,
new_part,
future_part->parts,
merge_list_entry.get(),
2023-01-23 12:45:28 +00:00
std::move(profile_counters_snapshot));
};
transfer_profile_counters_to_initial_query = [this, query_thread_group = CurrentThread::getGroup()] ()
{
if (query_thread_group)
{
auto task_thread_group = (*merge_list_entry)->thread_group;
auto task_counters_snapshot = task_thread_group->performance_counters.getPartiallyAtomicSnapshot();
auto & query_counters = query_thread_group->performance_counters;
for (ProfileEvents::Event i = ProfileEvents::Event(0); i < ProfileEvents::end(); ++i)
query_counters.incrementNoTrace(i, task_counters_snapshot[i]);
}
};
merge_task = storage.merger_mutator.mergePartsToTemporaryPart(
future_part,
metadata_snapshot,
merge_list_entry.get(),
2021-09-24 13:57:44 +00:00
{} /* projection_merge_list_element */,
table_lock_holder,
time(nullptr),
task_context,
merge_mutate_entry->tagger->reserved_space,
deduplicate,
deduplicate_by_columns,
[RFC] Replacing merge tree new engine (#41005) * Add new engine to ReplacingMergeTree corresponding to the ReplacingCollapsingMergeTree * Add new test for the new ReplacingMergeTree engine * Limit sign value to -1/1 * Add new engine to ReplacingMergeTree corresponding to the ReplacingCollapsingMergeTree * Add new test for the new ReplacingMergeTree engine * Limit sign value to -1/1 * Replace sign column(Int8) by is_deleted(UInt8) * Add new engine to ReplacingMergeTree corresponding to the ReplacingCollapsingMergeTree * Add new test for the new ReplacingMergeTree engine * Limit sign value to -1/1 * Replace sign column(Int8) by is_deleted(UInt8) * Add new engine to ReplacingMergeTree corresponding to the ReplacingCollapsingMergeTree * Add new test for the new ReplacingMergeTree engine * Limit sign value to -1/1 * Replace sign column(Int8) by is_deleted(UInt8) * Add keyword 'CLEANUP' when OPTIMIZE * Cleanup uniquely when it's a replacingMergeTree * Propagate CLEANUP information and change from 'with_cleanup' to 'cleanup' * Cleanup data flagged as 'is_deleted' * Fix merge when optimize and add a test * Fix OPTIMIZE and INSERT + add tests * New fix for cleanup at the merge * Cleanup debug logs * Add the SETTINGS option 'clean_deleted_rows' that can be 'never' or 'always' * Fix regression bug; Now REplicatedMergeTree can be called as before without 'is_deleted' * Add Replicated tests * Disable tag 'long' for our test and cleanup some white spaces * Update tests * Fix tests and remove additional useless whitespace * Fix replica test * Style clean && add condition check for is_deleted values * clean_deleted_rows settings is nom an enum * Add valid default value to the clean_deleted_rows settings * Update cleanup checkers to use the enum and fix typos in the test * Fix submodule contrib/AMQP-CPP pointer * Add missing messages in test reference and remove a print with non derterministic order * fix replica test reference * Fix edge case * Fix a typo for the spell checker * Fix reference * Fix a condition to raise an error if is_deleted differ from 0/1 and cleanup * Change tests file name and update number * This should fix the ReplacingMergeTree parameter set * Fix replicated parameters * Disable allow_deprecated_syntax_for_merge_tree for our new column * Fix a test * Remove non deterministic order print in the test * Test on replicas * Remove a condition, when checking optional parameters, that should not be sueful since we disabled the deprected_syntaxe * Revert "Remove a condition, when checking optional parameters, that should not be useful since we disabled the deprected_syntaxe" This reverts commit b65d64c05e482945ac20fcfcf0311e1b028ea137. * Fix replica management and limit the number of argument to two maximum, due to the possiblity of deprecated table create/attach failing otherwise * Test a fix for replicated log information error * Try to add sync to have consistent results * Change path of replicas that should cause one issue and add few prints in case it's not that * Get cleanup info on replicas only if information found * Fix style issues * Try to avoid replication error 'cannot select parts...' and and replica read/write field order * Cleanup according to PR reviews and add tests on error raised. * Update src/Storages/MergeTree/registerStorageMergeTree.cpp Co-authored-by: Alexander Tokmakov <tavplubix@gmail.com> * Select ... FINAL don't show rows with is_deleted = true * Update and fix SELECT ... FINAL merge parameter * Remove is_deleted rows only on the version inserted when merge * Fix (master) updates issues * Revert changes that should not be commited * Add changes according to review * Revert changes that should not be commited - part 2 --------- Co-authored-by: Alexander Tokmakov <tavplubix@gmail.com>
2023-02-16 13:03:16 +00:00
cleanup,
2022-02-14 19:50:08 +00:00
storage.merging_params,
txn);
}
void MergePlainMergeTreeTask::finish()
{
new_part = merge_task->getFuture().get();
2022-06-24 11:19:29 +00:00
MergeTreeData::Transaction transaction(storage, txn.get());
2022-10-22 22:51:59 +00:00
storage.merger_mutator.renameMergedTemporaryPart(new_part, future_part->parts, txn, transaction);
2022-06-24 11:19:29 +00:00
transaction.commit();
write_part_log({});
storage.incrementMergedPartsProfileEvent(new_part->getType());
transfer_profile_counters_to_initial_query();
}
ContextMutablePtr MergePlainMergeTreeTask::createTaskContext() const
{
auto context = Context::createCopy(storage.getContext());
context->makeQueryContext();
auto queryId = storage.getStorageID().getShortName() + "::" + future_part->name;
2023-03-29 05:41:31 +00:00
context->setCurrentQueryId(queryId);
return context;
}
}