Merge pull request #64130 from ClickHouse/more-instrumentation-index

More instrumentation around index
This commit is contained in:
Alexey Milovidov 2024-05-20 17:26:57 +02:00 committed by GitHub
commit c83f1933a3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 28 additions and 4 deletions

View File

@ -288,8 +288,10 @@
M(HTTPConnectionsTotal, "Total count of all sessions: stored in the pool and actively used right now for http hosts") \
\
M(AddressesActive, "Total count of addresses which are used for creation connections with connection pools") \
M(AddressesBanned, "Total count of addresses which are banned as faulty for creation connections with connection pools") \
M(AddressesBanned, "Total count of addresses which are banned as faulty for creation connections with connection pools") \
\
M(FilteringMarksWithPrimaryKey, "Number of threads currently doing filtering of mark ranges by the primary key") \
M(FilteringMarksWithSecondaryKeys, "Number of threads currently doing filtering of mark ranges by secondary keys") \
#ifdef APPLY_FOR_EXTERNAL_METRICS
#define APPLY_FOR_METRICS(M) APPLY_FOR_BUILTIN_METRICS(M) APPLY_FOR_EXTERNAL_METRICS(M)

View File

@ -26,6 +26,8 @@
#include <Common/escapeForFileName.h>
#include <Common/filesystemHelpers.h>
#include <Common/logger_useful.h>
#include <Common/setThreadName.h>
namespace fs = std::filesystem;
@ -665,6 +667,7 @@ void DatabaseOnDisk::iterateMetadataFiles(ContextPtr local_context, const Iterat
pool.scheduleOrThrowOnError(
[batch, &process_metadata_file, &process_tmp_drop_metadata_file]() mutable
{
setThreadName("DatabaseOnDisk");
for (const auto & file : batch)
if (file.second)
process_metadata_file(file.first);

View File

@ -129,6 +129,7 @@ void DiskObjectStorageRemoteMetadataRestoreHelper::migrateToRestorableSchemaRecu
{
pool.scheduleOrThrowOnError([this, path]
{
setThreadName("BackupWorker");
for (auto it = disk->iterateDirectory(path); it->isValid(); it->next())
migrateFileToRestorableSchema(it->path());
});

View File

@ -695,7 +695,6 @@ String serializeQuery(const IAST & query, size_t max_length)
}
// static
void AsynchronousInsertQueue::processData(
InsertQuery key, InsertDataPtr data, ContextPtr global_context, QueueShardFlushTimeHistory & queue_shard_flush_time_history)
try
@ -705,6 +704,8 @@ try
SCOPE_EXIT(CurrentMetrics::sub(CurrentMetrics::PendingAsyncInsert, data->entries.size()));
setThreadName("AsyncInsertQ");
const auto log = getLogger("AsynchronousInsertQueue");
const auto & insert_query = assert_cast<const ASTInsertQuery &>(*key.query);

View File

@ -53,6 +53,8 @@ namespace CurrentMetrics
extern const Metric MergeTreeDataSelectExecutorThreads;
extern const Metric MergeTreeDataSelectExecutorThreadsActive;
extern const Metric MergeTreeDataSelectExecutorThreadsScheduled;
extern const Metric FilteringMarksWithPrimaryKey;
extern const Metric FilteringMarksWithSecondaryKeys;
}
namespace DB
@ -664,15 +666,22 @@ RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipInd
size_t total_marks_count = part->index_granularity.getMarksCountWithoutFinal();
if (metadata_snapshot->hasPrimaryKey() || part_offset_condition)
{
CurrentMetrics::Increment metric(CurrentMetrics::FilteringMarksWithPrimaryKey);
ranges.ranges = markRangesFromPKRange(part, metadata_snapshot, key_condition, part_offset_condition, settings, log);
}
else if (total_marks_count)
{
ranges.ranges = MarkRanges{{MarkRange{0, total_marks_count}}};
}
sum_marks_pk.fetch_add(ranges.getMarksCount(), std::memory_order_relaxed);
if (!ranges.ranges.empty())
sum_parts_pk.fetch_add(1, std::memory_order_relaxed);
CurrentMetrics::Increment metric(CurrentMetrics::FilteringMarksWithSecondaryKeys);
for (size_t idx = 0; idx < skip_indexes.useful_indices.size(); ++idx)
{
if (ranges.ranges.empty())
@ -733,6 +742,8 @@ RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipInd
num_threads = std::min<size_t>(num_streams, settings.max_threads_for_indexes);
}
LOG_TRACE(log, "Filtering marks by primary and secondary keys");
if (num_threads <= 1)
{
for (size_t part_index = 0; part_index < parts.size(); ++part_index)
@ -740,7 +751,7 @@ RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipInd
}
else
{
/// Parallel loading of data parts.
/// Parallel loading and filtering of data parts.
ThreadPool pool(
CurrentMetrics::MergeTreeDataSelectExecutorThreads,
CurrentMetrics::MergeTreeDataSelectExecutorThreadsActive,
@ -748,8 +759,11 @@ RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipInd
num_threads);
for (size_t part_index = 0; part_index < parts.size(); ++part_index)
{
pool.scheduleOrThrowOnError([&, part_index, thread_group = CurrentThread::getGroup()]
{
setThreadName("MergeTreeIndex");
SCOPE_EXIT_SAFE(
if (thread_group)
CurrentThread::detachFromGroupIfNotDetached();
@ -759,6 +773,7 @@ RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipInd
process_part(part_index);
});
}
pool.wait();
}

View File

@ -141,6 +141,8 @@ public:
if (thread_group)
CurrentThread::attachToGroupIfDetached(thread_group);
setThreadName("SystemReplicas");
try
{
ReplicatedTableStatus status;