2021-09-16 21:19:58 +00:00
|
|
|
#include <Storages/MergeTree/MutatePlainMergeTreeTask.h>
|
|
|
|
|
|
|
|
#include <Storages/StorageMergeTree.h>
|
2021-12-14 20:06:34 +00:00
|
|
|
#include <Interpreters/TransactionLog.h>
|
2023-01-23 12:45:28 +00:00
|
|
|
#include <Common/ProfileEventsScope.h>
|
2021-09-16 21:19:58 +00:00
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
|
|
|
namespace ErrorCodes
|
|
|
|
{
|
|
|
|
extern const int LOGICAL_ERROR;
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
StorageID MutatePlainMergeTreeTask::getStorageID()
|
|
|
|
{
|
|
|
|
return storage.getStorageID();
|
|
|
|
}
|
|
|
|
|
|
|
|
void MutatePlainMergeTreeTask::onCompleted()
|
|
|
|
{
|
|
|
|
bool delay = state == State::SUCCESS;
|
|
|
|
task_result_callback(delay);
|
|
|
|
}
|
|
|
|
|
|
|
|
void MutatePlainMergeTreeTask::prepare()
|
|
|
|
{
|
|
|
|
future_part = merge_mutate_entry->future_part;
|
|
|
|
|
2023-04-03 20:01:18 +00:00
|
|
|
task_context = createTaskContext();
|
2021-10-03 08:21:54 +00:00
|
|
|
merge_list_entry = storage.getContext()->getMergeList().insert(
|
|
|
|
storage.getStorageID(),
|
|
|
|
future_part,
|
2023-04-03 20:01:18 +00:00
|
|
|
task_context);
|
2021-10-03 08:21:54 +00:00
|
|
|
|
2021-09-16 21:19:58 +00:00
|
|
|
stopwatch = std::make_unique<Stopwatch>();
|
|
|
|
|
|
|
|
write_part_log = [this] (const ExecutionStatus & execution_status)
|
|
|
|
{
|
2023-01-23 12:45:28 +00:00
|
|
|
auto profile_counters_snapshot = std::make_shared<ProfileEvents::Counters::Snapshot>(profile_counters.getPartiallyAtomicSnapshot());
|
2021-10-03 22:41:35 +00:00
|
|
|
mutate_task.reset();
|
2021-09-16 21:19:58 +00:00
|
|
|
storage.writePartLog(
|
|
|
|
PartLogElement::MUTATE_PART,
|
|
|
|
execution_status,
|
|
|
|
stopwatch->elapsed(),
|
|
|
|
future_part->name,
|
|
|
|
new_part,
|
|
|
|
future_part->parts,
|
2021-11-19 15:19:20 +00:00
|
|
|
merge_list_entry.get(),
|
2023-01-23 12:45:28 +00:00
|
|
|
std::move(profile_counters_snapshot));
|
2021-09-16 21:19:58 +00:00
|
|
|
};
|
|
|
|
|
2023-04-13 13:11:16 +00:00
|
|
|
if (task_context->getSettingsRef().enable_sharing_sets_for_mutations)
|
|
|
|
{
|
|
|
|
/// If we have a prepared sets cache for this mutations, we will use it.
|
|
|
|
auto mutation_id = future_part->part_info.mutation;
|
|
|
|
auto prepared_sets_cache_for_mutation = storage.getPreparedSetsCache(mutation_id);
|
|
|
|
task_context->setPreparedSetsCache(prepared_sets_cache_for_mutation);
|
|
|
|
}
|
2023-03-02 19:19:58 +00:00
|
|
|
|
2021-09-16 21:19:58 +00:00
|
|
|
mutate_task = storage.merger_mutator.mutatePartToTemporaryPart(
|
|
|
|
future_part, metadata_snapshot, merge_mutate_entry->commands, merge_list_entry.get(),
|
2023-04-03 20:01:18 +00:00
|
|
|
time(nullptr), task_context, merge_mutate_entry->txn, merge_mutate_entry->tagger->reserved_space, table_lock_holder);
|
2021-09-16 21:19:58 +00:00
|
|
|
}
|
|
|
|
|
2022-08-04 04:07:32 +00:00
|
|
|
|
2021-09-16 21:19:58 +00:00
|
|
|
bool MutatePlainMergeTreeTask::executeStep()
|
|
|
|
{
|
2023-01-23 12:45:28 +00:00
|
|
|
/// Metrics will be saved in the local profile_counters.
|
2023-01-24 08:24:36 +00:00
|
|
|
ProfileEventsScope profile_events_scope(&profile_counters);
|
2022-08-04 04:07:32 +00:00
|
|
|
|
2021-09-16 21:19:58 +00:00
|
|
|
/// Make out memory tracker a parent of current thread memory tracker
|
2023-03-03 22:09:36 +00:00
|
|
|
std::optional<ThreadGroupSwitcher> switcher;
|
2021-09-16 21:19:58 +00:00
|
|
|
if (merge_list_entry)
|
2023-03-27 12:31:29 +00:00
|
|
|
switcher.emplace((*merge_list_entry)->thread_group);
|
2021-09-16 21:19:58 +00:00
|
|
|
|
|
|
|
switch (state)
|
|
|
|
{
|
2022-06-28 10:51:49 +00:00
|
|
|
case State::NEED_PREPARE:
|
2021-09-16 21:19:58 +00:00
|
|
|
{
|
|
|
|
prepare();
|
|
|
|
state = State::NEED_EXECUTE;
|
|
|
|
return true;
|
|
|
|
}
|
2022-06-28 10:51:49 +00:00
|
|
|
case State::NEED_EXECUTE:
|
2021-09-16 21:19:58 +00:00
|
|
|
{
|
|
|
|
try
|
|
|
|
{
|
|
|
|
if (mutate_task->execute())
|
|
|
|
return true;
|
|
|
|
|
|
|
|
new_part = mutate_task->getFuture().get();
|
2023-01-25 17:34:09 +00:00
|
|
|
auto & data_part_storage = new_part->getDataPartStorage();
|
|
|
|
if (data_part_storage.hasActiveTransaction())
|
|
|
|
data_part_storage.precommitTransaction();
|
2021-09-16 21:19:58 +00:00
|
|
|
|
2022-06-24 11:19:29 +00:00
|
|
|
MergeTreeData::Transaction transaction(storage, merge_mutate_entry->txn.get());
|
2022-03-09 20:38:18 +00:00
|
|
|
/// FIXME Transactions: it's too optimistic, better to lock parts before starting transaction
|
2022-10-22 22:51:59 +00:00
|
|
|
storage.renameTempPartAndReplace(new_part, transaction);
|
2022-06-24 11:19:29 +00:00
|
|
|
transaction.commit();
|
|
|
|
|
2021-09-16 21:19:58 +00:00
|
|
|
storage.updateMutationEntriesErrors(future_part, true, "");
|
|
|
|
write_part_log({});
|
|
|
|
|
|
|
|
state = State::NEED_FINISH;
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
catch (...)
|
|
|
|
{
|
2021-12-14 20:06:34 +00:00
|
|
|
if (merge_mutate_entry->txn)
|
|
|
|
merge_mutate_entry->txn->onException();
|
2023-01-17 00:19:44 +00:00
|
|
|
PreformattedMessage exception_message = getCurrentExceptionMessageAndPattern(/* with_stacktrace */ false);
|
|
|
|
LOG_ERROR(&Poco::Logger::get("MutatePlainMergeTreeTask"), exception_message);
|
2023-01-25 20:16:42 +00:00
|
|
|
storage.updateMutationEntriesErrors(future_part, false, exception_message.text);
|
2023-02-03 06:41:27 +00:00
|
|
|
write_part_log(ExecutionStatus::fromCurrentException("", true));
|
2022-03-06 10:39:49 +00:00
|
|
|
tryLogCurrentException(__PRETTY_FUNCTION__);
|
2021-09-16 21:19:58 +00:00
|
|
|
return false;
|
|
|
|
}
|
|
|
|
}
|
2023-01-25 17:34:09 +00:00
|
|
|
case State::NEED_FINISH:
|
2021-09-16 21:19:58 +00:00
|
|
|
{
|
|
|
|
// Nothing to do
|
|
|
|
state = State::SUCCESS;
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
case State::SUCCESS:
|
|
|
|
{
|
|
|
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "Task with state SUCCESS mustn't be executed again");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
2023-04-03 20:01:18 +00:00
|
|
|
ContextMutablePtr MutatePlainMergeTreeTask::createTaskContext() const
|
2023-03-28 12:39:36 +00:00
|
|
|
{
|
|
|
|
auto context = Context::createCopy(storage.getContext());
|
|
|
|
context->makeQueryContext();
|
|
|
|
auto queryId = storage.getStorageID().getShortName() + "::" + future_part->name;
|
2023-03-29 05:41:31 +00:00
|
|
|
context->setCurrentQueryId(queryId);
|
2023-03-28 12:39:36 +00:00
|
|
|
return context;
|
|
|
|
}
|
|
|
|
|
2021-09-16 21:19:58 +00:00
|
|
|
}
|