[wip] ProfileCounters for each part

This commit is contained in:
vdimir 2023-01-23 12:45:28 +00:00
parent 5f9f72f937
commit a228f7f419
No known key found for this signature in database
GPG Key ID: 6EE4CE2BEDC51862
24 changed files with 277 additions and 99 deletions

View File

@ -40,6 +40,8 @@ ThreadStatus & CurrentThread::get()
ProfileEvents::Counters & CurrentThread::getProfileEvents()
{
if (unlikely(subthread_profile_events))
return *subthread_profile_events;
return current_thread ? current_thread->performance_counters : ProfileEvents::global_counters;
}

View File

@ -102,29 +102,6 @@ public:
bool log_peak_memory_usage_in_destructor = true;
};
class ScopedAttach : private boost::noncopyable
{
private:
bool attached = false;
public:
explicit ScopedAttach(const ThreadGroupStatusPtr & thread_group)
{
if (!CurrentThread::getGroup())
{
CurrentThread::attachToIfDetached(thread_group);
attached = true;
}
}
~ScopedAttach()
{
if (attached)
{
CurrentThread::detachQuery();
}
}
};
private:
static void defaultThreadDeleter();

View File

@ -0,0 +1,32 @@
#include <Common/ProfileEventsScope.h>
namespace DB
{
extern thread_local constinit ProfileEvents::Counters * subthread_profile_events;
ScopedProfileEvents::ScopedProfileEvents()
: performance_counters_holder(std::make_unique<ProfileEvents::Counters>())
, performance_counters_scope(performance_counters_holder.get())
{
CurrentThread::get().attachProfileCountersScope(performance_counters_scope);
}
ScopedProfileEvents::ScopedProfileEvents(ProfileEvents::Counters * performance_counters_scope_)
: performance_counters_scope(performance_counters_scope_)
{
CurrentThread::get().attachProfileCountersScope(performance_counters_scope);
}
std::shared_ptr<ProfileEvents::Counters::Snapshot> ScopedProfileEvents::getSnapshot()
{
return std::make_shared<ProfileEvents::Counters::Snapshot>(performance_counters_scope->getPartiallyAtomicSnapshot());
}
ScopedProfileEvents::~ScopedProfileEvents()
{
subthread_profile_events = nullptr;
}
}

View File

@ -0,0 +1,35 @@
#pragma once
#include <Common/ProfileEvents.h>
#include <Common/CurrentThread.h>
namespace DB
{
/// Use specific performance counters for current thread in the current scope.
class ScopedProfileEvents : private boost::noncopyable
{
public:
/// Counters are owned by this object.
ScopedProfileEvents();
/// Shared counters are stored outisde.
/// Useful when we calculate metrics entering into some scope several times.
explicit ScopedProfileEvents(ProfileEvents::Counters * performance_counters_scope_);
std::shared_ptr<ProfileEvents::Counters::Snapshot> getSnapshot();
~ScopedProfileEvents();
private:
/// If set, then performance_counters_scope is owned by this object.
/// Otherwise, counters are passed to the constructor from outside.
std::unique_ptr<ProfileEvents::Counters> performance_counters_holder;
ProfileEvents::Counters * performance_counters_scope;
};
}

View File

@ -25,6 +25,7 @@ namespace ErrorCodes
}
thread_local ThreadStatus constinit * current_thread = nullptr;
thread_local ProfileEvents::Counters constinit * subthread_profile_events = nullptr;
#if !defined(SANITIZER)
namespace

View File

