mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-16 03:12:43 +00:00
96 lines
2.7 KiB
C++
96 lines
2.7 KiB
C++
#pragma once
|
|
|
|
#include <base/types.h>
|
|
#include <Common/ProfileEvents.h>
|
|
#include <IO/Progress.h>
|
|
#include <Storages/MergeTree/MergeList.h>
|
|
|
|
|
|
namespace ProfileEvents
|
|
{
|
|
extern const Event MergesTimeMilliseconds;
|
|
extern const Event MergedUncompressedBytes;
|
|
extern const Event MergedRows;
|
|
extern const Event Merge;
|
|
}
|
|
|
|
namespace DB
|
|
{
|
|
|
|
/** Progress callback.
|
|
* What it should update:
|
|
* - approximate progress
|
|
* - amount of read rows
|
|
* - various metrics
|
|
* - time elapsed for current merge.
|
|
*/
|
|
|
|
/// Auxiliary struct that for each merge stage stores its current progress.
|
|
/// A stage is: the horizontal stage + a stage for each gathered column (if we are doing a
|
|
/// Vertical merge) or a mutation of a single part. During a single stage all rows are read.
|
|
struct MergeStageProgress
|
|
{
|
|
explicit MergeStageProgress(Float64 weight_)
|
|
: is_first(true) , weight(weight_) {}
|
|
|
|
MergeStageProgress(Float64 initial_progress_, Float64 weight_)
|
|
: initial_progress(initial_progress_), is_first(false), weight(weight_) {}
|
|
|
|
Float64 initial_progress = 0.0;
|
|
bool is_first;
|
|
Float64 weight;
|
|
|
|
UInt64 total_rows = 0;
|
|
UInt64 rows_read = 0;
|
|
};
|
|
|
|
class MergeProgressCallback
|
|
{
|
|
public:
|
|
MergeProgressCallback(
|
|
MergeListElement * merge_list_element_ptr_, UInt64 & watch_prev_elapsed_, MergeStageProgress & stage_)
|
|
: merge_list_element_ptr(merge_list_element_ptr_)
|
|
, watch_prev_elapsed(watch_prev_elapsed_)
|
|
, stage(stage_)
|
|
{
|
|
updateWatch();
|
|
}
|
|
|
|
MergeListElement * merge_list_element_ptr;
|
|
UInt64 & watch_prev_elapsed;
|
|
MergeStageProgress & stage;
|
|
|
|
void updateWatch()
|
|
{
|
|
UInt64 watch_curr_elapsed = merge_list_element_ptr->watch.elapsed();
|
|
ProfileEvents::increment(ProfileEvents::MergesTimeMilliseconds, (watch_curr_elapsed - watch_prev_elapsed) / 1000000);
|
|
watch_prev_elapsed = watch_curr_elapsed;
|
|
}
|
|
|
|
void operator() (const Progress & value)
|
|
{
|
|
ProfileEvents::increment(ProfileEvents::MergedUncompressedBytes, value.read_bytes);
|
|
if (stage.is_first)
|
|
{
|
|
ProfileEvents::increment(ProfileEvents::MergedRows, value.read_rows);
|
|
ProfileEvents::increment(ProfileEvents::Merge);
|
|
}
|
|
updateWatch();
|
|
|
|
merge_list_element_ptr->bytes_read_uncompressed += value.read_bytes;
|
|
if (stage.is_first)
|
|
merge_list_element_ptr->rows_read += value.read_rows;
|
|
|
|
stage.total_rows += value.total_rows_to_read;
|
|
stage.rows_read += value.read_rows;
|
|
if (stage.total_rows > 0)
|
|
{
|
|
merge_list_element_ptr->progress.store(
|
|
stage.initial_progress + stage.weight * stage.rows_read / stage.total_rows,
|
|
std::memory_order_relaxed);
|
|
}
|
|
}
|
|
};
|
|
|
|
}
|