[feat] Add ProfileEvents map to PartLog

closes #10316
This commit is contained in:
John Skopis 2021-11-19 15:19:20 +00:00 committed by vdimir
parent 726fb4bebc
commit f7604cc686
No known key found for this signature in database
GPG Key ID: 6EE4CE2BEDC51862
18 changed files with 239 additions and 15 deletions

View File

@ -13,6 +13,7 @@
M(BackgroundMergesAndMutationsPoolTask, "Number of active merges and mutations in an associated background pool") \
M(BackgroundFetchesPoolTask, "Number of active fetches in an associated background pool") \
M(BackgroundCommonPoolTask, "Number of active tasks in an associated background pool") \
M(BackgroundMaintPoolTask, "Number of active tasks in BackgroundProcessingPool (maint)") \
M(BackgroundMovePoolTask, "Number of active tasks in BackgroundProcessingPool for moves") \
M(BackgroundSchedulePoolTask, "Number of active tasks in BackgroundSchedulePool. This pool is used for periodic ReplicatedMergeTree tasks, like cleaning old data parts, altering data parts, replica re-initialization, etc.") \
M(BackgroundBufferFlushSchedulePoolTask, "Number of active tasks in BackgroundBufferFlushSchedulePool. This pool is used for periodic Buffer flushes") \

View File

@ -11,6 +11,9 @@
#include <Storages/MergeTree/MergeTreeData.h>
#include <Interpreters/PartLog.h>
#include <Interpreters/Context.h>
#include <Interpreters/ProfileEventsExt.h>
#include <Common/ProfileEvents.h>
#include <DataTypes/DataTypeMap.h>
#include <Common/CurrentThread.h>
@ -121,6 +124,17 @@ NamesAndTypesList PartLogElement::getNamesAndTypes()
/// Is there an error during the execution or commit
{"error", std::make_shared<DataTypeUInt16>()},
{"exception", std::make_shared<DataTypeString>()},
{"ProfileEvents", std::make_shared<DataTypeMap>(std::make_shared<DataTypeString>(), std::make_shared<DataTypeUInt64>())},
};
}
NamesAndAliases PartLogElement::getNamesAndAliases()
{
return
{
{"ProfileEvents.Names", {std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>())}, "mapKeys(ProfileEvents)"},
{"ProfileEvents.Values", {std::make_shared<DataTypeArray>(std::make_shared<DataTypeUInt64>())}, "mapValues(ProfileEvents)"},
};
}
@ -163,18 +177,28 @@ void PartLogElement::appendToBlock(MutableColumns & columns) const
columns[i++]->insert(error);
columns[i++]->insert(exception);
if (profile_counters)
{
auto * column = columns[i++].get();
ProfileEvents::dumpToMapColumn(*profile_counters, column, true);
}
else
{
columns[i++]->insertDefault();
}
}
bool PartLog::addNewPart(
ContextPtr current_context, const MutableDataPartPtr & part, UInt64 elapsed_ns, const ExecutionStatus & execution_status)
ContextPtr current_context, const MutableDataPartPtr & part, UInt64 elapsed_ns, const ExecutionStatus & execution_status, std::shared_ptr<ProfileEvents::Counters::Snapshot> profile_counters_)
{
return addNewParts(current_context, {part}, elapsed_ns, execution_status);
return addNewParts(current_context, {part}, elapsed_ns, execution_status, profile_counters_);
}
bool PartLog::addNewParts(
ContextPtr current_context, const PartLog::MutableDataPartsVector & parts, UInt64 elapsed_ns, const ExecutionStatus & execution_status)
ContextPtr current_context, const PartLog::MutableDataPartsVector & parts, UInt64 elapsed_ns, const ExecutionStatus & execution_status, std::shared_ptr<ProfileEvents::Counters::Snapshot> profile_counters_)
{
if (parts.empty())
return true;
@ -221,6 +245,8 @@ bool PartLog::addNewParts(
elem.error = static_cast<UInt16>(execution_status.code);
elem.exception = execution_status.message;
elem.profile_counters = profile_counters_;
part_log->add(elem);
}
}