@ -107,6 +107,7 @@ using ThreadGroupStatusPtr = std::shared_ptr<ThreadGroupStatus>;
* - https://github.com/ClickHouse/ClickHouse/pull/40078
*/
extern thread_local constinit ThreadStatus * current_thread;
extern thread_local constinit ProfileEvents::Counters * subthread_profile_events;
/** Encapsulates all per-thread info (ProfileEvents, MemoryTracker, query_id, query context, etc.).
* The object must be created in thread function and destroyed in the same thread before the exit.
@ -139,6 +140,7 @@ public:
Deleter deleter;
protected:
/// Group of threads, to which this thread attached
ThreadGroupStatusPtr thread_group;
std::atomic<int> thread_state{ThreadState::DetachedFromQuery};
@ -244,6 +246,8 @@ public:
/// Attaches slave thread to existing thread group
void attachQuery(const ThreadGroupStatusPtr & thread_group_, bool check_detached = true);
void attachProfileCountersScope(ProfileEvents::Counters * performance_counters_scope);
InternalTextLogsQueuePtr getInternalTextLogsQueue() const
{
return thread_state == Died ? nullptr : logs_queue_ptr.lock();

View File

@ -189,16 +189,8 @@ void PartLogElement::appendToBlock(MutableColumns & columns) const
}
}
bool PartLog::addNewPart(
ContextPtr current_context, const MutableDataPartPtr & part, UInt64 elapsed_ns, const ExecutionStatus & execution_status, std::shared_ptr<ProfileEvents::Counters::Snapshot> profile_counters_)
{
return addNewParts(current_context, {part}, elapsed_ns, execution_status, std::move(profile_counters_));
}
bool PartLog::addNewParts(
ContextPtr current_context, const PartLog::MutableDataPartsVector & parts, UInt64 elapsed_ns, const ExecutionStatus & execution_status, std::shared_ptr<ProfileEvents::Counters::Snapshot> profile_counters_)
ContextPtr current_context, const PartLog::PartLogEntries & parts, const ExecutionStatus & execution_status)
{
if (parts.empty())
return true;
@ -207,15 +199,17 @@ bool PartLog::addNewParts(
try
{
auto table_id = parts.front()->storage.getStorageID();
auto table_id = parts.front().part->storage.getStorageID();
part_log = current_context->getPartLog(table_id.database_name); // assume parts belong to the same table
if (!part_log)
return false;
auto query_id = CurrentThread::getQueryId();
for (const auto & part : parts)
for (const auto & part_log_entry : parts)
{
const auto & part = part_log_entry.part;
PartLogElement elem;
if (!query_id.empty())
@ -228,7 +222,7 @@ bool PartLog::addNewParts(
const auto time_now = std::chrono::system_clock::now();
elem.event_time = timeInSeconds(time_now);
elem.event_time_microseconds = timeInMicroseconds(time_now);
elem.duration_ms = elapsed_ns / 1000000;
elem.duration_ms = part_log_entry.elapsed_ns / 1000000;
elem.database_name = table_id.database_name;
elem.table_name = table_id.table_name;
@ -245,7 +239,7 @@ bool PartLog::addNewParts(
elem.error = static_cast<UInt16>(execution_status.code);
elem.exception = execution_status.message;
elem.profile_counters = profile_counters_;
elem.profile_counters = part_log_entry.profile_counters;
part_log->add(elem);
}
@ -259,4 +253,28 @@ bool PartLog::addNewParts(
return true;
}
bool PartLog::addNewPart(ContextPtr context, const PartLog::PartLogEntry & part, const ExecutionStatus & execution_status)
{
return addNewParts(context, {part}, execution_status);
}
bool PartLog::addNewParts(ContextPtr context, const PartLog::MutableDataPartsVector & parts, UInt64 elapsed_ns,
const ExecutionStatus & execution_status)
{
PartLog::PartLogEntries part_log_entries;
part_log_entries.reserve(parts.size());
for (const auto & part : parts)
part_log_entries.emplace_back(part, elapsed_ns);
return addNewParts(context, part_log_entries, execution_status);
}
bool PartLog::addNewPart(ContextPtr context, const PartLog::MutableDataPartPtr & part, UInt64 elapsed_ns, const ExecutionStatus & execution_status)
{
PartLog::PartLogEntries part_log_entries;
part_log_entries.emplace_back(part, elapsed_ns);
return addNewParts(context, part_log_entries, execution_status);
}
}

View File

@ -109,14 +109,42 @@ class PartLog : public SystemLog<PartLogElement>
using MutableDataPartPtr = std::shared_ptr<IMergeTreeDataPart>;
using MutableDataPartsVector = std::vector<MutableDataPartPtr>;
using ProfileCountersSnapshotPtr = std::shared_ptr<ProfileEvents::Counters::Snapshot>;
public:
struct PartLogEntry
{
std::shared_ptr<IMergeTreeDataPart> part;
ProfileCountersSnapshotPtr profile_counters;
UInt64 elapsed_ns;
PartLogEntry(std::shared_ptr<IMergeTreeDataPart> part_, UInt64 elapsed_ns_)
: part(std::move(part_)), elapsed_ns(elapsed_ns_)
{
}
PartLogEntry(std::shared_ptr<IMergeTreeDataPart> part_, UInt64 elapsed_ns_, ProfileCountersSnapshotPtr profile_counters_)
: part(std::move(part_))
, profile_counters(std::move(profile_counters_))
, elapsed_ns(elapsed_ns_)
{
}
};
using PartLogEntries = std::vector<PartLogEntry>;
/// Add a record about creation of new part.
static bool addNewPart(ContextPtr context, const PartLogEntry & part,
const ExecutionStatus & execution_status = {});
static bool addNewParts(ContextPtr context, const PartLogEntries & parts,
const ExecutionStatus & execution_status = {});
/// Add a record about creation of new part.
static bool addNewPart(ContextPtr context, const MutableDataPartPtr & part, UInt64 elapsed_ns,
const ExecutionStatus & execution_status = {},
std::shared_ptr<ProfileEvents::Counters::Snapshot> profile_counters_ = {});
const ExecutionStatus & execution_status = {});
static bool addNewParts(ContextPtr context, const MutableDataPartsVector & parts, UInt64 elapsed_ns,
const ExecutionStatus & execution_status = {},
std::shared_ptr<ProfileEvents::Counters::Snapshot> profile_counters_ = {});
const ExecutionStatus & execution_status = {});
};
}

View File

@ -19,6 +19,22 @@ std::shared_ptr<DB::DataTypeEnum8> TypeEnum = std::make_shared<DB::DataTypeEnum8
{ "gauge", static_cast<Int8>(GAUGE)},
});
String dumpToString(const Counters::Snapshot & counters, bool nonzero_only)
{
std::vector<String> ss;
for (Event event = 0; event < Counters::num_counters; ++event)
{
UInt64 value = counters[event];
if (nonzero_only && 0 == value)
continue;
const char * desc = getName(event);
ss.push_back(fmt::format("{}: {}", desc, value));
}
return fmt::format("[{}]", fmt::join(ss, ", "));
}
/// Put implementation here to avoid extra linking dependencies for clickhouse_common_io
void dumpToMapColumn(const Counters::Snapshot & counters, DB::IColumn * column, bool nonzero_only)
{

View File

@ -21,6 +21,8 @@ struct ProfileEventsSnapshot
using ThreadIdToCountersSnapshot = std::unordered_map<UInt64, Counters::Snapshot>;
String dumpToString(const Counters::Snapshot & counters, bool nonzero_only = true);
/// Dumps profile events to columns Map(String, UInt64)
void dumpToMapColumn(const Counters::Snapshot & counters, DB::IColumn * column, bool nonzero_only = true);

View File

@ -161,6 +161,12 @@ void ThreadStatus::attachQuery(const ThreadGroupStatusPtr & thread_group_, bool
setupState(thread_group_);
}
void ThreadStatus::attachProfileCountersScope(ProfileEvents::Counters * performance_counters_scope)
{
subthread_profile_events = performance_counters_scope;
performance_counters_scope->setParent(&performance_counters);
}
void ThreadStatus::initPerformanceCounters()
{
performance_counters_finalized = false;

View File

@ -3,6 +3,7 @@
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/StorageMergeTree.h>
#include <Storages/MergeTree/MergeTreeDataMergerMutator.h>
#include <Common/ProfileEventsScope.h>
namespace DB
{
@ -28,7 +29,7 @@ void MergePlainMergeTreeTask::onCompleted()
bool MergePlainMergeTreeTask::executeStep()
{
/// Metrics will be saved in the thread_group.
CurrentThread::ScopedAttach scoped_attach(thread_group);
ScopedProfileEvents profile_events_scope(&profile_counters);
/// Make out memory tracker a parent of current thread memory tracker
MemoryTrackerThreadSwitcherPtr switcher;
@ -90,7 +91,7 @@ void MergePlainMergeTreeTask::prepare()
{
auto & thread_status = CurrentThread::get();
thread_status.finalizePerformanceCounters();
auto profile_counters = std::make_shared<ProfileEvents::Counters::Snapshot>(thread_status.performance_counters.getPartiallyAtomicSnapshot());
auto profile_counters_snapshot = std::make_shared<ProfileEvents::Counters::Snapshot>(profile_counters.getPartiallyAtomicSnapshot());
merge_task.reset();
storage.writePartLog(
PartLogElement::MERGE_PARTS,
@ -100,7 +101,7 @@ void MergePlainMergeTreeTask::prepare()
new_part,
future_part->parts,
merge_list_entry.get(),
profile_counters);
std::move(profile_counters_snapshot));
};
merge_task = storage.merger_mutator.mergePartsToTemporaryPart(

View File

@ -83,7 +83,7 @@ private:
MergeTreeTransactionHolder txn_holder;
MergeTreeTransactionPtr txn;
ThreadGroupStatusPtr thread_group = std::make_shared<ThreadGroupStatus>();
ProfileEvents::Counters profile_counters;
};

View File

@ -10,6 +10,7 @@
#include <Common/escapeForFileName.h>
#include <Common/Increment.h>
#include <Common/noexcept_scope.h>
#include <Common/ProfileEventsScope.h>
#include <Common/quoteString.h>
#include <Common/scope_guard_safe.h>
#include <Common/SimpleIncrement.h>
@ -7347,6 +7348,10 @@ try
{
part_log_elem.profile_counters = profile_counters;
}
else
{
LOG_WARNING(log, "Profile counters are not set");
}
part_log->add(part_log_elem);
}
@ -7483,6 +7488,7 @@ bool MergeTreeData::moveParts(const CurrentlyMovingPartsTaggerPtr & moving_tagge
{
Stopwatch stopwatch;
MutableDataPartPtr cloned_part;
ScopedProfileEvents profile_events_scope;
auto write_part_log = [&](const ExecutionStatus & execution_status)
{
@ -7494,7 +7500,7 @@ bool MergeTreeData::moveParts(const CurrentlyMovingPartsTaggerPtr & moving_tagge
cloned_part,
{moving_part.part},
nullptr,
nullptr);
profile_events_scope.getSnapshot());
};
// Register in global moves list (StorageSystemMoves)

View File

@ -3,6 +3,7 @@
#include <Storages/StorageMergeTree.h>
#include <Interpreters/PartLog.h>
#include <DataTypes/ObjectUtils.h>
#include <Common/ProfileEventsScope.h>
namespace ProfileEvents
{
@ -47,6 +48,7 @@ struct MergeTreeSink::DelayedChunk
MergeTreeDataWriter::TemporaryPart temp_part;
UInt64 elapsed_ns;
String block_dedup_token;
ProfileEvents::Counters part_counters;
};
std::vector<Partition> partitions;
@ -70,12 +72,18 @@ void MergeTreeSink::consume(Chunk chunk)
for (auto & current_block : part_blocks)
{
Stopwatch watch;
String block_dedup_token;
ProfileEvents::Counters part_counters;
auto temp_part = storage.writer.writeTempPart(current_block, metadata_snapshot, context);
UInt64 elapsed_ns = 0;
MergeTreeDataWriter::TemporaryPart temp_part;
UInt64 elapsed_ns = watch.elapsed();
{
ScopedProfileEvents scoped_attach(&part_counters);
Stopwatch watch;
temp_part = storage.writer.writeTempPart(current_block, metadata_snapshot, context);
elapsed_ns = watch.elapsed();
}
/// If optimize_on_insert setting is true, current_block could become empty after merge
/// and we didn't create part.
@ -85,6 +93,7 @@ void MergeTreeSink::consume(Chunk chunk)
if (!support_parallel_write && temp_part.part->getDataPartStorage().supportParallelWrite())
support_parallel_write = true;
String block_dedup_token;
if (storage.getDeduplicationLog())
{
const String & dedup_token = settings.insert_deduplication_token;
@ -119,7 +128,8 @@ void MergeTreeSink::consume(Chunk chunk)
{
.temp_part = std::move(temp_part),
.elapsed_ns = elapsed_ns,
.block_dedup_token = std::move(block_dedup_token)
.block_dedup_token = std::move(block_dedup_token),
.part_counters = std::move(part_counters),
});
}
@ -135,6 +145,8 @@ void MergeTreeSink::finishDelayedChunk()
for (auto & partition : delayed_chunk->partitions)
{
ScopedProfileEvents scoped_attach(&partition.part_counters);
partition.temp_part.finalize();
auto & part = partition.temp_part.part;
@ -168,7 +180,8 @@ void MergeTreeSink::finishDelayedChunk()
/// Part can be deduplicated, so increment counters and add to part log only if it's really added
if (added)
{
PartLog::addNewPart(storage.getContext(), part, partition.elapsed_ns);
auto counters_snapshot = std::make_shared<ProfileEvents::Counters::Snapshot>(partition.part_counters.getPartiallyAtomicSnapshot());
PartLog::addNewPart(storage.getContext(), PartLog::PartLogEntry(part, partition.elapsed_ns, counters_snapshot));
storage.incrementInsertedPartsProfileEvent(part->getType());
/// Initiate async merge - it will be done if it's good time for merge and if there are space in 'background_pool'.

View File

@ -184,12 +184,10 @@ ReplicatedMergeMutateTaskBase::PrepareResult MutateFromLogEntryTask::prepare()
return {true, true, [this] (const ExecutionStatus & execution_status)
{
auto & thread_status = CurrentThread::get();
thread_status.finalizePerformanceCounters();
auto profile_counters = std::make_shared<ProfileEvents::Counters::Snapshot>(thread_status.performance_counters.getPartiallyAtomicSnapshot());
auto profile_counters_snapshot = std::make_shared<ProfileEvents::Counters::Snapshot>(profile_counters.getPartiallyAtomicSnapshot());
storage.writePartLog(
PartLogElement::MUTATE_PART, execution_status, stopwatch_ptr->elapsed(),
entry.new_part_name, new_part, future_mutated_part->parts, merge_mutate_entry.get(), std::move(profile_counters));
entry.new_part_name, new_part, future_mutated_part->parts, merge_mutate_entry.get(), std::move(profile_counters_snapshot));
}};
}

View File

@ -2,6 +2,7 @@
#include <Storages/StorageMergeTree.h>
#include <Interpreters/TransactionLog.h>
#include <Common/ProfileEventsScope.h>
namespace DB
{
@ -38,9 +39,7 @@ void MutatePlainMergeTreeTask::prepare()
write_part_log = [this] (const ExecutionStatus & execution_status)
{
auto & thread_status = CurrentThread::get();
thread_status.finalizePerformanceCounters();
auto profile_counters = std::make_shared<ProfileEvents::Counters::Snapshot>(thread_status.performance_counters.getPartiallyAtomicSnapshot());
auto profile_counters_snapshot = std::make_shared<ProfileEvents::Counters::Snapshot>(profile_counters.getPartiallyAtomicSnapshot());
mutate_task.reset();
storage.writePartLog(
PartLogElement::MUTATE_PART,
@ -50,7 +49,7 @@ void MutatePlainMergeTreeTask::prepare()
new_part,
future_part->parts,
merge_list_entry.get(),
profile_counters);
std::move(profile_counters_snapshot));
};
fake_query_context = Context::createCopy(storage.getContext());
@ -65,8 +64,8 @@ void MutatePlainMergeTreeTask::prepare()
bool MutatePlainMergeTreeTask::executeStep()
{
/// Metrics will be saved in the thread_group.
CurrentThread::ScopedAttach scoped_attach(thread_group);
/// Metrics will be saved in the local profile_counters.
ScopedProfileEvents profile_events_scope(&profile_counters);
/// Make out memory tracker a parent of current thread memory tracker
MemoryTrackerThreadSwitcherPtr switcher;

View File

@ -78,7 +78,7 @@ private:
ContextMutablePtr fake_query_context;
MutateTaskPtr mutate_task;
ThreadGroupStatusPtr thread_group = std::make_shared<ThreadGroupStatus>();
ProfileEvents::Counters profile_counters;
};

View File

@ -20,6 +20,7 @@
#include <Storages/MutationCommands.h>
#include <Storages/MergeTree/MergeTreeDataMergerMutator.h>
#include <boost/algorithm/string/replace.hpp>
#include <Common/ProfileEventsScope.h>
namespace CurrentMetrics
@ -814,8 +815,8 @@ public:
bool executeStep() override
{
/// Metrics will be saved in the thread_group.
CurrentThread::ScopedAttach scoped_attach(thread_group);
/// Metrics will be saved in the local profile_counters.
ScopedProfileEvents profile_events_scope(&profile_counters);
auto & current_level_parts = level_parts[current_level];
auto & next_level_parts = level_parts[next_level];
@ -918,7 +919,7 @@ private:
/// TODO(nikitamikhaylov): make this constant a setting
static constexpr size_t max_parts_to_merge_in_one_level = 10;
ThreadGroupStatusPtr thread_group = std::make_shared<ThreadGroupStatus>();
ProfileEvents::Counters profile_counters;
};
@ -929,7 +930,9 @@ private:
// In short it executed a mutation for the part an original part and for its every projection
/** An overview of how the process of mutation works for projections:
/**
*
* An overview of how the process of mutation works for projections:
*
* The mutation for original parts is executed block by block,
* but additionally we execute a SELECT query for each projection over a current block.
@ -1140,8 +1143,8 @@ public:
bool executeStep() override
{
/// Metrics will be saved in the thread_group.
CurrentThread::ScopedAttach scoped_attach(thread_group);
/// Metrics will be saved in the local profile_counters.
ScopedProfileEvents profile_events_scope(&profile_counters);
switch (state)
{
@ -1176,6 +1179,7 @@ public:
}
private:
void prepare()
{
if (ctx->new_data_part->isStoredOnDisk())
@ -1258,7 +1262,7 @@ private:
std::unique_ptr<PartMergerWriter> part_merger_writer_task;
ThreadGroupStatusPtr thread_group = std::make_shared<ThreadGroupStatus>();
ProfileEvents::Counters profile_counters;
};
@ -1273,8 +1277,8 @@ public:
bool executeStep() override
{
/// Metrics will be saved in the thread_group.
CurrentThread::ScopedAttach scoped_attach(thread_group);
/// Metrics will be saved in the local profile_counters.
ScopedProfileEvents profile_events_scope(&profile_counters);
switch (state)
{
@ -1308,6 +1312,7 @@ public:
}
private:
void prepare()
{
if (ctx->execute_ttl_type != ExecuteTTLType::NONE)
@ -1466,7 +1471,7 @@ private:
MergedColumnOnlyOutputStreamPtr out;
std::unique_ptr<PartMergerWriter> part_merger_writer_task{nullptr};
ThreadGroupStatusPtr thread_group = std::make_shared<ThreadGroupStatus>();
ProfileEvents::Counters profile_counters;
};

View File

@ -2,6 +2,7 @@
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/MergeTree/ReplicatedMergeTreeQueue.h>
#include <Common/ProfileEventsScope.h>
namespace DB
@ -29,8 +30,8 @@ void ReplicatedMergeMutateTaskBase::onCompleted()
bool ReplicatedMergeMutateTaskBase::executeStep()
{
/// Metrics will be saved in the thread_group.
CurrentThread::ScopedAttach scoped_attach(thread_group);
/// Metrics will be saved in the local profile_counters.
ScopedProfileEvents profile_events_scope(&profile_counters);
std::exception_ptr saved_exception;

View File

@ -11,9 +11,9 @@ namespace DB
class StorageReplicatedMergeTree;
/** This is used as a base of MergeFromLogEntryTask and MutateFromLogEntryTaskBase
*/
/**
* This is used as a base of MergeFromLogEntryTask and MutateFromLogEntryTaskBase
*/
class ReplicatedMergeMutateTaskBase : public IExecutableTask
{
public:
@ -33,11 +33,8 @@ public:
}
~ReplicatedMergeMutateTaskBase() override = default;
void onCompleted() override;
StorageID getStorageID() override;
bool executeStep() override;
protected:
@ -63,6 +60,8 @@ protected:
MergeList::EntryPtr merge_mutate_entry{nullptr};
Poco::Logger * log;
StorageReplicatedMergeTree & storage;
/// ProfileEvents for current part will be stored here
ProfileEvents::Counters profile_counters;
private:
enum class CheckExistingPartResult
@ -86,7 +85,6 @@ private:
PartLogWriter part_log_writer{};
State state{State::NEED_PREPARE};
IExecutableTask::TaskResultCallback task_result_callback;
ThreadGroupStatusPtr thread_group = std::make_shared<ThreadGroupStatus>();
};
}

