Added GraphiteRollupSortedTransform.

This commit is contained in:
Nikolai Kochetov 2020-04-02 17:33:30 +03:00
parent 5b83ca2fb6
commit 20fc52f02b
2 changed files with 660 additions and 0 deletions

View File

@ -0,0 +1,385 @@
#include <Processors/Merges/GraphiteRollupSortedTransform.h>
namespace DB
{
static GraphiteRollupSortedTransform::ColumnsDefinition defineColumns(
const Block & header, const Graphite::Params & params)
{
GraphiteRollupSortedTransform::ColumnsDefinition def;
def.path_column_num = header.getPositionByName(params.path_column_name);
def.time_column_num = header.getPositionByName(params.time_column_name);
def.value_column_num = header.getPositionByName(params.value_column_name);
def.version_column_num = header.getPositionByName(params.version_column_name);
size_t num_columns = header.columns();
for (size_t i = 0; i < num_columns; ++i)
if (i != def.time_column_num && i != def.value_column_num && i != def.version_column_num)
def.unmodified_column_numbers.push_back(i);
return def;
}
GraphiteRollupSortedTransform::GraphiteRollupSortedTransform(
size_t num_inputs, const Block & header,
SortDescription description_, size_t max_block_size,
Graphite::Params params_, time_t time_of_merge_)
: IMergingTransform(num_inputs, header, header, true)
, merged_data(header.cloneEmptyColumns(), false, max_block_size)
, description(std::move(description_))
, source_chunks(num_inputs)
, cursors(num_inputs)
, params(std::move(params_)), time_of_merge(time_of_merge_)
, chunk_allocator(num_inputs + max_row_refs)
{
size_t max_size_of_aggregate_state = 0;
size_t max_alignment_of_aggregate_state = 1;
for (const auto & pattern : params.patterns)
{
if (pattern.function)
{
max_size_of_aggregate_state = std::max(max_size_of_aggregate_state, pattern.function->sizeOfData());
max_alignment_of_aggregate_state = std::max(max_alignment_of_aggregate_state, pattern.function->alignOfData());
}
}
merged_data.allocMemForAggregates(max_size_of_aggregate_state, max_alignment_of_aggregate_state);
columns_definition = defineColumns(header, params);
}
Graphite::RollupRule GraphiteRollupSortedTransform::selectPatternForPath(StringRef path) const
{
const Graphite::Pattern * first_match = &undef_pattern;
for (const auto & pattern : params.patterns)
{
if (!pattern.regexp)
{
/// Default pattern
if (first_match->type == first_match->TypeUndef && pattern.type == pattern.TypeAll)
{
/// There is only default pattern for both retention and aggregation
return std::pair(&pattern, &pattern);
}
if (pattern.type != first_match->type)
{
if (first_match->type == first_match->TypeRetention)
{
return std::pair(first_match, &pattern);
}
if (first_match->type == first_match->TypeAggregation)
{
return std::pair(&pattern, first_match);
}
}
}
else if (pattern.regexp->match(path.data, path.size))
{
/// General pattern with matched path
if (pattern.type == pattern.TypeAll)
{
/// Only for not default patterns with both function and retention parameters
return std::pair(&pattern, &pattern);
}
if (first_match->type == first_match->TypeUndef)
{
first_match = &pattern;
continue;
}
if (pattern.type != first_match->type)
{
if (first_match->type == first_match->TypeRetention)
{
return std::pair(first_match, &pattern);
}
if (first_match->type == first_match->TypeAggregation)
{
return std::pair(&pattern, first_match);
}
}
}
}
return {nullptr, nullptr};
}
UInt32 GraphiteRollupSortedTransform::selectPrecision(const Graphite::Retentions & retentions, time_t time) const
{
static_assert(is_signed_v<time_t>, "time_t must be signed type");
for (const auto & retention : retentions)
{
if (time_of_merge - time >= static_cast<time_t>(retention.age))
return retention.precision;
}
/// No rounding.
return 1;
}
/** Round the unix timestamp to seconds precision.
* In this case, the date should not change. The date is calculated using the local time zone.
*
* If the rounding value is less than an hour,
* then, assuming that time zones that differ from UTC by a non-integer number of hours are not supported,
* just simply round the unix timestamp down to a multiple of 3600.
* And if the rounding value is greater,
* then we will round down the number of seconds from the beginning of the day in the local time zone.
*
* Rounding to more than a day is not supported.
*/
static time_t roundTimeToPrecision(const DateLUTImpl & date_lut, time_t time, UInt32 precision)
{
if (precision <= 3600)
{
return time / precision * precision;
}
else
{
time_t date = date_lut.toDate(time);
time_t remainder = time - date;
return date + remainder / precision * precision;
}
}
void GraphiteRollupSortedTransform::initializeInputs()
{
queue = SortingHeap<SortCursor>(cursors);
is_queue_initialized = true;
}
void GraphiteRollupSortedTransform::consume(Chunk chunk, size_t input_number)
{
updateCursor(std::move(chunk), input_number);
if (is_queue_initialized)
queue.push(cursors[input_number]);
}
void GraphiteRollupSortedTransform::updateCursor(Chunk chunk, size_t source_num)
{
auto num_rows = chunk.getNumRows();
auto columns = chunk.detachColumns();
for (auto & column : columns)
column = column->convertToFullColumnIfConst();
chunk.setColumns(std::move(columns), num_rows);
auto & source_chunk = source_chunks[source_num];
if (source_chunk)
{
source_chunk = chunk_allocator.alloc(std::move(chunk));
cursors[source_num].reset(source_chunk->getColumns(), {});
}
else
{
if (cursors[source_num].has_collation)
throw Exception("Logical error: " + getName() + " does not support collations", ErrorCodes::LOGICAL_ERROR);
source_chunk = chunk_allocator.alloc(std::move(chunk));
cursors[source_num] = SortCursorImpl(source_chunk->getColumns(), description, source_num);
}
source_chunk->all_columns = cursors[source_num].all_columns;
source_chunk->sort_columns = cursors[source_num].sort_columns;
}
void GraphiteRollupSortedTransform::work()
{
merge();
prepareOutputChunk(merged_data);
}
void GraphiteRollupSortedTransform::merge()
{
const DateLUTImpl & date_lut = DateLUT::instance();
/// Take rows in needed order and put them into `merged_data` until we get `max_block_size` rows.
///
/// Variables starting with current_* refer to the rows previously popped from the queue that will
/// contribute towards current output row.
/// Variables starting with next_* refer to the row at the top of the queue.
while (queue.isValid())
{
SortCursor current = queue.current();
StringRef next_path = current->all_columns[columns_definition.path_column_num]->getDataAt(current->pos);
bool new_path = is_first || next_path != current_group_path;
is_first = false;
time_t next_row_time = current->all_columns[columns_definition.time_column_num]->getUInt(current->pos);
/// Is new key before rounding.
bool is_new_key = new_path || next_row_time != current_time;
if (is_new_key)
{
/// Accumulate the row that has maximum version in the previous group of rows with the same key:
if (merged_data.wasGroupStarted())
accumulateRow(current_subgroup_newest_row);
Graphite::RollupRule next_rule = merged_data.currentRule();
if (new_path)
next_rule = selectPatternForPath(next_path);
const Graphite::RetentionPattern * retention_pattern = std::get<0>(next_rule);
time_t next_time_rounded;
if (retention_pattern)
{
UInt32 precision = selectPrecision(retention_pattern->retentions, next_row_time);
next_time_rounded = roundTimeToPrecision(date_lut, next_row_time, precision);
}
else
{
/// If no pattern has matched - take the value as-is.
next_time_rounded = next_row_time;
}
/// Key will be new after rounding. It means new result row.
bool will_be_new_key = new_path || next_time_rounded != current_time_rounded;
if (will_be_new_key)
{
if (merged_data.wasGroupStarted())
{
finishCurrentGroup();
/// We have enough rows - return, but don't advance the loop. At the beginning of the
/// next call to merge() the same next_cursor will be processed once more and
/// the next output row will be created from it.
if (merged_data.hasEnoughRows())
return;
}
/// At this point previous row has been fully processed, so we can advance the loop
/// (substitute current_* values for next_*, advance the cursor).
startNextGroup(current, next_rule);
current_time_rounded = next_time_rounded;
}
current_time = next_row_time;
}
/// Within all rows with same key, we should leave only one row with maximum version;
/// and for rows with same maximum version - only last row.
if (is_new_key
|| current->all_columns[columns_definition.version_column_num]->compareAt(
current->pos, current_subgroup_newest_row.row_num,
*(*current_subgroup_newest_row.all_columns)[columns_definition.version_column_num],
/* nan_direction_hint = */ 1) >= 0)
{
current_subgroup_newest_row.set(current, source_chunks[current.impl->order]);
/// Small hack: group and subgroups have the same path, so we can set current_group_path here instead of startNextGroup
/// But since we keep in memory current_subgroup_newest_row's block, we could use StringRef for current_group_path and don't
/// make deep copy of the path.
current_group_path = next_path;
}
if (!current->isLast())
{
queue.next();
}
else
{
/// We get the next block from the appropriate source, if there is one.
queue.removeTop();
requestDataForInput(current.impl->order);
return;
}
}
/// Write result row for the last group.
if (merged_data.wasGroupStarted())
{
accumulateRow(current_subgroup_newest_row);
finishCurrentGroup();
}
is_finished = true;
}
void GraphiteRollupSortedTransform::startNextGroup(SortCursor & cursor, Graphite::RollupRule next_rule)
{
merged_data.startNextGroup(cursor->all_columns, cursor->pos, next_rule, columns_definition);
}
void GraphiteRollupSortedTransform::finishCurrentGroup()
{
merged_data.insertRow(current_time_rounded, current_subgroup_newest_row, columns_definition);
}
void GraphiteRollupSortedTransform::accumulateRow(RowRef & row)
{
merged_data.accumulateRow(row, columns_definition);
}
void GraphiteRollupSortedTransform::GraphiteRollupMergedData::startNextGroup(
const ColumnRawPtrs & raw_columns, size_t row,
Graphite::RollupRule next_rule, ColumnsDefinition & def)
{
const Graphite::AggregationPattern * aggregation_pattern = std::get<1>(next_rule);
/// Copy unmodified column values (including path column).
for (size_t j : def.unmodified_column_numbers)
columns[j]->insertFrom(*raw_columns[j], row);
if (aggregation_pattern)
{
aggregation_pattern->function->create(place_for_aggregate_state.data());
aggregate_state_created = true;
}
current_rule = next_rule;
was_group_started = true;
}
void GraphiteRollupSortedTransform::GraphiteRollupMergedData::insertRow(
time_t time, RowRef & row, ColumnsDefinition & def)
{
/// Insert calculated values of the columns `time`, `value`, `version`.
columns[def.time_column_num]->insert(time);
auto & row_ref_version_column = (*row.all_columns)[def.version_column_num];
columns[def.version_column_num]->insertFrom(*row_ref_version_column, row.row_num);
auto & value_column = columns[def.value_column_num];
const Graphite::AggregationPattern * aggregation_pattern = std::get<1>(current_rule);
if (aggregate_state_created)
{
aggregation_pattern->function->insertResultInto(place_for_aggregate_state.data(), *value_column);
aggregation_pattern->function->destroy(place_for_aggregate_state.data());
aggregate_state_created = false;
}
else
value_column->insertFrom(*(*row.all_columns)[def.value_column_num], row.row_num);
++total_merged_rows;
++merged_rows;
/// TODO: sum_blocks_granularity += block_size;
was_group_started = false;
}
void GraphiteRollupSortedTransform::GraphiteRollupMergedData::accumulateRow(RowRef & row, ColumnsDefinition & def)
{
const Graphite::AggregationPattern * aggregation_pattern = std::get<1>(current_rule);
if (aggregate_state_created)
{
auto & column = (*row.all_columns)[def.value_column_num];
aggregation_pattern->function->add(place_for_aggregate_state.data(), &column, row.row_num, nullptr);
}
}
GraphiteRollupSortedTransform::GraphiteRollupMergedData::~GraphiteRollupMergedData()
{
if (aggregate_state_created)
std::get<1>(current_rule)->function->destroy(place_for_aggregate_state.data());
}
}

