Remove trash

This commit is contained in:
Alexey Milovidov 2022-08-04 06:07:32 +02:00 committed by vdimir
parent 96e4411694
commit f554ff9d99
No known key found for this signature in database
GPG Key ID: 6EE4CE2BEDC51862
20 changed files with 63 additions and 173 deletions

View File

@ -13,7 +13,6 @@
M(BackgroundMergesAndMutationsPoolTask, "Number of active merges and mutations in an associated background pool") \
M(BackgroundFetchesPoolTask, "Number of active fetches in an associated background pool") \
M(BackgroundCommonPoolTask, "Number of active tasks in an associated background pool") \
M(BackgroundMaintPoolTask, "Number of active tasks in BackgroundProcessingPool (maint)") \
M(BackgroundMovePoolTask, "Number of active tasks in BackgroundProcessingPool for moves") \
M(BackgroundSchedulePoolTask, "Number of active tasks in BackgroundSchedulePool. This pool is used for periodic ReplicatedMergeTree tasks, like cleaning old data parts, altering data parts, replica re-initialization, etc.") \
M(BackgroundBufferFlushSchedulePoolTask, "Number of active tasks in BackgroundBufferFlushSchedulePool. This pool is used for periodic Buffer flushes") \

View File

@ -92,7 +92,7 @@ public:
static void detachQueryIfNotDetached();
/// Initializes query with current thread as master thread in constructor, and detaches it in destructor
struct QueryScope
struct QueryScope : private boost::noncopyable
{
explicit QueryScope(ContextMutablePtr query_context);
explicit QueryScope(ContextPtr query_context);
@ -102,6 +102,20 @@ public:
bool log_peak_memory_usage_in_destructor = true;
};
class ScopedAttach : private boost::noncopyable
{
public:
explicit ScopedAttach(const ThreadGroupStatusPtr & thread_group)
{
CurrentThread::attachTo(thread_group);
}
~ScopedAttach()
{
CurrentThread::detachQuery();
}
};
private:
static void defaultThreadDeleter();

View File

@ -193,7 +193,7 @@ void PartLogElement::appendToBlock(MutableColumns & columns) const
bool PartLog::addNewPart(
ContextPtr current_context, const MutableDataPartPtr & part, UInt64 elapsed_ns, const ExecutionStatus & execution_status, std::shared_ptr<ProfileEvents::Counters::Snapshot> profile_counters_)
{
return addNewParts(current_context, {part}, elapsed_ns, execution_status, profile_counters_);
return addNewParts(current_context, {part}, elapsed_ns, execution_status, std::move(profile_counters_));
}

View File

@ -29,9 +29,7 @@ class IExecutableTask
{
public:
using TaskResultCallback = std::function<void(bool)>;
virtual bool onResume() = 0;
virtual bool executeStep() = 0;
virtual bool onSuspend() = 0;
virtual void onCompleted() = 0;
virtual StorageID getStorageID() = 0;
virtual UInt64 getPriority() = 0;
@ -56,11 +54,6 @@ public:
, job_result_callback(std::forward<Callback>(job_result_callback_))
, id(id_) {}
bool onResume() override
{
return true;
}
bool executeStep() override
{
res = job_to_execute();
@ -68,11 +61,6 @@ public:
return false;
}
bool onSuspend() override
{
return true;
}
void onCompleted() override { job_result_callback(!res); }
StorageID getStorageID() override { return id; }
UInt64 getPriority() override

View File

@ -294,7 +294,7 @@ ReplicatedMergeMutateTaskBase::PrepareResult MergeFromLogEntryTask::prepare()
auto profile_counters = std::make_shared<ProfileEvents::Counters::Snapshot>(thread_status.performance_counters.getPartiallyAtomicSnapshot());
storage.writePartLog(
PartLogElement::MERGE_PARTS, execution_status, stopwatch.elapsed(),
entry.new_part_name, part, parts, merge_mutate_entry.get(), profile_counters);
entry.new_part_name, part, parts, merge_mutate_entry.get(), std::move(profile_counters));
}};
}

View File

@ -27,6 +27,9 @@ void MergePlainMergeTreeTask::onCompleted()
bool MergePlainMergeTreeTask::executeStep()
{
/// Metrics will be saved in the thread_group.
CurrentThread::ScopedAttach scoped_attach(thread_group);
/// Make out memory tracker a parent of current thread memory tracker
MemoryTrackerThreadSwitcherPtr switcher;
if (merge_list_entry)

View File

@ -5,7 +5,6 @@
#include <Storages/MutationCommands.h>
#include <Storages/MergeTree/MergeMutateSelectedEntry.h>
#include <Interpreters/MergeTreeTransactionHolder.h>
#include <Storages/MergeTree/TaskObserverMetrics.h>
namespace DB
@ -36,18 +35,7 @@ public:
priority += item->getBytesOnDisk();
}
bool onResume() override
{
return observer.doResume();
}
bool executeStep() override;
bool onSuspend() override
{
return observer.doSuspend();
}
void onCompleted() override;
StorageID getStorageID() override;
UInt64 getPriority() override { return priority; }
@ -59,7 +47,6 @@ public:
}
private:
void prepare();
void finish();
@ -95,8 +82,8 @@ private:
MergeTreeTransactionHolder txn_holder;
MergeTreeTransactionPtr txn;
TaskObserverMetrics observer;
ThreadGroupStatusPtr thread_group = std::make_shared<ThreadGroupStatus>();
};