View File

@ -2,8 +2,10 @@
#include <cstddef>
#include <ranges>
#include "Common/hex.h"
#include <Common/hex.h>
#include <Common/Macros.h>
#include <Common/ProfileEventsScope.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/ZooKeeper/KeeperException.h>
#include <Common/ZooKeeper/Types.h>
@ -1592,6 +1594,8 @@ bool StorageReplicatedMergeTree::executeLogEntry(LogEntry & entry)
if (entry.type == LogEntry::ATTACH_PART)
{
ScopedProfileEvents profile_events_scope;
if (MutableDataPartPtr part = attachPartHelperFoundValidPart(entry))
{
LOG_TRACE(log, "Found valid local part for {}, preparing the transaction", part->name);
@ -1602,11 +1606,9 @@ bool StorageReplicatedMergeTree::executeLogEntry(LogEntry & entry)
renameTempPartAndReplace(part, transaction);
checkPartChecksumsAndCommit(transaction, part);
auto & thread_status = CurrentThread::get();
thread_status.finalizePerformanceCounters();
auto profile_counters = std::make_shared<ProfileEvents::Counters::Snapshot>(thread_status.performance_counters.getPartiallyAtomicSnapshot());
writePartLog(PartLogElement::Type::NEW_PART, {}, 0 /** log entry is fake so we don't measure the time */,
part->name, part, {} /** log entry is fake so there are no initial parts */, nullptr, std::move(profile_counters));
part->name, part, {} /** log entry is fake so there are no initial parts */, nullptr,
profile_events_scope.getSnapshot());
return true;
}
@ -4007,15 +4009,14 @@ bool StorageReplicatedMergeTree::fetchPart(
Stopwatch stopwatch;
MutableDataPartPtr part;
DataPartsVector replaced_parts;
ScopedProfileEvents profile_events_scope;
auto write_part_log = [&] (const ExecutionStatus & execution_status)
{
auto & thread_status = CurrentThread::get();
thread_status.finalizePerformanceCounters();
auto profile_counters = std::make_shared<ProfileEvents::Counters::Snapshot>(thread_status.performance_counters.getPartiallyAtomicSnapshot());
writePartLog(
PartLogElement::DOWNLOAD_PART, execution_status, stopwatch.elapsed(),
part_name, part, replaced_parts, nullptr, std::move(profile_counters));
part_name, part, replaced_parts, nullptr,
profile_events_scope.getSnapshot());
};
DataPartPtr part_to_clone;
@ -4246,15 +4247,14 @@ MutableDataPartStoragePtr StorageReplicatedMergeTree::fetchExistsPart(
Stopwatch stopwatch;
MutableDataPartPtr part;
DataPartsVector replaced_parts;
ScopedProfileEvents profile_events_scope;
auto write_part_log = [&] (const ExecutionStatus & execution_status)
{
auto & thread_status = CurrentThread::get();
thread_status.finalizePerformanceCounters();
auto profile_counters = std::make_shared<ProfileEvents::Counters::Snapshot>(thread_status.performance_counters.getPartiallyAtomicSnapshot());
writePartLog(
PartLogElement::DOWNLOAD_PART, execution_status, stopwatch.elapsed(),
part_name, part, replaced_parts, nullptr, std::move(profile_counters));
part_name, part, replaced_parts, nullptr,
profile_events_scope.getSnapshot());
};
std::function<MutableDataPartPtr()> get_part;