View File

@ -8,6 +8,10 @@
#include <Storages/MergeTree/MergeType.h>
#include <Storages/MergeTree/MergeAlgorithm.h>
namespace ProfileEvents
{
class Counters;
}
namespace DB
{
@ -81,13 +85,15 @@ struct PartLogElement
UInt16 error = 0;
String exception;
std::shared_ptr<ProfileEvents::Counters::Snapshot> profile_counters;
static std::string name() { return "PartLog"; }
static MergeReasonType getMergeReasonType(MergeType merge_type);
static PartMergeAlgorithm getMergeAlgorithm(MergeAlgorithm merge_algorithm_);
static NamesAndTypesList getNamesAndTypes();
static NamesAndAliases getNamesAndAliases() { return {}; }
static NamesAndAliases getNamesAndAliases();
void appendToBlock(MutableColumns & columns) const;
static const char * getCustomColumnList() { return nullptr; }
};
@ -106,9 +112,11 @@ class PartLog : public SystemLog<PartLogElement>
public:
/// Add a record about creation of new part.
static bool addNewPart(ContextPtr context, const MutableDataPartPtr & part, UInt64 elapsed_ns,
const ExecutionStatus & execution_status = {});
const ExecutionStatus & execution_status = {},
std::shared_ptr<ProfileEvents::Counters::Snapshot> profile_counters_ = {});
static bool addNewParts(ContextPtr context, const MutableDataPartsVector & parts, UInt64 elapsed_ns,
const ExecutionStatus & execution_status = {});
const ExecutionStatus & execution_status = {},
std::shared_ptr<ProfileEvents::Counters::Snapshot> profile_counters_ = {});
};
}

View File

@ -29,7 +29,9 @@ class IExecutableTask
{
public:
using TaskResultCallback = std::function<void(bool)>;
virtual bool onResume() = 0;
virtual bool executeStep() = 0;
virtual bool onSuspend() = 0;
virtual void onCompleted() = 0;
virtual StorageID getStorageID() = 0;
virtual UInt64 getPriority() = 0;
@ -54,6 +56,11 @@ public:
, job_result_callback(std::forward<Callback>(job_result_callback_))
, id(id_) {}
bool onResume() override
{
return true;
}
bool executeStep() override
{
res = job_to_execute();
@ -61,6 +68,11 @@ public:
return false;
}
bool onSuspend() override
{
return true;
}
void onCompleted() override { job_result_callback(!res); }
StorageID getStorageID() override { return id; }
UInt64 getPriority() override

View File

@ -289,9 +289,12 @@ ReplicatedMergeMutateTaskBase::PrepareResult MergeFromLogEntryTask::prepare()
return {true, true, [this, stopwatch = *stopwatch_ptr] (const ExecutionStatus & execution_status)
{
auto & thread_status = CurrentThread::get();
thread_status.finalizePerformanceCounters();
auto profile_counters = std::make_shared<ProfileEvents::Counters::Snapshot>(thread_status.performance_counters.getPartiallyAtomicSnapshot());
storage.writePartLog(
PartLogElement::MERGE_PARTS, execution_status, stopwatch.elapsed(),
entry.new_part_name, part, parts, merge_mutate_entry.get());
entry.new_part_name, part, parts, merge_mutate_entry.get(), profile_counters);
}};
}

View File

