fix part name parsing in system.detached_parts

This commit is contained in:
Alexander Tokmakov 2021-08-04 17:42:48 +03:00
parent b52250bc0d
commit 23f8b3d07d
6 changed files with 117 additions and 5 deletions

View File

@ -712,7 +712,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDisk(
MergeTreeData::DataPart::Checksums & checksums,
ThrottlerPtr throttler)
{
static const String TMP_PREFIX = "tmp_fetch_";
static const String TMP_PREFIX = "tmp-fetch_";
String tmp_prefix = tmp_prefix_.empty() ? TMP_PREFIX : tmp_prefix_;
/// We will remove directory if it's already exists. Make precautions.
@ -784,7 +784,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDiskRemoteMeta(
LOG_DEBUG(log, "Downloading Part {} unique id {} metadata onto disk {}.",
part_name, part_id, disk->getName());
static const String TMP_PREFIX = "tmp_fetch_";
static const String TMP_PREFIX = "tmp-fetch_";
String tmp_prefix = tmp_prefix_.empty() ? TMP_PREFIX : tmp_prefix_;
String part_relative_path = String(to_detached ? "detached/" : "") + tmp_prefix + part_name;

View File

@ -1322,6 +1322,9 @@ String IMergeTreeDataPart::getRelativePathForDetachedPart(const String & prefix)
{
/// Do not allow underscores in the prefix because they are used as separators.
assert(prefix.find_first_of('_') == String::npos);
assert(prefix.empty() || std::find(DetachedPartInfo::DETACH_REASONS.begin(),
DetachedPartInfo::DETACH_REASONS.end(),
prefix) != DetachedPartInfo::DETACH_REASONS.end());
return "detached/" + getRelativePathForPrefix(prefix);
}

View File

@ -247,13 +247,39 @@ String MergeTreePartInfo::getPartNameV0(DayNum left_date, DayNum right_date) con
return wb.str();
}
const std::vector<String> DetachedPartInfo::DETACH_REASONS =
{
"broken",
"unexpected",
"noquorum",
"ignored",
"broken-on-start",
"clone",
"attaching",
"deleting",
"tmp-fetch",
};
bool DetachedPartInfo::tryParseDetachedPartName(const String & dir_name, DetachedPartInfo & part_info,
MergeTreeDataFormatVersion format_version)
{
part_info.dir_name = dir_name;
/// First, try to parse as <part_name>.
// TODO what if tryParsePartName will parse prefix as partition_id? It can happen if dir_name doesn't contain mutation number at the end
/// First, try to find known prefix and parse dir_name as <prefix>_<partname>.
/// Arbitrary strings are not allowed for partition_id, so known_prefix cannot be confused with partition_id.
for (const auto & known_prefix : DETACH_REASONS)
{
if (dir_name.starts_with(known_prefix) && known_prefix.size() < dir_name.size() && dir_name[known_prefix.size()] == '_')
{
part_info.prefix = known_prefix;
String part_name = dir_name.substr(known_prefix.size() + 1);
bool parsed = MergeTreePartInfo::tryParsePartName(part_name, &part_info, format_version);
return part_info.valid_name = parsed;
}
}
/// Next, try to parse dir_name as <part_name>.
if (MergeTreePartInfo::tryParsePartName(dir_name, &part_info, format_version))
return part_info.valid_name = true;
@ -263,7 +289,6 @@ bool DetachedPartInfo::tryParseDetachedPartName(const String & dir_name, Detache
if (first_separator == String::npos)
return part_info.valid_name = false;
// TODO what if <prefix> contains '_'?
const auto part_name = dir_name.substr(first_separator + 1,
dir_name.size() - first_separator - 1);
if (!MergeTreePartInfo::tryParsePartName(part_name, &part_info, format_version))

View File

@ -2,6 +2,7 @@
#include <limits>
#include <tuple>
#include <vector>
#include <common/types.h>
#include <common/DayNum.h>
#include <Storages/MergeTree/MergeTreeDataFormatVersion.h>
@ -115,6 +116,10 @@ struct DetachedPartInfo : public MergeTreePartInfo
/// If false, MergeTreePartInfo is in invalid state (directory name was not successfully parsed).
bool valid_name;
static const std::vector<String> DETACH_REASONS;
/// NOTE: It may parse part info incorrectly.
/// For example, if prefix contain '_' or if DETACH_REASONS doesn't contain prefix.
static bool tryParseDetachedPartName(const String & dir_name, DetachedPartInfo & part_info, MergeTreeDataFormatVersion format_version);
};

View File

@ -236,3 +236,82 @@ def test_drop_detached_parts(drop_detached_parts_table):
q("ALTER TABLE test.drop_detached DROP DETACHED PARTITION 1", settings=s)
detached = q("SElECT name FROM system.detached_parts WHERE table='drop_detached' AND database='test' ORDER BY name")
assert TSV(detached) == TSV('0_3_3_0\nattaching_0_6_6_0\ndeleting_0_7_7_0')
def test_system_detached_parts(drop_detached_parts_table):
q("create table sdp_0 (n int, x int) engine=MergeTree order by n")
q("create table sdp_1 (n int, x int) engine=MergeTree order by n partition by x")
q("create table sdp_2 (n int, x String) engine=MergeTree order by n partition by x")
q("create table sdp_3 (n int, x Enum('broken' = 0, 'all' = 1)) engine=MergeTree order by n partition by x")
for i in range(0, 4):
q("system stop merges sdp_{}".format(i))
q("insert into sdp_{} values (0, 0)".format(i))
q("insert into sdp_{} values (1, 1)".format(i))
for p in q("select distinct partition_id from system.parts where table='sdp_{}'".format(i))[:-1].split('\n'):
q("alter table sdp_{} detach partition id '{}'".format(i, p))
path_to_detached = path_to_data + 'data/default/sdp_{}/detached/{}'
for i in range(0, 4):
instance.exec_in_container(['mkdir', path_to_detached.format(i, 'attaching_0_6_6_0')])
instance.exec_in_container(['mkdir', path_to_detached.format(i, 'deleting_0_7_7_0')])
instance.exec_in_container(['mkdir', path_to_detached.format(i, 'any_other_name')])
instance.exec_in_container(['mkdir', path_to_detached.format(i, 'prefix_1_2_2_0_0')])
instance.exec_in_container(['mkdir', path_to_detached.format(i, 'ignored_202107_714380_714380_0')])
instance.exec_in_container(['mkdir', path_to_detached.format(i, 'broken_202107_714380_714380_123')])
instance.exec_in_container(['mkdir', path_to_detached.format(i, 'clone_all_714380_714380_42')])
instance.exec_in_container(['mkdir', path_to_detached.format(i, 'clone_all_714380_714380_42_123')])
instance.exec_in_container(['mkdir', path_to_detached.format(i, 'broken-on-start_6711e2b2592d86d18fc0f260cf33ef2b_714380_714380_42_123')])
res = q("select * from system.detached_parts where table like 'sdp_%' order by table, name")
assert res == \
"default\tsdp_0\tall\tall_1_1_0\tdefault\t\t1\t1\t0\n" \
"default\tsdp_0\tall\tall_2_2_0\tdefault\t\t2\t2\t0\n" \
"default\tsdp_0\t\\N\tany_other_name\tdefault\t\\N\t\\N\t\\N\t\\N\n" \
"default\tsdp_0\t0\tattaching_0_6_6_0\tdefault\tattaching\t6\t6\t0\n" \
"default\tsdp_0\t6711e2b2592d86d18fc0f260cf33ef2b\tbroken-on-start_6711e2b2592d86d18fc0f260cf33ef2b_714380_714380_42_123\tdefault\tbroken-on-start\t714380\t714380\t42\n" \
"default\tsdp_0\t202107\tbroken_202107_714380_714380_123\tdefault\tbroken\t714380\t714380\t123\n" \
"default\tsdp_0\tall\tclone_all_714380_714380_42\tdefault\tclone\t714380\t714380\t42\n" \
"default\tsdp_0\tall\tclone_all_714380_714380_42_123\tdefault\tclone\t714380\t714380\t42\n" \
"default\tsdp_0\t0\tdeleting_0_7_7_0\tdefault\tdeleting\t7\t7\t0\n" \
"default\tsdp_0\t202107\tignored_202107_714380_714380_0\tdefault\tignored\t714380\t714380\t0\n" \
"default\tsdp_0\t1\tprefix_1_2_2_0_0\tdefault\tprefix\t2\t2\t0\n" \
"default\tsdp_1\t0\t0_1_1_0\tdefault\t\t1\t1\t0\n" \
"default\tsdp_1\t1\t1_2_2_0\tdefault\t\t2\t2\t0\n" \
"default\tsdp_1\t\\N\tany_other_name\tdefault\t\\N\t\\N\t\\N\t\\N\n" \
"default\tsdp_1\t0\tattaching_0_6_6_0\tdefault\tattaching\t6\t6\t0\n" \
"default\tsdp_1\t6711e2b2592d86d18fc0f260cf33ef2b\tbroken-on-start_6711e2b2592d86d18fc0f260cf33ef2b_714380_714380_42_123\tdefault\tbroken-on-start\t714380\t714380\t42\n" \
"default\tsdp_1\t202107\tbroken_202107_714380_714380_123\tdefault\tbroken\t714380\t714380\t123\n" \
"default\tsdp_1\tall\tclone_all_714380_714380_42\tdefault\tclone\t714380\t714380\t42\n" \
"default\tsdp_1\tall\tclone_all_714380_714380_42_123\tdefault\tclone\t714380\t714380\t42\n" \
"default\tsdp_1\t0\tdeleting_0_7_7_0\tdefault\tdeleting\t7\t7\t0\n" \
"default\tsdp_1\t202107\tignored_202107_714380_714380_0\tdefault\tignored\t714380\t714380\t0\n" \
"default\tsdp_1\t1\tprefix_1_2_2_0_0\tdefault\tprefix\t2\t2\t0\n" \
"default\tsdp_2\t58ed7160db50ea45e1c6aa694c8cbfd1\t58ed7160db50ea45e1c6aa694c8cbfd1_1_1_0\tdefault\t\t1\t1\t0\n" \
"default\tsdp_2\t6711e2b2592d86d18fc0f260cf33ef2b\t6711e2b2592d86d18fc0f260cf33ef2b_2_2_0\tdefault\t\t2\t2\t0\n" \
"default\tsdp_2\t\\N\tany_other_name\tdefault\t\\N\t\\N\t\\N\t\\N\n" \
"default\tsdp_2\t0\tattaching_0_6_6_0\tdefault\tattaching\t6\t6\t0\n" \
"default\tsdp_2\t6711e2b2592d86d18fc0f260cf33ef2b\tbroken-on-start_6711e2b2592d86d18fc0f260cf33ef2b_714380_714380_42_123\tdefault\tbroken-on-start\t714380\t714380\t42\n" \
"default\tsdp_2\t202107\tbroken_202107_714380_714380_123\tdefault\tbroken\t714380\t714380\t123\n" \
"default\tsdp_2\tall\tclone_all_714380_714380_42\tdefault\tclone\t714380\t714380\t42\n" \
"default\tsdp_2\tall\tclone_all_714380_714380_42_123\tdefault\tclone\t714380\t714380\t42\n" \
"default\tsdp_2\t0\tdeleting_0_7_7_0\tdefault\tdeleting\t7\t7\t0\n" \
"default\tsdp_2\t202107\tignored_202107_714380_714380_0\tdefault\tignored\t714380\t714380\t0\n" \
"default\tsdp_2\t1\tprefix_1_2_2_0_0\tdefault\tprefix\t2\t2\t0\n" \
"default\tsdp_3\t0\t0_1_1_0\tdefault\t\t1\t1\t0\n" \
"default\tsdp_3\t1\t1_2_2_0\tdefault\t\t2\t2\t0\n" \
"default\tsdp_3\t\\N\tany_other_name\tdefault\t\\N\t\\N\t\\N\t\\N\n" \
"default\tsdp_3\t0\tattaching_0_6_6_0\tdefault\tattaching\t6\t6\t0\n" \
"default\tsdp_3\t6711e2b2592d86d18fc0f260cf33ef2b\tbroken-on-start_6711e2b2592d86d18fc0f260cf33ef2b_714380_714380_42_123\tdefault\tbroken-on-start\t714380\t714380\t42\n" \
"default\tsdp_3\t202107\tbroken_202107_714380_714380_123\tdefault\tbroken\t714380\t714380\t123\n" \
"default\tsdp_3\tall\tclone_all_714380_714380_42\tdefault\tclone\t714380\t714380\t42\n" \
"default\tsdp_3\tall\tclone_all_714380_714380_42_123\tdefault\tclone\t714380\t714380\t42\n" \
"default\tsdp_3\t0\tdeleting_0_7_7_0\tdefault\tdeleting\t7\t7\t0\n" \
"default\tsdp_3\t202107\tignored_202107_714380_714380_0\tdefault\tignored\t714380\t714380\t0\n" \
"default\tsdp_3\t1\tprefix_1_2_2_0_0\tdefault\tprefix\t2\t2\t0\n"
for i in range(0, 4):
for p in q("select distinct partition_id from system.detached_parts where table='sdp_{}' and partition_id is not null".format(i))[:-1].split('\n'):
q("alter table sdp_{} attach partition id '{}'".format(i, p))
assert q("select n, x, count() from merge('default', 'sdp_') group by n, x") == "0\t0\t4\n1\t1\t4\n"