Remove empty parts after they were pruned by TTL

This commit is contained in:
Anton Popov 2020-11-11 19:18:21 +03:00
parent 6f0dc08ba7
commit 2497598a49
7 changed files with 94 additions and 10 deletions

View File

@ -1215,6 +1215,19 @@ void MergeTreeData::clearOldWriteAheadLogs()
}
}
void MergeTreeData::clearEmptyParts()
{
auto parts = getDataPartsVector();
for (const auto & part : parts)
{
if (part->rows_count == 0)
{
ASTPtr literal = std::make_shared<ASTLiteral>(part->name);
dropPartition(literal, /* detach = */ false, /*drop_part = */ true, global_context);
}
}
}
void MergeTreeData::rename(const String & new_table_path, const StorageID & new_table_id)
{
auto disks = getStoragePolicy()->getDisks();

View File

@ -498,6 +498,8 @@ public:
/// Must be called with locked lockForShare() because use relative_data_path.
void clearOldTemporaryDirectories(ssize_t custom_directories_lifetime_seconds = -1);
void clearEmptyParts();
/// After the call to dropAllData() no method can be called.
/// Deletes the data directory and flushes the uncompressed blocks cache and the marks cache.
void dropAllData();

View File

@ -72,6 +72,7 @@ void ReplicatedMergeTreeCleanupThread::iterate()
clearOldLogs();
clearOldBlocks();
clearOldMutations();
storage.clearEmptyParts();
}
}

View File

@ -99,6 +99,7 @@ void StorageMergeTree::startup()
{
clearOldPartsFromFilesystem();
clearOldWriteAheadLogs();
clearEmptyParts();
/// Temporary directories contain incomplete results of merges (after forced restart)
/// and don't allow to reinitialize them, so delete each of them immediately
@ -933,6 +934,7 @@ std::optional<JobAndPool> StorageMergeTree::getDataProcessingJob()
clearOldTemporaryDirectories();
clearOldWriteAheadLogs();
clearOldMutations();
clearEmptyParts();
}, PoolType::MERGE_MUTATE};
}
return {};

View File

@ -213,6 +213,14 @@ def test_ttl_double_delete_rule_returns_error(started_cluster):
assert False
def optimize_with_retry(node, table_name, retry=20):
for i in range(retry):
try:
node.query("OPTIMIZE TABLE {name} FINAL SETTINGS optimize_throw_if_noop = 1".format(name=table_name), settings={"optimize_throw_if_noop": "1"})
break
except e:
time.sleep(0.5)
@pytest.mark.parametrize("name,engine", [
("test_ttl_alter_delete", "MergeTree()"),
("test_replicated_ttl_alter_delete", "ReplicatedMergeTree('/clickhouse/test_replicated_ttl_alter_delete', '1')"),
@ -238,14 +246,6 @@ limitations under the License."""
"""
drop_table([node1], name)
def optimize_with_retry(retry=20):
for i in range(retry):
try:
node1.query("OPTIMIZE TABLE {name} FINAL".format(name=name), settings={"optimize_throw_if_noop": "1"})
break
except:
time.sleep(0.5)
node1.query(
"""
CREATE TABLE {name} (
@ -267,7 +267,7 @@ limitations under the License."""
time.sleep(1)
optimize_with_retry()
optimize_with_retry(node1, name)
r = node1.query("SELECT s1, b1 FROM {name} ORDER BY b1, s1".format(name=name)).splitlines()
assert r == ["\t1", "hello2\t2"]
@ -277,7 +277,49 @@ limitations under the License."""
time.sleep(1)
optimize_with_retry()
optimize_with_retry(node1, name)
r = node1.query("SELECT s1, b1 FROM {name} ORDER BY b1, s1".format(name=name)).splitlines()
assert r == ["\t0", "\t0", "hello2\t2"]
def test_ttl_empty_parts(started_cluster):
drop_table([node1, node2], "test_ttl_empty_parts")
for node in [node1, node2]:
node.query(
'''
CREATE TABLE test_ttl_empty_parts(date Date, id UInt32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/test_ttl', '{replica}')
ORDER BY id
SETTINGS max_bytes_to_merge_at_min_space_in_pool = 1, max_bytes_to_merge_at_max_space_in_pool = 1,
cleanup_delay_period = 1, cleanup_delay_period_random_add = 0
'''.format(replica=node.name))
for i in range (1, 7):
node1.query("INSERT INTO test_ttl_empty_parts SELECT '2{}00-01-0{}', number FROM numbers(1000)".format(i % 2, i))
assert node1.query("SELECT count() FROM test_ttl_empty_parts") == "6000\n"
assert node1.query("SELECT name FROM system.parts WHERE table = 'test_ttl_empty_parts' AND active ORDER BY name") == \
"all_0_0_0\nall_1_1_0\nall_2_2_0\nall_3_3_0\nall_4_4_0\nall_5_5_0\n"
node1.query("ALTER TABLE test_ttl_empty_parts MODIFY TTL date")
assert node1.query("SELECT count() FROM test_ttl_empty_parts") == "3000\n"
time.sleep(3) # Wait for cleanup thread
assert node1.query("SELECT name FROM system.parts WHERE table = 'test_ttl_empty_parts' AND active ORDER BY name") == \
"all_0_0_0_6\nall_2_2_0_6\nall_4_4_0_6\n"
for node in [node1, node2]:
node.query("ALTER TABLE test_ttl_empty_parts MODIFY SETTING max_bytes_to_merge_at_min_space_in_pool = 1000000000")
node.query("ALTER TABLE test_ttl_empty_parts MODIFY SETTING max_bytes_to_merge_at_max_space_in_pool = 1000000000")
optimize_with_retry(node1, 'test_ttl_empty_parts')
assert node1.query("SELECT name FROM system.parts WHERE table = 'test_ttl_empty_parts' AND active ORDER BY name") == "all_0_4_1_6\n"
# Check that after removing empty parts mutations and merges works
node1.query("INSERT INTO test_ttl_empty_parts SELECT '2100-01-20', number FROM numbers(1000)")
node1.query("ALTER TABLE test_ttl_empty_parts DELETE WHERE id % 2 = 0 SETTINGS mutations_sync = 2")
assert node1.query("SELECT count() FROM test_ttl_empty_parts") == "2000\n"
optimize_with_retry(node1, 'test_ttl_empty_parts')
assert node1.query("SELECT name FROM system.parts WHERE table = 'test_ttl_empty_parts' AND active ORDER BY name") == "all_0_7_2_8\n"

View File

@ -0,0 +1,4 @@
1000
2
500
1

View File

@ -0,0 +1,20 @@
DROP TABLE IF EXISTS ttl_empty_parts;
CREATE TABLE ttl_empty_parts (id UInt32, d Date) ENGINE = MergeTree ORDER BY tuple() PARTITION BY id;
INSERT INTO ttl_empty_parts SELECT 0, toDate('2005-01-01') + number from numbers(500);
INSERT INTO ttl_empty_parts SELECT 1, toDate('2050-01-01') + number from numbers(500);
SELECT count() FROM ttl_empty_parts;
SELECT count() FROM system.parts WHERE table = 'ttl_empty_parts' AND database = currentDatabase() AND active;
ALTER TABLE ttl_empty_parts MODIFY TTL d;
-- To be sure, that task, which clears outdated parts executed.
DETACH TABLE ttl_empty_parts;
ATTACH TABLE ttl_empty_parts;
SELECT count() FROM ttl_empty_parts;
SELECT count() FROM system.parts WHERE table = 'ttl_empty_parts' AND database = currentDatabase() AND active;
DROP TABLE ttl_empty_parts;