View File

@ -0,0 +1,275 @@
#pragma once
#include <Processors/Merges/IMergingTransform.h>
#include <Processors/Merges/RowRef.h>
#include <Processors/Merges/MergedData.h>
#include <AggregateFunctions/IAggregateFunction.h>
#include <Columns/ColumnAggregateFunction.h>
#include <Common/OptimizedRegularExpression.h>
#include <Common/AlignedBuffer.h>
#include <Core/SortDescription.h>
#include <Core/SortCursor.h>
#include <common/logger_useful.h>
namespace DB
{
/** Intended for implementation of "rollup" - aggregation (rounding) of older data
* for a table with Graphite data (Graphite is the system for time series monitoring).
*
* Table with graphite data has at least the following columns (accurate to the name):
* Path, Time, Value, Version
*
* Path - name of metric (sensor);
* Time - time of measurement;
* Value - value of measurement;
* Version - a number, that for equal pairs of Path and Time, need to leave only record with maximum version.
*
* Each row in a table correspond to one value of one sensor.
*
* Pattern should contain function, retention scheme, or both of them. The order of patterns does mean as well:
* * Aggregation OR retention patterns should be first
* * Then aggregation AND retention full patterns have to be placed
* * default pattern without regexp must be the last
*
* Rollup rules are specified in the following way:
*
* pattern
* regexp
* function
* pattern
* regexp
* age -> precision
* age -> precision
* ...
* pattern
* regexp
* function
* age -> precision
* age -> precision
* ...
* pattern
* ...
* default
* function
* age -> precision
* ...
*
* regexp - pattern for sensor name
* default - if no pattern has matched
*
* age - minimal data age (in seconds), to start rounding with specified precision.
* precision - rounding precision (in seconds)
*
* function - name of aggregate function to be applied for values, that time was rounded to same.
*
* Example:
*
* <graphite_rollup>
* <pattern>
* <regexp>\.max$</regexp>
* <function>max</function>
* </pattern>
* <pattern>
* <regexp>click_cost</regexp>
* <function>any</function>
* <retention>
* <age>0</age>
* <precision>5</precision>
* </retention>
* <retention>
* <age>86400</age>
* <precision>60</precision>
* </retention>
* </pattern>
* <default>
* <function>max</function>
* <retention>
* <age>0</age>
* <precision>60</precision>
* </retention>
* <retention>
* <age>3600</age>
* <precision>300</precision>
* </retention>
* <retention>
* <age>86400</age>
* <precision>3600</precision>
* </retention>
* </default>
* </graphite_rollup>
*/
namespace Graphite
{
struct Retention
{
UInt32 age;
UInt32 precision;
};
using Retentions = std::vector<Retention>;
struct Pattern
{
std::shared_ptr<OptimizedRegularExpression> regexp;
std::string regexp_str;
AggregateFunctionPtr function;
Retentions retentions; /// Must be ordered by 'age' descending.
enum { TypeUndef, TypeRetention, TypeAggregation, TypeAll } type = TypeAll; /// The type of defined pattern, filled automatically
};
using Patterns = std::vector<Pattern>;
using RetentionPattern = Pattern;
using AggregationPattern = Pattern;
struct Params
{
String config_name;
String path_column_name;
String time_column_name;
String value_column_name;
String version_column_name;
Graphite::Patterns patterns;
};
using RollupRule = std::pair<const RetentionPattern *, const AggregationPattern *>;
}
/** Merges several sorted ports into one.
*
* For each group of consecutive identical values of the `path` column,
* and the same `time` values, rounded to some precision
* (where rounding accuracy depends on the template set for `path`
* and the amount of time elapsed from `time` to the specified time),
* keeps one line,
* performing the rounding of time,
* merge `value` values using the specified aggregate functions,
* as well as keeping the maximum value of the `version` column.
*/
class GraphiteRollupSortedTransform : public IMergingTransform
{
public:
GraphiteRollupSortedTransform(
size_t num_inputs, const Block & header,
SortDescription description_, size_t max_block_size,
Graphite::Params params_, time_t time_of_merge_);
String getName() const override { return "GraphiteRollupSortedTransform"; }
void work() override;
struct ColumnsDefinition
{
size_t path_column_num;
size_t time_column_num;
size_t value_column_num;
size_t version_column_num;
/// All columns other than 'time', 'value', 'version'. They are unmodified during rollup.
ColumnNumbers unmodified_column_numbers;
};
using RowRef = detail::RowRefWithOwnedChunk;
/// Specialization for SummingSortedTransform.
class GraphiteRollupMergedData : public MergedData
{
public:
using MergedData::MergedData;
~GraphiteRollupMergedData();
void startNextGroup(const ColumnRawPtrs & raw_columns, size_t row,
Graphite::RollupRule next_rule, ColumnsDefinition & def);
void insertRow(time_t time, RowRef & row, ColumnsDefinition & def);
void accumulateRow(RowRef & row, ColumnsDefinition & def);
bool wasGroupStarted() const { return was_group_started; }
const Graphite::RollupRule & currentRule() const { return current_rule; }
void allocMemForAggregates(size_t size, size_t alignment) { place_for_aggregate_state.reset(size, alignment); }
private:
Graphite::RollupRule current_rule = {nullptr, nullptr};
AlignedBuffer place_for_aggregate_state;
bool aggregate_state_created = false; /// Invariant: if true then current_rule is not NULL.
bool was_group_started = false;
};
protected:
void initializeInputs() override;
void consume(Chunk chunk, size_t input_number) override;
private:
Logger * log = &Logger::get("GraphiteRollupSortedBlockInputStream");
GraphiteRollupMergedData merged_data;
SortDescription description;
/// Chunks currently being merged.
using SourceChunks = std::vector<detail::SharedChunkPtr>;
SourceChunks source_chunks;
SortCursorImpls cursors;
SortingHeap<SortCursor> queue;
bool is_queue_initialized = false;
const Graphite::Params params;
ColumnsDefinition columns_definition;
time_t time_of_merge;
/// No data has been read.
bool is_first = true;
/* | path | time | rounded_time | version | value | unmodified |
* -----------------------------------------------------------------------------------
* | A | 11 | 10 | 1 | 1 | a | |
* | A | 11 | 10 | 3 | 2 | b |> subgroup(A, 11) |
* | A | 11 | 10 | 2 | 3 | c | |> group(A, 10)
* ----------------------------------------------------------------------------------|>
* | A | 12 | 10 | 0 | 4 | d | |> Outputs (A, 10, avg(2, 5), a)
* | A | 12 | 10 | 1 | 5 | e |> subgroup(A, 12) |
* -----------------------------------------------------------------------------------
* | A | 21 | 20 | 1 | 6 | f |
* | B | 11 | 10 | 1 | 7 | g |
* ...
*/
/// Path name of current bucket
StringRef current_group_path;
static constexpr size_t max_row_refs = 2; /// current_subgroup_newest_row, current_row.
/// Last row with maximum version for current primary key (time bucket).
RowRef current_subgroup_newest_row;
detail::SharedChunkAllocator chunk_allocator;
/// Time of last read row
time_t current_time = 0;
time_t current_time_rounded = 0;
const Graphite::Pattern undef_pattern =
{ /// temporary empty pattern for selectPatternForPath
.regexp = nullptr,
.regexp_str = "",
.function = nullptr,
.retentions = DB::Graphite::Retentions(),
.type = undef_pattern.TypeUndef,
};
Graphite::RollupRule selectPatternForPath(StringRef path) const;
UInt32 selectPrecision(const Graphite::Retentions & retentions, time_t time) const;
/// Insert the values into the resulting columns, which will not be changed in the future.
void startNextGroup(SortCursor & cursor, Graphite::RollupRule next_rule);
/// Insert the calculated `time`, `value`, `version` values into the resulting columns by the last group of rows.
void finishCurrentGroup();
/// Update the state of the aggregate function with the new `value`.
void accumulateRow(RowRef & row);
void merge();
void updateCursor(Chunk chunk, size_t source_num);
};
}