View File

@ -130,15 +130,6 @@ void MergeTreeBackgroundExecutor<Queue>::routine(TaskRuntimeDataPtr item)
bool need_execute_again = false;
try
{
item->task->onResume();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
try
{
ALLOW_ALLOCATIONS_IN_SCOPE;
@ -162,15 +153,6 @@ void MergeTreeBackgroundExecutor<Queue>::routine(TaskRuntimeDataPtr item)
});
}
try
{
item->task->onSuspend();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
if (need_execute_again)
{
std::lock_guard guard(mutex);

View File

@ -17,6 +17,7 @@
#include <base/defines.h>
#include <Storages/MergeTree/IExecutableTask.h>
namespace DB
{
namespace ErrorCodes

View File

@ -7345,7 +7345,7 @@ try
if (profile_counters)
{
part_log_elem.profile_counters = profile_counters;
part_log_elem.profile_counters = profile_counters;
}
part_log->add(part_log_elem);
@ -7493,6 +7493,7 @@ bool MergeTreeData::moveParts(const CurrentlyMovingPartsTaggerPtr & moving_tagge
moving_part.part->name,
cloned_part,
{moving_part.part},
nullptr,
nullptr);
};

View File

@ -1299,7 +1299,7 @@ protected:
const DataPartPtr & result_part,
const DataPartsVector & source_parts,
const MergeListEntry * merge_entry,
std::shared_ptr<ProfileEvents::Counters::Snapshot> profile_counters = nullptr);
std::shared_ptr<ProfileEvents::Counters::Snapshot> profile_counters);
/// If part is assigned to merge or mutation (possibly replicated)
/// Should be overridden by children, because they can have different

View File

@ -189,7 +189,7 @@ ReplicatedMergeMutateTaskBase::PrepareResult MutateFromLogEntryTask::prepare()
auto profile_counters = std::make_shared<ProfileEvents::Counters::Snapshot>(thread_status.performance_counters.getPartiallyAtomicSnapshot());
storage.writePartLog(
PartLogElement::MUTATE_PART, execution_status, stopwatch_ptr->elapsed(),
entry.new_part_name, new_part, future_mutated_part->parts, merge_mutate_entry.get(), profile_counters);
entry.new_part_name, new_part, future_mutated_part->parts, merge_mutate_entry.get(), std::move(profile_counters));
}};
}

View File

@ -62,8 +62,12 @@ void MutatePlainMergeTreeTask::prepare()
time(nullptr), fake_query_context, merge_mutate_entry->txn, merge_mutate_entry->tagger->reserved_space, table_lock_holder);
}
bool MutatePlainMergeTreeTask::executeStep()
{
/// Metrics will be saved in the thread_group.
CurrentThread::ScopedAttach scoped_attach(thread_group);
/// Make out memory tracker a parent of current thread memory tracker
MemoryTrackerThreadSwitcherPtr switcher;
if (merge_list_entry)
@ -127,5 +131,4 @@ bool MutatePlainMergeTreeTask::executeStep()
return false;
}
}

View File

@ -8,7 +8,7 @@
#include <Storages/MergeTree/MutateTask.h>
#include <Storages/MutationCommands.h>
#include <Storages/MergeTree/MergeMutateSelectedEntry.h>
#include <Storages/MergeTree/TaskObserverMetrics.h>
namespace DB
{
@ -39,18 +39,7 @@ public:
priority += part->getBytesOnDisk();
}
bool onSuspend() override
{
return observer.doSuspend();
}
bool executeStep() override;
bool onResume() override
{
return observer.doResume();
}
void onCompleted() override;
StorageID getStorageID() override;
UInt64 getPriority() override { return priority; }
@ -89,7 +78,7 @@ private:
ContextMutablePtr fake_query_context;
MutateTaskPtr mutate_task;
TaskObserverMetrics observer;
ThreadGroupStatusPtr thread_group = std::make_shared<ThreadGroupStatus>();
};

View File

