mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-10 09:32:06 +00:00
Merge pull request #37536 from CurtizJ/profile-events-for-part-types
Add profile events for introspection of part types
This commit is contained in:
commit
1d9b3be7da
@ -144,6 +144,13 @@
|
|||||||
M(MergeTreeDataWriterBlocks, "Number of blocks INSERTed to MergeTree tables. Each block forms a data part of level zero.") \
|
M(MergeTreeDataWriterBlocks, "Number of blocks INSERTed to MergeTree tables. Each block forms a data part of level zero.") \
|
||||||
M(MergeTreeDataWriterBlocksAlreadySorted, "Number of blocks INSERTed to MergeTree tables that appeared to be already sorted.") \
|
M(MergeTreeDataWriterBlocksAlreadySorted, "Number of blocks INSERTed to MergeTree tables that appeared to be already sorted.") \
|
||||||
\
|
\
|
||||||
|
M(InsertedWideParts, "Number of parts inserted in Wide format.") \
|
||||||
|
M(InsertedCompactParts, "Number of parts inserted in Compact format.") \
|
||||||
|
M(InsertedInMemoryParts, "Number of parts inserted in InMemory format.") \
|
||||||
|
M(MergedIntoWideParts, "Number of parts merged into Wide format.") \
|
||||||
|
M(MergedIntoCompactParts, "Number of parts merged into Compact format.") \
|
||||||
|
M(MergedIntoInMemoryParts, "Number of parts in merged into InMemory format.") \
|
||||||
|
\
|
||||||
M(MergeTreeDataProjectionWriterRows, "Number of rows INSERTed to MergeTree tables projection.") \
|
M(MergeTreeDataProjectionWriterRows, "Number of rows INSERTed to MergeTree tables projection.") \
|
||||||
M(MergeTreeDataProjectionWriterUncompressedBytes, "Uncompressed bytes (for columns as they stored in memory) INSERTed to MergeTree tables projection.") \
|
M(MergeTreeDataProjectionWriterUncompressedBytes, "Uncompressed bytes (for columns as they stored in memory) INSERTed to MergeTree tables projection.") \
|
||||||
M(MergeTreeDataProjectionWriterCompressedBytes, "Bytes written to filesystem for data INSERTed to MergeTree tables projection.") \
|
M(MergeTreeDataProjectionWriterCompressedBytes, "Bytes written to filesystem for data INSERTed to MergeTree tables projection.") \
|
||||||
|
@ -46,6 +46,7 @@ NamesAndTypesList PartLogElement::getNamesAndTypes()
|
|||||||
{"table", std::make_shared<DataTypeString>()},
|
{"table", std::make_shared<DataTypeString>()},
|
||||||
{"part_name", std::make_shared<DataTypeString>()},
|
{"part_name", std::make_shared<DataTypeString>()},
|
||||||
{"partition_id", std::make_shared<DataTypeString>()},
|
{"partition_id", std::make_shared<DataTypeString>()},
|
||||||
|
{"part_type", std::make_shared<DataTypeString>()},
|
||||||
{"disk_name", std::make_shared<DataTypeString>()},
|
{"disk_name", std::make_shared<DataTypeString>()},
|
||||||
{"path_on_disk", std::make_shared<DataTypeString>()},
|
{"path_on_disk", std::make_shared<DataTypeString>()},
|
||||||
|
|
||||||
@ -80,6 +81,7 @@ void PartLogElement::appendToBlock(MutableColumns & columns) const
|
|||||||
columns[i++]->insert(table_name);
|
columns[i++]->insert(table_name);
|
||||||
columns[i++]->insert(part_name);
|
columns[i++]->insert(part_name);
|
||||||
columns[i++]->insert(partition_id);
|
columns[i++]->insert(partition_id);
|
||||||
|
columns[i++]->insert(part_type.toString());
|
||||||
columns[i++]->insert(disk_name);
|
columns[i++]->insert(disk_name);
|
||||||
columns[i++]->insert(path_on_disk);
|
columns[i++]->insert(path_on_disk);
|
||||||
|
|
||||||
@ -159,6 +161,7 @@ bool PartLog::addNewParts(
|
|||||||
elem.part_name = part->name;
|
elem.part_name = part->name;
|
||||||
elem.disk_name = part->volume->getDisk()->getName();
|
elem.disk_name = part->volume->getDisk()->getName();
|
||||||
elem.path_on_disk = part->getFullPath();
|
elem.path_on_disk = part->getFullPath();
|
||||||
|
elem.part_type = part->getType();
|
||||||
|
|
||||||
elem.bytes_compressed_on_disk = part->getBytesOnDisk();
|
elem.bytes_compressed_on_disk = part->getBytesOnDisk();
|
||||||
elem.rows = part->rows_count;
|
elem.rows = part->rows_count;
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
|
#include <Storages/MergeTree/MergeTreeDataPartType.h>
|
||||||
#include <Interpreters/SystemLog.h>
|
#include <Interpreters/SystemLog.h>
|
||||||
#include <Core/NamesAndTypes.h>
|
#include <Core/NamesAndTypes.h>
|
||||||
#include <Core/NamesAndAliases.h>
|
#include <Core/NamesAndAliases.h>
|
||||||
@ -35,6 +36,8 @@ struct PartLogElement
|
|||||||
String disk_name;
|
String disk_name;
|
||||||
String path_on_disk;
|
String path_on_disk;
|
||||||
|
|
||||||
|
MergeTreeDataPartType part_type;
|
||||||
|
|
||||||
/// Size of the part
|
/// Size of the part
|
||||||
UInt64 rows = 0;
|
UInt64 rows = 0;
|
||||||
|
|
||||||
|
@ -322,6 +322,7 @@ bool MergeFromLogEntryTask::finalize(ReplicatedMergeMutateTaskBase::PartLogWrite
|
|||||||
ProfileEvents::increment(ProfileEvents::ReplicatedPartMerges);
|
ProfileEvents::increment(ProfileEvents::ReplicatedPartMerges);
|
||||||
|
|
||||||
write_part_log({});
|
write_part_log({});
|
||||||
|
storage.incrementMergedPartsProfileEvent(part->getType());
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -117,6 +117,7 @@ void MergePlainMergeTreeTask::finish()
|
|||||||
new_part = merge_task->getFuture().get();
|
new_part = merge_task->getFuture().get();
|
||||||
storage.merger_mutator.renameMergedTemporaryPart(new_part, future_part->parts, txn, nullptr);
|
storage.merger_mutator.renameMergedTemporaryPart(new_part, future_part->parts, txn, nullptr);
|
||||||
write_part_log({});
|
write_part_log({});
|
||||||
|
storage.incrementMergedPartsProfileEvent(new_part->getType());
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -93,6 +93,12 @@ namespace ProfileEvents
|
|||||||
extern const Event DelayedInserts;
|
extern const Event DelayedInserts;
|
||||||
extern const Event DelayedInsertsMilliseconds;
|
extern const Event DelayedInsertsMilliseconds;
|
||||||
extern const Event DuplicatedInsertedBlocks;
|
extern const Event DuplicatedInsertedBlocks;
|
||||||
|
extern const Event InsertedWideParts;
|
||||||
|
extern const Event InsertedCompactParts;
|
||||||
|
extern const Event InsertedInMemoryParts;
|
||||||
|
extern const Event MergedIntoWideParts;
|
||||||
|
extern const Event MergedIntoCompactParts;
|
||||||
|
extern const Event MergedIntoInMemoryParts;
|
||||||
}
|
}
|
||||||
|
|
||||||
namespace CurrentMetrics
|
namespace CurrentMetrics
|
||||||
@ -1716,6 +1722,7 @@ void MergeTreeData::removePartsFinally(const MergeTreeData::DataPartsVector & pa
|
|||||||
part_log_elem.part_name = part->name;
|
part_log_elem.part_name = part->name;
|
||||||
part_log_elem.bytes_compressed_on_disk = part->getBytesOnDisk();
|
part_log_elem.bytes_compressed_on_disk = part->getBytesOnDisk();
|
||||||
part_log_elem.rows = part->rows_count;
|
part_log_elem.rows = part->rows_count;
|
||||||
|
part_log_elem.part_type = part->getType();
|
||||||
|
|
||||||
part_log->add(part_log_elem);
|
part_log->add(part_log_elem);
|
||||||
}
|
}
|
||||||
@ -6190,6 +6197,7 @@ try
|
|||||||
part_log_elem.path_on_disk = result_part->getFullPath();
|
part_log_elem.path_on_disk = result_part->getFullPath();
|
||||||
part_log_elem.bytes_compressed_on_disk = result_part->getBytesOnDisk();
|
part_log_elem.bytes_compressed_on_disk = result_part->getBytesOnDisk();
|
||||||
part_log_elem.rows = result_part->rows_count;
|
part_log_elem.rows = result_part->rows_count;
|
||||||
|
part_log_elem.part_type = result_part->getType();
|
||||||
}
|
}
|
||||||
|
|
||||||
part_log_elem.source_part_names.reserve(source_parts.size());
|
part_log_elem.source_part_names.reserve(source_parts.size());
|
||||||
@ -6755,6 +6763,42 @@ StorageSnapshotPtr MergeTreeData::getStorageSnapshot(const StorageMetadataPtr &
|
|||||||
return std::make_shared<StorageSnapshot>(*this, metadata_snapshot, object_columns, std::move(snapshot_data));
|
return std::make_shared<StorageSnapshot>(*this, metadata_snapshot, object_columns, std::move(snapshot_data));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void MergeTreeData::incrementInsertedPartsProfileEvent(MergeTreeDataPartType type)
|
||||||
|
{
|
||||||
|
switch (type.getValue())
|
||||||
|
{
|
||||||
|
case MergeTreeDataPartType::Wide:
|
||||||
|
ProfileEvents::increment(ProfileEvents::InsertedWideParts);
|
||||||
|
break;
|
||||||
|
case MergeTreeDataPartType::Compact:
|
||||||
|
ProfileEvents::increment(ProfileEvents::InsertedCompactParts);
|
||||||
|
break;
|
||||||
|
case MergeTreeDataPartType::InMemory:
|
||||||
|
ProfileEvents::increment(ProfileEvents::InsertedInMemoryParts);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void MergeTreeData::incrementMergedPartsProfileEvent(MergeTreeDataPartType type)
|
||||||
|
{
|
||||||
|
switch (type.getValue())
|
||||||
|
{
|
||||||
|
case MergeTreeDataPartType::Wide:
|
||||||
|
ProfileEvents::increment(ProfileEvents::MergedIntoWideParts);
|
||||||
|
break;
|
||||||
|
case MergeTreeDataPartType::Compact:
|
||||||
|
ProfileEvents::increment(ProfileEvents::MergedIntoCompactParts);
|
||||||
|
break;
|
||||||
|
case MergeTreeDataPartType::InMemory:
|
||||||
|
ProfileEvents::increment(ProfileEvents::MergedIntoInMemoryParts);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
CurrentlySubmergingEmergingTagger::~CurrentlySubmergingEmergingTagger()
|
CurrentlySubmergingEmergingTagger::~CurrentlySubmergingEmergingTagger()
|
||||||
{
|
{
|
||||||
std::lock_guard lock(storage.currently_submerging_emerging_mutex);
|
std::lock_guard lock(storage.currently_submerging_emerging_mutex);
|
||||||
|
@ -1224,6 +1224,8 @@ protected:
|
|||||||
/// Moves part to specified space, used in ALTER ... MOVE ... queries
|
/// Moves part to specified space, used in ALTER ... MOVE ... queries
|
||||||
bool movePartsToSpace(const DataPartsVector & parts, SpacePtr space);
|
bool movePartsToSpace(const DataPartsVector & parts, SpacePtr space);
|
||||||
|
|
||||||
|
static void incrementInsertedPartsProfileEvent(MergeTreeDataPartType type);
|
||||||
|
static void incrementMergedPartsProfileEvent(MergeTreeDataPartType type);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
/// RAII Wrapper for atomic work with currently moving parts
|
/// RAII Wrapper for atomic work with currently moving parts
|
||||||
|
@ -451,6 +451,7 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeTempPart(
|
|||||||
temp_part.streams.emplace_back(std::move(stream));
|
temp_part.streams.emplace_back(std::move(stream));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
auto finalizer = out->finalizePartAsync(
|
auto finalizer = out->finalizePartAsync(
|
||||||
new_data_part,
|
new_data_part,
|
||||||
data_settings->fsync_after_insert,
|
data_settings->fsync_after_insert,
|
||||||
@ -460,8 +461,6 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeTempPart(
|
|||||||
temp_part.part = new_data_part;
|
temp_part.part = new_data_part;
|
||||||
temp_part.streams.emplace_back(TemporaryPart::Stream{.stream = std::move(out), .finalizer = std::move(finalizer)});
|
temp_part.streams.emplace_back(TemporaryPart::Stream{.stream = std::move(out), .finalizer = std::move(finalizer)});
|
||||||
|
|
||||||
/// out.finish(new_data_part, std::move(written_files), sync_on_insert);
|
|
||||||
|
|
||||||
ProfileEvents::increment(ProfileEvents::MergeTreeDataWriterRows, block.rows());
|
ProfileEvents::increment(ProfileEvents::MergeTreeDataWriterRows, block.rows());
|
||||||
ProfileEvents::increment(ProfileEvents::MergeTreeDataWriterUncompressedBytes, block.bytes());
|
ProfileEvents::increment(ProfileEvents::MergeTreeDataWriterUncompressedBytes, block.bytes());
|
||||||
ProfileEvents::increment(ProfileEvents::MergeTreeDataWriterCompressedBytes, new_data_part->getBytesOnDisk());
|
ProfileEvents::increment(ProfileEvents::MergeTreeDataWriterCompressedBytes, new_data_part->getBytesOnDisk());
|
||||||
|
@ -137,6 +137,7 @@ void MergeTreeSink::finishDelayedChunk()
|
|||||||
if (storage.renameTempPartAndAdd(part, context->getCurrentTransaction().get(), &storage.increment, nullptr, storage.getDeduplicationLog(), partition.block_dedup_token))
|
if (storage.renameTempPartAndAdd(part, context->getCurrentTransaction().get(), &storage.increment, nullptr, storage.getDeduplicationLog(), partition.block_dedup_token))
|
||||||
{
|
{
|
||||||
PartLog::addNewPart(storage.getContext(), part, partition.elapsed_ns);
|
PartLog::addNewPart(storage.getContext(), part, partition.elapsed_ns);
|
||||||
|
storage.incrementInsertedPartsProfileEvent(part->getType());
|
||||||
|
|
||||||
/// Initiate async merge - it will be done if it's good time for merge and if there are space in 'background_pool'.
|
/// Initiate async merge - it will be done if it's good time for merge and if there are space in 'background_pool'.
|
||||||
storage.background_operations_assignee.trigger();
|
storage.background_operations_assignee.trigger();
|
||||||
|
@ -261,6 +261,7 @@ void ReplicatedMergeTreeSink::finishDelayedChunk(zkutil::ZooKeeperPtr & zookeepe
|
|||||||
/// Set a special error code if the block is duplicate
|
/// Set a special error code if the block is duplicate
|
||||||
int error = (deduplicate && part->is_duplicate) ? ErrorCodes::INSERT_WAS_DEDUPLICATED : 0;
|
int error = (deduplicate && part->is_duplicate) ? ErrorCodes::INSERT_WAS_DEDUPLICATED : 0;
|
||||||
PartLog::addNewPart(storage.getContext(), part, partition.elapsed_ns, ExecutionStatus(error));
|
PartLog::addNewPart(storage.getContext(), part, partition.elapsed_ns, ExecutionStatus(error));
|
||||||
|
storage.incrementInsertedPartsProfileEvent(part->getType());
|
||||||
}
|
}
|
||||||
catch (...)
|
catch (...)
|
||||||
{
|
{
|
||||||
|
@ -0,0 +1,7 @@
|
|||||||
|
3 1 2
|
||||||
|
2 1 1
|
||||||
|
Compact
|
||||||
|
Compact
|
||||||
|
Wide
|
||||||
|
Compact 1
|
||||||
|
Wide 1
|
@ -0,0 +1,44 @@
|
|||||||
|
DROP TABLE IF EXISTS t_parts_profile_events;
|
||||||
|
|
||||||
|
CREATE TABLE t_parts_profile_events (a UInt32)
|
||||||
|
ENGINE = MergeTree ORDER BY tuple()
|
||||||
|
SETTINGS min_rows_for_wide_part = 10, min_bytes_for_wide_part = 0;
|
||||||
|
|
||||||
|
SYSTEM STOP MERGES t_parts_profile_events;
|
||||||
|
|
||||||
|
SET log_comment = '02306_part_types_profile_events';
|
||||||
|
|
||||||
|
INSERT INTO t_parts_profile_events VALUES (1);
|
||||||
|
INSERT INTO t_parts_profile_events VALUES (1);
|
||||||
|
|
||||||
|
SYSTEM START MERGES t_parts_profile_events;
|
||||||
|
OPTIMIZE TABLE t_parts_profile_events FINAL;
|
||||||
|
SYSTEM STOP MERGES t_parts_profile_events;
|
||||||
|
|
||||||
|
INSERT INTO t_parts_profile_events SELECT number FROM numbers(20);
|
||||||
|
|
||||||
|
SYSTEM START MERGES t_parts_profile_events;
|
||||||
|
OPTIMIZE TABLE t_parts_profile_events FINAL;
|
||||||
|
SYSTEM STOP MERGES t_parts_profile_events;
|
||||||
|
|
||||||
|
SYSTEM FLUSH LOGS;
|
||||||
|
|
||||||
|
SELECT count(), sum(ProfileEvents['InsertedWideParts']), sum(ProfileEvents['InsertedCompactParts'])
|
||||||
|
FROM system.query_log WHERE current_database = currentDatabase()
|
||||||
|
AND log_comment = '02306_part_types_profile_events'
|
||||||
|
AND query ILIKE 'INSERT INTO%' AND type = 'QueryFinish';
|
||||||
|
|
||||||
|
SELECT count(), sum(ProfileEvents['MergedIntoWideParts']), sum(ProfileEvents['MergedIntoCompactParts'])
|
||||||
|
FROM system.query_log WHERE current_database = currentDatabase()
|
||||||
|
AND log_comment = '02306_part_types_profile_events'
|
||||||
|
AND query ILIKE 'OPTIMIZE TABLE%' AND type = 'QueryFinish';
|
||||||
|
|
||||||
|
SELECT part_type FROM system.part_log WHERE database = currentDatabase()
|
||||||
|
AND table = 't_parts_profile_events' AND event_type = 'NewPart'
|
||||||
|
ORDER BY event_time_microseconds;
|
||||||
|
|
||||||
|
SELECT part_type, count() > 0 FROM system.part_log WHERE database = currentDatabase()
|
||||||
|
AND table = 't_parts_profile_events' AND event_type = 'MergeParts'
|
||||||
|
GROUP BY part_type ORDER BY part_type;
|
||||||
|
|
||||||
|
DROP TABLE t_parts_profile_events;
|
Loading…
Reference in New Issue
Block a user