Merging #12548 - Correction to merge_with_ttl_timeout logic by @excitoon (#12982)

* Fixed `merge_with_ttl_timeout` logic.
* Separate TTL-merge timers for each partition

Co-authored-by: Vladimir Chebotarev <vladimir.chebotarev@gmail.com>
This commit is contained in:
Alexander Kazakov 2020-07-28 17:38:34 +03:00 committed by GitHub
parent ba7c33f806
commit 2bde393499
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 112 additions and 42 deletions

View File

@ -30,13 +30,15 @@
#include <Common/interpolate.h>
#include <Common/typeid_cast.h>
#include <Common/escapeForFileName.h>
#include <cmath>
#include <numeric>
#include <iomanip>
#include <cmath>
#include <ctime>
#include <iomanip>
#include <numeric>
#include <boost/algorithm/string/replace.hpp>
namespace ProfileEvents
{
extern const Event MergedRows;
@ -219,14 +221,13 @@ bool MergeTreeDataMergerMutator::selectPartsToMerge(
return false;
}
time_t current_time = time(nullptr);
time_t current_time = std::time(nullptr);
IMergeSelector::Partitions partitions;
const String * prev_partition_id = nullptr;
/// Previous part only in boundaries of partition frame
const MergeTreeData::DataPartPtr * prev_part = nullptr;
bool has_part_with_expired_ttl = false;
for (const MergeTreeData::DataPartPtr & part : data_parts)
{
/// Check predicate only for first part in each partition.
@ -258,11 +259,6 @@ bool MergeTreeDataMergerMutator::selectPartsToMerge(
part_info.min_ttl = part->ttl_infos.part_min_ttl;
part_info.max_ttl = part->ttl_infos.part_max_ttl;
time_t ttl = data_settings->ttl_only_drop_parts ? part_info.max_ttl : part_info.min_ttl;
if (ttl && ttl <= current_time)
has_part_with_expired_ttl = true;
partitions.back().emplace_back(part_info);
/// Check for consistency of data parts. If assertion is failed, it requires immediate investigation.
@ -275,38 +271,38 @@ bool MergeTreeDataMergerMutator::selectPartsToMerge(
prev_part = &part;
}
std::unique_ptr<IMergeSelector> merge_selector;
IMergeSelector::PartsInPartition parts_to_merge;
SimpleMergeSelector::Settings merge_settings;
if (aggressive)
merge_settings.base = 1;
bool can_merge_with_ttl =
(current_time - last_merge_with_ttl > data_settings->merge_with_ttl_timeout);
/// NOTE Could allow selection of different merge strategy.
if (can_merge_with_ttl && has_part_with_expired_ttl && !ttl_merges_blocker.isCancelled())
if (!ttl_merges_blocker.isCancelled())
{
merge_selector = std::make_unique<TTLMergeSelector>(current_time, data_settings->ttl_only_drop_parts);
last_merge_with_ttl = current_time;
TTLMergeSelector merge_selector(
next_ttl_merge_times_by_partition,
current_time,
data_settings->merge_with_ttl_timeout,
data_settings->ttl_only_drop_parts);
parts_to_merge = merge_selector.select(partitions, max_total_size_to_merge);
}
else
merge_selector = std::make_unique<SimpleMergeSelector>(merge_settings);
IMergeSelector::PartsInPartition parts_to_merge = merge_selector->select(
partitions,
max_total_size_to_merge);
if (parts_to_merge.empty())
{
if (out_disable_reason)
*out_disable_reason = "There is no need to merge parts according to merge selector algorithm";
return false;
}
SimpleMergeSelector::Settings merge_settings;
if (aggressive)
merge_settings.base = 1;
/// Allow to "merge" part with itself if we need remove some values with expired ttl
if (parts_to_merge.size() == 1 && !has_part_with_expired_ttl)
throw Exception("Logical error: merge selector returned only one part to merge", ErrorCodes::LOGICAL_ERROR);
parts_to_merge = SimpleMergeSelector(merge_settings)
.select(partitions, max_total_size_to_merge);
/// Do not allow to "merge" part with itself for regular merges, unless it is a TTL-merge where it is ok to remove some values with expired ttl
if (parts_to_merge.size() == 1)
throw Exception("Logical error: merge selector returned only one part to merge", ErrorCodes::LOGICAL_ERROR);
if (parts_to_merge.empty())
{
if (out_disable_reason)
*out_disable_reason = "There is no need to merge parts according to merge selector algorithm";
return false;
}
}
MergeTreeData::DataPartsVector parts;
parts.reserve(parts_to_merge.size());

View File

@ -5,6 +5,7 @@
#include <atomic>
#include <functional>
#include <Common/ActionBlocker.h>
#include <Storages/MergeTree/TTLMergeSelector.h>
namespace DB
@ -242,8 +243,10 @@ private:
/// When the last time you wrote to the log that the disk space was running out (not to write about this too often).
time_t disk_space_warning_time = 0;
/// Last time when TTLMergeSelector has been used
time_t last_merge_with_ttl = 0;
/// Stores the next TTL merge due time for each partition (used only by TTLMergeSelector)
TTLMergeSelector::PartitionIdToTTLs next_ttl_merge_times_by_partition;
/// Performing TTL merges independently for each partition guarantees that
/// there is only a limited number of TTL merges and no partition stores data, that is too stale
};

View File

@ -1,12 +1,20 @@
#include <Storages/MergeTree/TTLMergeSelector.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <cmath>
#include <algorithm>
#include <cmath>
namespace DB
{
const String & getPartitionIdForPart(const TTLMergeSelector::Part & part_info)
{
const MergeTreeData::DataPartPtr & part = *static_cast<const MergeTreeData::DataPartPtr *>(part_info.data);
return part->info.partition_id;
}
IMergeSelector::PartsInPartition TTLMergeSelector::select(
const Partitions & partitions,
const size_t max_total_size_to_merge)
@ -18,15 +26,24 @@ IMergeSelector::PartsInPartition TTLMergeSelector::select(
for (size_t i = 0; i < partitions.size(); ++i)
{
for (auto it = partitions[i].begin(); it != partitions[i].end(); ++it)
const auto & mergeable_parts_in_partition = partitions[i];
if (mergeable_parts_in_partition.empty())
continue;
const auto & partition_id = getPartitionIdForPart(mergeable_parts_in_partition.front());
const auto & next_merge_time_for_partition = merge_due_times[partition_id];
if (next_merge_time_for_partition > current_time)
continue;
for (Iterator part_it = mergeable_parts_in_partition.cbegin(); part_it != mergeable_parts_in_partition.cend(); ++part_it)
{
time_t ttl = only_drop_parts ? it->max_ttl : it->min_ttl;
time_t ttl = only_drop_parts ? part_it->max_ttl : part_it->min_ttl;
if (ttl && (partition_to_merge_index == -1 || ttl < partition_to_merge_min_ttl))
{
partition_to_merge_min_ttl = ttl;
partition_to_merge_index = i;
best_begin = it;
best_begin = part_it;
}
}
}
@ -68,6 +85,9 @@ IMergeSelector::PartsInPartition TTLMergeSelector::select(
++best_end;
}
const auto & best_partition_id = getPartitionIdForPart(best_partition.front());
merge_due_times[best_partition_id] = current_time + merge_cooldown_time;
return PartsInPartition(best_begin, best_end);
}

View File

@ -1,7 +1,10 @@
#pragma once
#include <Core/Types.h>
#include <Storages/MergeTree/MergeSelector.h>
#include <map>
namespace DB
{
@ -10,17 +13,27 @@ namespace DB
* It selects parts to merge by greedy algorithm:
* 1. Finds part with the most earliest expired ttl and includes it to result.
* 2. Tries to find the longest range of parts with expired ttl, that includes part from step 1.
* Finally, merge selector updates TTL merge timer for the selected partition
*/
class TTLMergeSelector : public IMergeSelector
{
public:
explicit TTLMergeSelector(time_t current_time_, bool only_drop_parts_) : current_time(current_time_), only_drop_parts(only_drop_parts_) {}
using PartitionIdToTTLs = std::map<String, time_t>;
explicit TTLMergeSelector(PartitionIdToTTLs & merge_due_times_, time_t current_time_, Int64 merge_cooldown_time_, bool only_drop_parts_)
: merge_due_times(merge_due_times_),
current_time(current_time_),
merge_cooldown_time(merge_cooldown_time_),
only_drop_parts(only_drop_parts_) {}
PartsInPartition select(
const Partitions & partitions,
const size_t max_total_size_to_merge) override;
private:
PartitionIdToTTLs & merge_due_times;
time_t current_time;
Int64 merge_cooldown_time;
bool only_drop_parts;
};

View File

@ -5,10 +5,12 @@ import helpers.client as client
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import TSV
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', with_zookeeper=True)
node2 = cluster.add_instance('node2', with_zookeeper=True)
@pytest.fixture(scope="module")
def started_cluster():
try:
@ -22,11 +24,13 @@ def started_cluster():
finally:
cluster.shutdown()
def drop_table(nodes, table_name):
for node in nodes:
node.query("DROP TABLE IF EXISTS {} NO DELAY".format(table_name))
time.sleep(1)
def test_ttl_columns(started_cluster):
drop_table([node1, node2], "test_ttl")
for node in [node1, node2]:
@ -47,6 +51,40 @@ def test_ttl_columns(started_cluster):
assert TSV(node2.query("SELECT id, a, b FROM test_ttl ORDER BY id")) == TSV(expected)
def test_merge_with_ttl_timeout(started_cluster):
table = "test_merge_with_ttl_timeout"
drop_table([node1, node2], table)
for node in [node1, node2]:
node.query(
'''
CREATE TABLE {table}(date DateTime, id UInt32, a Int32 TTL date + INTERVAL 1 DAY, b Int32 TTL date + INTERVAL 1 MONTH)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/{table}', '{replica}')
ORDER BY id PARTITION BY toDayOfMonth(date);
'''.format(replica=node.name, table=table))
node1.query("SYSTEM STOP TTL MERGES {table}".format(table=table))
node2.query("SYSTEM STOP TTL MERGES {table}".format(table=table))
for i in range(1, 4):
node1.query("INSERT INTO {table} VALUES (toDateTime('2000-10-{day:02d} 10:00:00'), 1, 2, 3)".format(day=i, table=table))
assert node1.query("SELECT countIf(a = 0) FROM {table}".format(table=table)) == "0\n"
assert node2.query("SELECT countIf(a = 0) FROM {table}".format(table=table)) == "0\n"
node1.query("SYSTEM START TTL MERGES {table}".format(table=table))
node2.query("SYSTEM START TTL MERGES {table}".format(table=table))
time.sleep(15) # TTL merges shall happen.
for i in range(1, 4):
node1.query("INSERT INTO {table} VALUES (toDateTime('2000-10-{day:02d} 10:00:00'), 1, 2, 3)".format(day=i, table=table))
time.sleep(15) # TTL merges shall not happen.
assert node1.query("SELECT countIf(a = 0) FROM {table}".format(table=table)) == "3\n"
assert node2.query("SELECT countIf(a = 0) FROM {table}".format(table=table)) == "3\n"
def test_ttl_many_columns(started_cluster):
drop_table([node1, node2], "test_ttl_2")
for node in [node1, node2]: