ClickHouse/src/Storages/MergeTree/MergeProgress.h

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

96 lines
2.7 KiB
C++
Raw Normal View History

#pragma once
2021-10-02 08:30:38 +00:00
#include <base/types.h>
#include <Common/ProfileEvents.h>
#include <IO/Progress.h>
#include <Storages/MergeTree/MergeList.h>
namespace ProfileEvents
{
extern const Event MergesTimeMilliseconds;
extern const Event MergedUncompressedBytes;
extern const Event MergedRows;
extern const Event Merge;
}
namespace DB
{
/** Progress callback.
* What it should update:
* - approximate progress
* - amount of read rows
* - various metrics
* - time elapsed for current merge.
*/
/// Auxiliary struct that for each merge stage stores its current progress.
/// A stage is: the horizontal stage + a stage for each gathered column (if we are doing a
/// Vertical merge) or a mutation of a single part. During a single stage all rows are read.
struct MergeStageProgress
{
explicit MergeStageProgress(Float64 weight_)
: is_first(true) , weight(weight_) {}
MergeStageProgress(Float64 initial_progress_, Float64 weight_)
: initial_progress(initial_progress_), is_first(false), weight(weight_) {}
Float64 initial_progress = 0.0;
bool is_first;
Float64 weight;
UInt64 total_rows = 0;
UInt64 rows_read = 0;
};
class MergeProgressCallback
{
public:
MergeProgressCallback(
2021-09-24 13:57:44 +00:00
MergeListElement * merge_list_element_ptr_, UInt64 & watch_prev_elapsed_, MergeStageProgress & stage_)
: merge_list_element_ptr(merge_list_element_ptr_)
, watch_prev_elapsed(watch_prev_elapsed_)
, stage(stage_)
{
updateWatch();
}
2021-09-24 13:57:44 +00:00
MergeListElement * merge_list_element_ptr;
UInt64 & watch_prev_elapsed;
MergeStageProgress & stage;
void updateWatch()
{
2021-09-24 13:57:44 +00:00
UInt64 watch_curr_elapsed = merge_list_element_ptr->watch.elapsed();
ProfileEvents::increment(ProfileEvents::MergesTimeMilliseconds, (watch_curr_elapsed - watch_prev_elapsed) / 1000000);
watch_prev_elapsed = watch_curr_elapsed;
}
void operator() (const Progress & value)
{
ProfileEvents::increment(ProfileEvents::MergedUncompressedBytes, value.read_bytes);
if (stage.is_first)
{
ProfileEvents::increment(ProfileEvents::MergedRows, value.read_rows);
ProfileEvents::increment(ProfileEvents::Merge);
}
updateWatch();
2021-09-24 13:57:44 +00:00
merge_list_element_ptr->bytes_read_uncompressed += value.read_bytes;
if (stage.is_first)
2021-09-24 13:57:44 +00:00
merge_list_element_ptr->rows_read += value.read_rows;
stage.total_rows += value.total_rows_to_read;
stage.rows_read += value.read_rows;
if (stage.total_rows > 0)
{
2021-09-24 13:57:44 +00:00
merge_list_element_ptr->progress.store(
stage.initial_progress + stage.weight * stage.rows_read / stage.total_rows,
std::memory_order_relaxed);
}
}
};
}