Add system stop moves query

This commit is contained in:
alesapin 2019-09-03 17:50:49 +03:00
parent 48004e9b7b
commit 52442cf549
10 changed files with 113 additions and 4 deletions

View File

@ -15,6 +15,7 @@ namespace ActionLocks
extern const StorageActionBlockType ReplicationQueue = 4;
extern const StorageActionBlockType DistributedSend = 5;
extern const StorageActionBlockType PartsTTLMerge = 6;
extern const StorageActionBlockType PartsMove = 7;
}

View File

@ -49,6 +49,7 @@ namespace ActionLocks
extern StorageActionBlockType ReplicationQueue;
extern StorageActionBlockType DistributedSend;
extern StorageActionBlockType PartsTTLMerge;
extern StorageActionBlockType PartsMove;
}
@ -189,6 +190,12 @@ BlockIO InterpreterSystemQuery::execute()
case Type::START_TTL_MERGES:
startStopAction(context, query, ActionLocks::PartsTTLMerge, true);
break;
case Type::STOP_MOVES:
startStopAction(context, query, ActionLocks::PartsMove, false);
break;
case Type::START_MOVES:
startStopAction(context, query, ActionLocks::PartsMove, true);
break;
case Type::STOP_FETCHES:
startStopAction(context, query, ActionLocks::PartsFetch, false);
break;

View File

