Merge pull request #8104 from excitoon-favorites/test_move_after_merge

FIxed behavior with ALTER MOVE ran immediately after merge finish moves superpart of specified
This commit is contained in:
alesapin 2019-12-10 17:15:08 +03:00 committed by GitHub
commit 67f1ff1c8b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 73 additions and 8 deletions

View File

@ -34,6 +34,7 @@ namespace ErrorCodes
extern const int CANNOT_STATVFS;
extern const int NOT_ENOUGH_SPACE;
extern const int NOT_IMPLEMENTED;
extern const int NO_SUCH_DATA_PART;
extern const int SYSTEM_ERROR;
extern const int UNKNOWN_ELEMENT_IN_CONFIG;
extern const int EXCESSIVE_ELEMENT_IN_CONFIG;

View File

@ -2574,7 +2574,6 @@ MergeTreeData::DataPartPtr MergeTreeData::getActiveContainingPart(const String &
return getActiveContainingPart(part_info);
}
MergeTreeData::DataPartsVector MergeTreeData::getDataPartsVectorInPartition(MergeTreeData::DataPartState state, const String & partition_id)
{
DataPartStateAndPartitionID state_with_partition{state, partition_id};
@ -2740,8 +2739,9 @@ void MergeTreeData::movePartitionToDisk(const ASTPtr & partition, const String &
DataPartsVector parts;
if (moving_part)
{
parts.push_back(getActiveContainingPart(partition_id));
if (!parts.back())
auto part_info = MergeTreePartInfo::fromPartName(partition_id, format_version);
parts.push_back(getActiveContainingPart(part_info));
if (!parts.back() || parts.back()->name != part_info.getPartName())
throw Exception("Part " + partition_id + " is not exists or not active", ErrorCodes::NO_SUCH_DATA_PART);
}
else
@ -2756,6 +2756,9 @@ void MergeTreeData::movePartitionToDisk(const ASTPtr & partition, const String &
return part_ptr->disk->getName() == disk->getName();
}), parts.end());
if (parts.empty())
throw Exception("Nothing to move", ErrorCodes::NO_SUCH_DATA_PART);
if (parts.empty())
{
String no_parts_to_move_message;
@ -2768,7 +2771,7 @@ void MergeTreeData::movePartitionToDisk(const ASTPtr & partition, const String &
}
if (!movePartsToSpace(parts, std::static_pointer_cast<const DiskSpace::Space>(disk)))
throw Exception("Cannot move parts because moves are manually disabled.", ErrorCodes::ABORTED);
throw Exception("Cannot move parts because moves are manually disabled", ErrorCodes::ABORTED);
}
@ -2784,18 +2787,21 @@ void MergeTreeData::movePartitionToVolume(const ASTPtr & partition, const String
DataPartsVector parts;
if (moving_part)
{
parts.push_back(getActiveContainingPart(partition_id));
if (!parts.back())
auto part_info = MergeTreePartInfo::fromPartName(partition_id, format_version);
parts.emplace_back(getActiveContainingPart(part_info));
if (!parts.back() || parts.back()->name != part_info.getPartName())
throw Exception("Part " + partition_id + " is not exists or not active", ErrorCodes::NO_SUCH_DATA_PART);
}
else
parts = getDataPartsVectorInPartition(MergeTreeDataPartState::Committed, partition_id);
auto volume = storage_policy->getVolumeByName(name);
if (!volume)
throw Exception("Volume " + name + " does not exists on policy " + storage_policy->getName(), ErrorCodes::UNKNOWN_DISK);
if (parts.empty())
throw Exception("Nothing to move", ErrorCodes::NO_SUCH_DATA_PART);
parts.erase(std::remove_if(parts.begin(), parts.end(), [&](auto part_ptr)
{
for (const auto & disk : volume->disks)
@ -2820,7 +2826,7 @@ void MergeTreeData::movePartitionToVolume(const ASTPtr & partition, const String
}
if (!movePartsToSpace(parts, std::static_pointer_cast<const DiskSpace::Space>(volume)))
throw Exception("Cannot move parts because moves are manually disabled.", ErrorCodes::ABORTED);
throw Exception("Cannot move parts because moves are manually disabled", ErrorCodes::ABORTED);
}

View File

@ -1108,3 +1108,61 @@ def test_kill_while_insert(start_cluster):
finally:
"""Don't drop table afterwards to not shadow assertion."""
def test_move_while_merge(start_cluster):
try:
name = "test_move_while_merge"
node1.query("""
CREATE TABLE {name} (
n Int64
) ENGINE = MergeTree
ORDER BY sleep(2)
SETTINGS storage_policy='small_jbod_with_external'
""".format(name=name))
node1.query("INSERT INTO {name} VALUES (1)".format(name=name))
node1.query("INSERT INTO {name} VALUES (2)".format(name=name))
parts = node1.query("SELECT name FROM system.parts WHERE table = '{name}' AND active = 1".format(name=name)).splitlines()
assert len(parts) == 2
def optimize():
node1.query("OPTIMIZE TABLE {name}".format(name=name))
optimize = threading.Thread(target=optimize)
optimize.start()
time.sleep(0.5)
with pytest.raises(QueryRuntimeException):
node1.query("ALTER TABLE {name} MOVE PART '{part}' TO DISK 'external'".format(name=name, part=parts[0]))
exiting = False
no_exception = {}
def alter():
while not exiting:
try:
node1.query("ALTER TABLE {name} MOVE PART '{part}' TO DISK 'external'".format(name=name, part=parts[0]))
no_exception['missing'] = 'exception'
break
except QueryRuntimeException:
""""""
alter_thread = threading.Thread(target=alter)
alter_thread.start()
optimize.join()
time.sleep(0.5)
exiting = True
alter_thread.join()
assert len(no_exception) == 0
assert node1.query("SELECT count() FROM {name}".format(name=name)).splitlines() == ["2"]
finally:
node1.query("DROP TABLE IF EXISTS {name}".format(name=name))