View File

@ -1,13 +1,47 @@
DROP TABLE IF EXISTS test;
CREATE TABLE test (x UInt64) ENGINE = MergeTree ORDER BY x;
SET max_block_size = 1;
INSERT INTO test SELECT * FROM system.numbers LIMIT 1000;
CREATE TABLE test (key UInt64, val UInt64) engine = MergeTree Order by key PARTITION BY key >= 128;
SET max_block_size = 64, max_insert_block_size = 64, min_insert_block_size_rows = 64;
INSERT INTO test SELECT number AS key, sipHash64(number) AS val FROM numbers(512);
SYSTEM FLUSH LOGS;
SELECT
count(DISTINCT query_id) == 1
AND count() >= 512 / 64 -- 512 rows inserted, 64 rows per block
AND SUM(ProfileEvents['MergeTreeDataWriterRows']) == 512
AND SUM(ProfileEvents['MergeTreeDataWriterUncompressedBytes']) > 1024
AND SUM(ProfileEvents['MergeTreeDataWriterCompressedBytes']) > 1024
AND SUM(ProfileEvents['MergeTreeDataWriterBlocks']) >= 8
FROM system.part_log
WHERE event_time > now() - INTERVAL 10 MINUTE
AND database == currentDatabase() AND table == 'test'
AND event_type == 'NewPart'
;
OPTIMIZE TABLE test FINAL;
SYSTEM FLUSH LOGS;
SELECT
count() >= 2 AND SUM(ProfileEvents['MergedRows']) >= 512
FROM system.part_log
WHERE event_time > now() - INTERVAL 10 MINUTE
AND database == currentDatabase() AND table == 'test'
AND event_type == 'MergeParts'
;
-- ProfileEvents field exist and contains something plausible:
SELECT count() > 0 FROM system.part_log WHERE ProfileEvents['MergedRows'] = 1000 AND table = 'test' AND database = currentDatabase() AND event_time >= now() - 600;
ALTER TABLE test UPDATE val = 0 WHERE key % 2 == 0 SETTINGS mutations_sync = 2;
SYSTEM FLUSH LOGS;
SELECT
count() == 2
AND SUM(ProfileEvents['SelectedRows']) == 512
AND SUM(ProfileEvents['FileOpen']) > 2
FROM system.part_log
WHERE event_time > now() - INTERVAL 10 MINUTE
AND database == currentDatabase() AND table == 'test'
AND event_type == 'MutatePart'
;
DROP TABLE test;