Merge pull request #25122 from msaf1980/rollup_rules_type

GraphiteMergeTree Rollup rules type
This commit is contained in:
Mikhail f. Shiryaev 2021-12-07 10:18:08 +01:00 committed by GitHub
commit 5fc20b3e6a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 2705 additions and 266 deletions

View File

@ -48,7 +48,10 @@ struct StringRef
std::string toString() const { return std::string(data, size); }
explicit operator std::string() const { return toString(); }
constexpr explicit operator std::string_view() const { return {data, size}; }
std::string_view toView() const { return std::string_view(data, size); }
constexpr explicit operator std::string_view() const { return std::string_view(data, size); }
};
/// Here constexpr doesn't implicate inline, see https://www.viva64.com/en/w/v1043/

View File

@ -547,6 +547,7 @@ if (ENABLE_TESTS AND USE_GTEST)
clickhouse_parsers
clickhouse_storages_system
dbms
clickhouse_common_config
clickhouse_common_zookeeper
string_utils)

View File

@ -0,0 +1,7 @@
#include "gtest_global_context.h"
const ContextHolder & getContext()
{
static ContextHolder holder;
return holder;
}

View File

@ -18,8 +18,4 @@ struct ContextHolder
ContextHolder(ContextHolder &&) = default;
};
inline const ContextHolder & getContext()
{
static ContextHolder holder;
return holder;
}
const ContextHolder & getContext();

View File