@ -59,6 +59,10 @@ const char * ASTSystemQuery::typeToString(Type type)
return "STOP TTL MERGES";
case Type::START_TTL_MERGES:
return "START TTL MERGES";
case Type::STOP_MOVES:
return "STOP MOVES";
case Type::START_MOVES:
return "START MOVES";
case Type::STOP_FETCHES:
return "STOP FETCHES";
case Type::START_FETCHES:
@ -106,6 +110,8 @@ void ASTSystemQuery::formatImpl(const FormatSettings & settings, FormatState &,
|| type == Type::START_MERGES
|| type == Type::STOP_TTL_MERGES
|| type == Type::START_TTL_MERGES
|| type == Type::STOP_MOVES
|| type == Type::START_MOVES
|| type == Type::STOP_FETCHES
|| type == Type::START_FETCHES
|| type == Type::STOP_REPLICATED_SENDS

View File

@ -37,6 +37,8 @@ public:
START_TTL_MERGES,
STOP_FETCHES,
START_FETCHES,
STOP_MOVES,
START_MOVES,
STOP_REPLICATED_SENDS,
START_REPLICATED_SENDS,
STOP_REPLICATION_QUEUES,

View File

@ -58,6 +58,8 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected &
case Type::START_MERGES:
case Type::STOP_TTL_MERGES:
case Type::START_TTL_MERGES:
case Type::STOP_MOVES:
case Type::START_MOVES:
case Type::STOP_FETCHES:
case Type::START_FETCHES:
case Type::STOP_REPLICATED_SENDS:

View File

@ -96,6 +96,7 @@ bool MergeTreePartsMover::selectPartsToMove(
auto space_information = disk->getSpaceInformation();
UInt64 total_space_with_factor = space_information.getTotalSpace() * policy->getMoveFactor();
/// Do not take into account reserved space
if (total_space_with_factor > space_information.getAvailableSpace())
need_to_move.emplace(disk, total_space_with_factor - space_information.getAvailableSpace());

View File

@ -45,6 +45,7 @@ namespace ActionLocks
{
extern const StorageActionBlockType PartsMerge;
extern const StorageActionBlockType PartsTTLMerge;
extern const StorageActionBlockType PartsMove;
}
@ -1254,7 +1255,9 @@ ActionLock StorageMergeTree::getActionLock(StorageActionBlockType action_type)
if (action_type == ActionLocks::PartsMerge)
return merger_mutator.merges_blocker.cancel();
else if (action_type == ActionLocks::PartsTTLMerge)
return merger_mutator.ttl_merges_blocker.cancel();
return merger_mutator.ttl_merges_blocker.cancel();
else if (action_type == ActionLocks::PartsMove)
return parts_mover.moves_blocker.cancel();
return {};
}

View File

@ -120,6 +120,7 @@ namespace ActionLocks
extern const StorageActionBlockType PartsSend;
extern const StorageActionBlockType ReplicationQueue;
extern const StorageActionBlockType PartsTTLMerge;
extern const StorageActionBlockType PartsMove;
}
@ -2148,6 +2149,9 @@ BackgroundProcessingPoolTaskResult StorageReplicatedMergeTree::queueTask()
BackgroundProcessingPoolTaskResult StorageReplicatedMergeTree::movingPartsTask()
{
if (parts_mover.moves_blocker.isCancelled())
return BackgroundProcessingPoolTaskResult::ERROR;
auto table_lock_holder = lockStructureForShare(true, RWLockImpl::NO_QUERY);
try
@ -5231,6 +5235,9 @@ ActionLock StorageReplicatedMergeTree::getActionLock(StorageActionBlockType acti
if (action_type == ActionLocks::ReplicationQueue)
return queue.actions_blocker.cancel();
if (action_type == ActionLocks::PartsMove)
return parts_mover.moves_blocker.cancel();
return {};
}

View File

@ -46,7 +46,7 @@
</volumes>
</jbods_with_external>
<!-- Moving all parts jbod1 if accuired more than 50% -->
<!-- Moving all parts jbod1 if accuired more than 70% -->
<moving_jbod_with_external>
<volumes>
<main>

View File

@ -38,7 +38,7 @@ def get_random_string(length):
return ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(length))
def get_used_disks_for_table(node, table_name):
return node.query("select disk_name from system.parts where table == '{}' order by modification_time".format(table_name)).strip().split('\n')
return node.query("select disk_name from system.parts where table == '{}' and active=1 order by modification_time".format(table_name)).strip().split('\n')
@pytest.mark.parametrize("name,engine", [
("mt_on_jbod","MergeTree()"),
@ -167,7 +167,7 @@ def test_background_move(start_cluster, name, engine):
data = [] # 5MB in total
for i in range(5):
data.append(get_random_string(1024 * 1024)) # 1MB row
# small jbod size is 40MB, so lets insert 5MB batch 2 times (less than 70%)
# small jbod size is 40MB, so lets insert 5MB batch 6 times
node1.query("INSERT INTO {} VALUES {}".format(name, ','.join(["('" + x + "')" for x in data])))
@ -193,6 +193,86 @@ def test_background_move(start_cluster, name, engine):
finally:
node1.query("DROP TABLE IF EXISTS {name}".format(name=name))
@pytest.mark.parametrize("name,engine", [
("stopped_moving_mt","MergeTree()"),
("stopped_moving_replicated_mt","ReplicatedMergeTree('/clickhouse/stopped_moving_replicated_mt', '1')",),
])
def test_start_stop_moves(start_cluster, name, engine):
try:
node1.query("""
CREATE TABLE {name} (
s1 String
) ENGINE = {engine}
ORDER BY tuple()
SETTINGS storage_policy_name='moving_jbod_with_external'
""".format(name=name, engine=engine))
node1.query("INSERT INTO {} VALUES ('HELLO')".format(name))
node1.query("INSERT INTO {} VALUES ('WORLD')".format(name))
used_disks = get_used_disks_for_table(node1, name)
assert all(d == "jbod1" for d in used_disks), "All writes shoud go to jbods"
first_part = node1.query("SELECT name FROM system.parts WHERE table = '{}' ORDER BY modification_time LIMIT 1".format(name)).strip()
node1.query("SYSTEM STOP MOVES")
with pytest.raises(QueryRuntimeException):
node1.query("ALTER TABLE {} MOVE PART '{}' TO VOLUME 'external'".format(name, first_part))
used_disks = get_used_disks_for_table(node1, name)
assert all(d == "jbod1" for d in used_disks), "Blocked moves doesn't actually move something"
node1.query("SYSTEM START MOVES")
node1.query("ALTER TABLE {} MOVE PART '{}' TO VOLUME 'external'".format(name, first_part))
disk = node1.query("SELECT disk_name FROM system.parts WHERE table = '{}' and name = '{}'".format(name, first_part)).strip()
assert disk == "external"
node1.query("TRUNCATE TABLE {}".format(name))
node1.query("SYSTEM STOP MOVES {}".format(name))
node1.query("SYSTEM STOP MERGES {}".format(name))
for i in range(5):
data = [] # 5MB in total
for i in range(5):
data.append(get_random_string(1024 * 1024)) # 1MB row
# jbod size is 40MB, so lets insert 5MB batch 7 times
node1.query("INSERT INTO {} VALUES {}".format(name, ','.join(["('" + x + "')" for x in data])))
used_disks = get_used_disks_for_table(node1, name)
retry = 5
i = 0
while not sum(1 for x in used_disks if x == 'jbod1') <= 2 and i < retry:
time.sleep(0.1)
used_disks = get_used_disks_for_table(node1, name)
i += 1
# first (oldest) part doesn't move anywhere
assert used_disks[0] == 'jbod1'
node1.query("SYSTEM START MOVES {}".format(name))
node1.query("SYSTEM START MERGES {}".format(name))
# wait sometime until background backoff finishes
retry = 30
i = 0
while not sum(1 for x in used_disks if x == 'jbod1') <= 2 and i < retry:
time.sleep(1)
used_disks = get_used_disks_for_table(node1, name)
i += 1
assert sum(1 for x in used_disks if x == 'jbod1') <= 2
# first (oldest) part moved to external
assert used_disks[0] == 'external'
finally:
node1.query("DROP TABLE IF EXISTS {name}".format(name=name))
def get_path_for_part_from_part_log(node, table, part_name):
node.query("SYSTEM FLUSH LOGS")