add profile events for introspection of part types

This commit is contained in:
Anton Popov 2022-05-25 14:54:49 +00:00
parent 4dd447b232
commit 16e839ac71
12 changed files with 115 additions and 2 deletions

View File

@ -144,6 +144,13 @@
M(MergeTreeDataWriterBlocks, "Number of blocks INSERTed to MergeTree tables. Each block forms a data part of level zero.") \
M(MergeTreeDataWriterBlocksAlreadySorted, "Number of blocks INSERTed to MergeTree tables that appeared to be already sorted.") \
\
M(InsertedWideParts, "Number of parts inserted in Wide format.") \
M(InsertedCompactParts, "Number of parts inserted in Compact format.") \
M(InsertedInMemoryParts, "Number of parts inserted in InMemory format.") \
M(MergedIntoWideParts, "Number of parts merged into Wide format.") \
M(MergedIntoCompactParts, "Number of parts merged into Compact format.") \
M(MergedIntoInMemoryParts, "Number of parts in merged into InMemory format.") \
\
M(MergeTreeDataProjectionWriterRows, "Number of rows INSERTed to MergeTree tables projection.") \
M(MergeTreeDataProjectionWriterUncompressedBytes, "Uncompressed bytes (for columns as they stored in memory) INSERTed to MergeTree tables projection.") \
M(MergeTreeDataProjectionWriterCompressedBytes, "Bytes written to filesystem for data INSERTed to MergeTree tables projection.") \

View File

@ -46,6 +46,7 @@ NamesAndTypesList PartLogElement::getNamesAndTypes()
{"table", std::make_shared<DataTypeString>()},
{"part_name", std::make_shared<DataTypeString>()},
{"partition_id", std::make_shared<DataTypeString>()},
{"part_type", std::make_shared<DataTypeString>()},
{"disk_name", std::make_shared<DataTypeString>()},
{"path_on_disk", std::make_shared<DataTypeString>()},
@ -80,6 +81,7 @@ void PartLogElement::appendToBlock(MutableColumns & columns) const
columns[i++]->insert(table_name);
columns[i++]->insert(part_name);
columns[i++]->insert(partition_id);
columns[i++]->insert(part_type.toString());
columns[i++]->insert(disk_name);
columns[i++]->insert(path_on_disk);
@ -159,6 +161,7 @@ bool PartLog::addNewParts(
elem.part_name = part->name;
elem.disk_name = part->volume->getDisk()->getName();
elem.path_on_disk = part->getFullPath();
elem.part_type = part->getType();
elem.bytes_compressed_on_disk = part->getBytesOnDisk();
elem.rows = part->rows_count;

View File

@ -1,5 +1,6 @@
#pragma once
#include <Storages/MergeTree/MergeTreeDataPartType.h>
#include <Interpreters/SystemLog.h>
#include <Core/NamesAndTypes.h>
#include <Core/NamesAndAliases.h>
@ -35,6 +36,8 @@ struct PartLogElement
String disk_name;
String path_on_disk;
MergeTreeDataPartType part_type;
/// Size of the part
UInt64 rows = 0;

View File

@ -322,6 +322,7 @@ bool MergeFromLogEntryTask::finalize(ReplicatedMergeMutateTaskBase::PartLogWrite
ProfileEvents::increment(ProfileEvents::ReplicatedPartMerges);
write_part_log({});
storage.incrementMergedPartsProfileEvent(part->getType());
return true;
}

View File

@ -117,6 +117,7 @@ void MergePlainMergeTreeTask::finish()
new_part = merge_task->getFuture().get();
storage.merger_mutator.renameMergedTemporaryPart(new_part, future_part->parts, txn, nullptr);
write_part_log({});
storage.incrementMergedPartsProfileEvent(new_part->getType());
}
}

View File