@ -17,7 +17,6 @@
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Storages/MergeTree/StorageFromMergeTreeDataPart.h>
#include <Storages/MergeTree/MergeTreeDataWriter.h>
#include <Storages/MergeTree/TaskObserverMetrics.h>
#include <Storages/MutationCommands.h>
#include <Storages/MergeTree/MergeTreeDataMergerMutator.h>
#include <boost/algorithm/string/replace.hpp>
@ -813,13 +812,11 @@ public:
StorageID getStorageID() override { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not implemented"); }
UInt64 getPriority() override { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not implemented"); }
bool onSuspend() override
{
return observer.doSuspend();
}
bool executeStep() override
{
/// Metrics will be saved in the thread_group.
CurrentThread::ScopedAttach scoped_attach(thread_group);
auto & current_level_parts = level_parts[current_level];
auto & next_level_parts = level_parts[next_level];
@ -905,11 +902,6 @@ public:
return true;
}
bool onResume() override
{
return observer.doResume();
}
private:
String name;
MergeTreeData::MutableDataPartsVector parts;
@ -926,7 +918,7 @@ private:
/// TODO(nikitamikhaylov): make this constant a setting
static constexpr size_t max_parts_to_merge_in_one_level = 10;
TaskObserverMetrics observer;
ThreadGroupStatusPtr thread_group = std::make_shared<ThreadGroupStatus>();
};
@ -937,9 +929,7 @@ private:
// In short it executed a mutation for the part an original part and for its every projection
/**
*
* An overview of how the process of mutation works for projections:
/** An overview of how the process of mutation works for projections:
*
* The mutation for original parts is executed block by block,
* but additionally we execute a SELECT query for each projection over a current block.
@ -1148,14 +1138,11 @@ public:
StorageID getStorageID() override { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not implemented"); }
UInt64 getPriority() override { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not implemented"); }
bool onSuspend() override
{
return observer.doSuspend();
}
bool executeStep() override
{
/// Metrics will be saved in the thread_group.
CurrentThread::ScopedAttach scoped_attach(thread_group);
switch (state)
{
case State::NEED_PREPARE:
@ -1188,13 +1175,7 @@ public:
return false;
}
bool onResume() override
{
return observer.doResume();
}
private:
void prepare()
{
if (ctx->new_data_part->isStoredOnDisk())
@ -1277,9 +1258,10 @@ private:
std::unique_ptr<PartMergerWriter> part_merger_writer_task;
TaskObserverMetrics observer;
ThreadGroupStatusPtr thread_group = std::make_shared<ThreadGroupStatus>();
};
class MutateSomePartColumnsTask : public IExecutableTask
{
public:
@ -1289,13 +1271,11 @@ public:
StorageID getStorageID() override { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not implemented"); }
UInt64 getPriority() override { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not implemented"); }
bool onSuspend() override
{
return observer.doSuspend();
}
bool executeStep() override
{
/// Metrics will be saved in the thread_group.
CurrentThread::ScopedAttach scoped_attach(thread_group);
switch (state)
{
case State::NEED_PREPARE:
@ -1327,13 +1307,7 @@ public:
return false;
}
bool onResume() override
{
return observer.doResume();
}
private:
void prepare()
{
if (ctx->execute_ttl_type != ExecuteTTLType::NONE)
@ -1492,7 +1466,7 @@ private:
MergedColumnOnlyOutputStreamPtr out;
std::unique_ptr<PartMergerWriter> part_merger_writer_task{nullptr};
TaskObserverMetrics observer;
ThreadGroupStatusPtr thread_group = std::make_shared<ThreadGroupStatus>();
};

View File

@ -29,6 +29,9 @@ void ReplicatedMergeMutateTaskBase::onCompleted()
bool ReplicatedMergeMutateTaskBase::executeStep()
{
/// Metrics will be saved in the thread_group.
CurrentThread::ScopedAttach scoped_attach(thread_group);
std::exception_ptr saved_exception;
bool retryable_error = false;
@ -83,7 +86,6 @@ bool ReplicatedMergeMutateTaskBase::executeStep()
saved_exception = std::current_exception();
}
if (!retryable_error && saved_exception)
{
std::lock_guard lock(storage.queue.state_mutex);

View File

@ -4,16 +4,16 @@
#include <Storages/MergeTree/IExecutableTask.h>
#include <Storages/MergeTree/ReplicatedMergeTreeQueue.h>
#include <Storages/MergeTree/TaskObserverMetrics.h>
namespace DB
{
class StorageReplicatedMergeTree;
/**
* This is used as a base of MergeFromLogEntryTask and MutateFromLogEntryTaskBase
*/
/** This is used as a base of MergeFromLogEntryTask and MutateFromLogEntryTaskBase
*/
class ReplicatedMergeMutateTaskBase : public IExecutableTask
{
public:
@ -33,17 +33,12 @@ public:
}
~ReplicatedMergeMutateTaskBase() override = default;
void onCompleted() override;
StorageID getStorageID() override;
bool onSuspend() override
{
return observer.doSuspend();
}
bool executeStep() override;
bool onResume() override
{
return observer.doResume();
}
protected:
using PartLogWriter = std::function<void(const ExecutionStatus &)>;
@ -70,7 +65,6 @@ protected:
StorageReplicatedMergeTree & storage;
private:
enum class CheckExistingPartResult
{
PART_EXISTS,
@ -78,7 +72,7 @@ private:
};
CheckExistingPartResult checkExistingPart();
bool executeImpl() ;
bool executeImpl();
enum class State
{
@ -92,7 +86,7 @@ private:
PartLogWriter part_log_writer{};
State state{State::NEED_PREPARE};
IExecutableTask::TaskResultCallback task_result_callback;
TaskObserverMetrics observer;
ThreadGroupStatusPtr thread_group = std::make_shared<ThreadGroupStatus>();
};
}