@ -85,6 +85,9 @@ void MergePlainMergeTreeTask::prepare()
write_part_log = [this] (const ExecutionStatus & execution_status)
{
auto & thread_status = CurrentThread::get();
thread_status.finalizePerformanceCounters();
auto profile_counters = std::make_shared<ProfileEvents::Counters::Snapshot>(thread_status.performance_counters.getPartiallyAtomicSnapshot());
merge_task.reset();
storage.writePartLog(
PartLogElement::MERGE_PARTS,
@ -93,7 +96,8 @@ void MergePlainMergeTreeTask::prepare()
future_part->name,
new_part,
future_part->parts,
merge_list_entry.get());
merge_list_entry.get(),
profile_counters);
};
merge_task = storage.merger_mutator.mergePartsToTemporaryPart(

View File

@ -5,6 +5,8 @@
#include <Storages/MutationCommands.h>
#include <Storages/MergeTree/MergeMutateSelectedEntry.h>
#include <Interpreters/MergeTreeTransactionHolder.h>
#include <Storages/MergeTree/TaskObserverMetrics.h>
namespace DB
{
@ -34,7 +36,18 @@ public:
priority += item->getBytesOnDisk();
}
bool onResume() override
{
return observer.doResume();
}
bool executeStep() override;
bool onSuspend() override
{
return observer.doSuspend();
}
void onCompleted() override;
StorageID getStorageID() override;
UInt64 getPriority() override { return priority; }
@ -82,6 +95,8 @@ private:
MergeTreeTransactionHolder txn_holder;
MergeTreeTransactionPtr txn;
TaskObserverMetrics observer;
};

View File

@ -130,6 +130,15 @@ void MergeTreeBackgroundExecutor<Queue>::routine(TaskRuntimeDataPtr item)
bool need_execute_again = false;
try
{
item->task->onResume();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
try
{
ALLOW_ALLOCATIONS_IN_SCOPE;
@ -153,6 +162,15 @@ void MergeTreeBackgroundExecutor<Queue>::routine(TaskRuntimeDataPtr item)
});
}
try
{
item->task->onSuspend();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
if (need_execute_again)
{
std::lock_guard guard(mutex);

View File

@ -7280,7 +7280,8 @@ void MergeTreeData::writePartLog(
const String & new_part_name,
const DataPartPtr & result_part,
const DataPartsVector & source_parts,
const MergeListEntry * merge_entry)
const MergeListEntry * merge_entry,
std::shared_ptr<ProfileEvents::Counters::Snapshot> profile_counters)
try
{
auto table_id = getStorageID();
@ -7342,6 +7343,11 @@ try
part_log_elem.peak_memory_usage = (*merge_entry)->memory_tracker.getPeak();
}
if (profile_counters)
{
part_log_elem.profile_counters = profile_counters;
}
part_log->add(part_log_elem);
}
catch (...)

View File

@ -1298,7 +1298,8 @@ protected:
const String & new_part_name,
const DataPartPtr & result_part,
const DataPartsVector & source_parts,
const MergeListEntry * merge_entry);
const MergeListEntry * merge_entry,
std::shared_ptr<ProfileEvents::Counters::Snapshot> profile_counters = nullptr);
/// If part is assigned to merge or mutation (possibly replicated)
/// Should be overridden by children, because they can have different

View File

@ -184,9 +184,12 @@ ReplicatedMergeMutateTaskBase::PrepareResult MutateFromLogEntryTask::prepare()
return {true, true, [this] (const ExecutionStatus & execution_status)
{
auto & thread_status = CurrentThread::get();
thread_status.finalizePerformanceCounters();
auto profile_counters = std::make_shared<ProfileEvents::Counters::Snapshot>(thread_status.performance_counters.getPartiallyAtomicSnapshot());
storage.writePartLog(
PartLogElement::MUTATE_PART, execution_status, stopwatch_ptr->elapsed(),
entry.new_part_name, new_part, future_mutated_part->parts, merge_mutate_entry.get());
entry.new_part_name, new_part, future_mutated_part->parts, merge_mutate_entry.get(), profile_counters);
}};
}

View File

@ -38,6 +38,9 @@ void MutatePlainMergeTreeTask::prepare()
write_part_log = [this] (const ExecutionStatus & execution_status)
{
auto & thread_status = CurrentThread::get();
thread_status.finalizePerformanceCounters();
auto profile_counters = std::make_shared<ProfileEvents::Counters::Snapshot>(thread_status.performance_counters.getPartiallyAtomicSnapshot());
mutate_task.reset();
storage.writePartLog(
PartLogElement::MUTATE_PART,
@ -46,7 +49,8 @@ void MutatePlainMergeTreeTask::prepare()
future_part->name,
new_part,
future_part->parts,
merge_list_entry.get());
merge_list_entry.get(),
profile_counters);
};
fake_query_context = Context::createCopy(storage.getContext());

