Lock parts with dirty hack

This commit is contained in:
alesapin 2019-08-20 21:00:48 +03:00
parent 9c8ec6e4fc
commit 120e27a385
5 changed files with 217 additions and 84 deletions

View File

@ -39,7 +39,7 @@ void ReplicatedMergeTreeQueue::addVirtualParts(const MergeTreeData::DataParts &
}
void ReplicatedMergeTreeQueue::disableMergesForParts(const MergeTreeData::DataPartsVector & data_parts)
Names ReplicatedMergeTreeQueue::disableMergesForParts(const MergeTreeData::DataPartsVector & data_parts)
{
std::lock_guard lock(state_mutex);
for (const auto & data_part : data_parts)
@ -49,8 +49,16 @@ void ReplicatedMergeTreeQueue::disableMergesForParts(const MergeTreeData::DataPa
+ " already assigned to background operation.", ErrorCodes::PART_IS_TEMPORARILY_LOCKED);
}
Names result;
for (const auto & data_part : data_parts)
virtual_parts.add(data_part->name);
{
MergeTreePartInfo info_copy = data_part->info;
info_copy.partition_id = "disabled"; /// Fake name to block this part
auto fake_part_name = info_copy.getPartName();
virtual_parts.add(fake_part_name);
result.emplace_back(fake_part_name);
}
return result;
}
@ -401,10 +409,15 @@ bool ReplicatedMergeTreeQueue::remove(zkutil::ZooKeeperPtr zookeeper, const Stri
bool ReplicatedMergeTreeQueue::removeFromVirtualParts(const MergeTreePartInfo & part_info)
{
std::unique_lock lock(state_mutex);
std::lock_guard lock(state_mutex);
return virtual_parts.remove(part_info);
}
bool ReplicatedMergeTreeQueue::removeFromVirtualParts(const String & part_name)
{
std::lock_guard lock(state_mutex);
return virtual_parts.remove(part_name);
}
void ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, Coordination::WatchCallback watch_callback)
{

View File

@ -253,6 +253,8 @@ public:
bool removeFromVirtualParts(const MergeTreePartInfo & part_info);
bool removeFromVirtualParts(const String & part_name);
/** Copy the new entries from the shared log to the queue of this replica. Set the log_pointer to the appropriate value.
* If watch_callback is not empty, will call it when new entries appear in the log.
* If there were new entries, notifies storage.queue_task_handle.
@ -331,7 +333,7 @@ public:
/// Add part to virtual_parts, which means that part must exist
/// after processing replication log up to log_pointer.
/// Throws exception if any part was in virtual parts
void disableMergesForParts(const MergeTreeData::DataPartsVector & data_parts);
Names disableMergesForParts(const MergeTreeData::DataPartsVector & data_parts);
/// Cheks that part is already in virtual parts
bool isPartAssignedToBackgroundOperation(const MergeTreeData::DataPartPtr & data_part) const;

View File

@ -2176,6 +2176,8 @@ struct CurrentlyMovingPartsTagger
ReplicatedMergeTreeQueue & queue;
Names remove_names;
public:
CurrentlyMovingPartsTagger(MergeTreeMovingParts parts_, ReplicatedMergeTreeQueue & queue_)
: parts(std::move(parts_))
@ -2186,14 +2188,15 @@ public:
data_parts.emplace_back(moving_part.part);
/// Throws exception if some parts already exists
queue.disableMergesForParts(data_parts);
remove_names = queue.disableMergesForParts(data_parts);
}
~CurrentlyMovingPartsTagger()
{
/// May return false, but we don't care, it's ok.
for (auto & part : parts)
queue.removeFromVirtualParts(part.part->info);
for (auto & part_name : remove_names)
if (!queue.removeFromVirtualParts(part_name))
std::terminate();
}
};

View File

@ -2,20 +2,16 @@
<storage_configuration>
<disks>
<default> <!--path for default disk is provided in main config-->
<keep_free_space_bytes>1000</keep_free_space_bytes>
<default>
<keep_free_space_bytes>1024</keep_free_space_bytes>
</default>
<!-- <mainstorage>-->
<!-- <path>/mainstorage/</path>&lt;!&ndash; trailing slash is mandatory &ndash;&gt;-->
<!-- <keep_free_space_bytes>1000000</keep_free_space_bytes>-->
<!-- </mainstorage>-->
<jbod1>
<path>/jbod1/</path>
<keep_free_space_bytes>10000000</keep_free_space_bytes>
</jbod1>
<jbod2>
<path>/jbod2/</path>
<keep_free_space_bytes>10000000</keep_free_space_bytes>
<keep_free_space_bytes>10485760</keep_free_space_bytes>
<!-- 10MB -->
</jbod2>
<external>
<path>/external/</path>
@ -23,63 +19,64 @@
</disks>
<policies>
<!-- default: store on mainstorage -->
<!-- <default>-->
<!-- <volumes>-->
<!-- <volume>-->
<!-- <disk>mainstorage</disk>-->
<!-- <max_data_part_size_bytes>20000000</max_data_part_size_bytes>-->
<!-- </volume>-->
<!-- </volumes>-->
<!-- </default>-->
<!-- store on JBOD by default (round-robin), store big parts on external -->
<small_jbod_with_external>
<volumes>
<main>
<disk>jbod1</disk>
</main>
<external>
<disk>external</disk>
</external>
</volumes>
</small_jbod_with_external>
<!-- store on JBOD by default (round-robin), store big parts on external -->
<jbods_with_external>
<volumes>
<main>
<disk>jbod1</disk>
<disk>jbod2</disk>
<max_data_part_size_bytes>10485760</max_data_part_size_bytes>
<!-- 10MB -->
</main>
<external>
<disk>external</disk>
</external>
</volumes>
</jbods_with_external>
<!-- Moving all parts jbod1 if accuired more than 50% -->
<moving_jbod_with_external>
<volumes>
<main>
<disk>jbod1</disk>
</main>
<external>
<disk>external</disk>
</external>
</volumes>
<move_factor>0.7</move_factor>
</moving_jbod_with_external>
<!-- store local by default, store big parts on external -->
<default_disk_with_external>
<volumes>
<small>
<!-- names for the volumes should be added to allow moving parts between volumed with DDL commands -->
<disk>default</disk>
<max_data_part_size_bytes>2000000</max_data_part_size_bytes>
<max_data_part_size_bytes>2097152</max_data_part_size_bytes>
<!-- 2MB -->
</small>
<big>
<disk>external</disk>
<max_data_part_size_bytes>20000000</max_data_part_size_bytes>
<max_data_part_size_bytes>20971520</max_data_part_size_bytes>
<!-- 20MB -->
</big>
</volumes>
</default_disk_with_external>
<!-- store on JBOD by default (round-robin), store big parts on external -->
<jbod_with_external>
<volumes>
<main>
<disk>jbod1</disk>
<disk>jbod2</disk>
<max_data_part_size_bytes>10000000</max_data_part_size_bytes>
<!-- max_data_part_size_ratio>0.2</max_data_part_size_ratio -->
</main>
<external>
<disk>external</disk>
<!--max_data_part_size_bytes>30000000</max_data_part_size_bytes-->
</external>
</volumes>
</jbod_with_external>
<!-- <mainstorage_only>-->
<!-- <volumes>-->
<!-- <main>-->
<!-- <disk>mainstorage</disk>-->
<!-- <max_data_part_size_bytes>9000000</max_data_part_size_bytes>-->
<!-- </main>-->
<!-- <extra>-->
<!-- <disk>external</disk>-->
<!-- <max_data_part_size_bytes>20000000</max_data_part_size_bytes>-->
<!-- </extra>-->
<!-- </volumes>-->
<!-- </mainstorage_only>-->
</policies>
</storage_configuration>
</yandex>
</yandex>

View File

@ -1,6 +1,8 @@
import time
import pytest
import os
import random
import string
from helpers.cluster import ClickHouseCluster
@ -44,43 +46,159 @@ def start_cluster():
# Check that configuration is valid
def test_config(start_cluster):
def test_config_parser(start_cluster):
assert node1.query("select name, path, keep_free_space from system.disks") == "default\t/var/lib/clickhouse/data/\t1000\nexternal\t/external/\t0\njbod1\t/jbod1/\t10000000\njbod2\t/jbod2/\t10000000\n"
assert node2.query("select name, path, keep_free_space from system.disks") == "default\t/var/lib/clickhouse/data/\t1000\nexternal\t/external/\t0\njbod1\t/jbod1/\t10000000\njbod2\t/jbod2/\t10000000\n"
assert node1.query("select * from system.storage_policies") == "" \
"default\tdefault\t0\t['default']\t18446744073709551615\n" \
"default_disk_with_external\tsmall\t0\t['default']\t2000000\n" \
"default_disk_with_external\tbig\t1\t['external']\t20000000\n" \
"jbod_with_external\tmain\t0\t['jbod1','jbod2']\t10000000\n" \
"jbod_with_external\texternal\t1\t['external']\t18446744073709551615\n"
"jbods_with_external\tmain\t0\t['jbod1','jbod2']\t10000000\n" \
"jbods_with_external\texternal\t1\t['external']\t18446744073709551615\n"
assert node2.query("select * from system.storage_policies") == "" \
"default\tdefault\t0\t['default']\t18446744073709551615\n" \
"default_disk_with_external\tsmall\t0\t['default']\t2000000\n" \
"default_disk_with_external\tbig\t1\t['external']\t20000000\n" \
"jbod_with_external\tmain\t0\t['jbod1','jbod2']\t10000000\n" \
"jbod_with_external\texternal\t1\t['external']\t18446744073709551615\n"
"jbods_with_external\tmain\t0\t['jbod1','jbod2']\t10000000\n" \
"jbods_with_external\texternal\t1\t['external']\t18446744073709551615\n"
def test_write_on_second_volume(start_cluster):
node1.query("create table node1_mt ( d UInt64 ) ENGINE = MergeTree() ORDER BY d SETTINGS storage_policy_name='jbod_with_external'")
n = 1000
flag = True
i = 0
used_disks = set()
retries = 0
# Keep insert while external disk do not contain parts
while flag and i < 1000:
s = ["(" + str(n * i + k) + ")" for k in range(n)]
node1.query("insert into node1_mt values " + ', '.join(s))
used_disks_ = node1.query("select distinct disk_name from system.parts where table == 'node1_mt'").strip().split("\n")
used_disks.update(used_disks_)
flag = "external" not in used_disks_
i += 1
def get_random_string(length):
return ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(length))
def get_used_disks_for_table(node, table_name):
return node.query("select disk_name from system.parts where table == '{}' order by modification_time".format(table_name)).strip().split('\n')
def test_round_robin(start_cluster):
try:
node1.query("""
CREATE TABLE mt_on_jbod (
d UInt64
) ENGINE = MergeTree()
ORDER BY d
SETTINGS storage_policy_name='jbods_with_external'
""")
# first should go to the jbod1
node1.query("insert into mt_on_jbod select * from numbers(10000)")
used_disk = get_used_disks_for_table(node1, 'mt_on_jbod')
assert len(used_disk) == 1, 'More than one disk used for single insert'
assert used_disk[0] == 'jbod1', 'First disk should by jbod1'
node1.query("insert into mt_on_jbod select * from numbers(10000)")
used_disks = get_used_disks_for_table(node1, 'mt_on_jbod')
assert len(used_disks) == 2, 'Two disks should be used for two parts'
assert used_disks[0] == 'jbod1'
assert used_disks[1] == 'jbod2'
node1.query("insert into mt_on_jbod select * from numbers(10000)")
used_disks = get_used_disks_for_table(node1, 'mt_on_jbod')
assert len(used_disks) == 3
assert used_disks[0] == 'jbod1'
assert used_disks[1] == 'jbod2'
assert used_disks[2] == 'jbod1'
finally:
node1.query("DROP TABLE IF EXISTS mt_ob_jbod")
def test_max_data_part_size(start_cluster):
try:
node1.query("""
CREATE TABLE mt_with_huge_part (
s1 String
) ENGINE = MergeTree()
ORDER BY tuple()
SETTINGS storage_policy_name='jbods_with_external'
""")
data = [] # 10MB in total
for i in range(10):
data.append(get_random_string(1024 * 1024)) # 1MB row
node1.query("INSERT INTO mt_with_huge_part VALUES {}".format(','.join(["('" + x + "')" for x in data])))
used_disks = get_used_disks_for_table(node1, 'mt_with_huge_part')
assert len(used_disks) == 1
assert used_disks[0] == 'external'
finally:
node1.query("DROP TABLE IF EXISTS mt_with_huge_part")
def test_jbod_overflow(start_cluster):
try:
node1.query("""
CREATE TABLE mt_with_overflow (
s1 String
) ENGINE = MergeTree()
ORDER BY tuple()
SETTINGS storage_policy_name='small_jbod_with_external'
""")
data = [] # 5MB in total
for i in range(5):
data.append(get_random_string(1024 * 1024)) # 1MB row
node1.query("SYSTEM STOP MERGES")
# small jbod size is 40MB, so lets insert 5MB batch 7 times
for i in range(7):
node1.query("INSERT INTO mt_with_overflow VALUES {}".format(','.join(["('" + x + "')" for x in data])))
used_disks = get_used_disks_for_table(node1, 'mt_with_overflow')
assert all(disk == 'jbod1' for disk in used_disks)
# should go to the external disk (jbod is overflown)
data = [] # 10MB in total
for i in range(10):
data.append(get_random_string(1024 * 1024)) # 1MB row
node1.query("INSERT INTO mt_with_overflow VALUES {}".format(','.join(["('" + x + "')" for x in data])))
used_disks = get_used_disks_for_table(node1, 'mt_with_overflow')
assert used_disks[-1] == 'external'
node1.query("SYSTEM START MERGES")
node1.query("OPTIMIZE TABLE mt_with_overflow FINAL")
disks_for_merges = node1.query("SELECT disk_name FROM system.parts WHERE table == 'mt_with_overflow' AND level >= 1 ORDER BY modification_time").strip().split('\n')
assert all(disk == 'external' for disk in disks_for_merges)
finally:
node1.query("DROP TABLE IF EXISTS mt_with_overflow")
@pytest.mark.parametrize("name,engine", [
("moving_mt","MergeTree()"),
("moving_replicated_mt","ReplicatedMergeTree('/clickhouse/sometable', '1')",),
])
def test_background_move(start_cluster, name, engine):
try:
node1.query("""
CREATE TABLE {name} (
s1 String
) ENGINE = {engine}
ORDER BY tuple()
SETTINGS storage_policy_name='moving_jbod_with_external'
""".format(name=name, engine=engine))
for i in range(5):
data = [] # 5MB in total
for i in range(5):
data.append(get_random_string(1024 * 1024)) # 1MB row
# small jbod size is 40MB, so lets insert 5MB batch 2 times (less than 70%)
node1.query("INSERT INTO {} VALUES {}".format(name, ','.join(["('" + x + "')" for x in data])))
time.sleep(5)
used_disks = get_used_disks_for_table(node1, name)
# Maximum two parts on jbod1
assert sum(1 for x in used_disks if x == 'jbod1') <= 2
# first (oldest) part was moved to external
assert used_disks[0] == 'external'
finally:
node1.query("DROP TABLE IF EXISTS {name}".format(name=name))
# Check if all disks from policy was used
assert used_disks == {'jbod1', 'jbod2', 'external'}
node1.query("drop table node1_mt")
assert node1.query("select distinct disk_name from system.parts where table == 'node1_mt'").strip().split("\n") == ['']
def test_default(start_cluster):
@ -182,12 +300,12 @@ select name, data_paths, storage_policy from system.tables where name='table_wit
INSERT INTO table_with_storage_policy_default SELECT rand64() FROM numbers(100);
CREATE TABLE table_with_storage_policy_default_explicit (id UInt64) Engine=MergeTree() ORDER BY (id) SETTINGS storage_table_with_storage_policy_name='default';
CREATE TABLE table_with_storage_policy_default_disk_with_external (id UInt64) Engine=MergeTree() ORDER BY (id) SETTINGS storage_table_with_storage_policy_name='default_disk_with_external';
CREATE TABLE table_with_storage_policy_jbod_with_external (id UInt64) Engine=MergeTree() ORDER BY (id) SETTINGS storage_table_with_storage_policy_name='jbod_with_external';
CREATE TABLE table_with_storage_policy_jbod_with_external (id UInt64) Engine=MergeTree() ORDER BY (id) SETTINGS storage_table_with_storage_policy_name='jbods_with_external';
CREATE TABLE replicated_table_with_storage_policy_default (id UInt64) Engine=ReplicatedMergeTree('/clickhouse/tables/{database}/{table}', '{replica}') ORDER BY (id);
CREATE TABLE replicated_table_with_storage_policy_default_explicit (id UInt64) Engine=ReplicatedMergeTree('/clickhouse/tables/{database}/{table}', '{replica}') ORDER BY (id) SETTINGS storage_table_with_storage_policy_name='default';
CREATE TABLE replicated_table_with_storage_policy_default_disk_with_external (id UInt64) Engine=ReplicatedMergeTree('/clickhouse/tables/{database}/{table}', '{replica}') ORDER BY (id) SETTINGS storage_table_with_storage_policy_name='default_disk_with_external';
CREATE TABLE replicated_table_with_storage_policy_jbod_with_external (id UInt64) Engine=ReplicatedMergeTree('/clickhouse/tables/{database}/{table}', '{replica}') ORDER BY (id) SETTINGS storage_table_with_storage_policy_name='jbod_with_external';
CREATE TABLE replicated_table_with_storage_policy_jbod_with_external (id UInt64) Engine=ReplicatedMergeTree('/clickhouse/tables/{database}/{table}', '{replica}') ORDER BY (id) SETTINGS storage_table_with_storage_policy_name='jbods_with_external';
```