View File

@ -1,31 +0,0 @@
#pragma once
#include <Common/ThreadStatus.h>
namespace DB
{
class TaskObserverMetrics : public boost::noncopyable
{
public:
TaskObserverMetrics() : thread_group(std::make_shared<ThreadGroupStatus>()) { }
~TaskObserverMetrics() { }
bool doResume()
{
CurrentThread::attachTo(thread_group);
return true;
}
bool doSuspend()
{
CurrentThread::detachQueryIfNotDetached();
return true;
}
private:
ThreadGroupStatusPtr thread_group;
};
}

View File

@ -24,11 +24,6 @@ public:
{
}
bool onSuspend() override
{
suspend_calls++
}
bool executeStep() override
{
auto sleep_time = distribution(generator);
@ -41,11 +36,6 @@ public:
return false;
}
bool onSuspend() override
{
resume_calls++
}
StorageID getStorageID() override
{
return {"test", name};
@ -65,9 +55,7 @@ private:
std::uniform_int_distribution<> distribution;
String name;
size_t suspend_calls;
std::function<void()> on_completed;
size_t resume_calls;
};
@ -87,8 +75,7 @@ TEST(Executor, RemoveTasks)
for (size_t i = 0; i < batch; ++i)
for (size_t j = 0; j < tasks_kinds; ++j)
ASSERT_TRUE(
executor->trySchedule(std::make_shared<FakeExecutableTask>(std::to_string(j)))
);
executor->trySchedule(std::make_shared<FakeExecutableTask>(std::to_string(j))));
std::vector<std::thread> threads(batch);
@ -105,9 +92,6 @@ TEST(Executor, RemoveTasks)
thread.join();
ASSERT_EQ(CurrentMetrics::values[CurrentMetrics::BackgroundMergesAndMutationsPoolTask], 0);
/// TODO: move to a test by itself
ASSERT_EQ(batch*tasks_kinds, suspend_calls);
ASSERT_EQ(batch*tasks_kinds, resume_calls);
executor->wait();
}

View File

@ -1606,7 +1606,7 @@ bool StorageReplicatedMergeTree::executeLogEntry(LogEntry & entry)
thread_status.finalizePerformanceCounters();
auto profile_counters = std::make_shared<ProfileEvents::Counters::Snapshot>(thread_status.performance_counters.getPartiallyAtomicSnapshot());
writePartLog(PartLogElement::Type::NEW_PART, {}, 0 /** log entry is fake so we don't measure the time */,
part->name, part, {} /** log entry is fake so there are no initial parts */, nullptr, profile_counters);
part->name, part, {} /** log entry is fake so there are no initial parts */, nullptr, std::move(profile_counters));
return true;
}
@ -4015,7 +4015,7 @@ bool StorageReplicatedMergeTree::fetchPart(
auto profile_counters = std::make_shared<ProfileEvents::Counters::Snapshot>(thread_status.performance_counters.getPartiallyAtomicSnapshot());
writePartLog(
PartLogElement::DOWNLOAD_PART, execution_status, stopwatch.elapsed(),
part_name, part, replaced_parts, nullptr, profile_counters);
part_name, part, replaced_parts, nullptr, std::move(profile_counters));
};
DataPartPtr part_to_clone;
@ -4254,7 +4254,7 @@ MutableDataPartStoragePtr StorageReplicatedMergeTree::fetchExistsPart(
auto profile_counters = std::make_shared<ProfileEvents::Counters::Snapshot>(thread_status.performance_counters.getPartiallyAtomicSnapshot());
writePartLog(
PartLogElement::DOWNLOAD_PART, execution_status, stopwatch.elapsed(),
part_name, part, replaced_parts, nullptr, profile_counters);
part_name, part, replaced_parts, nullptr, std::move(profile_counters));
};
std::function<MutableDataPartPtr()> get_part;