@ -93,6 +93,12 @@ namespace ProfileEvents
extern const Event DelayedInserts;
extern const Event DelayedInsertsMilliseconds;
extern const Event DuplicatedInsertedBlocks;
extern const Event InsertedWideParts;
extern const Event InsertedCompactParts;
extern const Event InsertedInMemoryParts;
extern const Event MergedIntoWideParts;
extern const Event MergedIntoCompactParts;
extern const Event MergedIntoInMemoryParts;
}
namespace CurrentMetrics
@ -1716,6 +1722,7 @@ void MergeTreeData::removePartsFinally(const MergeTreeData::DataPartsVector & pa
part_log_elem.part_name = part->name;
part_log_elem.bytes_compressed_on_disk = part->getBytesOnDisk();
part_log_elem.rows = part->rows_count;
part_log_elem.part_type = part->getType();
part_log->add(part_log_elem);
}
@ -6190,6 +6197,7 @@ try
part_log_elem.path_on_disk = result_part->getFullPath();
part_log_elem.bytes_compressed_on_disk = result_part->getBytesOnDisk();
part_log_elem.rows = result_part->rows_count;
part_log_elem.part_type = result_part->getType();
}
part_log_elem.source_part_names.reserve(source_parts.size());
@ -6755,6 +6763,42 @@ StorageSnapshotPtr MergeTreeData::getStorageSnapshot(const StorageMetadataPtr &
return std::make_shared<StorageSnapshot>(*this, metadata_snapshot, object_columns, std::move(snapshot_data));
}
#define FOR_EACH_PART_TYPE(M) \
M(Wide) \
M(Compact) \
M(InMemory)
#define DECLARE_INCREMENT_EVENT_CASE(Event, Type) \
case MergeTreeDataPartType::Type: \
ProfileEvents::increment(ProfileEvents::Event##Type##Parts); \
break;
#define DECLARE_INCREMENT_EVENT(value, CASE) \
switch (value) \
{ \
FOR_EACH_PART_TYPE(CASE) \
default: \
break; \
}
void MergeTreeData::incrementInsertedPartsProfileEvent(MergeTreeDataPartType type)
{
#define DECLARE_INSERTED_EVENT_CASE(Type) DECLARE_INCREMENT_EVENT_CASE(Inserted, Type)
DECLARE_INCREMENT_EVENT(type.getValue(), DECLARE_INSERTED_EVENT_CASE)
#undef DECLARE_INSERTED_EVENT
}
void MergeTreeData::incrementMergedPartsProfileEvent(MergeTreeDataPartType type)
{
#define DECLARE_MERGED_EVENT_CASE(Type) DECLARE_INCREMENT_EVENT_CASE(MergedInto, Type)
DECLARE_INCREMENT_EVENT(type.getValue(), DECLARE_MERGED_EVENT_CASE)
#undef DECLARE_MERGED_EVENT
}
#undef FOR_EACH_PART_TYPE
#undef DECLARE_INCREMENT_EVENT_CASE
#undef DECLARE_INCREMENT_EVENT
CurrentlySubmergingEmergingTagger::~CurrentlySubmergingEmergingTagger()
{
std::lock_guard lock(storage.currently_submerging_emerging_mutex);

View File

@ -1224,6 +1224,8 @@ protected:
/// Moves part to specified space, used in ALTER ... MOVE ... queries
bool movePartsToSpace(const DataPartsVector & parts, SpacePtr space);
static void incrementInsertedPartsProfileEvent(MergeTreeDataPartType type);
static void incrementMergedPartsProfileEvent(MergeTreeDataPartType type);
private:
/// RAII Wrapper for atomic work with currently moving parts

View File

@ -451,6 +451,7 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeTempPart(
temp_part.streams.emplace_back(std::move(stream));
}
}
auto finalizer = out->finalizePartAsync(
new_data_part,
data_settings->fsync_after_insert,
@ -460,8 +461,6 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeTempPart(
temp_part.part = new_data_part;
temp_part.streams.emplace_back(TemporaryPart::Stream{.stream = std::move(out), .finalizer = std::move(finalizer)});
/// out.finish(new_data_part, std::move(written_files), sync_on_insert);
ProfileEvents::increment(ProfileEvents::MergeTreeDataWriterRows, block.rows());
ProfileEvents::increment(ProfileEvents::MergeTreeDataWriterUncompressedBytes, block.bytes());
ProfileEvents::increment(ProfileEvents::MergeTreeDataWriterCompressedBytes, new_data_part->getBytesOnDisk());

View File

@ -137,6 +137,7 @@ void MergeTreeSink::finishDelayedChunk()
if (storage.renameTempPartAndAdd(part, context->getCurrentTransaction().get(), &storage.increment, nullptr, storage.getDeduplicationLog(), partition.block_dedup_token))
{
PartLog::addNewPart(storage.getContext(), part, partition.elapsed_ns);
storage.incrementInsertedPartsProfileEvent(part->getType());
/// Initiate async merge - it will be done if it's good time for merge and if there are space in 'background_pool'.
storage.background_operations_assignee.trigger();

View File

@ -261,6 +261,7 @@ void ReplicatedMergeTreeSink::finishDelayedChunk(zkutil::ZooKeeperPtr & zookeepe
/// Set a special error code if the block is duplicate
int error = (deduplicate && part->is_duplicate) ? ErrorCodes::INSERT_WAS_DEDUPLICATED : 0;
PartLog::addNewPart(storage.getContext(), part, partition.elapsed_ns, ExecutionStatus(error));
storage.incrementInsertedPartsProfileEvent(part->getType());
}
catch (...)
{

View File

@ -0,0 +1,7 @@
3 1 2
2 1 1
Compact
Compact
Wide
Compact 1
Wide 1

View File

@ -0,0 +1,44 @@
DROP TABLE IF EXISTS t_parts_profile_events;
CREATE TABLE t_parts_profile_events (a UInt32)
ENGINE = MergeTree ORDER BY tuple()
SETTINGS min_rows_for_wide_part = 10, min_bytes_for_wide_part = 0;
SYSTEM STOP MERGES t_parts_profile_events;
SET log_comment = '02306_part_types_profile_events';
INSERT INTO t_parts_profile_events VALUES (1);
INSERT INTO t_parts_profile_events VALUES (1);
SYSTEM START MERGES t_parts_profile_events;
OPTIMIZE TABLE t_parts_profile_events FINAL;
SYSTEM STOP MERGES t_parts_profile_events;
INSERT INTO t_parts_profile_events SELECT number FROM numbers(20);
SYSTEM START MERGES t_parts_profile_events;
OPTIMIZE TABLE t_parts_profile_events FINAL;
SYSTEM STOP MERGES t_parts_profile_events;
SYSTEM FLUSH LOGS;
SELECT count(), sum(ProfileEvents['InsertedWideParts']), sum(ProfileEvents['InsertedCompactParts'])
FROM system.query_log WHERE has(databases, currentDatabase())
AND log_comment = '02306_part_types_profile_events'
AND query ILIKE 'INSERT INTO%' AND type = 'QueryFinish';
SELECT count(), sum(ProfileEvents['MergedIntoWideParts']), sum(ProfileEvents['MergedIntoCompactParts'])
FROM system.query_log WHERE has(databases, currentDatabase())
AND log_comment = '02306_part_types_profile_events'
AND query ILIKE 'OPTIMIZE TABLE%' AND type = 'QueryFinish';
SELECT part_type FROM system.part_log WHERE database = currentDatabase()
AND table = 't_parts_profile_events' AND event_type = 'NewPart'
ORDER BY event_time_microseconds;
SELECT part_type, count() > 0 FROM system.part_log WHERE database = currentDatabase()
AND table = 't_parts_profile_events' AND event_type = 'MergeParts'
GROUP BY part_type;
DROP TABLE t_parts_profile_events;