View File

@ -8,6 +8,7 @@
#include <Storages/MergeTree/MutateTask.h>
#include <Storages/MutationCommands.h>
#include <Storages/MergeTree/MergeMutateSelectedEntry.h>
#include <Storages/MergeTree/TaskObserverMetrics.h>
namespace DB
{
@ -38,7 +39,18 @@ public:
priority += part->getBytesOnDisk();
}
bool onSuspend() override
{
return observer.doSuspend();
}
bool executeStep() override;
bool onResume() override
{
return observer.doResume();
}
void onCompleted() override;
StorageID getStorageID() override;
UInt64 getPriority() override { return priority; }
@ -76,6 +88,8 @@ private:
ContextMutablePtr fake_query_context;
MutateTaskPtr mutate_task;
TaskObserverMetrics observer;
};

View File

@ -17,6 +17,7 @@
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Storages/MergeTree/StorageFromMergeTreeDataPart.h>
#include <Storages/MergeTree/MergeTreeDataWriter.h>
#include <Storages/MergeTree/TaskObserverMetrics.h>
#include <Storages/MutationCommands.h>
#include <Storages/MergeTree/MergeTreeDataMergerMutator.h>
#include <boost/algorithm/string/replace.hpp>
@ -812,6 +813,11 @@ public:
StorageID getStorageID() override { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not implemented"); }
UInt64 getPriority() override { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not implemented"); }
bool onSuspend() override
{
return observer.doSuspend();
}
bool executeStep() override
{
auto & current_level_parts = level_parts[current_level];
@ -898,6 +904,12 @@ public:
/// Need execute again
return true;
}
bool onResume() override
{
return observer.doResume();
}
private:
String name;
MergeTreeData::MutableDataPartsVector parts;
@ -913,6 +925,8 @@ private:
/// TODO(nikitamikhaylov): make this constant a setting
static constexpr size_t max_parts_to_merge_in_one_level = 10;
TaskObserverMetrics observer;
};
@ -1134,6 +1148,12 @@ public:
StorageID getStorageID() override { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not implemented"); }
UInt64 getPriority() override { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not implemented"); }
bool onSuspend() override
{
return observer.doSuspend();
}
bool executeStep() override
{
switch (state)
@ -1168,6 +1188,11 @@ public:
return false;
}
bool onResume() override
{
return observer.doResume();
}
private:
void prepare()
@ -1251,6 +1276,8 @@ private:
MutationContextPtr ctx;
std::unique_ptr<PartMergerWriter> part_merger_writer_task;
TaskObserverMetrics observer;
};
class MutateSomePartColumnsTask : public IExecutableTask
@ -1262,6 +1289,11 @@ public:
StorageID getStorageID() override { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not implemented"); }
UInt64 getPriority() override { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not implemented"); }
bool onSuspend() override
{
return observer.doSuspend();
}
bool executeStep() override
{
switch (state)
@ -1295,6 +1327,11 @@ public:
return false;
}
bool onResume() override
{
return observer.doResume();
}
private:
void prepare()
@ -1455,6 +1492,7 @@ private:
MergedColumnOnlyOutputStreamPtr out;
std::unique_ptr<PartMergerWriter> part_merger_writer_task{nullptr};
TaskObserverMetrics observer;
};

View File

@ -4,6 +4,7 @@
#include <Storages/MergeTree/IExecutableTask.h>
#include <Storages/MergeTree/ReplicatedMergeTreeQueue.h>
#include <Storages/MergeTree/TaskObserverMetrics.h>
namespace DB
{
@ -34,7 +35,15 @@ public:
~ReplicatedMergeMutateTaskBase() override = default;
void onCompleted() override;
StorageID getStorageID() override;
bool onSuspend() override
{
return observer.doSuspend();
}
bool executeStep() override;
bool onResume() override
{
return observer.doResume();
}
protected:
using PartLogWriter = std::function<void(const ExecutionStatus &)>;
@ -83,6 +92,7 @@ private:
PartLogWriter part_log_writer{};
State state{State::NEED_PREPARE};
IExecutableTask::TaskResultCallback task_result_callback;
TaskObserverMetrics observer;
};
}

View File

@ -0,0 +1,37 @@
#pragma once
#include <Common/ThreadStatus.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
class TaskObserverMetrics : public boost::noncopyable
{
public:
TaskObserverMetrics() : thread_group(std::make_shared<ThreadGroupStatus>()) {}
~TaskObserverMetrics() {}
bool doResume()
{
CurrentThread::attachTo(thread_group);
return true;
}
bool doSuspend()
{
CurrentThread::detachQueryIfNotDetached();
return true;
}
private:
ThreadGroupStatusPtr thread_group;
};
}

View File

@ -24,6 +24,11 @@ public:
{
}
bool onSuspend() override
{
suspend_calls++
}
bool executeStep() override
{
auto sleep_time = distribution(generator);
@ -36,6 +41,11 @@ public:
return false;
}
bool onSuspend() override
{
resume_calls++
}
StorageID getStorageID() override
{
return {"test", name};
@ -55,7 +65,9 @@ private:
std::uniform_int_distribution<> distribution;
String name;
size_t suspend_calls;
std::function<void()> on_completed;
size_t resume_calls;
};
@ -93,6 +105,9 @@ TEST(Executor, RemoveTasks)
thread.join();
ASSERT_EQ(CurrentMetrics::values[CurrentMetrics::BackgroundMergesAndMutationsPoolTask], 0);
/// TODO: move to a test by itself
ASSERT_EQ(batch*tasks_kinds, suspend_calls);
ASSERT_EQ(batch*tasks_kinds, resume_calls);
executor->wait();
}