@ -0,0 +1,493 @@
#include <base/find_symbols.h>
#include <Interpreters/Context.h>
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <AggregateFunctions/parseAggregateFunctionParameters.h>
#include <DataTypes/DataTypesNumber.h>
#include <Processors/Merges/Algorithms/Graphite.h>
#include <string_view>
#include <vector>
#include <unordered_map>
#include <fmt/format.h>
#include <Poco/Util/AbstractConfiguration.h>
using namespace std::literals;
namespace DB::ErrorCodes
{
extern const int NOT_IMPLEMENTED;
extern const int BAD_ARGUMENTS;
extern const int UNKNOWN_ELEMENT_IN_CONFIG;
extern const int NO_ELEMENTS_IN_CONFIG;
}
namespace DB::Graphite
{
static std::unordered_map<RuleType, const String> ruleTypeMap =
{
{ RuleTypeAll, "all" },
{ RuleTypePlain, "plain" },
{ RuleTypeTagged, "tagged"},
{ RuleTypeTagList, "tag_list"}
};
const String & ruleTypeStr(RuleType rule_type)
{
try
{
return ruleTypeMap.at(rule_type);
}
catch (...)
{
throw Exception("invalid rule type: " + std::to_string(rule_type), DB::ErrorCodes::BAD_ARGUMENTS);
}
}
RuleType ruleType(const String & s)
{
if (s == "all")
return RuleTypeAll;
else if (s == "plain")
return RuleTypePlain;
else if (s == "tagged")
return RuleTypeTagged;
else if (s == "tag_list")
return RuleTypeTagList;
else
throw Exception("invalid rule type: " + s, DB::ErrorCodes::BAD_ARGUMENTS);
}
static const Graphite::Pattern undef_pattern =
{ /// empty pattern for selectPatternForPath
.rule_type = RuleTypeAll,
.regexp = nullptr,
.regexp_str = "",
.function = nullptr,
.retentions = Graphite::Retentions(),
.type = undef_pattern.TypeUndef,
};
inline static const Patterns & selectPatternsForMetricType(const Graphite::Params & params, const StringRef path)
{
if (params.patterns_typed)
{
std::string_view path_view = path.toView();
if (path_view.find("?"sv) == path_view.npos)
return params.patterns_plain;
else
return params.patterns_tagged;
}
else
{
return params.patterns;
}
}
Graphite::RollupRule selectPatternForPath(
const Graphite::Params & params,
const StringRef path)
{
const Graphite::Pattern * first_match = &undef_pattern;
const Patterns & patterns_check = selectPatternsForMetricType(params, path);
for (const auto & pattern : patterns_check)
{
if (!pattern.regexp)
{
/// Default pattern
if (first_match->type == first_match->TypeUndef && pattern.type == pattern.TypeAll)
{
/// There is only default pattern for both retention and aggregation
return std::pair(&pattern, &pattern);
}
if (pattern.type != first_match->type)
{
if (first_match->type == first_match->TypeRetention)
{
return std::pair(first_match, &pattern);
}
if (first_match->type == first_match->TypeAggregation)
{
return std::pair(&pattern, first_match);
}
}
}
else
{
if (pattern.regexp->match(path.data, path.size))
{
/// General pattern with matched path
if (pattern.type == pattern.TypeAll)
{
/// Only for not default patterns with both function and retention parameters
return std::pair(&pattern, &pattern);
}
if (first_match->type == first_match->TypeUndef)
{
first_match = &pattern;
continue;
}
if (pattern.type != first_match->type)
{
if (first_match->type == first_match->TypeRetention)
{
return std::pair(first_match, &pattern);
}
if (first_match->type == first_match->TypeAggregation)
{
return std::pair(&pattern, first_match);
}
}
}
}
}
return {nullptr, nullptr};
}
/** Is used to order Graphite::Retentions by age and precision descending.
* Throws exception if not both age and precision are less or greater then another.
*/
static bool compareRetentions(const Retention & a, const Retention & b)
{
if (a.age > b.age && a.precision > b.precision)
{
return true;
}
else if (a.age < b.age && a.precision < b.precision)
{
return false;
}
String error_msg = "age and precision should only grow up: "
+ std::to_string(a.age) + ":" + std::to_string(a.precision) + " vs "
+ std::to_string(b.age) + ":" + std::to_string(b.precision);
throw Exception(
error_msg,
DB::ErrorCodes::BAD_ARGUMENTS);
}
bool operator==(const Retention & a, const Retention & b)
{
return a.age == b.age && a.precision == b.precision;
}
std::ostream & operator<<(std::ostream & stream, const Retentions & a)
{
stream << "{ ";
for (size_t i = 0; i < a.size(); i++)
{
if (i > 0)
stream << ",";
stream << " { age = " << a[i].age << ", precision = " << a[i].precision << " }";
}
stream << " }";
return stream;
}
bool operator==(const Pattern & a, const Pattern & b)
{
// equal
// Retentions retentions; /// Must be ordered by 'age' descending.
if (a.type != b.type || a.regexp_str != b.regexp_str || a.rule_type != b.rule_type)
return false;
if (a.function == nullptr)
{
if (b.function != nullptr)
return false;
}
else if (b.function == nullptr)
{
return false;
}
else if (a.function->getName() != b.function->getName())
{
return false;
}
return a.retentions == b.retentions;
}
std::ostream & operator<<(std::ostream & stream, const Pattern & a)
{
stream << "{ rule_type = " << ruleTypeStr(a.rule_type);
if (!a.regexp_str.empty())
stream << ", regexp = '" << a.regexp_str << "'";
if (a.function != nullptr)
stream << ", function = " << a.function->getName();
if (!a.retentions.empty())
{
stream << ",\n retentions = {\n";
for (size_t i = 0; i < a.retentions.size(); i++)
{
stream << " { " << a.retentions[i].age << ", " << a.retentions[i].precision << " }";
if (i < a.retentions.size() - 1)
stream << ",";
stream << "\n";
}
stream << " }\n";
}
else
stream << " ";
stream << "}";
return stream;
}
std::string buildTaggedRegex(std::string regexp_str)
{
/*
* tags list in format (for name or any value can use regexp, alphabet sorting not needed)
* spaces are not stiped and used as tag and value part
* name must be first (if used)
*
* tag1=value1; tag2=VALUE2_REGEX;tag3=value3
* or
* name;tag1=value1;tag2=VALUE2_REGEX;tag3=value3
* or for one tag
* tag1=value1
*
* Resulting regex against metric like
* name?tag1=value1&tag2=value2
*
* So,
*
* name
* produce
* name\?
*
* tag2=val2
* produce
* [\?&]tag2=val2(&.*)?$
*
* nam.* ; tag1=val1 ; tag2=val2
* produce
* nam.*\?(.*&)?tag1=val1&(.*&)?tag2=val2(&.*)?$
*/
std::vector<std::string> tags;
splitInto<';'>(tags, regexp_str);
/* remove empthy elements */
using namespace std::string_literals;
tags.erase(std::remove(tags.begin(), tags.end(), ""s), tags.end());
if (tags[0].find('=') == tags[0].npos)
{
if (tags.size() == 1) /* only name */
return "^" + tags[0] + "\\?";
/* start with name value */
regexp_str = "^" + tags[0] + "\\?(.*&)?";
tags.erase(std::begin(tags));
}
else
regexp_str = "[\\?&]";
std::sort(std::begin(tags), std::end(tags)); /* sorted tag keys */
regexp_str += fmt::format(
"{}{}",
fmt::join(tags, "&(.*&)?"),
"(&.*)?$" /* close regex */
);
return regexp_str;
}
/** Read the settings for Graphite rollup from config.
* Example
*
* <graphite_rollup>
* <path_column_name>Path</path_column_name>
* <pattern>
* <regexp>click_cost</regexp>
* <function>any</function>
* <retention>
* <age>0</age>
* <precision>3600</precision>
* </retention>
* <retention>
* <age>86400</age>
* <precision>60</precision>
* </retention>
* </pattern>
* <default>
* <function>max</function>
* <retention>
* <age>0</age>
* <precision>60</precision>
* </retention>
* <retention>
* <age>3600</age>
* <precision>300</precision>
* </retention>
* <retention>
* <age>86400</age>
* <precision>3600</precision>
* </retention>
* </default>
* </graphite_rollup>
*/
static const Pattern &
appendGraphitePattern(
const Poco::Util::AbstractConfiguration & config,
const String & config_element, Patterns & patterns,
bool default_rule,
ContextPtr context)
{
Pattern pattern;
Poco::Util::AbstractConfiguration::Keys keys;
config.keys(config_element, keys);
for (const auto & key : keys)
{
if (key == "regexp")
{
pattern.regexp_str = config.getString(config_element + ".regexp");
}
else if (key == "function")
{
String aggregate_function_name_with_params = config.getString(config_element + ".function");
String aggregate_function_name;
Array params_row;
getAggregateFunctionNameAndParametersArray(
aggregate_function_name_with_params, aggregate_function_name, params_row, "GraphiteMergeTree storage initialization", context);
/// TODO Not only Float64
AggregateFunctionProperties properties;
pattern.function = AggregateFunctionFactory::instance().get(
aggregate_function_name, {std::make_shared<DataTypeFloat64>()}, params_row, properties);
}
else if (key == "rule_type")
{
String rule_type = config.getString(config_element + ".rule_type");
pattern.rule_type = ruleType(rule_type);
}
else if (startsWith(key, "retention"))
{
pattern.retentions.emplace_back(Graphite::Retention{
.age = config.getUInt(config_element + "." + key + ".age"),
.precision = config.getUInt(config_element + "." + key + ".precision")});
}
else
throw Exception("Unknown element in config: " + key, DB::ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
}
if (!pattern.regexp_str.empty())
{
if (pattern.rule_type == RuleTypeTagList)
{
// construct tagged regexp
pattern.regexp_str = buildTaggedRegex(pattern.regexp_str);
pattern.rule_type = RuleTypeTagged;
}
pattern.regexp = std::make_shared<OptimizedRegularExpression>(pattern.regexp_str);
}
if (!pattern.function && pattern.retentions.empty())
throw Exception(
"At least one of an aggregate function or retention rules is mandatory for rollup patterns in GraphiteMergeTree",
DB::ErrorCodes::NO_ELEMENTS_IN_CONFIG);
if (default_rule && pattern.rule_type != RuleTypeAll)
{
throw Exception(
"Default must have rule_type all for rollup patterns in GraphiteMergeTree",
DB::ErrorCodes::BAD_ARGUMENTS);
}
if (!pattern.function)
{
pattern.type = pattern.TypeRetention;
}
else if (pattern.retentions.empty())
{
pattern.type = pattern.TypeAggregation;
}
else
{
pattern.type = pattern.TypeAll;
}
if (pattern.type & pattern.TypeAggregation) /// TypeAggregation or TypeAll
if (pattern.function->allocatesMemoryInArena())
throw Exception(
"Aggregate function " + pattern.function->getName() + " isn't supported in GraphiteMergeTree", DB::ErrorCodes::NOT_IMPLEMENTED);
/// retention should be in descending order of age.
if (pattern.type & pattern.TypeRetention) /// TypeRetention or TypeAll
std::sort(pattern.retentions.begin(), pattern.retentions.end(), compareRetentions);
patterns.emplace_back(pattern);
return patterns.back();
}
void setGraphitePatternsFromConfig(ContextPtr context, const String & config_element, Graphite::Params & params)
{
const auto & config = context->getConfigRef();
if (!config.has(config_element))
throw Exception("No '" + config_element + "' element in configuration file", ErrorCodes::NO_ELEMENTS_IN_CONFIG);
params.config_name = config_element;
params.path_column_name = config.getString(config_element + ".path_column_name", "Path");
params.time_column_name = config.getString(config_element + ".time_column_name", "Time");
params.value_column_name = config.getString(config_element + ".value_column_name", "Value");
params.version_column_name = config.getString(config_element + ".version_column_name", "Timestamp");
params.patterns_typed = false;
Poco::Util::AbstractConfiguration::Keys keys;
config.keys(config_element, keys);
for (const auto & key : keys)
{
if (startsWith(key, "pattern"))
{
if (appendGraphitePattern(config, config_element + "." + key, params.patterns, false, context).rule_type != RuleTypeAll)
params.patterns_typed = true;
}
else if (key == "default")
{
/// See below.
}
else if (key == "path_column_name" || key == "time_column_name" || key == "value_column_name" || key == "version_column_name")
{
/// See above.
}
else
throw Exception("Unknown element in config: " + key, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
}
if (config.has(config_element + ".default"))
appendGraphitePattern(config, config_element + "." + ".default", params.patterns, true, context);
for (const auto & pattern : params.patterns)
{
if (pattern.rule_type == RuleTypeAll)
{
if (params.patterns_typed)
{
params.patterns_plain.push_back(pattern);
params.patterns_tagged.push_back(pattern);
}
}
else if (pattern.rule_type == RuleTypePlain)
{
params.patterns_plain.push_back(pattern);
}
else if (pattern.rule_type == RuleTypeTagged)
{
params.patterns_tagged.push_back(pattern);
}
else
{
throw Exception("Unhandled rule_type in config: " + ruleTypeStr(pattern.rule_type), ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
}
}
}
}

View File

@ -1,13 +1,8 @@
#pragma once
#include <base/StringRef.h>
#include <Common/OptimizedRegularExpression.h>
namespace DB
{
class IAggregateFunction;
using AggregateFunctionPtr = std::shared_ptr<const IAggregateFunction>;
}
#include <AggregateFunctions/IAggregateFunction.h>
/** Intended for implementation of "rollup" - aggregation (rounding) of older data
* for a table with Graphite data (Graphite is the system for time series monitoring).
@ -97,16 +92,32 @@ using AggregateFunctionPtr = std::shared_ptr<const IAggregateFunction>;
namespace DB::Graphite
{
// sync with rule_types_str
enum RuleType
{
RuleTypeAll = 0, // default, with regex, compatible with old scheme
RuleTypePlain = 1, // plain metrics, with regex, compatible with old scheme
RuleTypeTagged = 2, // tagged metrics, with regex, compatible with old scheme
RuleTypeTagList = 3 // tagged metrics, with regex (converted to RuleTypeTagged from string like 'retention=10min ; env=(staging|prod)')
};
const String & ruleTypeStr(RuleType rule_type);
struct Retention
{
UInt32 age;
UInt32 precision;
};
bool operator==(const Retention & a, const Retention & b);
using Retentions = std::vector<Retention>;
std::ostream &operator<<(std::ostream & stream, const Retentions & a);
struct Pattern
{
RuleType rule_type = RuleTypeAll;
std::shared_ptr<OptimizedRegularExpression> regexp;
std::string regexp_str;
AggregateFunctionPtr function;
@ -114,6 +125,9 @@ struct Pattern
enum { TypeUndef, TypeRetention, TypeAggregation, TypeAll } type = TypeAll; /// The type of defined pattern, filled automatically
};
bool operator==(const Pattern & a, const Pattern & b);
std::ostream &operator<<(std::ostream & stream, const Pattern & a);
using Patterns = std::vector<Pattern>;
using RetentionPattern = Pattern;
using AggregationPattern = Pattern;
@ -125,9 +139,16 @@ struct Params
String time_column_name;
String value_column_name;
String version_column_name;
bool patterns_typed;
Graphite::Patterns patterns;
Graphite::Patterns patterns_plain;
Graphite::Patterns patterns_tagged;
};
using RollupRule = std::pair<const RetentionPattern *, const AggregationPattern *>;
Graphite::RollupRule selectPatternForPath(const Graphite::Params & params, const StringRef path);
void setGraphitePatternsFromConfig(ContextPtr context, const String & config_element, Graphite::Params & params);
}

View File

@ -1,3 +1,4 @@
#include <Processors/Merges/Algorithms/Graphite.h>
#include <Processors/Merges/Algorithms/GraphiteRollupSortedAlgorithm.h>
#include <AggregateFunctions/IAggregateFunction.h>
#include <base/DateLUTImpl.h>
@ -52,62 +53,6 @@ GraphiteRollupSortedAlgorithm::GraphiteRollupSortedAlgorithm(
columns_definition = defineColumns(header, params);
}
Graphite::RollupRule GraphiteRollupSortedAlgorithm::selectPatternForPath(StringRef path) const
{
const Graphite::Pattern * first_match = &undef_pattern;
for (const auto & pattern : params.patterns)
{
if (!pattern.regexp)
{
/// Default pattern
if (first_match->type == first_match->TypeUndef && pattern.type == pattern.TypeAll)
{
/// There is only default pattern for both retention and aggregation
return std::pair(&pattern, &pattern);
}
if (pattern.type != first_match->type)
{
if (first_match->type == first_match->TypeRetention)
{
return std::pair(first_match, &pattern);
}
if (first_match->type == first_match->TypeAggregation)
{
return std::pair(&pattern, first_match);
}
}
}
else if (pattern.regexp->match(path.data, path.size))
{
/// General pattern with matched path
if (pattern.type == pattern.TypeAll)
{
/// Only for not default patterns with both function and retention parameters
return std::pair(&pattern, &pattern);
}
if (first_match->type == first_match->TypeUndef)
{
first_match = &pattern;
continue;
}
if (pattern.type != first_match->type)
{
if (first_match->type == first_match->TypeRetention)
{
return std::pair(first_match, &pattern);
}
if (first_match->type == first_match->TypeAggregation)
{
return std::pair(&pattern, first_match);
}
}
}
}
return {nullptr, nullptr};
}
UInt32 GraphiteRollupSortedAlgorithm::selectPrecision(const Graphite::Retentions & retentions, time_t time) const
{
static_assert(is_signed_v<time_t>, "time_t must be signed type");
@ -188,7 +133,7 @@ IMergingAlgorithm::Status GraphiteRollupSortedAlgorithm::merge()
Graphite::RollupRule next_rule = merged_data.currentRule();
if (new_path)
next_rule = selectPatternForPath(next_path);
next_rule = selectPatternForPath(this->params, next_path);
const Graphite::RetentionPattern * retention_pattern = std::get<0>(next_rule);
time_t next_time_rounded;

View File

@ -102,16 +102,6 @@ private:
time_t current_time = 0;
time_t current_time_rounded = 0;
const Graphite::Pattern undef_pattern =
{ /// temporary empty pattern for selectPatternForPath
.regexp = nullptr,
.regexp_str = "",
.function = nullptr,
.retentions = DB::Graphite::Retentions(),
.type = undef_pattern.TypeUndef,
};
Graphite::RollupRule selectPatternForPath(StringRef path) const;
UInt32 selectPrecision(const Graphite::Retentions & retentions, time_t time) const;
/// Insert the values into the resulting columns, which will not be changed in the future.

View File

@ -0,0 +1,597 @@
#include <cstring>
#include <filesystem>
#include <fstream>
#include <stdexcept>
#include <gtest/gtest.h>
#include <Common/tests/gtest_global_context.h>
#include <Common/tests/gtest_global_register.h>
#include <AggregateFunctions/IAggregateFunction.h>
#include <AggregateFunctions/registerAggregateFunctions.h>
#include <Processors/Merges/Algorithms/Graphite.h>
#include <Common/Config/ConfigProcessor.h>
using namespace DB;
static int regAggregateFunctions = 0;
void tryRegisterAggregateFunctions()
{
if (!regAggregateFunctions)
{
registerAggregateFunctions();
regAggregateFunctions = 1;
}
}
static ConfigProcessor::LoadedConfig loadConfiguration(const std::string & config_path)
{
ConfigProcessor config_processor(config_path, true, true);
ConfigProcessor::LoadedConfig config = config_processor.loadConfig(false);
return config;
}
static ConfigProcessor::LoadedConfig loadConfigurationFromString(std::string & s)
{
char tmp_file[19];
strcpy(tmp_file, "/tmp/rollup-XXXXXX");
int fd = mkstemp(tmp_file);
if (fd == -1)
{
throw std::runtime_error(strerror(errno));
}
try {
if (write(fd, s.c_str(), s.size()) < s.size())
{
throw std::runtime_error("unable write to temp file");
}
if (write(fd, "\n", 1) != 1)
{
throw std::runtime_error("unable write to temp file");
}
close(fd);
auto config_path = std::string(tmp_file) + ".xml";
if (std::rename(tmp_file, config_path.c_str()))
{
int err = errno;
remove(tmp_file);
throw std::runtime_error(strerror(err));
}
ConfigProcessor::LoadedConfig config = loadConfiguration(config_path);
remove(tmp_file);
return config;
}
catch (...)
{
remove(tmp_file);
throw;
}
}
static Graphite::Params setGraphitePatterns(ContextMutablePtr context, ConfigProcessor::LoadedConfig & config)
{
context->setConfig(config.configuration);
Graphite::Params params;
setGraphitePatternsFromConfig(context, "graphite_rollup", params);
return params;
}
struct PatternForCheck
{
Graphite::RuleType rule_type;
std::string regexp_str;
String function;
Graphite::Retentions retentions;
};
bool checkRule(const Graphite::Pattern & pattern, const struct PatternForCheck & pattern_check,
const std::string & typ, const std::string & path, std::string & message)
{
bool rule_type_eq = (pattern.rule_type == pattern_check.rule_type);
bool regexp_eq = (pattern.regexp_str == pattern_check.regexp_str);
bool function_eq = (pattern.function == nullptr && pattern_check.function.empty())
|| (pattern.function != nullptr && pattern.function->getName() == pattern_check.function);
bool retentions_eq = (pattern.retentions == pattern_check.retentions);
if (rule_type_eq && regexp_eq && function_eq && retentions_eq)
return true;
message = typ + " rollup rule mismatch for '" + path + "'," +
(rule_type_eq ? "" : "rule_type ") +
(regexp_eq ? "" : "regexp ") +
(function_eq ? "" : "function ") +
(retentions_eq ? "" : "retentions ");
return false;
}
std::ostream & operator<<(std::ostream & stream, const PatternForCheck & a)
{
stream << "{ rule_type = " << ruleTypeStr(a.rule_type);
if (!a.regexp_str.empty())
stream << ", regexp = '" << a.regexp_str << "'";
if (!a.function.empty())
stream << ", function = " << a.function;
if (!a.retentions.empty())
{
stream << ",\n retentions = {\n";
for (size_t i = 0; i < a.retentions.size(); i++)
{
stream << " { " << a.retentions[i].age << ", " << a.retentions[i].precision << " }";
if (i < a.retentions.size() - 1)
stream << ",";
stream << "\n";
}
stream << " }\n";
}
else
stream << " ";
stream << "}";
return stream;
}
struct PatternsForPath
{
std::string path;
PatternForCheck retention_want;
PatternForCheck aggregation_want;
};
TEST(GraphiteTest, testSelectPattern)
{
tryRegisterAggregateFunctions();
using namespace std::literals;
std::string
xml(R"END(<yandex>
<graphite_rollup>
<pattern>
<regexp>\.sum$</regexp>
<function>sum</function>
</pattern>
<pattern>
<regexp>^((.*)|.)sum\?</regexp>
<function>sum</function>
</pattern>
<pattern>
<regexp>\.max$</regexp>
<function>max</function>
</pattern>
<pattern>
<regexp>^((.*)|.)max\?</regexp>
<function>max</function>
</pattern>
<pattern>
<regexp>\.min$</regexp>
<function>min</function>
</pattern>
<pattern>
<regexp>^((.*)|.)min\?</regexp>
<function>min</function>
</pattern>
<pattern>
<regexp>\.(count|sum|sum_sq)$</regexp>
<function>sum</function>
</pattern>
<pattern>
<regexp>^((.*)|.)(count|sum|sum_sq)\?</regexp>
<function>sum</function>
</pattern>
<pattern>
<regexp>^retention\.</regexp>
<retention>
<age>0</age>
<precision>60</precision>
</retention>
<retention>
<age>86400</age>
<precision>3600</precision>
</retention>
</pattern>
<default>
<function>avg</function>
<retention>
<age>0</age>
<precision>60</precision>
</retention>
<retention>
<age>3600</age>
<precision>300</precision>
</retention>
<retention>
<age>86400</age>
<precision>3600</precision>
</retention>
</default>
</graphite_rollup>
</yandex>
)END");
// Retentions must be ordered by 'age' descending.
std::vector<struct PatternsForPath> tests
{
{
"test.sum",
{ Graphite::RuleTypeAll, "", "avg", { { 86400, 3600 }, { 3600, 300 }, { 0, 60 } } }, //default
{ Graphite::RuleTypeAll, R"END(\.sum$)END", "sum", { } }
},
{
"val.sum?env=test&tag=Fake3",
{ Graphite::RuleTypeAll, "", "avg", { { 86400, 3600 }, { 3600, 300 }, { 0, 60 } } }, //default
{ Graphite::RuleTypeAll, R"END(^((.*)|.)sum\?)END", "sum", { } }
},
{
"test.max",
{ Graphite::RuleTypeAll, "", "avg", { { 86400, 3600 }, { 3600, 300 }, { 0, 60 } } }, //default
{ Graphite::RuleTypeAll, R"END(\.max$)END", "max", { } },
},
{
"val.max?env=test&tag=Fake4",
{ Graphite::RuleTypeAll, "", "avg", { { 86400, 3600 }, { 3600, 300 }, { 0, 60 } } }, //default
{ Graphite::RuleTypeAll, R"END(^((.*)|.)max\?)END", "max", { } },
},
{
"test.min",
{ Graphite::RuleTypeAll, "", "avg", { { 86400, 3600 }, { 3600, 300 }, { 0, 60 } } }, //default
{ Graphite::RuleTypeAll, R"END(\.min$)END", "min", { } },
},
{
"val.min?env=test&tag=Fake5",
{ Graphite::RuleTypeAll, "", "avg", { { 86400, 3600 }, { 3600, 300 }, { 0, 60 } } }, //default
{ Graphite::RuleTypeAll, R"END(^((.*)|.)min\?)END", "min", { } },
},
{
"retention.count",
{ Graphite::RuleTypeAll, R"END(^retention\.)END", "", { { 86400, 3600 }, { 0, 60 } } }, // ^retention
{ Graphite::RuleTypeAll, R"END(\.(count|sum|sum_sq)$)END", "sum", { } },
},
{
"val.retention.count?env=test&tag=Fake5",
{ Graphite::RuleTypeAll, "", "avg", { { 86400, 3600 }, { 3600, 300 }, { 0, 60 } } }, //default
{ Graphite::RuleTypeAll, R"END(^((.*)|.)(count|sum|sum_sq)\?)END", "sum", { } },
},
{
"val.count?env=test&tag=Fake5",
{ Graphite::RuleTypeAll, "", "avg", { { 86400, 3600 }, { 3600, 300 }, { 0, 60 } } }, //default
{ Graphite::RuleTypeAll, R"END(^((.*)|.)(count|sum|sum_sq)\?)END", "sum", { } },
},
{
"test.p95",
{ Graphite::RuleTypeAll, "", "avg", { { 86400, 3600 }, { 3600, 300 }, { 0, 60 } } }, //default
{ Graphite::RuleTypeAll, "", "avg", { { 86400, 3600 }, { 3600, 300 }, { 0, 60 } } }, //default
},
{
"val.p95?env=test&tag=FakeNo",
{ Graphite::RuleTypeAll, "", "avg", { { 86400, 3600 }, { 3600, 300 }, { 0, 60 } } }, //default
{ Graphite::RuleTypeAll, "", "avg", { { 86400, 3600 }, { 3600, 300 }, { 0, 60 } } }, //default
},
{
"default",
{ Graphite::RuleTypeAll, "", "avg", { { 86400, 3600 }, { 3600, 300 }, { 0, 60 } } }, //default
{ Graphite::RuleTypeAll, "", "avg", { { 86400, 3600 }, { 3600, 300 }, { 0, 60 } } }, //default
},
{
"val.default?env=test&tag=FakeNo",
{ Graphite::RuleTypeAll, "", "avg", { { 86400, 3600 }, { 3600, 300 }, { 0, 60 } } }, //default
{ Graphite::RuleTypeAll, "", "avg", { { 86400, 3600 }, { 3600, 300 }, { 0, 60 } } }, //default
}
};
auto config = loadConfigurationFromString(xml);
ContextMutablePtr context = getContext().context;
Graphite::Params params = setGraphitePatterns(context, config);
for (const auto & t : tests)
{
auto rule = DB::Graphite::selectPatternForPath(params, t.path);
std:: string message;
if (!checkRule(*rule.first, t.retention_want, "retention", t.path, message))
ADD_FAILURE() << message << ", got\n" << *rule.first << "\n, want\n" << t.retention_want << "\n";
if (!checkRule(*rule.second, t.aggregation_want, "aggregation", t.path, message))
ADD_FAILURE() << message << ", got\n" << *rule.second << "\n, want\n" << t.aggregation_want << "\n";
}
}
namespace DB::Graphite
{
std::string buildTaggedRegex(std::string regexp_str);
}
struct RegexCheck
{
std::string regex;
std::string regex_want;
std::string match;
std::string nomatch;
};
TEST(GraphiteTest, testBuildTaggedRegex)
{
std::vector<struct RegexCheck> tests
{
{
"cpu\\.loadavg;project=DB.*;env=st.*",
R"END(^cpu\.loadavg\?(.*&)?env=st.*&(.*&)?project=DB.*(&.*)?$)END",
R"END(cpu.loadavg?env=staging&project=DBAAS)END",
R"END(cpu.loadavg?env=staging&project=D)END"
},
{
R"END(project=DB.*;env=staging;)END",
R"END([\?&]env=staging&(.*&)?project=DB.*(&.*)?$)END",
R"END(cpu.loadavg?env=staging&project=DBPG)END",
R"END(cpu.loadavg?env=stagingN&project=DBAAS)END"
},
{
"env=staging;",
R"END([\?&]env=staging(&.*)?$)END",
R"END(cpu.loadavg?env=staging&project=DPG)END",
R"END(cpu.loadavg?env=stagingN)END"
},
{
" env = staging ;", // spaces are allowed,
R"END([\?&] env = staging (&.*)?$)END",
R"END(cpu.loadavg? env = staging &project=DPG)END",
R"END(cpu.loadavg?env=stagingN)END"
},
{
"name;",
R"END(^name\?)END",
R"END(name?env=staging&project=DPG)END",
R"END(nameN?env=stagingN)END",
},
{
"name",
R"END(^name\?)END",
R"END(name?env=staging&project=DPG)END",
R"END(nameN?env=stagingN)END",
}
};
for (const auto & t : tests)
{
auto s = DB::Graphite::buildTaggedRegex(t.regex);
EXPECT_EQ(t.regex_want, s) << "result for '" << t.regex_want << "' mismatch";
auto regexp = OptimizedRegularExpression(s);
EXPECT_TRUE(regexp.match(t.match.data(), t.match.size())) << t.match << " match for '" << s << "' failed";
EXPECT_FALSE(regexp.match(t.nomatch.data(), t.nomatch.size())) << t.nomatch << " ! match for '" << s << "' failed";
}
}
TEST(GraphiteTest, testSelectPatternTyped)
{
tryRegisterAggregateFunctions();
using namespace std::literals;
std::string
xml(R"END(<yandex>
<graphite_rollup>
<pattern>
<rule_type>plain</rule_type>
<regexp>\.sum$</regexp>
<function>sum</function>
</pattern>
<pattern>
<rule_type>tagged</rule_type>
<regexp>^((.*)|.)sum\?</regexp>
<function>sum</function>
</pattern>
<pattern>
<rule_type>plain</rule_type>
<regexp>\.max$</regexp>
<function>max</function>
</pattern>
<pattern>
<rule_type>tagged</rule_type>
<regexp>^((.*)|.)max\?</regexp>
<function>max</function>
</pattern>
<pattern>
<rule_type>plain</rule_type>
<regexp>\.min$</regexp>
<function>min</function>
</pattern>
<pattern>
<rule_type>tagged</rule_type>
<regexp>^((.*)|.)min\?</regexp>
<function>min</function>
</pattern>
<pattern>
<rule_type>plain</rule_type>
<regexp>\.(count|sum|sum_sq)$</regexp>
<function>sum</function>
</pattern>
<pattern>
<rule_type>tagged</rule_type>
<regexp>^((.*)|.)(count|sum|sum_sq)\?</regexp>
<function>sum</function>
</pattern>
<pattern>
<rule_type>plain</rule_type>
<regexp>^retention\.</regexp>
<retention>
<age>0</age>
<precision>60</precision>
</retention>
<retention>
<age>86400</age>
<precision>3600</precision>
</retention>
</pattern>
<pattern>
<rule_type>tagged</rule_type>
<regexp><![CDATA[[\?&]retention=hour(&.*)?$]]></regexp>
<retention>
<age>0</age>
<precision>60</precision>
</retention>
<retention>
<age>86400</age>
<precision>3600</precision>
</retention>
</pattern>
<pattern>
<rule_type>tag_list</rule_type>
<regexp>retention=10min;env=staging</regexp>
<retention>
<age>0</age>
<precision>600</precision>
</retention>
<retention>
<age>86400</age>
<precision>3600</precision>
</retention>
</pattern>
<pattern>
<rule_type>tag_list</rule_type>
<regexp>retention=10min;env=[A-Za-z-]+rod[A-Za-z-]+</regexp>
<retention>
<age>0</age>
<precision>600</precision>
</retention>
<retention>
<age>86400</age>
<precision>3600</precision>
</retention>
</pattern>
<pattern>
<rule_type>tag_list</rule_type>
<regexp>cpu\.loadavg</regexp>
<retention>
<age>0</age>
<precision>600</precision>
</retention>
<retention>
<age>86400</age>
<precision>3600</precision>
</retention>
</pattern>
<default>
<function>avg</function>
<retention>
<age>0</age>
<precision>60</precision>
</retention>
<retention>
<age>3600</age>
<precision>300</precision>
</retention>
<retention>
<age>86400</age>
<precision>3600</precision>
</retention>
</default>
</graphite_rollup>
</yandex>
)END");
// Retentions must be ordered by 'age' descending.
std::vector<PatternsForPath> tests
{
{
"test.sum",
{ Graphite::RuleTypeAll, "", "avg", { { 86400, 3600 }, { 3600, 300 }, { 0, 60 } } }, //default
{ Graphite::RuleTypePlain, R"END(\.sum$)END", "sum", { } }
},
{
"val.sum?env=test&tag=Fake3",
{ Graphite::RuleTypeAll, "", "avg", { { 86400, 3600 }, { 3600, 300 }, { 0, 60 } } }, //default
{ Graphite::RuleTypeTagged, R"END(^((.*)|.)sum\?)END", "sum", { } }
},
{
"test.max",
{ Graphite::RuleTypeAll, "", "avg", { { 86400, 3600 }, { 3600, 300 }, { 0, 60 } } }, //default
{ Graphite::RuleTypePlain, R"END(\.max$)END", "max", { } },
},
{
"val.max?env=test&tag=Fake4",
{ Graphite::RuleTypeAll, "", "avg", { { 86400, 3600 }, { 3600, 300 }, { 0, 60 } } }, //default
{ Graphite::RuleTypeTagged, R"END(^((.*)|.)max\?)END", "max", { } },
},
{
"test.min",
{ Graphite::RuleTypeAll, "", "avg", { { 86400, 3600 }, { 3600, 300 }, { 0, 60 } } }, //default
{ Graphite::RuleTypePlain, R"END(\.min$)END", "min", { } },
},
{
"val.min?env=test&tag=Fake5",
{ Graphite::RuleTypeAll, "", "avg", { { 86400, 3600 }, { 3600, 300 }, { 0, 60 } } }, //default
{ Graphite::RuleTypeTagged, R"END(^((.*)|.)min\?)END", "min", { } },
},
{
"retention.count",
{ Graphite::RuleTypePlain, R"END(^retention\.)END", "", { { 86400, 3600 }, { 0, 60 } } }, // ^retention
{ Graphite::RuleTypePlain, R"END(\.(count|sum|sum_sq)$)END", "sum", { } },
},
{
"val.count?env=test&retention=hour&tag=Fake5",
{ Graphite::RuleTypeTagged, R"END([\?&]retention=hour(&.*)?$)END", "", { { 86400, 3600 }, { 0, 60 } } }, // tagged retention=hour
{ Graphite::RuleTypeTagged, R"END(^((.*)|.)(count|sum|sum_sq)\?)END", "sum", { } },
},
{
"val.count?env=test&retention=hour",
{ Graphite::RuleTypeTagged, R"END([\?&]retention=hour(&.*)?$)END", "", { { 86400, 3600 }, { 0, 60 } } }, // tagged retention=hour
{ Graphite::RuleTypeTagged, R"END(^((.*)|.)(count|sum|sum_sq)\?)END", "sum", { } },
},
{
"val.count?env=staging&retention=10min",
{ Graphite::RuleTypeTagged, R"END([\?&]env=staging&(.*&)?retention=10min(&.*)?$)END", "", { { 86400, 3600 }, { 0, 600 } } }, // retention=10min ; env=staging
{ Graphite::RuleTypeTagged, R"END(^((.*)|.)(count|sum|sum_sq)\?)END", "sum", { } },
},
{
"val.count?env=production&retention=10min",
{ Graphite::RuleTypeTagged, R"END([\?&]env=[A-Za-z-]+rod[A-Za-z-]+&(.*&)?retention=10min(&.*)?$)END", "", { { 86400, 3600 }, { 0, 600 } } }, // retention=10min ; env=[A-Za-z-]+rod[A-Za-z-]+
{ Graphite::RuleTypeTagged, R"END(^((.*)|.)(count|sum|sum_sq)\?)END", "sum", { } },
},
{
"val.count?env=test&tag=Fake5",
{ Graphite::RuleTypeAll, "", "avg", { { 86400, 3600 }, { 3600, 300 }, { 0, 60 } } }, //default
{ Graphite::RuleTypeTagged, R"END(^((.*)|.)(count|sum|sum_sq)\?)END", "sum", { } },
},
{
"cpu.loadavg?env=test&tag=FakeNo",
{ Graphite::RuleTypeTagged, R"END(^cpu\.loadavg\?)END", "", { { 86400, 3600 }, { 0, 600 } } }, // name=cpu\.loadavg
{ Graphite::RuleTypeAll, "", "avg", { { 86400, 3600 }, { 3600, 300 }, { 0, 60 } } },
},
{
"test.p95",
{ Graphite::RuleTypeAll, "", "avg", { { 86400, 3600 }, { 3600, 300 }, { 0, 60 } } }, //default
{ Graphite::RuleTypeAll, "", "avg", { { 86400, 3600 }, { 3600, 300 }, { 0, 60 } } }, //default
},
{
"val.p95?env=test&tag=FakeNo",
{ Graphite::RuleTypeAll, "", "avg", { { 86400, 3600 }, { 3600, 300 }, { 0, 60 } } }, //default
{ Graphite::RuleTypeAll, "", "avg", { { 86400, 3600 }, { 3600, 300 }, { 0, 60 } } }, //default
},
{
"default",
{ Graphite::RuleTypeAll, "", "avg", { { 86400, 3600 }, { 3600, 300 }, { 0, 60 } } }, //default
{ Graphite::RuleTypeAll, "", "avg", { { 86400, 3600 }, { 3600, 300 }, { 0, 60 } } }, //default
},
{
"val.default?env=test&tag=FakeNo",
{ Graphite::RuleTypeAll, "", "avg", { { 86400, 3600 }, { 3600, 300 }, { 0, 60 } } }, //default
{ Graphite::RuleTypeAll, "", "avg", { { 86400, 3600 }, { 3600, 300 }, { 0, 60 } } }, //default
}
};
auto config = loadConfigurationFromString(xml);
ContextMutablePtr context = getContext().context;
Graphite::Params params = setGraphitePatterns(context, config);
for (const auto & t : tests)
{
auto rule = DB::Graphite::selectPatternForPath(params, t.path);
std:: string message;
if (!checkRule(*rule.first, t.retention_want, "retention", t.path, message))
ADD_FAILURE() << message << ", got\n" << *rule.first << "\n, want\n" << t.retention_want << "\n";
if (!checkRule(*rule.second, t.aggregation_want, "aggregation", t.path, message))
ADD_FAILURE() << message << ", got\n" << *rule.second << "\n, want\n" << t.aggregation_want << "\n";
}
}

View File

@ -22,17 +22,13 @@
#include <Interpreters/Context.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Databases/DatabaseReplicatedHelpers.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
extern const int BAD_ARGUMENTS;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int UNKNOWN_ELEMENT_IN_CONFIG;
extern const int NO_ELEMENTS_IN_CONFIG;
extern const int UNKNOWN_STORAGE;
extern const int NO_REPLICA_NAME_GIVEN;
}
@ -62,171 +58,6 @@ static Names extractColumnNames(const ASTPtr & node)
}
}
/** Is used to order Graphite::Retentions by age and precision descending.
* Throws exception if not both age and precision are less or greater then another.
*/
static bool compareRetentions(const Graphite::Retention & a, const Graphite::Retention & b)
{
if (a.age > b.age && a.precision > b.precision)
{
return true;
}
else if (a.age < b.age && a.precision < b.precision)
{
return false;
}
String error_msg = "age and precision should only grow up: "
+ std::to_string(a.age) + ":" + std::to_string(a.precision) + " vs "
+ std::to_string(b.age) + ":" + std::to_string(b.precision);
throw Exception(
error_msg,
ErrorCodes::BAD_ARGUMENTS);
}
/** Read the settings for Graphite rollup from config.
* Example
*
* <graphite_rollup>
* <path_column_name>Path</path_column_name>
* <pattern>
* <regexp>click_cost</regexp>
* <function>any</function>
* <retention>
* <age>0</age>
* <precision>3600</precision>
* </retention>
* <retention>
* <age>86400</age>
* <precision>60</precision>
* </retention>
* </pattern>
* <default>
* <function>max</function>
* <retention>
* <age>0</age>
* <precision>60</precision>
* </retention>
* <retention>
* <age>3600</age>
* <precision>300</precision>
* </retention>
* <retention>
* <age>86400</age>
* <precision>3600</precision>
* </retention>
* </default>
* </graphite_rollup>
*/
static void appendGraphitePattern(
const Poco::Util::AbstractConfiguration & config,
const String & config_element,
Graphite::Patterns & out_patterns,
ContextPtr context)
{
Graphite::Pattern pattern;
Poco::Util::AbstractConfiguration::Keys keys;
config.keys(config_element, keys);
for (const auto & key : keys)
{
if (key == "regexp")
{
pattern.regexp_str = config.getString(config_element + ".regexp");
pattern.regexp = std::make_shared<OptimizedRegularExpression>(pattern.regexp_str);
}
else if (key == "function")
{
String aggregate_function_name_with_params = config.getString(config_element + ".function");
String aggregate_function_name;
Array params_row;
getAggregateFunctionNameAndParametersArray(
aggregate_function_name_with_params, aggregate_function_name, params_row, "GraphiteMergeTree storage initialization", context);
/// TODO Not only Float64
AggregateFunctionProperties properties;
pattern.function = AggregateFunctionFactory::instance().get(
aggregate_function_name, {std::make_shared<DataTypeFloat64>()}, params_row, properties);
}
else if (startsWith(key, "retention"))
{
pattern.retentions.emplace_back(Graphite::Retention{
.age = config.getUInt(config_element + "." + key + ".age"),
.precision = config.getUInt(config_element + "." + key + ".precision")});
}
else
throw Exception("Unknown element in config: " + key, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
}
if (!pattern.function && pattern.retentions.empty())
throw Exception(
"At least one of an aggregate function or retention rules is mandatory for rollup patterns in GraphiteMergeTree",
ErrorCodes::NO_ELEMENTS_IN_CONFIG);
if (!pattern.function)
{
pattern.type = pattern.TypeRetention;
}
else if (pattern.retentions.empty())
{
pattern.type = pattern.TypeAggregation;
}
else
{
pattern.type = pattern.TypeAll;
}
if (pattern.type & pattern.TypeAggregation) /// TypeAggregation or TypeAll
if (pattern.function->allocatesMemoryInArena())
throw Exception(
"Aggregate function " + pattern.function->getName() + " isn't supported in GraphiteMergeTree", ErrorCodes::NOT_IMPLEMENTED);
/// retention should be in descending order of age.
if (pattern.type & pattern.TypeRetention) /// TypeRetention or TypeAll
std::sort(pattern.retentions.begin(), pattern.retentions.end(), compareRetentions);
out_patterns.emplace_back(pattern);
}
static void setGraphitePatternsFromConfig(ContextPtr context, const String & config_element, Graphite::Params & params)
{
const auto & config = context->getConfigRef();
if (!config.has(config_element))
throw Exception("No '" + config_element + "' element in configuration file", ErrorCodes::NO_ELEMENTS_IN_CONFIG);
params.config_name = config_element;
params.path_column_name = config.getString(config_element + ".path_column_name", "Path");
params.time_column_name = config.getString(config_element + ".time_column_name", "Time");
params.value_column_name = config.getString(config_element + ".value_column_name", "Value");
params.version_column_name = config.getString(config_element + ".version_column_name", "Timestamp");
Poco::Util::AbstractConfiguration::Keys keys;
config.keys(config_element, keys);
for (const auto & key : keys)
{
if (startsWith(key, "pattern"))
{
appendGraphitePattern(config, config_element + "." + key, params.patterns, context);
}
else if (key == "default")
{
/// See below.
}
else if (key == "path_column_name" || key == "time_column_name" || key == "value_column_name" || key == "version_column_name")
{
/// See above.
}
else
throw Exception("Unknown element in config: " + key, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
}
if (config.has(config_element + ".default"))
appendGraphitePattern(config, config_element + "." + ".default", params.patterns, context);
}
static String getMergeTreeVerboseHelp(bool)
{
using namespace std::string_literals;
@ -542,12 +373,6 @@ static StoragePtr create(const StorageFactory::Arguments & args)
/// to make possible copying metadata files between replicas.
Macros::MacroExpansionInfo info;
info.table_id = args.table_id;
if (is_replicated_database)
{
auto database = DatabaseCatalog::instance().getDatabase(args.table_id.database_name);
info.shard = getReplicatedDatabaseShardName(database);
info.replica = getReplicatedDatabaseReplicaName(database);
}
if (!allow_uuid_macro)
info.table_id.uuid = UUIDHelpers::Nil;
zookeeper_path = args.getContext()->getMacros()->expand(zookeeper_path, info);

View File

@ -10,6 +10,7 @@ NamesAndTypesList StorageSystemGraphite::getNamesAndTypes()
{
return {
{"config_name", std::make_shared<DataTypeString>()},
{"rule_type", std::make_shared<DataTypeString>()},
{"regexp", std::make_shared<DataTypeString>()},
{"function", std::make_shared<DataTypeString>()},
{"age", std::make_shared<DataTypeUInt64>()},
@ -85,6 +86,7 @@ void StorageSystemGraphite::fillData(MutableColumns & res_columns, ContextPtr co
bool is_default = pattern.regexp == nullptr;
String regexp;
String function;
const String & rule_type = ruleTypeStr(pattern.rule_type);
if (is_default)
{
@ -107,6 +109,7 @@ void StorageSystemGraphite::fillData(MutableColumns & res_columns, ContextPtr co
{
size_t i = 0;
res_columns[i++]->insert(config.first);
res_columns[i++]->insert(rule_type);
res_columns[i++]->insert(regexp);
res_columns[i++]->insert(function);
res_columns[i++]->insert(retention.age);
@ -121,6 +124,7 @@ void StorageSystemGraphite::fillData(MutableColumns & res_columns, ContextPtr co
{
size_t i = 0;
res_columns[i++]->insert(config.first);
res_columns[i++]->insert(rule_type);
res_columns[i++]->insert(regexp);
res_columns[i++]->insert(function);
res_columns[i++]->insertDefault();

View File

@ -100,3 +100,19 @@ def exec_query_with_retry(instance, query, retry_count=40, sleep_time=0.5, silen
time.sleep(sleep_time)
else:
raise exception
def csv_compare(result, expected):
csv_result = TSV(result)
csv_expected = TSV(expected)
mismatch = []
max_len = len(csv_result) if len(csv_result) > len(csv_expected) else len(csv_expected)
for i in range(max_len):
if i >= len(csv_result):
mismatch.append("-[%d]=%s" % (i, csv_expected.lines[i]))
elif i >= len(csv_expected):
mismatch.append("+[%d]=%s" % (i, csv_result.lines[i]))
elif csv_expected.lines[i] != csv_result.lines[i]:
mismatch.append("-[%d]=%s" % (i, csv_expected.lines[i]))
mismatch.append("+[%d]=%s" % (i, csv_result.lines[i]))
return "\n".join(mismatch)

View File

@ -6,6 +6,7 @@ import pytest
from helpers.client import QueryRuntimeException
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import TSV
from helpers.test_tools import csv_compare
cluster = ClickHouseCluster(__file__)
instance = cluster.add_instance('instance',
@ -234,18 +235,19 @@ SELECT * FROM test.graphite;
def test_system_graphite_retentions(graphite_table):
expected = '''
graphite_rollup \\\\.count$ sum 0 0 1 0 ['test'] ['graphite']
graphite_rollup \\\\.max$ max 0 0 2 0 ['test'] ['graphite']
graphite_rollup ^five_min\\\\. 31536000 14400 3 0 ['test'] ['graphite']
graphite_rollup ^five_min\\\\. 5184000 3600 3 0 ['test'] ['graphite']
graphite_rollup ^five_min\\\\. 0 300 3 0 ['test'] ['graphite']
graphite_rollup ^one_min avg 31536000 600 4 0 ['test'] ['graphite']
graphite_rollup ^one_min avg 7776000 300 4 0 ['test'] ['graphite']
graphite_rollup ^one_min avg 0 60 4 0 ['test'] ['graphite']
graphite_rollup all \\\\.count$ sum 0 0 1 0 ['test'] ['graphite']
graphite_rollup all \\\\.max$ max 0 0 2 0 ['test'] ['graphite']
graphite_rollup all ^five_min\\\\. 31536000 14400 3 0 ['test'] ['graphite']
graphite_rollup all ^five_min\\\\. 5184000 3600 3 0 ['test'] ['graphite']
graphite_rollup all ^five_min\\\\. 0 300 3 0 ['test'] ['graphite']
graphite_rollup all ^one_min avg 31536000 600 4 0 ['test'] ['graphite']
graphite_rollup all ^one_min avg 7776000 300 4 0 ['test'] ['graphite']
graphite_rollup all ^one_min avg 0 60 4 0 ['test'] ['graphite']
'''
result = q('SELECT * from system.graphite_retentions')
assert TSV(result) == TSV(expected)
mismatch = csv_compare(result, expected)
assert len(mismatch) == 0, f"got\n{result}\nwant\n{expected}\ndiff\n{mismatch}\n"
q('''
DROP TABLE IF EXISTS test.graphite2;

View File

@ -0,0 +1,120 @@
<clickhouse>
<!-- retention scheme for GraphiteMergeTree engine-->
<graphite_rollup>
<path_column_name>metric</path_column_name>
<time_column_name>timestamp</time_column_name>
<value_column_name>value</value_column_name>
<version_column_name>updated</version_column_name>
<pattern>
<rule_type>plain</rule_type>
<regexp>\.count$</regexp>
<function>sum</function>
</pattern>
<pattern>
<rule_type>plain</rule_type>
<regexp>\.max$</regexp>
<function>max</function>
</pattern>
<pattern>
<rule_type>plain</rule_type>
<regexp>^five_min\.</regexp>
<retention>
<age>0</age>
<precision>300</precision>
</retention>
<retention>
<age>5184000</age>
<precision>3600</precision>
</retention>
<retention>
<age>31536000</age>
<precision>14400</precision>
</retention>
</pattern>
<pattern>
<rule_type>plain</rule_type>
<regexp>^one_min</regexp>
<function>avg</function>
<retention>
<age>0</age>
<precision>60</precision>
</retention>
<retention>
<age>7776000</age>
<precision>300</precision>
</retention>
<retention>
<age>31536000</age>
<precision>600</precision>
</retention>
</pattern>
<pattern>
<rule_type>tagged</rule_type>
<regexp><![CDATA[[\?&]retention=one_min(&.*)?$]]></regexp>
<function>avg</function>
<retention>
<age>0</age>
<precision>60</precision>
</retention>
<retention>
<age>7776000</age>
<precision>300</precision>
</retention>
<retention>
<age>31536000</age>
<precision>600</precision>
</retention>
</pattern>
<pattern>
<rule_type>tag_list</rule_type>
<regexp>retention=five_min</regexp>
<function>avg</function>
<retention>
<age>0</age>
<precision>300</precision>
</retention>
<retention>
<age>5184000</age>
<precision>3600</precision>
</retention>
<retention>
<age>31536000</age>
<precision>14400</precision>
</retention>
</pattern>
<pattern>
<rule_type>tagged</rule_type>
<regexp>^for_taggged</regexp>
<function>avg</function>
<retention>
<age>0</age>
<precision>60</precision>
</retention>
<retention>
<age>7776000</age>
<precision>300</precision>
</retention>
<retention>
<age>31536000</age>
<precision>600</precision>
</retention>
</pattern>
<pattern>
<rule_type>all</rule_type>
<regexp>^ten_min\.</regexp>
<function>sum</function>
<retention>
<age>0</age>
<precision>600</precision>
</retention>
<retention>
<age>5184000</age>
<precision>7200</precision>
</retention>
<retention>
<age>31536000</age>
<precision>28800</precision>
</retention>
</pattern>
</graphite_rollup>
</clickhouse>

View File

@ -0,0 +1,8 @@
<?xml version="1.0"?>
<clickhouse>
<profiles>
<default>
<optimize_on_insert>0</optimize_on_insert>
</default>
</profiles>
</clickhouse>

View File

@ -0,0 +1,580 @@
import datetime
import os.path as p
import time
import sys
import pytest
from helpers.client import QueryRuntimeException
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import TSV
from helpers.test_tools import csv_compare
cluster = ClickHouseCluster(__file__)
instance = cluster.add_instance('instance',
main_configs=['configs/graphite_rollup.xml'],
user_configs=["configs/users.xml"])
q = instance.query
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
q('CREATE DATABASE test')
yield cluster
finally:
cluster.shutdown()
@pytest.fixture
def graphite_table(started_cluster):
q('''
DROP TABLE IF EXISTS test.graphite;
CREATE TABLE test.graphite
(metric String, value Float64, timestamp UInt32, date Date, updated UInt32)
ENGINE = GraphiteMergeTree('graphite_rollup')
PARTITION BY toYYYYMM(date)
ORDER BY (metric, timestamp)
SETTINGS index_granularity=8192;
''')
yield
q('DROP TABLE test.graphite')
def test_rollup_versions_plain(graphite_table):
timestamp = int(time.time())
rounded_timestamp = timestamp - timestamp % 60
date = datetime.date.today().isoformat()
# Insert rows with timestamps relative to the current time so that the
# first retention clause is active.
# Two parts are created.
q('''
INSERT INTO test.graphite (metric, value, timestamp, date, updated)
VALUES ('one_min.x1', 100, {timestamp}, '{date}', 1);
INSERT INTO test.graphite (metric, value, timestamp, date, updated)
VALUES ('one_min.x1', 200, {timestamp}, '{date}', 2);
'''.format(timestamp=timestamp, date=date))
expected1 = '''\
one_min.x1 100 {timestamp} {date} 1
one_min.x1 200 {timestamp} {date} 2
'''.format(timestamp=timestamp, date=date)
assert TSV(
q('SELECT * FROM test.graphite ORDER BY updated')
) == TSV(expected1)
q('OPTIMIZE TABLE test.graphite')
# After rollup only the row with max version is retained.
expected2 = '''\
one_min.x1 200 {timestamp} {date} 2
'''.format(timestamp=rounded_timestamp, date=date)
assert TSV(q('SELECT * FROM test.graphite')) == TSV(expected2)
def test_rollup_versions_tagged(graphite_table):
timestamp = int(time.time())
rounded_timestamp = timestamp - timestamp % 60
date = datetime.date.today().isoformat()
# Insert rows with timestamps relative to the current time so that the
# first retention clause is active.
# Two parts are created.
q('''
INSERT INTO test.graphite (metric, value, timestamp, date, updated)
VALUES ('x1?retention=one_min', 100, {timestamp}, '{date}', 1);
INSERT INTO test.graphite (metric, value, timestamp, date, updated)
VALUES ('x1?retention=one_min', 200, {timestamp}, '{date}', 2);
'''.format(timestamp=timestamp, date=date))
expected1 = '''\
x1?retention=one_min 100 {timestamp} {date} 1
x1?retention=one_min 200 {timestamp} {date} 2
'''.format(timestamp=timestamp, date=date)
result = q('SELECT * FROM test.graphite ORDER BY metric, updated')
mismatch = csv_compare(result, expected1)
assert len(mismatch) == 0, f"got\n{result}\nwant\n{expected1}\ndiff\n{mismatch}\n"
q('OPTIMIZE TABLE test.graphite')
# After rollup only the row with max version is retained.
expected2 = '''\
x1?retention=one_min 200 {timestamp} {date} 2
'''.format(timestamp=rounded_timestamp, date=date)
result = q('SELECT * FROM test.graphite ORDER BY metric, updated')
mismatch = csv_compare(result, expected2)
assert len(mismatch) == 0, f"got\n{result}\nwant\n{expected2}\ndiff\n{mismatch}\n"
def test_rollup_versions_all(graphite_table):
timestamp = int(time.time())
rounded_timestamp = timestamp - timestamp % 600
date = datetime.date.today().isoformat()
# Insert rows with timestamps relative to the current time so that the
# first retention clause is active.
# Two parts are created.
q('''
INSERT INTO test.graphite (metric, value, timestamp, date, updated)
VALUES ('ten_min.x1', 100, {timestamp}, '{date}', 1);
INSERT INTO test.graphite (metric, value, timestamp, date, updated)
VALUES ('ten_min.x1', 200, {timestamp}, '{date}', 2);
INSERT INTO test.graphite (metric, value, timestamp, date, updated)
VALUES ('ten_min.x1?env=staging', 100, {timestamp}, '{date}', 1);
INSERT INTO test.graphite (metric, value, timestamp, date, updated)
VALUES ('ten_min.x1?env=staging', 200, {timestamp}, '{date}', 2);
'''.format(timestamp=timestamp, date=date))
expected1 = '''\
ten_min.x1 100 {timestamp} {date} 1
ten_min.x1 200 {timestamp} {date} 2
ten_min.x1?env=staging 100 {timestamp} {date} 1
ten_min.x1?env=staging 200 {timestamp} {date} 2
'''.format(timestamp=timestamp, date=date)
result = q('SELECT * FROM test.graphite ORDER BY metric, updated')
mismatch = csv_compare(result, expected1)
assert len(mismatch) == 0, f"got\n{result}\nwant\n{expected1}\ndiff\n{mismatch}\n"
q('OPTIMIZE TABLE test.graphite')
# After rollup only the row with max version is retained.
expected2 = '''\
ten_min.x1 200 {timestamp} {date} 2
ten_min.x1?env=staging 200 {timestamp} {date} 2
'''.format(timestamp=rounded_timestamp, date=date)
result = q('SELECT * FROM test.graphite ORDER BY metric, updated')
mismatch = csv_compare(result, expected2)
assert len(mismatch) == 0, f"got\n{result}\nwant\n{expected2}\ndiff\n{mismatch}\n"
def test_rollup_aggregation_plain(graphite_table):
# This query essentially emulates what rollup does.
result1 = q('''
SELECT avg(v), max(upd)
FROM (SELECT timestamp,
argMax(value, (updated, number)) AS v,
max(updated) AS upd
FROM (SELECT 'one_min.x5' AS metric,
toFloat64(number) AS value,
toUInt32(1111111111 + intDiv(number, 3)) AS timestamp,
toDate('2017-02-02') AS date,
toUInt32(intDiv(number, 2)) AS updated,
number
FROM system.numbers LIMIT 1000000)
WHERE intDiv(timestamp, 600) * 600 = 1111444200
GROUP BY timestamp)
''')
expected1 = '''\
999634.9918367347 499999
'''
assert TSV(result1) == TSV(expected1)
# Timestamp 1111111111 is in sufficiently distant past
# so that the last retention clause is active.
result2 = q('''
INSERT INTO test.graphite
SELECT 'one_min.x' AS metric,
toFloat64(number) AS value,
toUInt32(1111111111 + intDiv(number, 3)) AS timestamp,
toDate('2017-02-02') AS date, toUInt32(intDiv(number, 2)) AS updated
FROM (SELECT * FROM system.numbers LIMIT 1000000)
WHERE intDiv(timestamp, 600) * 600 = 1111444200;
OPTIMIZE TABLE test.graphite PARTITION 201702 FINAL;
SELECT * FROM test.graphite;
''')
expected2 = '''\
one_min.x 999634.9918367347 1111444200 2017-02-02 499999
'''
assert TSV(result2) == TSV(expected2)
def test_rollup_aggregation_tagged(graphite_table):
# This query essentially emulates what rollup does.
result1 = q('''
SELECT avg(v), max(upd)
FROM (SELECT timestamp,
argMax(value, (updated, number)) AS v,
max(updated) AS upd
FROM (SELECT 'x?retention=one_min' AS metric,
toFloat64(number) AS value,
toUInt32(1111111111 + intDiv(number, 3)) AS timestamp,
toDate('2017-02-02') AS date,
toUInt32(intDiv(number, 2)) AS updated,
number
FROM system.numbers LIMIT 1000000)
WHERE intDiv(timestamp, 600) * 600 = 1111444200
GROUP BY timestamp)
''')
expected1 = '''\
999634.9918367347 499999
'''
assert TSV(result1) == TSV(expected1)
# Timestamp 1111111111 is in sufficiently distant past
# so that the last retention clause is active.
result2 = q('''
INSERT INTO test.graphite
SELECT 'x?retention=one_min' AS metric,
toFloat64(number) AS value,
toUInt32(1111111111 + intDiv(number, 3)) AS timestamp,
toDate('2017-02-02') AS date, toUInt32(intDiv(number, 2)) AS updated
FROM (SELECT * FROM system.numbers LIMIT 1000000)
WHERE intDiv(timestamp, 600) * 600 = 1111444200;
OPTIMIZE TABLE test.graphite PARTITION 201702 FINAL;
SELECT * FROM test.graphite;
''')
expected2 = '''\
x?retention=one_min 999634.9918367347 1111444200 2017-02-02 499999
'''
assert TSV(result2) == TSV(expected2)
def test_rollup_aggregation_2_plain(graphite_table):
result = q('''
INSERT INTO test.graphite
SELECT 'one_min.x' AS metric,
toFloat64(number) AS value,
toUInt32(1111111111 - intDiv(number, 3)) AS timestamp,
toDate('2017-02-02') AS date,
toUInt32(100 - number) AS updated
FROM (SELECT * FROM system.numbers LIMIT 50);
OPTIMIZE TABLE test.graphite PARTITION 201702 FINAL;
SELECT * FROM test.graphite;
''')
expected = '''\
one_min.x 24 1111110600 2017-02-02 100
'''
assert TSV(result) == TSV(expected)
def test_rollup_aggregation_2_tagged(graphite_table):
result = q('''
INSERT INTO test.graphite
SELECT 'x?retention=one_min' AS metric,
toFloat64(number) AS value,
toUInt32(1111111111 - intDiv(number, 3)) AS timestamp,
toDate('2017-02-02') AS date,
toUInt32(100 - number) AS updated
FROM (SELECT * FROM system.numbers LIMIT 50);
OPTIMIZE TABLE test.graphite PARTITION 201702 FINAL;
SELECT * FROM test.graphite;
''')
expected = '''\
x?retention=one_min 24 1111110600 2017-02-02 100
'''
assert TSV(result) == TSV(expected)
def test_multiple_paths_and_versions_plain(graphite_table):
result = q('''
INSERT INTO test.graphite
SELECT 'one_min.x' AS metric,
toFloat64(number) AS value,
toUInt32(1111111111 + intDiv(number, 3) * 600) AS timestamp,
toDate('2017-02-02') AS date,
toUInt32(100 - number) AS updated
FROM (SELECT * FROM system.numbers LIMIT 50);
OPTIMIZE TABLE test.graphite PARTITION 201702 FINAL;
SELECT * FROM test.graphite;
INSERT INTO test.graphite
SELECT 'one_min.y' AS metric,
toFloat64(number) AS value,
toUInt32(1111111111 + number * 600) AS timestamp,
toDate('2017-02-02') AS date,
toUInt32(100 - number) AS updated
FROM (SELECT * FROM system.numbers LIMIT 50);
OPTIMIZE TABLE test.graphite PARTITION 201702 FINAL;
SELECT * FROM test.graphite;
''')
with open(p.join(p.dirname(__file__),
'test_multiple_paths_and_versions.reference.plain')
) as reference:
assert TSV(result) == TSV(reference)
def test_multiple_paths_and_versions_tagged(graphite_table):
result = q('''
INSERT INTO test.graphite
SELECT 'x?retention=one_min' AS metric,
toFloat64(number) AS value,
toUInt32(1111111111 + intDiv(number, 3) * 600) AS timestamp,
toDate('2017-02-02') AS date,
toUInt32(100 - number) AS updated
FROM (SELECT * FROM system.numbers LIMIT 50);
OPTIMIZE TABLE test.graphite PARTITION 201702 FINAL;
SELECT * FROM test.graphite;
INSERT INTO test.graphite
SELECT 'y?retention=one_min' AS metric,
toFloat64(number) AS value,
toUInt32(1111111111 + number * 600) AS timestamp,
toDate('2017-02-02') AS date,
toUInt32(100 - number) AS updated
FROM (SELECT * FROM system.numbers LIMIT 50);
OPTIMIZE TABLE test.graphite PARTITION 201702 FINAL;
SELECT * FROM test.graphite;
''')
with open(p.join(p.dirname(__file__),
'test_multiple_paths_and_versions.reference.tagged')
) as reference:
assert TSV(result) == TSV(reference)
def test_multiple_output_blocks(graphite_table):
MERGED_BLOCK_SIZE = 8192
to_insert = ''
expected = ''
for i in range(2 * MERGED_BLOCK_SIZE + 1):
rolled_up_time = 1000000200 + 600 * i
for j in range(3):
cur_time = rolled_up_time + 100 * j
to_insert += 'one_min.x1 {} {} 2001-09-09 1\n'.format(
10 * j, cur_time
)
to_insert += 'one_min.x1 {} {} 2001-09-09 2\n'.format(
10 * (j + 1), cur_time
)
expected += 'one_min.x1 20 {} 2001-09-09 2\n'.format(rolled_up_time)
q('INSERT INTO test.graphite FORMAT TSV', to_insert)
result = q('''
OPTIMIZE TABLE test.graphite PARTITION 200109 FINAL;
SELECT * FROM test.graphite;
''')
assert TSV(result) == TSV(expected)
def test_paths_not_matching_any_pattern(graphite_table):
to_insert = '''\
one_min.x1 100 1000000000 2001-09-09 1
zzzzzzzz 100 1000000001 2001-09-09 1
zzzzzzzz 200 1000000001 2001-09-09 2
'''
q('INSERT INTO test.graphite FORMAT TSV', to_insert)
expected = '''\
one_min.x1 100 999999600 2001-09-09 1
zzzzzzzz 200 1000000001 2001-09-09 2
'''
result = q('''
OPTIMIZE TABLE test.graphite PARTITION 200109 FINAL;
SELECT * FROM test.graphite;
''')
assert TSV(result) == TSV(expected)
def test_rules_isolation(graphite_table):
to_insert = '''\
one_min.x1 100 1000000000 2001-09-09 1
for_taggged 100 1000000001 2001-09-09 1
for_taggged 200 1000000001 2001-09-09 2
one_min?env=staging 100 1000000001 2001-09-09 1
one_min?env=staging 200 1000000001 2001-09-09 2
'''
q('INSERT INTO test.graphite FORMAT TSV', to_insert)
expected = '''\
for_taggged 200 1000000001 2001-09-09 2
one_min.x1 100 999999600 2001-09-09 1
one_min?env=staging 200 1000000001 2001-09-09 2
'''
result = q('''
OPTIMIZE TABLE test.graphite PARTITION 200109 FINAL;
SELECT * FROM test.graphite;
''')
result = q('SELECT * FROM test.graphite ORDER BY metric, updated')
mismatch = csv_compare(result, expected)
assert len(mismatch) == 0, f"got\n{result}\nwant\n{expected}\ndiff\n{mismatch}\n"
def test_system_graphite_retentions(graphite_table):
expected = '''
graphite_rollup plain \\\\.count$ sum 0 0 1 0 ['test'] ['graphite']
graphite_rollup plain \\\\.max$ max 0 0 2 0 ['test'] ['graphite']
graphite_rollup plain ^five_min\\\\. 31536000 14400 3 0 ['test'] ['graphite']
graphite_rollup plain ^five_min\\\\. 5184000 3600 3 0 ['test'] ['graphite']
graphite_rollup plain ^five_min\\\\. 0 300 3 0 ['test'] ['graphite']
graphite_rollup plain ^one_min avg 31536000 600 4 0 ['test'] ['graphite']
graphite_rollup plain ^one_min avg 7776000 300 4 0 ['test'] ['graphite']
graphite_rollup plain ^one_min avg 0 60 4 0 ['test'] ['graphite']
graphite_rollup tagged [\\\\?&]retention=one_min(&.*)?$ avg 31536000 600 5 0 ['test'] ['graphite']
graphite_rollup tagged [\\\\?&]retention=one_min(&.*)?$ avg 7776000 300 5 0 ['test'] ['graphite']
graphite_rollup tagged [\\\\?&]retention=one_min(&.*)?$ avg 0 60 5 0 ['test'] ['graphite']
graphite_rollup tagged [\\\\?&]retention=five_min(&.*)?$ avg 31536000 14400 6 0 ['test'] ['graphite']
graphite_rollup tagged [\\\\?&]retention=five_min(&.*)?$ avg 5184000 3600 6 0 ['test'] ['graphite']
graphite_rollup tagged [\\\\?&]retention=five_min(&.*)?$ avg 0 300 6 0 ['test'] ['graphite']
graphite_rollup tagged ^for_taggged avg 31536000 600 7 0 ['test'] ['graphite']
graphite_rollup tagged ^for_taggged avg 7776000 300 7 0 ['test'] ['graphite']
graphite_rollup tagged ^for_taggged avg 0 60 7 0 ['test'] ['graphite']
graphite_rollup all ^ten_min\\\\. sum 31536000 28800 8 0 ['test'] ['graphite']
graphite_rollup all ^ten_min\\\\. sum 5184000 7200 8 0 ['test'] ['graphite']
graphite_rollup all ^ten_min\\\\. sum 0 600 8 0 ['test'] ['graphite']
'''
result = q('SELECT * from system.graphite_retentions')
mismatch = csv_compare(result, expected)
assert len(mismatch) == 0, f"got\n{result}\nwant\n{expected}\ndiff\n{mismatch}\n"
q('''
DROP TABLE IF EXISTS test.graphite2;
CREATE TABLE test.graphite2
(metric String, value Float64, timestamp UInt32, date Date, updated UInt32)
ENGINE = GraphiteMergeTree('graphite_rollup')
PARTITION BY toYYYYMM(date)
ORDER BY (metric, timestamp)
SETTINGS index_granularity=8192;
''')
expected = '''
graphite_rollup ['test','test'] ['graphite','graphite2']
graphite_rollup ['test','test'] ['graphite','graphite2']
graphite_rollup ['test','test'] ['graphite','graphite2']
graphite_rollup ['test','test'] ['graphite','graphite2']
graphite_rollup ['test','test'] ['graphite','graphite2']
graphite_rollup ['test','test'] ['graphite','graphite2']
graphite_rollup ['test','test'] ['graphite','graphite2']
graphite_rollup ['test','test'] ['graphite','graphite2']
'''
result = q('''
SELECT
config_name,
Tables.database,
Tables.table
FROM system.graphite_retentions
''')
assert csv_compare(result, expected), f"got\n{result}\nwant\n{expected}"
def test_path_dangling_pointer(graphite_table):
q('''
DROP TABLE IF EXISTS test.graphite2;
CREATE TABLE test.graphite2
(metric String, value Float64, timestamp UInt32, date Date, updated UInt32)
ENGINE = GraphiteMergeTree('graphite_rollup')
PARTITION BY toYYYYMM(date)
ORDER BY (metric, timestamp)
SETTINGS index_granularity=1;
''')
path = 'abcd' * 4000000 # 16MB
q('INSERT INTO test.graphite2 FORMAT TSV',
"{}\t0.0\t0\t2018-01-01\t100\n".format(path))
q('INSERT INTO test.graphite2 FORMAT TSV',
"{}\t0.0\t0\t2018-01-01\t101\n".format(path))
for version in range(10):
q('INSERT INTO test.graphite2 FORMAT TSV',
"{}\t0.0\t0\t2018-01-01\t{}\n".format(path, version))
while True:
q('OPTIMIZE TABLE test.graphite2 PARTITION 201801 FINAL')
parts = int(q("SELECT count() FROM system.parts "
"WHERE active AND database='test' "
"AND table='graphite2'"))
if parts == 1:
break
print(('Parts', parts))
assert TSV(
q("SELECT value, timestamp, date, updated FROM test.graphite2")
) == TSV("0\t0\t2018-01-01\t101\n")
q('DROP TABLE test.graphite2')
def test_combined_rules(graphite_table):
# 1487970000 ~ Sat 25 Feb 00:00:00 MSK 2017
to_insert = 'INSERT INTO test.graphite VALUES '
expected_unmerged = ''
for i in range(384):
to_insert += "('five_min.count', {v}, {t}, toDate({t}), 1), ".format(
v=1, t=1487970000 + (i * 300)
)
to_insert += "('five_min.max', {v}, {t}, toDate({t}), 1), ".format(
v=i, t=1487970000 + (i * 300)
)
expected_unmerged += ("five_min.count\t{v1}\t{t}\n"
"five_min.max\t{v2}\t{t}\n").format(
v1=1, v2=i,
t=1487970000 + (i * 300)
)
q(to_insert)
assert TSV(q('SELECT metric, value, timestamp FROM test.graphite'
' ORDER BY (timestamp, metric)')) == TSV(expected_unmerged)
q('OPTIMIZE TABLE test.graphite PARTITION 201702 FINAL')
expected_merged = '''
five_min.count 48 1487970000 2017-02-25 1
five_min.count 48 1487984400 2017-02-25 1
five_min.count 48 1487998800 2017-02-25 1
five_min.count 48 1488013200 2017-02-25 1
five_min.count 48 1488027600 2017-02-25 1
five_min.count 48 1488042000 2017-02-25 1
five_min.count 48 1488056400 2017-02-26 1
five_min.count 48 1488070800 2017-02-26 1
five_min.max 47 1487970000 2017-02-25 1
five_min.max 95 1487984400 2017-02-25 1
five_min.max 143 1487998800 2017-02-25 1
five_min.max 191 1488013200 2017-02-25 1
five_min.max 239 1488027600 2017-02-25 1
five_min.max 287 1488042000 2017-02-25 1
five_min.max 335 1488056400 2017-02-26 1
five_min.max 383 1488070800 2017-02-26 1
'''
assert TSV(q('SELECT * FROM test.graphite'
' ORDER BY (metric, timestamp)')) == TSV(expected_merged)

View File

@ -0,0 +1,84 @@
one_min.x 0 1111110600 2017-02-02 100
one_min.x 3 1111111200 2017-02-02 97
one_min.x 6 1111111800 2017-02-02 94
one_min.x 9 1111112400 2017-02-02 91
one_min.x 12 1111113000 2017-02-02 88
one_min.x 15 1111113600 2017-02-02 85
one_min.x 18 1111114200 2017-02-02 82
one_min.x 21 1111114800 2017-02-02 79
one_min.x 24 1111115400 2017-02-02 76
one_min.x 27 1111116000 2017-02-02 73
one_min.x 30 1111116600 2017-02-02 70
one_min.x 33 1111117200 2017-02-02 67
one_min.x 36 1111117800 2017-02-02 64
one_min.x 39 1111118400 2017-02-02 61
one_min.x 42 1111119000 2017-02-02 58
one_min.x 45 1111119600 2017-02-02 55
one_min.x 48 1111120200 2017-02-02 52
one_min.x 0 1111110600 2017-02-02 100
one_min.x 3 1111111200 2017-02-02 97
one_min.x 6 1111111800 2017-02-02 94
one_min.x 9 1111112400 2017-02-02 91
one_min.x 12 1111113000 2017-02-02 88
one_min.x 15 1111113600 2017-02-02 85
one_min.x 18 1111114200 2017-02-02 82
one_min.x 21 1111114800 2017-02-02 79
one_min.x 24 1111115400 2017-02-02 76
one_min.x 27 1111116000 2017-02-02 73
one_min.x 30 1111116600 2017-02-02 70
one_min.x 33 1111117200 2017-02-02 67
one_min.x 36 1111117800 2017-02-02 64
one_min.x 39 1111118400 2017-02-02 61
one_min.x 42 1111119000 2017-02-02 58
one_min.x 45 1111119600 2017-02-02 55
one_min.x 48 1111120200 2017-02-02 52
one_min.y 0 1111110600 2017-02-02 100
one_min.y 1 1111111200 2017-02-02 99
one_min.y 2 1111111800 2017-02-02 98
one_min.y 3 1111112400 2017-02-02 97
one_min.y 4 1111113000 2017-02-02 96
one_min.y 5 1111113600 2017-02-02 95
one_min.y 6 1111114200 2017-02-02 94
one_min.y 7 1111114800 2017-02-02 93
one_min.y 8 1111115400 2017-02-02 92
one_min.y 9 1111116000 2017-02-02 91
one_min.y 10 1111116600 2017-02-02 90
one_min.y 11 1111117200 2017-02-02 89
one_min.y 12 1111117800 2017-02-02 88
one_min.y 13 1111118400 2017-02-02 87
one_min.y 14 1111119000 2017-02-02 86
one_min.y 15 1111119600 2017-02-02 85
one_min.y 16 1111120200 2017-02-02 84
one_min.y 17 1111120800 2017-02-02 83
one_min.y 18 1111121400 2017-02-02 82
one_min.y 19 1111122000 2017-02-02 81
one_min.y 20 1111122600 2017-02-02 80
one_min.y 21 1111123200 2017-02-02 79
one_min.y 22 1111123800 2017-02-02 78
one_min.y 23 1111124400 2017-02-02 77
one_min.y 24 1111125000 2017-02-02 76
one_min.y 25 1111125600 2017-02-02 75
one_min.y 26 1111126200 2017-02-02 74
one_min.y 27 1111126800 2017-02-02 73
one_min.y 28 1111127400 2017-02-02 72
one_min.y 29 1111128000 2017-02-02 71
one_min.y 30 1111128600 2017-02-02 70
one_min.y 31 1111129200 2017-02-02 69
one_min.y 32 1111129800 2017-02-02 68
one_min.y 33 1111130400 2017-02-02 67
one_min.y 34 1111131000 2017-02-02 66
one_min.y 35 1111131600 2017-02-02 65
one_min.y 36 1111132200 2017-02-02 64
one_min.y 37 1111132800 2017-02-02 63
one_min.y 38 1111133400 2017-02-02 62
one_min.y 39 1111134000 2017-02-02 61
one_min.y 40 1111134600 2017-02-02 60
one_min.y 41 1111135200 2017-02-02 59
one_min.y 42 1111135800 2017-02-02 58
one_min.y 43 1111136400 2017-02-02 57
one_min.y 44 1111137000 2017-02-02 56
one_min.y 45 1111137600 2017-02-02 55
one_min.y 46 1111138200 2017-02-02 54
one_min.y 47 1111138800 2017-02-02 53
one_min.y 48 1111139400 2017-02-02 52
one_min.y 49 1111140000 2017-02-02 51

View File

@ -0,0 +1,84 @@
x?retention=one_min 0 1111110600 2017-02-02 100
x?retention=one_min 3 1111111200 2017-02-02 97
x?retention=one_min 6 1111111800 2017-02-02 94
x?retention=one_min 9 1111112400 2017-02-02 91
x?retention=one_min 12 1111113000 2017-02-02 88
x?retention=one_min 15 1111113600 2017-02-02 85
x?retention=one_min 18 1111114200 2017-02-02 82
x?retention=one_min 21 1111114800 2017-02-02 79
x?retention=one_min 24 1111115400 2017-02-02 76
x?retention=one_min 27 1111116000 2017-02-02 73
x?retention=one_min 30 1111116600 2017-02-02 70
x?retention=one_min 33 1111117200 2017-02-02 67
x?retention=one_min 36 1111117800 2017-02-02 64
x?retention=one_min 39 1111118400 2017-02-02 61
x?retention=one_min 42 1111119000 2017-02-02 58
x?retention=one_min 45 1111119600 2017-02-02 55
x?retention=one_min 48 1111120200 2017-02-02 52
x?retention=one_min 0 1111110600 2017-02-02 100
x?retention=one_min 3 1111111200 2017-02-02 97
x?retention=one_min 6 1111111800 2017-02-02 94
x?retention=one_min 9 1111112400 2017-02-02 91
x?retention=one_min 12 1111113000 2017-02-02 88
x?retention=one_min 15 1111113600 2017-02-02 85
x?retention=one_min 18 1111114200 2017-02-02 82
x?retention=one_min 21 1111114800 2017-02-02 79
x?retention=one_min 24 1111115400 2017-02-02 76
x?retention=one_min 27 1111116000 2017-02-02 73
x?retention=one_min 30 1111116600 2017-02-02 70
x?retention=one_min 33 1111117200 2017-02-02 67
x?retention=one_min 36 1111117800 2017-02-02 64
x?retention=one_min 39 1111118400 2017-02-02 61
x?retention=one_min 42 1111119000 2017-02-02 58
x?retention=one_min 45 1111119600 2017-02-02 55
x?retention=one_min 48 1111120200 2017-02-02 52
y?retention=one_min 0 1111110600 2017-02-02 100
y?retention=one_min 1 1111111200 2017-02-02 99
y?retention=one_min 2 1111111800 2017-02-02 98
y?retention=one_min 3 1111112400 2017-02-02 97
y?retention=one_min 4 1111113000 2017-02-02 96
y?retention=one_min 5 1111113600 2017-02-02 95
y?retention=one_min 6 1111114200 2017-02-02 94
y?retention=one_min 7 1111114800 2017-02-02 93
y?retention=one_min 8 1111115400 2017-02-02 92
y?retention=one_min 9 1111116000 2017-02-02 91
y?retention=one_min 10 1111116600 2017-02-02 90
y?retention=one_min 11 1111117200 2017-02-02 89
y?retention=one_min 12 1111117800 2017-02-02 88
y?retention=one_min 13 1111118400 2017-02-02 87
y?retention=one_min 14 1111119000 2017-02-02 86
y?retention=one_min 15 1111119600 2017-02-02 85
y?retention=one_min 16 1111120200 2017-02-02 84
y?retention=one_min 17 1111120800 2017-02-02 83
y?retention=one_min 18 1111121400 2017-02-02 82
y?retention=one_min 19 1111122000 2017-02-02 81
y?retention=one_min 20 1111122600 2017-02-02 80
y?retention=one_min 21 1111123200 2017-02-02 79
y?retention=one_min 22 1111123800 2017-02-02 78
y?retention=one_min 23 1111124400 2017-02-02 77
y?retention=one_min 24 1111125000 2017-02-02 76
y?retention=one_min 25 1111125600 2017-02-02 75
y?retention=one_min 26 1111126200 2017-02-02 74
y?retention=one_min 27 1111126800 2017-02-02 73
y?retention=one_min 28 1111127400 2017-02-02 72
y?retention=one_min 29 1111128000 2017-02-02 71
y?retention=one_min 30 1111128600 2017-02-02 70
y?retention=one_min 31 1111129200 2017-02-02 69
y?retention=one_min 32 1111129800 2017-02-02 68
y?retention=one_min 33 1111130400 2017-02-02 67
y?retention=one_min 34 1111131000 2017-02-02 66
y?retention=one_min 35 1111131600 2017-02-02 65
y?retention=one_min 36 1111132200 2017-02-02 64
y?retention=one_min 37 1111132800 2017-02-02 63
y?retention=one_min 38 1111133400 2017-02-02 62
y?retention=one_min 39 1111134000 2017-02-02 61
y?retention=one_min 40 1111134600 2017-02-02 60
y?retention=one_min 41 1111135200 2017-02-02 59
y?retention=one_min 42 1111135800 2017-02-02 58
y?retention=one_min 43 1111136400 2017-02-02 57
y?retention=one_min 44 1111137000 2017-02-02 56
y?retention=one_min 45 1111137600 2017-02-02 55
y?retention=one_min 46 1111138200 2017-02-02 54
y?retention=one_min 47 1111138800 2017-02-02 53
y?retention=one_min 48 1111139400 2017-02-02 52
y?retention=one_min 49 1111140000 2017-02-02 51

View File

@ -21,7 +21,7 @@ CREATE TABLE system.events\n(\n `event` String,\n `value` UInt64,\n `de
CREATE TABLE system.formats\n(\n `name` String,\n `is_input` UInt8,\n `is_output` UInt8\n)\nENGINE = SystemFormats()\nCOMMENT \'SYSTEM TABLE is built on the fly.\'
CREATE TABLE system.functions\n(\n `name` String,\n `is_aggregate` UInt8,\n `case_insensitive` UInt8,\n `alias_to` String,\n `create_query` String,\n `origin` Enum8(\'System\' = 0, \'SQLUserDefined\' = 1, \'ExecutableUserDefined\' = 2)\n)\nENGINE = SystemFunctions()\nCOMMENT \'SYSTEM TABLE is built on the fly.\'
CREATE TABLE system.grants\n(\n `user_name` Nullable(String),\n `role_name` Nullable(String),\n `access_type` Enum8(\'SQLITE\' = -128, \'ODBC\' = -127, \'JDBC\' = -126, \'HDFS\' = -125, \'S3\' = -124, \'SOURCES\' = -123, \'ALL\' = -122, \'NONE\' = -121, \'SHOW DATABASES\' = 0, \'SHOW TABLES\' = 1, \'SHOW COLUMNS\' = 2, \'SHOW DICTIONARIES\' = 3, \'SHOW\' = 4, \'SELECT\' = 5, \'INSERT\' = 6, \'ALTER UPDATE\' = 7, \'ALTER DELETE\' = 8, \'ALTER ADD COLUMN\' = 9, \'ALTER MODIFY COLUMN\' = 10, \'ALTER DROP COLUMN\' = 11, \'ALTER COMMENT COLUMN\' = 12, \'ALTER CLEAR COLUMN\' = 13, \'ALTER RENAME COLUMN\' = 14, \'ALTER MATERIALIZE COLUMN\' = 15, \'ALTER COLUMN\' = 16, \'ALTER MODIFY COMMENT\' = 17, \'ALTER ORDER BY\' = 18, \'ALTER SAMPLE BY\' = 19, \'ALTER ADD INDEX\' = 20, \'ALTER DROP INDEX\' = 21, \'ALTER MATERIALIZE INDEX\' = 22, \'ALTER CLEAR INDEX\' = 23, \'ALTER INDEX\' = 24, \'ALTER ADD PROJECTION\' = 25, \'ALTER DROP PROJECTION\' = 26, \'ALTER MATERIALIZE PROJECTION\' = 27, \'ALTER CLEAR PROJECTION\' = 28, \'ALTER PROJECTION\' = 29, \'ALTER ADD CONSTRAINT\' = 30, \'ALTER DROP CONSTRAINT\' = 31, \'ALTER CONSTRAINT\' = 32, \'ALTER TTL\' = 33, \'ALTER MATERIALIZE TTL\' = 34, \'ALTER SETTINGS\' = 35, \'ALTER MOVE PARTITION\' = 36, \'ALTER FETCH PARTITION\' = 37, \'ALTER FREEZE PARTITION\' = 38, \'ALTER DATABASE SETTINGS\' = 39, \'ALTER TABLE\' = 40, \'ALTER DATABASE\' = 41, \'ALTER VIEW REFRESH\' = 42, \'ALTER VIEW MODIFY QUERY\' = 43, \'ALTER VIEW\' = 44, \'ALTER\' = 45, \'CREATE DATABASE\' = 46, \'CREATE TABLE\' = 47, \'CREATE VIEW\' = 48, \'CREATE DICTIONARY\' = 49, \'CREATE TEMPORARY TABLE\' = 50, \'CREATE FUNCTION\' = 51, \'CREATE\' = 52, \'DROP DATABASE\' = 53, \'DROP TABLE\' = 54, \'DROP VIEW\' = 55, \'DROP DICTIONARY\' = 56, \'DROP FUNCTION\' = 57, \'DROP\' = 58, \'TRUNCATE\' = 59, \'OPTIMIZE\' = 60, \'KILL QUERY\' = 61, \'MOVE PARTITION BETWEEN SHARDS\' = 62, \'CREATE USER\' = 63, \'ALTER USER\' = 64, \'DROP USER\' = 65, \'CREATE ROLE\' = 66, \'ALTER ROLE\' = 67, \'DROP ROLE\' = 68, \'ROLE ADMIN\' = 69, \'CREATE ROW POLICY\' = 70, \'ALTER ROW POLICY\' = 71, \'DROP ROW POLICY\' = 72, \'CREATE QUOTA\' = 73, \'ALTER QUOTA\' = 74, \'DROP QUOTA\' = 75, \'CREATE SETTINGS PROFILE\' = 76, \'ALTER SETTINGS PROFILE\' = 77, \'DROP SETTINGS PROFILE\' = 78, \'SHOW USERS\' = 79, \'SHOW ROLES\' = 80, \'SHOW ROW POLICIES\' = 81, \'SHOW QUOTAS\' = 82, \'SHOW SETTINGS PROFILES\' = 83, \'SHOW ACCESS\' = 84, \'ACCESS MANAGEMENT\' = 85, \'SYSTEM SHUTDOWN\' = 86, \'SYSTEM DROP DNS CACHE\' = 87, \'SYSTEM DROP MARK CACHE\' = 88, \'SYSTEM DROP UNCOMPRESSED CACHE\' = 89, \'SYSTEM DROP MMAP CACHE\' = 90, \'SYSTEM DROP COMPILED EXPRESSION CACHE\' = 91, \'SYSTEM DROP CACHE\' = 92, \'SYSTEM RELOAD CONFIG\' = 93, \'SYSTEM RELOAD SYMBOLS\' = 94, \'SYSTEM RELOAD DICTIONARY\' = 95, \'SYSTEM RELOAD MODEL\' = 96, \'SYSTEM RELOAD FUNCTION\' = 97, \'SYSTEM RELOAD EMBEDDED DICTIONARIES\' = 98, \'SYSTEM RELOAD\' = 99, \'SYSTEM RESTART DISK\' = 100, \'SYSTEM MERGES\' = 101, \'SYSTEM TTL MERGES\' = 102, \'SYSTEM FETCHES\' = 103, \'SYSTEM MOVES\' = 104, \'SYSTEM DISTRIBUTED SENDS\' = 105, \'SYSTEM REPLICATED SENDS\' = 106, \'SYSTEM SENDS\' = 107, \'SYSTEM REPLICATION QUEUES\' = 108, \'SYSTEM DROP REPLICA\' = 109, \'SYSTEM SYNC REPLICA\' = 110, \'SYSTEM RESTART REPLICA\' = 111, \'SYSTEM RESTORE REPLICA\' = 112, \'SYSTEM FLUSH DISTRIBUTED\' = 113, \'SYSTEM FLUSH LOGS\' = 114, \'SYSTEM FLUSH\' = 115, \'SYSTEM\' = 116, \'dictGet\' = 117, \'addressToLine\' = 118, \'addressToSymbol\' = 119, \'demangle\' = 120, \'INTROSPECTION\' = 121, \'FILE\' = 122, \'URL\' = 123, \'REMOTE\' = 124, \'MONGO\' = 125, \'MYSQL\' = 126, \'POSTGRES\' = 127),\n `database` Nullable(String),\n `table` Nullable(String),\n `column` Nullable(String),\n `is_partial_revoke` UInt8,\n `grant_option` UInt8\n)\nENGINE = SystemGrants()\nCOMMENT \'SYSTEM TABLE is built on the fly.\'
CREATE TABLE system.graphite_retentions\n(\n `config_name` String,\n `regexp` String,\n `function` String,\n `age` UInt64,\n `precision` UInt64,\n `priority` UInt16,\n `is_default` UInt8,\n `Tables.database` Array(String),\n `Tables.table` Array(String)\n)\nENGINE = SystemGraphite()\nCOMMENT \'SYSTEM TABLE is built on the fly.\'
CREATE TABLE system.graphite_retentions\n(\n `config_name` String,\n `rule_type` String,\n `regexp` String,\n `function` String,\n `age` UInt64,\n `precision` UInt64,\n `priority` UInt16,\n `is_default` UInt8,\n `Tables.database` Array(String),\n `Tables.table` Array(String)\n)\nENGINE = SystemGraphite()\nCOMMENT \'SYSTEM TABLE is built on the fly.\'
CREATE TABLE system.licenses\n(\n `library_name` String,\n `license_type` String,\n `license_path` String,\n `license_text` String\n)\nENGINE = SystemLicenses()\nCOMMENT \'SYSTEM TABLE is built on the fly.\'
CREATE TABLE system.macros\n(\n `macro` String,\n `substitution` String\n)\nENGINE = SystemMacros()\nCOMMENT \'SYSTEM TABLE is built on the fly.\'
CREATE TABLE system.merge_tree_settings\n(\n `name` String,\n `value` String,\n `changed` UInt8,\n `description` String,\n `type` String\n)\nENGINE = SystemMergeTreeSettings()\nCOMMENT \'SYSTEM TABLE is built on the fly.\'

View File

@ -32,6 +32,7 @@ if (NOT DEFINED ENABLE_UTILS OR ENABLE_UTILS)
add_subdirectory (wal-dump)
add_subdirectory (check-mysql-binlog)
add_subdirectory (keeper-bench)
add_subdirectory (graphite-rollup)
if (USE_NURAFT)
add_subdirectory (keeper-data-dumper)

View File

@ -0,0 +1,23 @@
add_executable(graphite-rollup-bench graphite-rollup-bench.cpp)
target_link_libraries(
graphite-rollup-bench
PRIVATE
clickhouse_storages_system
clickhouse_aggregate_functions
clickhouse_common_config
dbms
)
target_include_directories(
graphite-rollup-bench
PRIVATE
${ClickHouse_SOURCE_DIR}/src ${CMAKE_BINARY_DIR}/src
${ClickHouse_SOURCE_DIR}/base ${ClickHouse_SOURCE_DIR}/base/pcg-random
${CMAKE_BINARY_DIR}/src/Core/include
${POCO_INCLUDE_DIR}
${ClickHouse_SOURCE_DIR}/contrib/double-conversion ${ClickHouse_SOURCE_DIR}/contrib/dragonbox/include
${ClickHouse_SOURCE_DIR}/contrib/fmtlib/include
${ClickHouse_SOURCE_DIR}/contrib/cityhash102/include
${RE2_INCLUDE_DIR} ${CMAKE_BINARY_DIR}/contrib/re2_st
)
target_compile_definitions(graphite-rollup-bench PRIVATE RULES_DIR="${CMAKE_CURRENT_SOURCE_DIR}")

View File

@ -0,0 +1,147 @@
#include <chrono>
#include <cstdio>
#include <cstdlib>
#include <iomanip>
#include <iostream>
#include <stdexcept>
#include <system_error>
#include <boost/program_options.hpp>
#include <AggregateFunctions/registerAggregateFunctions.h>
#include <Processors/Merges/Algorithms/Graphite.h>
#include <Processors/Merges/Algorithms/GraphiteRollupSortedAlgorithm.h>
#include <Storages/System/StorageSystemGraphite.h>
#include <Common/Config/ConfigProcessor.h>
#include <Common/Exception.h>
#include <Interpreters/Context.h>
using namespace DB;
static SharedContextHolder shared_context = Context::createShared();
std::vector<StringRef> loadMetrics(const std::string & metrics_file)
{
std::vector<StringRef> metrics;
FILE * stream;
char * line = nullptr;
size_t len = 0;
ssize_t nread;
stream = fopen(metrics_file.c_str(), "r");
if (stream == nullptr)
{
throw std::runtime_error(strerror(errno));
}
while ((nread = getline(&line, &len, stream)) != -1)
{
size_t l = strlen(line);
if (l > 0)
{
if (line[l - 1] == '\n')
{
line[l - 1] = '\0';
l--;
}
if (l > 0)
{
metrics.push_back(StringRef(strdup(line), l));
}
}
}
free(line);
if (ferror(stream))
{
fclose(stream);
throw std::runtime_error(strerror(errno));
}
fclose(stream);
return metrics;
}
ConfigProcessor::LoadedConfig loadConfiguration(const std::string & config_path)
{
ConfigProcessor config_processor(config_path, true, true);
ConfigProcessor::LoadedConfig config = config_processor.loadConfig(false);
return config;
}
void bench(const std::string & config_path, const std::string & metrics_file, size_t n, bool verbose)
{
auto config = loadConfiguration(config_path);
auto context = Context::createGlobal(shared_context.get());
context->setConfig(config.configuration.get());
Graphite::Params params;
setGraphitePatternsFromConfig(context, "graphite_rollup", params);
std::vector<StringRef> metrics = loadMetrics(metrics_file);
std::vector<double> durations(metrics.size());
size_t j, i;
for (j = 0; j < n; j++)
{
for (i = 0; i < metrics.size(); i++)
{
auto start = std::chrono::high_resolution_clock::now();
auto rule = DB::Graphite::selectPatternForPath(params, metrics[i]);
(void)rule;
auto end = std::chrono::high_resolution_clock::now();
double duration = (duration_cast<std::chrono::duration<double>>(end - start)).count() * 1E9;
durations[i] += duration;
if (j == 0 && verbose)
{
std::cout << metrics[i].data << ": rule with regexp '" << rule.second->regexp_str << "' found\n";
}
}
}
for (i = 0; i < metrics.size(); i++)
{
std::cout << metrics[i].data << " " << durations[i] / n << " ns\n";
free(const_cast<void *>(static_cast<const void *>(metrics[i].data)));
}
}
int main(int argc, char ** argv)
{
registerAggregateFunctions();
std::string config_file, metrics_file;
using namespace std::literals;
std::string config_default = RULES_DIR + "/rollup.xml"s;
std::string metrics_default = RULES_DIR + "/metrics.txt"s;
namespace po = boost::program_options;
po::variables_map vm;
po::options_description desc;
desc.add_options()("help,h", "produce help")(
"config,c", po::value<std::string>()->default_value(config_default), "XML config with rollup rules")(
"metrics,m", po::value<std::string>()->default_value(metrics_default), "metrcis files (one metric per line) for run benchmark")(
"verbose,V", po::bool_switch()->default_value(false), "verbose output (print found rule)");
po::parsed_options parsed = po::command_line_parser(argc, argv).options(desc).run();
po::store(parsed, vm);
po::notify(vm);
if (vm.count("help"))
{
std::cout << desc << '\n';
exit(1);
}
bench(vm["config"].as<std::string>(), vm["metrics"].as<std::string>(), 10000, vm["verbose"].as<bool>());
return 0;
}

View File

@ -0,0 +1,11 @@
test.sum
sum?env=test&tag=Fake3
test.max
max?env=test&tag=Fake4
test.min
min?env=test&tag=Fake5
fake5?env=test&tag=Fake5
test.p95
p95?env=test&tag=FakeNo
default
default?env=test&tag=FakeNo

View File

@ -0,0 +1,167 @@
<yandex>
<graphite_rollup>
<pattern>
<rule_type>plain</rule_type>
<regexp>\.sum$</regexp>
<function>sum</function>
<retention>
<age>0</age>
<precision>60</precision>
</retention>
<retention>
<age>86400</age>
<precision>3600</precision>
</retention>
</pattern>
<pattern>
<rule_type>tagged</rule_type>
<regexp>^((.*)|.)sum\?</regexp>
<function>sum</function>
<retention>
<age>0</age>
<precision>60</precision>
</retention>
<retention>
<age>86400</age>
<precision>3600</precision>
</retention>
</pattern>
<pattern>
<rule_type>plain</rule_type>
<regexp>\.max$</regexp>
<function>max</function>
<retention>
<age>0</age>
<precision>60</precision>
</retention>
<retention>
<age>86400</age>
<precision>3600</precision>
</retention>
</pattern>
<pattern>
<rule_type>tagged</rule_type>
<regexp>^((.*)|.)max\?</regexp>
<function>max</function>
<retention>
<age>0</age>
<precision>60</precision>
</retention>
<retention>
<age>86400</age>
<precision>3600</precision>
</retention>
</pattern>
<pattern>
<rule_type>plain</rule_type>
<regexp>\.min$</regexp>
<function>min</function>
<retention>
<age>0</age>
<precision>60</precision>
</retention>
<retention>
<age>86400</age>
<precision>3600</precision>
</retention>
</pattern>
<pattern>
<rule_type>tagged</rule_type>
<regexp>^((.*)|.)min\?</regexp>
<function>min</function>
<retention>
<age>0</age>
<precision>60</precision>
</retention>
<retention>
<age>86400</age>
<precision>3600</precision>
</retention>
</pattern>
<pattern>
<rule_type>plain</rule_type>
<regexp>\.fake1\..*\.Fake1\.</regexp>
<function>sum</function>
</pattern>
<pattern>
<rule_type>tag_list</rule_type>
<regexp>fake1;tag=Fake1</regexp>
<function>sum</function>
</pattern>
<pattern>
<rule_type>plain</rule_type>
<regexp>\.fake2\..*\.Fake2\.</regexp>
<function>sum</function>
</pattern>
<pattern>
<rule_type>tag_list</rule_type>
<regexp>fake2;tag=Fake2</regexp>
<function>sum</function>
</pattern>
<pattern>
<rule_type>plain</rule_type>
<regexp>\.fake3\..*\.Fake3\.</regexp>
<function>sum</function>
</pattern>
<pattern>
<rule_type>tag_list</rule_type>
<regexp>fake3;tag=Fake3</regexp>
<function>sum</function>
</pattern>
<pattern>
<rule_type>plain</rule_type>
<regexp>\.fake4\..*\.Fake4\.</regexp>
<function>sum</function>
</pattern>
<pattern>
<rule_type>tag_list</rule_type>
<regexp>fake4;tag=Fake4</regexp>
<function>sum</function>
</pattern>
<pattern>
<rule_type>plain</rule_type>
<regexp>\.fake5\..*\.Fake5\.</regexp>
<function>sum</function>
</pattern>
<pattern>
<rule_type>tag_list</rule_type>
<regexp>fake5;tag=Fake5</regexp>
<function>sum</function>
</pattern>
<pattern>
<rule_type>plain</rule_type>
<regexp>\.fake6\..*\.Fake6\.</regexp>
<function>sum</function>
</pattern>
<pattern>
<rule_type>tag_list</rule_type>
<regexp>fake6;tag=Fake6</regexp>
<function>sum</function>
</pattern>
<pattern>
<rule_type>plain</rule_type>
<regexp>\.fake7\..*\.Fake7\.</regexp>
<function>sum</function>
</pattern>
<pattern>
<rule_type>tag_list</rule_type>
<regexp>fake7;tag=Fake7</regexp>
<function>sum</function>
</pattern>
<default>
<function>avg</function>
<retention>
<age>0</age>
<precision>60</precision>
</retention>
<retention>
<age>3600</age>
<precision>300</precision>
</retention>
<retention>
<age>86400</age>
<precision>3600</precision>
</retention>
</default>
</graphite_rollup>
</yandex>

View File

@ -0,0 +1,167 @@
<yandex>
<graphite_rollup>
<pattern>
<rule_type>plain</rule_type>
<regexp>\.sum$</regexp>
<function>sum</function>
<retention>
<age>0</age>
<precision>60</precision>
</retention>
<retention>
<age>86400</age>
<precision>3600</precision>
</retention>
</pattern>
<pattern>
<rule_type>tagged</rule_type>
<regexp>^((.*)|.)sum\?</regexp>
<function>sum</function>
<retention>
<age>0</age>
<precision>60</precision>
</retention>
<retention>
<age>86400</age>
<precision>3600</precision>
</retention>
</pattern>
<pattern>
<rule_type>plain</rule_type>
<regexp>\.max$</regexp>
<function>max</function>
<retention>
<age>0</age>
<precision>60</precision>
</retention>
<retention>
<age>86400</age>
<precision>3600</precision>
</retention>
</pattern>
<pattern>
<rule_type>tagged</rule_type>
<regexp>^((.*)|.)max\?</regexp>
<function>max</function>
<retention>
<age>0</age>
<precision>60</precision>
</retention>
<retention>
<age>86400</age>
<precision>3600</precision>
</retention>
</pattern>
<pattern>
<rule_type>plain</rule_type>
<regexp>\.min$</regexp>
<function>min</function>
<retention>
<age>0</age>
<precision>60</precision>
</retention>
<retention>
<age>86400</age>
<precision>3600</precision>
</retention>
</pattern>
<pattern>
<rule_type>tagged</rule_type>
<regexp>^((.*)|.)min\?</regexp>
<function>min</function>
<retention>
<age>0</age>
<precision>60</precision>
</retention>
<retention>
<age>86400</age>
<precision>3600</precision>
</retention>
</pattern>
<pattern>
<rule_type>plain</rule_type>
<regexp>\.fake1\..*\.Fake1\.</regexp>
<function>sum</function>
</pattern>
<pattern>
<rule_type>tagged</rule_type>
<regexp><![CDATA[^fake1\?(.*&)*tag=Fake1(&|$)]]></regexp>
<function>sum</function>
</pattern>
<pattern>
<rule_type>plain</rule_type>
<regexp>\.fake2\..*\.Fake2\.</regexp>
<function>sum</function>
</pattern>
<pattern>
<rule_type>tagged</rule_type>
<regexp><![CDATA[^fake2\?(.*&)*tag=Fake2(&|$)]]></regexp>
<function>sum</function>
</pattern>
<pattern>
<rule_type>plain</rule_type>
<regexp>\.fake3\..*\.Fake3\.</regexp>
<function>sum</function>
</pattern>
<pattern>
<rule_type>tagged</rule_type>
<regexp><![CDATA[^fake3\?(.*&)*tag=Fake3(&|$)]]></regexp>
<function>sum</function>
</pattern>
<pattern>
<rule_type>plain</rule_type>
<regexp>\.fake4\..*\.Fake4\.</regexp>
<function>sum</function>
</pattern>
<pattern>
<rule_type>tagged</rule_type>
<regexp><![CDATA[^fake4\?(.*&)*tag=Fake4(&|$)]]></regexp>
<function>sum</function>
</pattern>
<pattern>
<rule_type>plain</rule_type>
<regexp>\.fake5\..*\.Fake5\.</regexp>
<function>sum</function>
</pattern>
<pattern>
<rule_type>tagged</rule_type>
<regexp><![CDATA[^fake5\?(.*&)*tag=Fake5(&|$)]]></regexp>
<function>sum</function>
</pattern>
<pattern>
<rule_type>plain</rule_type>
<regexp>\.fake6\..*\.Fake6\.</regexp>
<function>sum</function>
</pattern>
<pattern>
<rule_type>tagged</rule_type>
<regexp><![CDATA[^fake6\?(.*&)*tag=Fake6(&|$)]]></regexp>
<function>sum</function>
</pattern>
<pattern>
<rule_type>plain</rule_type>
<regexp>\.fake7\..*\.Fake7\.</regexp>
<function>sum</function>
</pattern>
<pattern>
<rule_type>tagged</rule_type>
<regexp><![CDATA[^fake7\?(.*&)*tag=Fake7(&|$)]]></regexp>
<function>sum</function>
</pattern>
<default>
<function>avg</function>
<retention>
<age>0</age>
<precision>60</precision>
</retention>
<retention>
<age>3600</age>
<precision>300</precision>
</retention>
<retention>
<age>86400</age>
<precision>3600</precision>
</retention>
</default>
</graphite_rollup>
</yandex>

View File

@ -0,0 +1,147 @@
<yandex>
<graphite_rollup>
<pattern>
<regexp>\.sum$</regexp>
<function>sum</function>
<retention>
<age>0</age>
<precision>60</precision>
</retention>
<retention>
<age>86400</age>
<precision>3600</precision>
</retention>
</pattern>
<pattern>
<regexp>^((.*)|.)sum\?</regexp>
<function>sum</function>
<retention>
<age>0</age>
<precision>60</precision>
</retention>
<retention>
<age>86400</age>
<precision>3600</precision>
</retention>
</pattern>
<pattern>
<regexp>\.max$</regexp>
<function>max</function>
<retention>
<age>0</age>
<precision>60</precision>
</retention>
<retention>
<age>86400</age>
<precision>3600</precision>
</retention>
</pattern>
<pattern>
<regexp>^((.*)|.)max\?</regexp>
<function>max</function>
<retention>
<age>0</age>
<precision>60</precision>
</retention>
<retention>
<age>86400</age>
<precision>3600</precision>
</retention>
</pattern>
<pattern>
<regexp>\.min$</regexp>
<function>min</function>
<retention>
<age>0</age>
<precision>60</precision>
</retention>
<retention>
<age>86400</age>
<precision>3600</precision>
</retention>
</pattern>
<pattern>
<regexp>^((.*)|.)min\?</regexp>
<function>min</function>
<retention>
<age>0</age>
<precision>60</precision>
</retention>
<retention>
<age>86400</age>
<precision>3600</precision>
</retention>
</pattern>
<pattern>
<regexp>\.fake1\..*\.Fake1\.</regexp>
<function>sum</function>
</pattern>
<pattern>
<regexp><![CDATA[^fake1\?(.*&)*tag=Fake1(&|$)]]></regexp>
<function>sum</function>
</pattern>
<pattern>
<regexp>\.fake2\..*\.Fake2\.</regexp>
<function>sum</function>
</pattern>
<pattern>
<regexp><![CDATA[^fake2\?(.*&)*tag=Fake2(&|$)]]></regexp>
<function>sum</function>
</pattern>
<pattern>
<regexp>\.fake3\..*\.Fake3\.</regexp>
<function>sum</function>
</pattern>
<pattern>
<regexp><![CDATA[^fake3\?(.*&)*tag=Fake3(&|$)]]></regexp>
<function>sum</function>
</pattern>
<pattern>
<regexp>\.fake4\..*\.Fake4\.</regexp>
<function>sum</function>
</pattern>
<pattern>
<regexp><![CDATA[^fake4\?(.*&)*tag=Fake4(&|$)]]></regexp>
<function>sum</function>
</pattern>
<pattern>
<regexp>\.fake5\..*\.Fake5\.</regexp>
<function>sum</function>
</pattern>
<pattern>
<regexp><![CDATA[^fake5\?(.*&)*tag=Fake5(&|$)]]></regexp>
<function>sum</function>
</pattern>
<pattern>
<regexp>\.fake6\..*\.Fake6\.</regexp>
<function>sum</function>
</pattern>
<pattern>
<regexp><![CDATA[^fake6\?(.*&)*tag=Fake6(&|$)]]></regexp>
<function>sum</function>
</pattern>
<pattern>
<regexp>\.fake7\..*\.Fake7\.</regexp>
<function>sum</function>
</pattern>
<pattern>
<regexp><![CDATA[^fake7\?(.*&)*tag=Fake7(&|$)]]></regexp>
<function>sum</function>
</pattern>
<default>
<function>avg</function>
<retention>
<age>0</age>
<precision>60</precision>
</retention>
<retention>
<age>3600</age>
<precision>300</precision>
</retention>
<retention>
<age>86400</age>
<precision>3600</precision>
</retention>
</default>
</graphite_rollup>
</yandex>