Merge pull request #4426 from Felixoid/graphite-rollup

Graphite rollup: combined rules
This commit is contained in:
alexey-milovidov 2019-03-06 03:10:45 +03:00 committed by GitHub
commit 1df9c1720e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 452 additions and 464 deletions

View File

@ -4,7 +4,6 @@
<default>
<networks replace="replace">
<ip>::1</ip>
<ip>0.0.0.0</ip>
<ip>127.0.0.1</ip>
</networks>
</default>

View File

@ -23,8 +23,11 @@ GraphiteRollupSortedBlockInputStream::GraphiteRollupSortedBlockInputStream(
for (const auto & pattern : params.patterns)
{
max_size_of_aggregate_state = std::max(max_size_of_aggregate_state, pattern.function->sizeOfData());
max_alignment_of_aggregate_state = std::max(max_alignment_of_aggregate_state, pattern.function->alignOfData());
if (pattern.function)
{
max_size_of_aggregate_state = std::max(max_size_of_aggregate_state, pattern.function->sizeOfData());
max_alignment_of_aggregate_state = std::max(max_alignment_of_aggregate_state, pattern.function->alignOfData());
}
}
place_for_aggregate_state.reset(max_size_of_aggregate_state, max_alignment_of_aggregate_state);
@ -41,13 +44,60 @@ GraphiteRollupSortedBlockInputStream::GraphiteRollupSortedBlockInputStream(
}
const Graphite::Pattern * GraphiteRollupSortedBlockInputStream::selectPatternForPath(StringRef path) const
Graphite::RollupRule GraphiteRollupSortedBlockInputStream::selectPatternForPath(StringRef path) const
{
for (const auto & pattern : params.patterns)
if (!pattern.regexp || pattern.regexp->match(path.data, path.size))
return &pattern;
const Graphite::Pattern * first_match = &undef_pattern;
return nullptr;
for (const auto & pattern : params.patterns)
{
if (!pattern.regexp)
{
/// Default pattern
if (first_match->type == first_match->TypeUndef && pattern.type == pattern.TypeAll)
{
/// There is only default pattern for both retention and aggregation
return std::pair(&pattern, &pattern);
}
if (pattern.type != first_match->type)
{
if (first_match->type == first_match->TypeRetention)
{
return std::pair(first_match, &pattern);
}
if (first_match->type == first_match->TypeAggregation)
{
return std::pair(&pattern, first_match);
}
}
}
else if (pattern.regexp->match(path.data, path.size))
{
/// General pattern with matched path
if (pattern.type == pattern.TypeAll)
{
/// Only for not default patterns with both function and retention parameters
return std::pair(&pattern, &pattern);
}
if (first_match->type == first_match->TypeUndef)
{
first_match = &pattern;
continue;
}
if (pattern.type != first_match->type)
{
if (first_match->type == first_match->TypeRetention)
{
return std::pair(first_match, &pattern);
}
if (first_match->type == first_match->TypeAggregation)
{
return std::pair(&pattern, first_match);
}
}
}
}
return {nullptr, nullptr};
}
@ -142,14 +192,15 @@ void GraphiteRollupSortedBlockInputStream::merge(MutableColumns & merged_columns
if (started_rows)
accumulateRow(current_subgroup_newest_row);
const Graphite::Pattern * next_pattern = current_pattern;
Graphite::RollupRule next_rule = current_rule;
if (new_path)
next_pattern = selectPatternForPath(next_path);
next_rule = selectPatternForPath(next_path);
const Graphite::RetentionPattern * retention_pattern = std::get<0>(next_rule);
time_t next_time_rounded;
if (next_pattern)
if (retention_pattern)
{
UInt32 precision = selectPrecision(next_pattern->retentions, next_row_time);
UInt32 precision = selectPrecision(retention_pattern->retentions, next_row_time);
next_time_rounded = roundTimeToPrecision(date_lut, next_row_time, precision);
}
else
@ -177,7 +228,7 @@ void GraphiteRollupSortedBlockInputStream::merge(MutableColumns & merged_columns
/// At this point previous row has been fully processed, so we can advance the loop
/// (substitute current_* values for next_*, advance the cursor).
startNextGroup(merged_columns, next_cursor, next_pattern);
startNextGroup(merged_columns, next_cursor, next_rule);
++started_rows;
current_time_rounded = next_time_rounded;
@ -229,8 +280,10 @@ void GraphiteRollupSortedBlockInputStream::merge(MutableColumns & merged_columns
template <typename TSortCursor>
void GraphiteRollupSortedBlockInputStream::startNextGroup(MutableColumns & merged_columns, TSortCursor & cursor,
const Graphite::Pattern * next_pattern)
Graphite::RollupRule next_rule)
{
const Graphite::AggregationPattern * aggregation_pattern = std::get<1>(next_rule);
/// Copy unmodified column values (including path column).
for (size_t i = 0, size = unmodified_column_numbers.size(); i < size; ++i)
{
@ -238,13 +291,13 @@ void GraphiteRollupSortedBlockInputStream::startNextGroup(MutableColumns & merge
merged_columns[j]->insertFrom(*cursor->all_columns[j], cursor->pos);
}
if (next_pattern)
if (aggregation_pattern)
{
next_pattern->function->create(place_for_aggregate_state.data());
aggregation_pattern->function->create(place_for_aggregate_state.data());
aggregate_state_created = true;
}
current_pattern = next_pattern;
current_rule = next_rule;
}
@ -255,10 +308,11 @@ void GraphiteRollupSortedBlockInputStream::finishCurrentGroup(MutableColumns & m
merged_columns[version_column_num]->insertFrom(
*(*current_subgroup_newest_row.columns)[version_column_num], current_subgroup_newest_row.row_num);
const Graphite::AggregationPattern * aggregation_pattern = std::get<1>(current_rule);
if (aggregate_state_created)
{
current_pattern->function->insertResultInto(place_for_aggregate_state.data(), *merged_columns[value_column_num]);
current_pattern->function->destroy(place_for_aggregate_state.data());
aggregation_pattern->function->insertResultInto(place_for_aggregate_state.data(), *merged_columns[value_column_num]);
aggregation_pattern->function->destroy(place_for_aggregate_state.data());
aggregate_state_created = false;
}
else
@ -269,8 +323,9 @@ void GraphiteRollupSortedBlockInputStream::finishCurrentGroup(MutableColumns & m
void GraphiteRollupSortedBlockInputStream::accumulateRow(RowRef & row)
{
const Graphite::AggregationPattern * aggregation_pattern = std::get<1>(current_rule);
if (aggregate_state_created)
current_pattern->function->add(place_for_aggregate_state.data(), &(*row.columns)[value_column_num], row.row_num, nullptr);
aggregation_pattern->function->add(place_for_aggregate_state.data(), &(*row.columns)[value_column_num], row.row_num, nullptr);
}
}

View File

@ -27,11 +27,24 @@ namespace DB
*
* Each row in a table correspond to one value of one sensor.
*
* Pattern should contain function, retention scheme, or both of them. The order of patterns does mean as well:
* * Aggregation OR retention patterns should be first
* * Then aggregation AND retention full patterns have to be placed
* * default pattern without regexp must be the last
*
* Rollup rules are specified in the following way:
*
* pattern
* regexp
* function
* pattern
* regexp
* age -> precision
* age -> precision
* ...
* pattern
* regexp
* function
* age -> precision
* age -> precision
* ...
@ -54,6 +67,10 @@ namespace DB
*
* <graphite_rollup>
* <pattern>
* <regexp>\.max$</regexp>
* <function>max</function>
* </pattern>
* <pattern>
* <regexp>click_cost</regexp>
* <function>any</function>
* <retention>
@ -98,9 +115,12 @@ namespace Graphite
std::shared_ptr<OptimizedRegularExpression> regexp;
AggregateFunctionPtr function;
Retentions retentions; /// Must be ordered by 'age' descending.
enum { TypeUndef, TypeRetention, TypeAggregation, TypeAll } type = TypeAll; /// The type of defined pattern, filled automatically
};
using Patterns = std::vector<Pattern>;
using RetentionPattern = Pattern;
using AggregationPattern = Pattern;
struct Params
{
@ -110,6 +130,8 @@ namespace Graphite
String version_column_name;
Graphite::Patterns patterns;
};
using RollupRule = std::pair<const RetentionPattern *, const AggregationPattern *>;
}
/** Merges several sorted streams into one.
@ -135,7 +157,7 @@ public:
~GraphiteRollupSortedBlockInputStream() override
{
if (aggregate_state_created)
current_pattern->function->destroy(place_for_aggregate_state.data());
std::get<1>(current_rule)->function->destroy(place_for_aggregate_state.data());
}
protected:
@ -186,11 +208,18 @@ private:
time_t current_time = 0;
time_t current_time_rounded = 0;
const Graphite::Pattern * current_pattern = nullptr;
Graphite::RollupRule current_rule = {nullptr, nullptr};
AlignedBuffer place_for_aggregate_state;
bool aggregate_state_created = false; /// Invariant: if true then current_pattern is not NULL.
bool aggregate_state_created = false; /// Invariant: if true then current_rule is not NULL.
const Graphite::Pattern * selectPatternForPath(StringRef path) const;
const Graphite::Pattern undef_pattern =
{ /// temporary empty pattern for selectPatternForPath
nullptr,
nullptr,
DB::Graphite::Retentions(),
undef_pattern.TypeUndef,
};
Graphite::RollupRule selectPatternForPath(StringRef path) const;
UInt32 selectPrecision(const Graphite::Retentions & retentions, time_t time) const;
@ -198,7 +227,7 @@ private:
/// Insert the values into the resulting columns, which will not be changed in the future.
template <typename TSortCursor>
void startNextGroup(MutableColumns & merged_columns, TSortCursor & cursor, const Graphite::Pattern * next_pattern);
void startNextGroup(MutableColumns & merged_columns, TSortCursor & cursor, Graphite::RollupRule next_pattern);
/// Insert the calculated `time`, `value`, `version` values into the resulting columns by the last group of rows.
void finishCurrentGroup(MutableColumns & merged_columns);

View File

@ -126,17 +126,32 @@ static void appendGraphitePattern(
throw Exception("Unknown element in config: " + key, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
}
if (!pattern.function)
throw Exception("Aggregate function is mandatory for retention patterns in GraphiteMergeTree",
if (!pattern.function && pattern.retentions.empty())
throw Exception("At least one of an aggregate function or retention rules is mandatory for rollup patterns in GraphiteMergeTree",
ErrorCodes::NO_ELEMENTS_IN_CONFIG);
if (pattern.function->allocatesMemoryInArena())
throw Exception("Aggregate function " + pattern.function->getName() + " isn't supported in GraphiteMergeTree",
ErrorCodes::NOT_IMPLEMENTED);
if (!pattern.function)
{
pattern.type = pattern.TypeRetention;
}
else if (pattern.retentions.empty())
{
pattern.type = pattern.TypeAggregation;
}
else
{
pattern.type = pattern.TypeAll;
}
if (pattern.type & pattern.TypeAggregation) /// TypeAggregation or TypeAll
if (pattern.function->allocatesMemoryInArena())
throw Exception("Aggregate function " + pattern.function->getName() + " isn't supported in GraphiteMergeTree",
ErrorCodes::NOT_IMPLEMENTED);
/// retention should be in descending order of age.
std::sort(pattern.retentions.begin(), pattern.retentions.end(),
[] (const Graphite::Retention & a, const Graphite::Retention & b) { return a.age > b.age; });
if (pattern.type & pattern.TypeRetention) /// TypeRetention or TypeAll
std::sort(pattern.retentions.begin(), pattern.retentions.end(),
[] (const Graphite::Retention & a, const Graphite::Retention & b) { return a.age > b.age; });
patterns.emplace_back(pattern);
}

View File

@ -148,13 +148,26 @@ void StorageSystemGraphite::fillData(MutableColumns & res_columns, const Context
const auto patterns = readPatterns(config, section);
for (const auto & pattern : patterns)
{
for (const auto & ret : pattern.retentions)
if (!pattern.retentions.empty())
{
for (const auto & ret : pattern.retentions)
{
res_columns[0]->insert(section);
res_columns[1]->insert(pattern.regexp);
res_columns[2]->insert(pattern.function);
res_columns[3]->insert(ret.age);
res_columns[4]->insert(ret.precision);
res_columns[5]->insert(pattern.priority);
res_columns[6]->insert(pattern.is_default);
}
}
else
{
res_columns[0]->insert(section);
res_columns[1]->insert(pattern.regexp);
res_columns[2]->insert(pattern.function);
res_columns[3]->insert(ret.age);
res_columns[4]->insert(ret.precision);
res_columns[3]->insert(0);
res_columns[4]->insert(0);
res_columns[5]->insert(pattern.priority);
res_columns[6]->insert(pattern.is_default);
}

View File

@ -5,6 +5,29 @@
<time_column_name>timestamp</time_column_name>
<value_column_name>value</value_column_name>
<version_column_name>updated</version_column_name>
<pattern>
<regexp>\.count$</regexp>
<function>sum</function>
</pattern>
<pattern>
<regexp>\.max$</regexp>
<function>max</function>
</pattern>
<pattern>
<regexp>^five_min\.</regexp>
<retention>
<age>0</age>
<precision>300</precision>
</retention>
<retention>
<age>5184000</age>
<precision>3600</precision>
</retention>
<retention>
<age>31536000</age>
<precision>14400</precision>
</retention>
</pattern>
<pattern>
<regexp>^one_min</regexp>
<function>avg</function>
@ -22,4 +45,53 @@
</retention>
</pattern>
</graphite_rollup>
<graphite_rollup_with_default>
<path_column_name>metric</path_column_name>
<time_column_name>timestamp</time_column_name>
<value_column_name>value</value_column_name>
<version_column_name>updated</version_column_name>
<pattern>
<regexp>\.count$</regexp>
<function>sum</function>
</pattern>
<pattern>
<regexp>\.max$</regexp>
<function>max</function>
</pattern>
<default>
<function>any</function>
<retention>
<age>0</age>
<precision>60</precision>
</retention>
<retention>
<age>7776000</age>
<precision>300</precision>
</retention>
<retention>
<age>31536000</age>
<precision>600</precision>
</retention>
</default>
</graphite_rollup_with_default>
<graphite_rollup_broken>
<path_column_name>metric</path_column_name>
<time_column_name>timestamp</time_column_name>
<value_column_name>value</value_column_name>
<version_column_name>updated</version_column_name>
<default>
<retention>
<age>0</age>
<precision>60</precision>
</retention>
<retention>
<age>7776000</age>
<precision>300</precision>
</retention>
<retention>
<age>31536000</age>
<precision>600</precision>
</retention>
</default>
</graphite_rollup_broken>
</yandex>

View File

@ -8,31 +8,38 @@ from helpers.test_tools import TSV
cluster = ClickHouseCluster(__file__)
instance = cluster.add_instance('instance', main_configs=['configs/graphite_rollup.xml'])
instance = cluster.add_instance('instance',
main_configs=['configs/graphite_rollup.xml'])
q = instance.query
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
instance.query('CREATE DATABASE test')
q('CREATE DATABASE test')
yield cluster
finally:
cluster.shutdown()
@pytest.fixture
def graphite_table(started_cluster):
instance.query('''
q('''
DROP TABLE IF EXISTS test.graphite;
CREATE TABLE test.graphite
(metric String, value Float64, timestamp UInt32, date Date, updated UInt32)
ENGINE = GraphiteMergeTree(date, (metric, timestamp), 8192, 'graphite_rollup');
ENGINE = GraphiteMergeTree('graphite_rollup')
PARTITION BY toYYYYMM(date)
ORDER BY (metric, timestamp)
SETTINGS index_granularity=8192;
''')
yield
instance.query('DROP TABLE test.graphite')
q('DROP TABLE test.graphite')
def test_rollup_versions(graphite_table):
@ -40,13 +47,14 @@ def test_rollup_versions(graphite_table):
rounded_timestamp = timestamp - timestamp % 60
date = datetime.date.today().isoformat()
q = instance.query
# Insert rows with timestamps relative to the current time so that the first retention clause is active.
# Insert rows with timestamps relative to the current time so that the
# first retention clause is active.
# Two parts are created.
q('''
INSERT INTO test.graphite (metric, value, timestamp, date, updated) VALUES ('one_min.x1', 100, {timestamp}, '{date}', 1);
INSERT INTO test.graphite (metric, value, timestamp, date, updated) VALUES ('one_min.x1', 200, {timestamp}, '{date}', 2);
INSERT INTO test.graphite (metric, value, timestamp, date, updated)
VALUES ('one_min.x1', 100, {timestamp}, '{date}', 1);
INSERT INTO test.graphite (metric, value, timestamp, date, updated)
VALUES ('one_min.x1', 200, {timestamp}, '{date}', 2);
'''.format(timestamp=timestamp, date=date))
expected1 = '''\
@ -54,7 +62,9 @@ one_min.x1 100 {timestamp} {date} 1
one_min.x1 200 {timestamp} {date} 2
'''.format(timestamp=timestamp, date=date)
assert TSV(q('SELECT * FROM test.graphite ORDER BY updated')) == TSV(expected1)
assert TSV(
q('SELECT * FROM test.graphite ORDER BY updated')
) == TSV(expected1)
q('OPTIMIZE TABLE test.graphite')
@ -67,8 +77,6 @@ one_min.x1 200 {timestamp} {date} 2
def test_rollup_aggregation(graphite_table):
q = instance.query
# This query essentially emulates what rollup does.
result1 = q('''
SELECT avg(v), max(upd)
@ -91,7 +99,8 @@ FROM (SELECT timestamp,
'''
assert TSV(result1) == TSV(expected1)
# Timestamp 1111111111 is in sufficiently distant past so that the last retention clause is active.
# Timestamp 1111111111 is in sufficiently distant past
# so that the last retention clause is active.
result2 = q('''
INSERT INTO test.graphite
SELECT 'one_min.x' AS metric,
@ -114,7 +123,7 @@ one_min.x 999634.9918367347 1111444200 2017-02-02 499999
def test_rollup_aggregation_2(graphite_table):
result = instance.query('''
result = q('''
INSERT INTO test.graphite
SELECT 'one_min.x' AS metric,
toFloat64(number) AS value,
@ -136,7 +145,7 @@ one_min.x 24 1111110600 2017-02-02 100
def test_multiple_paths_and_versions(graphite_table):
result = instance.query('''
result = q('''
INSERT INTO test.graphite
SELECT 'one_min.x' AS metric,
toFloat64(number) AS value,
@ -163,7 +172,9 @@ OPTIMIZE TABLE test.graphite PARTITION 201702 FINAL;
SELECT * FROM test.graphite;
''')
with open(p.join(p.dirname(__file__), 'test_multiple_paths_and_versions.reference')) as reference:
with open(p.join(p.dirname(__file__),
'test_multiple_paths_and_versions.reference')
) as reference:
assert TSV(result) == TSV(reference)
@ -177,14 +188,18 @@ def test_multiple_output_blocks(graphite_table):
for j in range(3):
cur_time = rolled_up_time + 100 * j
to_insert += 'one_min.x1 {} {} 2001-09-09 1\n'.format(10 * j, cur_time)
to_insert += 'one_min.x1 {} {} 2001-09-09 2\n'.format(10 * (j + 1), cur_time)
to_insert += 'one_min.x1 {} {} 2001-09-09 1\n'.format(
10 * j, cur_time
)
to_insert += 'one_min.x1 {} {} 2001-09-09 2\n'.format(
10 * (j + 1), cur_time
)
expected += 'one_min.x1 20 {} 2001-09-09 2\n'.format(rolled_up_time)
instance.query('INSERT INTO test.graphite FORMAT TSV', to_insert)
q('INSERT INTO test.graphite FORMAT TSV', to_insert)
result = instance.query('''
result = q('''
OPTIMIZE TABLE test.graphite PARTITION 200109 FINAL;
SELECT * FROM test.graphite;
@ -200,14 +215,14 @@ zzzzzzzz 100 1000000001 2001-09-09 1
zzzzzzzz 200 1000000001 2001-09-09 2
'''
instance.query('INSERT INTO test.graphite FORMAT TSV', to_insert)
q('INSERT INTO test.graphite FORMAT TSV', to_insert)
expected = '''\
one_min.x1 100 999999600 2001-09-09 1
zzzzzzzz 200 1000000001 2001-09-09 2
'''
result = instance.query('''
result = q('''
OPTIMIZE TABLE test.graphite PARTITION 200109 FINAL;
SELECT * FROM test.graphite;
@ -215,27 +230,171 @@ SELECT * FROM test.graphite;
assert TSV(result) == TSV(expected)
def test_path_dangling_pointer(graphite_table):
instance.query('''
q('''
DROP TABLE IF EXISTS test.graphite2;
CREATE TABLE test.graphite2
(metric String, value Float64, timestamp UInt32, date Date, updated UInt32)
ENGINE = GraphiteMergeTree(date, (metric, timestamp), 1, 'graphite_rollup');
''')
(metric String, value Float64, timestamp UInt32, date Date, updated UInt32)
ENGINE = GraphiteMergeTree('graphite_rollup')
PARTITION BY toYYYYMM(date)
ORDER BY (metric, timestamp)
SETTINGS index_granularity=1;
''')
path = 'abcd' * 4000000 # 16MB
instance.query('INSERT INTO test.graphite2 FORMAT TSV', "{}\t0.0\t0\t2018-01-01\t100\n".format(path))
instance.query('INSERT INTO test.graphite2 FORMAT TSV', "{}\t0.0\t0\t2018-01-01\t101\n".format(path))
path = 'abcd' * 4000000 # 16MB
q('INSERT INTO test.graphite2 FORMAT TSV',
"{}\t0.0\t0\t2018-01-01\t100\n".format(path))
q('INSERT INTO test.graphite2 FORMAT TSV',
"{}\t0.0\t0\t2018-01-01\t101\n".format(path))
for version in range(10):
instance.query('INSERT INTO test.graphite2 FORMAT TSV', "{}\t0.0\t0\t2018-01-01\t{}\n".format(path, version))
q('INSERT INTO test.graphite2 FORMAT TSV',
"{}\t0.0\t0\t2018-01-01\t{}\n".format(path, version))
while True:
instance.query('OPTIMIZE TABLE test.graphite2 PARTITION 201801 FINAL')
parts = int(instance.query("SELECT count() FROM system.parts WHERE active AND database='test' AND table='graphite2'"))
if parts == 1:
break
print "Parts", parts
q('OPTIMIZE TABLE test.graphite2 PARTITION 201801 FINAL')
parts = int(q("SELECT count() FROM system.parts "
"WHERE active AND database='test' "
"AND table='graphite2'"))
if parts == 1:
break
print('Parts', parts)
assert TSV(instance.query("SELECT value, timestamp, date, updated FROM test.graphite2")) == TSV("0\t0\t2018-01-01\t101\n")
assert TSV(
q("SELECT value, timestamp, date, updated FROM test.graphite2")
) == TSV("0\t0\t2018-01-01\t101\n")
instance.query('DROP TABLE test.graphite2')
q('DROP TABLE test.graphite2')
def test_combined_rules(graphite_table):
# 1487970000 ~ Sat 25 Feb 00:00:00 MSK 2017
to_insert = 'INSERT INTO test.graphite VALUES '
expected_unmerged = ''
for i in range(384):
to_insert += "('five_min.count', {v}, {t}, toDate({t}), 1), ".format(
v=1, t=1487970000+(i*300)
)
to_insert += "('five_min.max', {v}, {t}, toDate({t}), 1), ".format(
v=i, t=1487970000+(i*300)
)
expected_unmerged += ("five_min.count\t{v1}\t{t}\n"
"five_min.max\t{v2}\t{t}\n").format(
v1=1, v2=i,
t=1487970000+(i*300)
)
q(to_insert)
assert TSV(q('SELECT metric, value, timestamp FROM test.graphite'
' ORDER BY (timestamp, metric)')) == TSV(expected_unmerged)
q('OPTIMIZE TABLE test.graphite PARTITION 201702 FINAL')
expected_merged = '''
five_min.count 48 1487970000 2017-02-25 1
five_min.count 48 1487984400 2017-02-25 1
five_min.count 48 1487998800 2017-02-25 1
five_min.count 48 1488013200 2017-02-25 1
five_min.count 48 1488027600 2017-02-25 1
five_min.count 48 1488042000 2017-02-25 1
five_min.count 48 1488056400 2017-02-26 1
five_min.count 48 1488070800 2017-02-26 1
five_min.max 47 1487970000 2017-02-25 1
five_min.max 95 1487984400 2017-02-25 1
five_min.max 143 1487998800 2017-02-25 1
five_min.max 191 1488013200 2017-02-25 1
five_min.max 239 1488027600 2017-02-25 1
five_min.max 287 1488042000 2017-02-25 1
five_min.max 335 1488056400 2017-02-26 1
five_min.max 383 1488070800 2017-02-26 1
'''
assert TSV(q('SELECT * FROM test.graphite'
' ORDER BY (metric, timestamp)')) == TSV(expected_merged)
def test_combined_rules_with_default(graphite_table):
q('''
DROP TABLE IF EXISTS test.graphite;
CREATE TABLE test.graphite
(metric String, value Float64, timestamp UInt32, date Date, updated UInt32)
ENGINE = GraphiteMergeTree('graphite_rollup_with_default')
PARTITION BY toYYYYMM(date)
ORDER BY (metric, timestamp)
SETTINGS index_granularity=1;
''')
# 1487970000 ~ Sat 25 Feb 00:00:00 MSK 2017
to_insert = 'INSERT INTO test.graphite VALUES '
expected_unmerged = ''
for i in range(100):
to_insert += "('top_level.count', {v}, {t}, toDate({t}), 1), ".format(
v=1, t=1487970000+(i*60)
)
to_insert += "('top_level.max', {v}, {t}, toDate({t}), 1), ".format(
v=i, t=1487970000+(i*60)
)
expected_unmerged += ("top_level.count\t{v1}\t{t}\n"
"top_level.max\t{v2}\t{t}\n").format(
v1=1, v2=i,
t=1487970000+(i*60)
)
q(to_insert)
assert TSV(q('SELECT metric, value, timestamp FROM test.graphite'
' ORDER BY (timestamp, metric)')) == TSV(expected_unmerged)
q('OPTIMIZE TABLE test.graphite PARTITION 201702 FINAL')
expected_merged = '''
top_level.count 10 1487970000 2017-02-25 1
top_level.count 10 1487970600 2017-02-25 1
top_level.count 10 1487971200 2017-02-25 1
top_level.count 10 1487971800 2017-02-25 1
top_level.count 10 1487972400 2017-02-25 1
top_level.count 10 1487973000 2017-02-25 1
top_level.count 10 1487973600 2017-02-25 1
top_level.count 10 1487974200 2017-02-25 1
top_level.count 10 1487974800 2017-02-25 1
top_level.count 10 1487975400 2017-02-25 1
top_level.max 9 1487970000 2017-02-25 1
top_level.max 19 1487970600 2017-02-25 1
top_level.max 29 1487971200 2017-02-25 1
top_level.max 39 1487971800 2017-02-25 1
top_level.max 49 1487972400 2017-02-25 1
top_level.max 59 1487973000 2017-02-25 1
top_level.max 69 1487973600 2017-02-25 1
top_level.max 79 1487974200 2017-02-25 1
top_level.max 89 1487974800 2017-02-25 1
top_level.max 99 1487975400 2017-02-25 1
'''
assert TSV(q('SELECT * FROM test.graphite'
' ORDER BY (metric, timestamp)')) == TSV(expected_merged)
def test_broken_partial_rollup(graphite_table):
q('''
DROP TABLE IF EXISTS test.graphite;
CREATE TABLE test.graphite
(metric String, value Float64, timestamp UInt32, date Date, updated UInt32)
ENGINE = GraphiteMergeTree('graphite_rollup_broken')
PARTITION BY toYYYYMM(date)
ORDER BY (metric, timestamp)
SETTINGS index_granularity=1;
''')
to_insert = '''\
one_min.x1 100 1000000000 2001-09-09 1
zzzzzzzz 100 1000000001 2001-09-09 1
zzzzzzzz 200 1000000001 2001-09-09 2
'''
q('INSERT INTO test.graphite FORMAT TSV', to_insert)
expected = '''\
one_min.x1 100 1000000000 2001-09-09 1
zzzzzzzz 200 1000000001 2001-09-09 2
'''
result = q('''
OPTIMIZE TABLE test.graphite PARTITION 200109 FINAL;
SELECT * FROM test.graphite;
''')
assert TSV(result) == TSV(expected)

View File

@ -75,6 +75,13 @@ Rollup configuration structure:
```
required-columns
pattern
regexp
function
pattern
regexp
age + precision
...
pattern
regexp
function
@ -88,15 +95,20 @@ default
...
```
When processing a row, ClickHouse checks the rules in the `pattern` section. If the metric name matches the `regexp`, the rules from the `pattern`section are applied; otherwise, the rules from the `default` section are used.
**Important:** The order of patterns should be next:
The rules are defined with fields `function` and `age + precision`.
1. Patterns *without* `function` *or* `retention`.
1. Patterns *with* both `function` *and* `retention`.
1. Pattern `dafault`.
When processing a row, ClickHouse checks the rules in the `pattern` sections. Each of `pattern` (including `default`) sections could contain `function` parameter for aggregation, `retention` parameters or both. If the metric name matches the `regexp`, the rules from the `pattern` section (or sections) are applied; otherwise, the rules from the `default` section are used.
Fields for `pattern` and `default` sections:
- `regexp` A pattern for the metric name.
- `age` The minimum age of the data in seconds.
- `precision` How precisely to define the age of the data in seconds.
- `precision` How precisely to define the age of the data in seconds. Should be a divisor for 86400 (seconds in a day).
- `function` The name of the aggregating function to apply to data whose age falls within the range `[age, age + precision]`.
The `required-columns`:

View File

@ -72,12 +72,19 @@ CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
## Конфигурация rollup
Настройки для прореживания данных задаются параметром [graphite_rollup](../server_settings/settings.md#server_settings-graphite_rollup) Имя параметра может быть любым. Можно создать несколько конфигураций и использовать их для разных таблиц.
Настройки для прореживания данных задаются параметром [graphite_rollup](../server_settings/settings.md#server_settings-graphite_rollup). Имя параметра может быть любым. Можно создать несколько конфигураций и использовать их для разных таблиц.
Структура конфигурации rollup:
```
required-columns
pattern
regexp
function
pattern
regexp
age + precision
...
pattern
regexp
function
@ -91,15 +98,19 @@ default
...
```
При обработке строки ClickHouse проверяет правила в разделе `pattern`. Если имя метрики соответствует шаблону `regexp`, то применяются правила из раздела `pattern`, в противном случае из раздела `default`.
**Важно**: порядок разделов `pattern` должен быть следующим:
Правила определяются с помощью полей `function` и `age + precision`.
1. Разделы *без* параметра `function` *или* `retention`.
1. Разделы *с* параметрами `function` *и* `retention`.
1. Раздел `default`.
Поля для разделов `pattenrn` и `default`:
При обработке строки ClickHouse проверяет правила в разделах `pattern`. Каждый из разделов `pattern` (включая `default`) может содержать параметр `function` для аггрегации, правила `retention` для прореживания или оба эти параметра. Если имя метрики соответствует шаблону `regexp`, то применяются правила из раздела (или разделов) `pattern`, в противном случае из раздела `default`.
Поля для разделов `pattern` и `default`:
- `regexp` шаблон имени метрики.
- `age` минимальный возраст данных в секундах.
- `precision` точность определения возраста данных в секундах.
- `precision` точность определения возраста данных в секундах. Должен быть делителем для 86400 (количество секунд в дне).
- `function` имя агрегирующей функции, которую следует применить к данным, чей возраст оказался в интервале `[age, age + precision]`.
`required-columns`:
@ -117,6 +128,10 @@ default
<time_column_name>Time</time_column_name>
<value_column_name>Value</value_column_name>
<version_column_name>Version</version_column_name>
<pattern>
<regexp>\.count$</regexp>
<function>sum</function>
</pattern>
<pattern>
<regexp>click_cost</regexp>
<function>any</function>

View File

@ -1,148 +0,0 @@
# GraphiteMergeTree
This engine is designed for rollup (thinning and aggregating/averaging) [Graphite](http://graphite.readthedocs.io/en/latest/index.html) data. It may be helpful to developers who want to use ClickHouse as a data store for Graphite.
You can use any ClickHouse table engine to store the Graphite data if you don't need rollup, but if you need a rollup use `GraphiteMergeTree`. The engine reduces the volume of storage and increases the efficiency of queries from Graphite.
The engine inherits properties from [MergeTree](mergetree.md).
## Creating a Table
```sql
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
Path String,
Time DateTime,
Value <Numeric_type>,
Version <Numeric_type>
...
) ENGINE = GraphiteMergeTree(config_section)
[PARTITION BY expr]
[ORDER BY expr]
[SAMPLE BY expr]
[SETTINGS name=value, ...]
```
For a description of request parameters, see [request description](../../query_language/create.md).
A table for the Graphite date should have the following columns:
- Column with the metric name (Graphite sensor). Data type: `String`.
- Column with the time for measuring the metric. Data type: `DateTime`.
- Column with the value of the metric. Data type: any numeric.
- Column with the version of the metric with the same name and time of measurement. Data type: any numeric.
ClickHouse saves the rows with the highest version or the last written if versions are the same. Other rows are deleted during the merge of data parts.
The names of these columns should be set in the rollup configuration.
**GraphiteMergeTree parameters**
- `config_section` — Name of the section in the configuration file, where are the rules of rollup set.
**Query clauses**
When creating a `GraphiteMergeTree` table, the same [clauses](mergetree.md) are required, as when creating a `MergeTree` table.
<details markdown="1"><summary>Deprecated Method for Creating a Table</summary>
!!! attention
Do not use this method in new projects and, if possible, switch the old projects to the method described above.
```sql
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
EventDate Date,
Path String,
Time DateTime,
Value <Numeric_type>,
Version <Numeric_type>
...
) ENGINE [=] GraphiteMergeTree(date-column [, sampling_expression], (primary, key), index_granularity, config_section)
```
All of the parameters excepting `config_section` have the same meaning as in `MergeTree`.
- `config_section` — Name of the section in the configuration file, where are the rules of rollup set.
</details>
## Rollup configuration
The settings for rollup are defined by the [graphite_rollup](../server_settings/settings.md) parameter in the server configuration. The name of the parameter could be any. You can create several configurations and use them for different tables.
Rollup configuration structure:
```
required-columns
pattern
regexp
function
age + precision
...
pattern
...
default
function
age + precision
...
```
When processing a row, ClickHouse checks the rules in the `pattern` section. If the metric name matches the `regexp`, the rules from the `pattern`section are applied; otherwise, the rules from the `default` section are used.
The rules are defined with fields `function` and `age + precision`.
Fields for `pattern` and `default` sections:
- `regexp` A pattern for the metric name.
- `age` The minimum age of the data in seconds.
- `precision` How precisely to define the age of the data in seconds.
- `function` The name of the aggregating function to apply to data whose age falls within the range `[age, age + precision]`.
The `required-columns`:
- `path_column_name` — Column with the metric name (Graphite sensor).
- `time_column_name` — Column with the time for measuring the metric.
- `value_column_name` — Column with the value of the metric at the time set in `time_column_name`.
- `version_column_name` — Column with the version timestamp of the metric with the same name and time remains in the database.
Example of settings:
```xml
<graphite_rollup>
<path_column_name>Path</path_column_name>
<time_column_name>Time</time_column_name>
<value_column_name>Value</value_column_name>
<version_column_name>Version</version_column_name>
<pattern>
<regexp>click_cost</regexp>
<function>any</function>
<retention>
<age>0</age>
<precision>5</precision>
</retention>
<retention>
<age>86400</age>
<precision>60</precision>
</retention>
</pattern>
<default>
<function>max</function>
<retention>
<age>0</age>
<precision>60</precision>
</retention>
<retention>
<age>3600</age>
<precision>300</precision>
</retention>
<retention>
<age>86400</age>
<precision>3600</precision>
</retention>
</default>
</graphite_rollup>
```
[Original article](https://clickhouse.yandex/docs/en/operations/table_engines/graphitemergetree/) <!--hide-->

View File

@ -0,0 +1 @@
../../../en/operations/table_engines/graphitemergetree.md

View File

@ -1,235 +0,0 @@
# MergeTree {#table_engines-mergetree}
The `MergeTree` engine and other engines of this family (`*MergeTree`) are the most robust ClickHousе table engines.
The basic idea for `MergeTree` engines family is the following. When you have tremendous amount of a data that should be inserted into the table, you should write them quickly part by part and then merge parts by some rules in background. This method is much more efficient than constantly rewriting data in the storage at the insert.
Main features:
- Stores data sorted by primary key.
This allows you to create a small sparse index that helps find data faster.
- This allows you to use partitions if the [partitioning key](custom_partitioning_key.md) is specified.
ClickHouse supports certain operations with partitions that are more effective than general operations on the same data with the same result. ClickHouse also automatically cuts off the partition data where the partitioning key is specified in the query. This also increases the query performance.
- Data replication support.
The family of `ReplicatedMergeTree` tables is used for this. For more information, see the [Data replication](replication.md) section.
- Data sampling support.
If necessary, you can set the data sampling method in the table.
!!! info
The [Merge](merge.md) engine does not belong to the `*MergeTree` family.
## Creating a Table
```
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
...
) ENGINE = MergeTree()
[PARTITION BY expr]
[ORDER BY expr]
[PRIMARY KEY expr]
[SAMPLE BY expr]
[SETTINGS name=value, ...]
```
For a description of request parameters, see [request description](../../query_language/create.md).
**Query clauses**
- `ENGINE` - Name and parameters of the engine. `ENGINE = MergeTree()`. `MergeTree` engine does not have parameters.
- `PARTITION BY` — The [partitioning key](custom_partitioning_key.md).
For partitioning by month, use the `toYYYYMM(date_column)` expression, where `date_column` is a column with a date of the type [Date](../../data_types/date.md). The partition names here have the `"YYYYMM"` format.
- `ORDER BY` — The sorting key.
A tuple of columns or arbitrary expressions. Example: `ORDER BY (CounterID, EventDate)`.
- `PRIMARY KEY` - The primary key if it [differs from the sorting key](mergetree.md).
By default the primary key is the same as the sorting key (which is specified by the `ORDER BY` clause).
Thus in most cases it is unnecessary to specify a separate `PRIMARY KEY` clause.
- `SAMPLE BY` — An expression for sampling.
If a sampling expression is used, the primary key must contain it. Example:
`SAMPLE BY intHash32(UserID) ORDER BY (CounterID, EventDate, intHash32(UserID))`.
- `SETTINGS` — Additional parameters that control the behavior of the `MergeTree`:
- `index_granularity` — The granularity of an index. The number of data rows between the "marks" of an index. By default, 8192.
**Example of sections setting**
```
ENGINE MergeTree() PARTITION BY toYYYYMM(EventDate) ORDER BY (CounterID, EventDate, intHash32(UserID)) SAMPLE BY intHash32(UserID) SETTINGS index_granularity=8192
```
In the example, we set partitioning by month.
We also set an expression for sampling as a hash by the user ID. This allows you to pseudorandomize the data in the table for each `CounterID` and `EventDate`. If, when selecting the data, you define a [SAMPLE](../../query_language/select.md#select-sample-clause) clause, ClickHouse will return an evenly pseudorandom data sample for a subset of users.
`index_granularity` could be omitted because 8192 is the default value.
<details markdown="1"><summary>Deprecated Method for Creating a Table</summary>
!!! attention
Do not use this method in new projects and, if possible, switch the old projects to the method described above.
```
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
...
) ENGINE [=] MergeTree(date-column [, sampling_expression], (primary, key), index_granularity)
```
**MergeTree() parameters**
- `date-column` — The name of a column of the type [Date](../../data_types/date.md). ClickHouse automatically creates partitions by month on the basis of this column. The partition names are in the `"YYYYMM"` format.
- `sampling_expression` — an expression for sampling.
- `(primary, key)` — primary key. Type — [Tuple()](../../data_types/tuple.md- `index_granularity` — The granularity of an index. The number of data rows between the "marks" of an index. The value 8192 is appropriate for most tasks.
**Example**
```
MergeTree(EventDate, intHash32(UserID), (CounterID, EventDate, intHash32(UserID)), 8192)
```
The `MergeTree` engine is configured in the same way as in the example above for the main engine configuration method.
</details>
## Data Storage
A table consists of data *parts* sorted by primary key.
When data is inserted in a table, separate data parts are created and each of them is lexicographically sorted by primary key. For example, if the primary key is `(CounterID, Date)`, the data in the part is sorted by `CounterID`, and within each `CounterID`, it is ordered by `Date`.
Data belonging to different partitions are separated into different parts. In the background, ClickHouse merges data parts for more efficient storage. Parts belonging to different partitions are not merged. The merge mechanism does not guarantee that all rows with the same primary key will be in the same data part.
For each data part, ClickHouse creates an index file that contains the primary key value for each index row ("mark"). Index row numbers are defined as `n * index_granularity`. The maximum value `n` is equal to the integer part of dividing the total number of rows by the `index_granularity`. For each column, the "marks" are also written for the same index rows as the primary key. These "marks" allow you to find the data directly in the columns.
You can use a single large table and continually add data to it in small chunks this is what the `MergeTree` engine is intended for.
## Primary Keys and Indexes in Queries
Let's take the `(CounterID, Date)` primary key. In this case, the sorting and index can be illustrated as follows:
```
Whole data: [-------------------------------------------------------------------------]
CounterID: [aaaaaaaaaaaaaaaaaabbbbcdeeeeeeeeeeeeefgggggggghhhhhhhhhiiiiiiiiikllllllll]
Date: [1111111222222233331233211111222222333211111112122222223111112223311122333]
Marks: | | | | | | | | | | |
a,1 a,2 a,3 b,3 e,2 e,3 g,1 h,2 i,1 i,3 l,3
Marks numbers: 0 1 2 3 4 5 6 7 8 9 10
```
If the data query specifies:
- `CounterID in ('a', 'h')`, the server reads the data in the ranges of marks `[0, 3)` and `[6, 8)`.
- `CounterID IN ('a', 'h') AND Date = 3`, the server reads the data in the ranges of marks `[1, 3)` and `[7, 8)`.
- `Date = 3`, the server reads the data in the range of marks `[1, 10]`.
The examples above show that it is always more effective to use an index than a full scan.
A sparse index allows extra strings to be read. When reading a single range of the primary key, up to `index_granularity * 2` extra rows in each data block can be read. In most cases, ClickHouse performance does not degrade when `index_granularity = 8192`.
Sparse indexes allow you to work with a very large number of table rows, because such indexes are always stored in the computer's RAM.
ClickHouse does not require a unique primary key. You can insert multiple rows with the same primary key.
### Selecting the Primary Key
The number of columns in the primary key is not explicitly limited. Depending on the data structure, you can include more or fewer columns in the primary key. This may:
- Improve the performance of an index.
If the primary key is `(a, b)`, then adding another column `c` will improve the performance if the following conditions are met:
- There are queries with a condition on column `c`.
- Long data ranges (several times longer than the `index_granularity`) with identical values for `(a, b)` are common. In other words, when adding another column allows you to skip quite long data ranges.
- Improve data compression.
ClickHouse sorts data by primary key, so the higher the consistency, the better the compression.
- Provide additional logic when data parts merging in the [CollapsingMergeTree](collapsingmergetree.md#table_engine-collapsingmergetree) and [SummingMergeTree](summingmergetree.md) engines.
In this case it makes sense to specify the *sorting key* that is different from the primary key.
A long primary key will negatively affect the insert performance and memory consumption, but extra columns in the primary key do not affect ClickHouse performance during `SELECT` queries.
### Choosing the Primary Key that differs from the Sorting Key
It is possible to specify the primary key (the expression, values of which are written into the index file
for each mark) that is different from the sorting key (the expression for sorting the rows in data parts).
In this case the primary key expression tuple must be a prefix of the sorting key expression tuple.
This feature is helpful when using the [SummingMergeTree](summingmergetree.md) and
[AggregatingMergeTree](aggregatingmergetree.md) table engines. In a common case when using these engines the
table has two types of columns: *dimensions* and *measures*. Typical queries aggregate values of measure
columns with arbitrary `GROUP BY` and filtering by dimensions. As SummingMergeTree and AggregatingMergeTree
aggregate rows with the same value of the sorting key, it is natural to add all dimensions to it. As a result
the key expression consists of a long list of columns and this list must be frequently updated with newly
added dimensions.
In this case it makes sense to leave only a few columns in the primary key that will provide efficient
range scans and add the remaining dimension columns to the sorting key tuple.
[ALTER of the sorting key](../../query_language/alter.md) is a
lightweight operation because when a new column is simultaneously added to the table and to the sorting key
data parts need not be changed (they remain sorted by the new sorting key expression).
### Use of Indexes and Partitions in Queries
For`SELECT` queries, ClickHouse analyzes whether an index can be used. An index can be used if the `WHERE/PREWHERE` clause has an expression (as one of the conjunction elements, or entirely) that represents an equality or inequality comparison operation, or if it has `IN` or `LIKE` with a fixed prefix on columns or expressions that are in the primary key or partitioning key, or on certain partially repetitive functions of these columns, or logical relationships of these expressions.
Thus, it is possible to quickly run queries on one or many ranges of the primary key. In this example, queries will be fast when run for a specific tracking tag; for a specific tag and date range; for a specific tag and date; for multiple tags with a date range, and so on.
Let's look at the engine configured as follows:
```
ENGINE MergeTree() PARTITION BY toYYYYMM(EventDate) ORDER BY (CounterID, EventDate) SETTINGS index_granularity=8192
```
In this case, in queries:
``` sql
SELECT count() FROM table WHERE EventDate = toDate(now()) AND CounterID = 34
SELECT count() FROM table WHERE EventDate = toDate(now()) AND (CounterID = 34 OR CounterID = 42)
SELECT count() FROM table WHERE ((EventDate >= toDate('2014-01-01') AND EventDate <= toDate('2014-01-31')) OR EventDate = toDate('2014-05-01')) AND CounterID IN (101500, 731962, 160656) AND (CounterID = 101500 OR EventDate != toDate('2014-05-01'))
```
ClickHouse will use the primary key index to trim improper data and the monthly partitioning key to trim partitions that are in improper date ranges.
The queries above show that the index is used even for complex expressions. Reading from the table is organized so that using the index can't be slower than a full scan.
In the example below, the index can't be used.
``` sql
SELECT count() FROM table WHERE CounterID = 34 OR URL LIKE '%upyachka%'
```
To check whether ClickHouse can use the index when running a query, use the settings [force_index_by_date](../settings/settings.md#settings-force_index_by_date) and [force_primary_key](../settings/settings.md).
The key for partitioning by month allows reading only those data blocks which contain dates from the proper range. In this case, the data block may contain data for many dates (up to an entire month). Within a block, data is sorted by primary key, which might not contain the date as the first column. Because of this, using a query with only a date condition that does not specify the primary key prefix will cause more data to be read than for a single date.
## Concurrent Data Access
For concurrent table access, we use multi-versioning. In other words, when a table is simultaneously read and updated, data is read from a set of parts that is current at the time of the query. There are no lengthy locks. Inserts do not get in the way of read operations.
Reading from a table is automatically parallelized.
[Original article](https://clickhouse.yandex/docs/en/operations/table_engines/mergetree/) <!--hide-->

View File

@ -0,0 +1 @@
../../../en/operations/table_engines/mergetree.md