View File

@ -1602,8 +1602,11 @@ bool StorageReplicatedMergeTree::executeLogEntry(LogEntry & entry)
renameTempPartAndReplace(part, transaction);
checkPartChecksumsAndCommit(transaction, part);
auto & thread_status = CurrentThread::get();
thread_status.finalizePerformanceCounters();
auto profile_counters = std::make_shared<ProfileEvents::Counters::Snapshot>(thread_status.performance_counters.getPartiallyAtomicSnapshot());
writePartLog(PartLogElement::Type::NEW_PART, {}, 0 /** log entry is fake so we don't measure the time */,
part->name, part, {} /** log entry is fake so there are no initial parts */, nullptr);
part->name, part, {} /** log entry is fake so there are no initial parts */, nullptr, profile_counters);
return true;
}
@ -4007,9 +4010,12 @@ bool StorageReplicatedMergeTree::fetchPart(
auto write_part_log = [&] (const ExecutionStatus & execution_status)
{
auto & thread_status = CurrentThread::get();
thread_status.finalizePerformanceCounters();
auto profile_counters = std::make_shared<ProfileEvents::Counters::Snapshot>(thread_status.performance_counters.getPartiallyAtomicSnapshot());
writePartLog(
PartLogElement::DOWNLOAD_PART, execution_status, stopwatch.elapsed(),
part_name, part, replaced_parts, nullptr);
part_name, part, replaced_parts, nullptr, profile_counters);
};
DataPartPtr part_to_clone;
@ -4243,9 +4249,12 @@ MutableDataPartStoragePtr StorageReplicatedMergeTree::fetchExistsPart(
auto write_part_log = [&] (const ExecutionStatus & execution_status)
{
auto & thread_status = CurrentThread::get();
thread_status.finalizePerformanceCounters();
auto profile_counters = std::make_shared<ProfileEvents::Counters::Snapshot>(thread_status.performance_counters.getPartiallyAtomicSnapshot());
writePartLog(
PartLogElement::DOWNLOAD_PART, execution_status, stopwatch.elapsed(),
part_name, part, replaced_parts, nullptr);
part_name, part, replaced_parts, nullptr, profile_counters);
};
std::function<MutableDataPartPtr()> get_part;