Merge pull request #61954 from ClickHouse/follow-up-broken-projections

Follow up to #60452
This commit is contained in:
Kseniia Sumarokova 2024-03-28 14:23:28 +01:00 committed by GitHub
commit 0244e0851a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 209 additions and 25 deletions

View File

@ -409,12 +409,13 @@ MutationsInterpreter::MutationsInterpreter(
, available_columns(std::move(available_columns_))
, settings(std::move(settings_))
, select_limits(SelectQueryOptions().analyze(!settings.can_execute).ignoreLimits())
, logger(getLogger("MutationsInterpreter(" + source.getStorage()->getStorageID().getFullTableName() + ")"))
{
auto new_context = Context::createCopy(context_);
if (new_context->getSettingsRef().allow_experimental_analyzer)
{
new_context->setSetting("allow_experimental_analyzer", false);
LOG_DEBUG(&Poco::Logger::get("MutationsInterpreter"), "Will use old analyzer to prepare mutation");
LOG_DEBUG(logger, "Will use old analyzer to prepare mutation");
}
context = std::move(new_context);
@ -997,6 +998,7 @@ void MutationsInterpreter::prepare(bool dry_run)
/// Always rebuild broken projections.
if (source.hasBrokenProjection(projection.name))
{
LOG_DEBUG(logger, "Will rebuild broken projection {}", projection.name);
materialized_projections.insert(projection.name);
continue;
}

View File

@ -175,6 +175,8 @@ private:
Settings settings;
SelectQueryOptions select_limits;
LoggerPtr logger;
/// A sequence of mutation commands is executed as a sequence of stages. Each stage consists of several
/// filters, followed by updating values of some columns. Commands can reuse expressions calculated by the
/// previous commands in the same stage, but at the end of each stage intermediate columns are thrown away

View File

@ -805,8 +805,8 @@ void IMergeTreeDataPart::loadProjections(bool require_columns_checksums, bool ch
throw;
auto message = getCurrentExceptionMessage(true);
LOG_ERROR(&Poco::Logger::get("IMergeTreeDataPart"),
"Cannot load projection {}, will consider it broken. Reason: {}", projection.name, message);
LOG_WARNING(storage.log, "Cannot load projection {}, "
"will consider it broken. Reason: {}", projection.name, message);
has_broken_projection = true;
part->setBrokenReason(message, getCurrentExceptionCode());

View File

@ -386,12 +386,12 @@ ReplicatedCheckResult ReplicatedMergeTreePartCheckThread::checkPartImpl(const St
throw;
PreformattedMessage message;
if (is_broken_projection)
if (is_broken_projection && throw_on_broken_projection)
{
WriteBufferFromOwnString wb;
message = PreformattedMessage::create(
"Part {} has a broken projections. It will be ignored. Broken projections info: {}",
part_name, getCurrentExceptionMessage(false));
part_name, getCurrentExceptionMessage(true));
LOG_DEBUG(log, message);
result.action = ReplicatedCheckResult::DoNothing;
}

View File

@ -285,11 +285,6 @@ static IMergeTreeDataPart::Checksums checkDataPart(
return {};
auto projection_file = name + ".proj";
if (!throw_on_broken_projection && projection->is_broken)
{
projections_on_disk.erase(projection_file);
checksums_txt.remove(projection_file);
}
IMergeTreeDataPart::Checksums projection_checksums;
try
@ -306,13 +301,19 @@ static IMergeTreeDataPart::Checksums checkDataPart(
if (isRetryableException(std::current_exception()))
throw;
is_broken_projection = true;
projections_on_disk.erase(projection_file);
checksums_txt.remove(projection_file);
const auto exception_message = getCurrentExceptionMessage(true);
if (!projection->is_broken)
{
LOG_TEST(log, "Marking projection {} as broken ({})", name, projection_file);
projection->setBrokenReason(getCurrentExceptionMessage(false), getCurrentExceptionCode());
LOG_WARNING(log, "Marking projection {} as broken ({}). Reason: {}",
name, projection_file, exception_message);
projection->setBrokenReason(exception_message, getCurrentExceptionCode());
}
is_broken_projection = true;
if (throw_on_broken_projection)
{
if (!broken_projections_message.empty())
@ -320,12 +321,10 @@ static IMergeTreeDataPart::Checksums checkDataPart(
broken_projections_message += fmt::format(
"Part {} has a broken projection {} (error: {})",
data_part->name, name, getCurrentExceptionMessage(false));
continue;
data_part->name, name, exception_message);
}
projections_on_disk.erase(projection_file);
checksums_txt.remove(projection_file);
continue;
}
checksums_data.files[projection_file] = IMergeTreeDataPart::Checksums::Checksum(

View File

@ -148,7 +148,7 @@ def break_part(node, table, part):
bash(node, f"rm '{part_path}/columns.txt'")
def get_broken_projections_info(node, table):
def get_broken_projections_info(node, table, active=True):
return node.query(
f"""
SELECT parent_name, name, errors.name FROM
@ -158,6 +158,7 @@ def get_broken_projections_info(node, table):
WHERE table='{table}'
AND database=currentDatabase()
AND is_broken = 1
AND active = {active}
) AS parts_info
INNER JOIN system.errors AS errors
ON parts_info.exception_code = errors.code
@ -214,14 +215,21 @@ def random_str(length=6):
return "".join(random.SystemRandom().choice(alphabet) for _ in range(length))
def check(node, table, check_result, expect_broken_part="", expected_error=""):
def check(
node,
table,
check_result,
expect_broken_part="",
expected_error="",
do_check_command=True,
):
if expect_broken_part == "proj1":
assert expected_error in node.query_and_get_error(
f"SELECT c FROM '{table}' WHERE d == 12 ORDER BY c"
f"SELECT c FROM '{table}' WHERE d == 12 ORDER BY c SETTINGS force_optimize_projection_name = 'proj1'"
)
else:
query_id = node.query(
f"SELECT queryID() FROM (SELECT c FROM '{table}' WHERE d == 12 ORDER BY c)"
f"SELECT queryID() FROM (SELECT c FROM '{table}' WHERE d == 12 ORDER BY c SETTINGS force_optimize_projection_name = 'proj1')"
).strip()
for _ in range(10):
node.query("SYSTEM FLUSH LOGS")
@ -247,11 +255,11 @@ def check(node, table, check_result, expect_broken_part="", expected_error=""):
if expect_broken_part == "proj2":
assert expected_error in node.query_and_get_error(
f"SELECT d FROM '{table}' WHERE c == 12 ORDER BY d"
f"SELECT d FROM '{table}' WHERE c == 12 ORDER BY d SETTINGS force_optimize_projection_name = 'proj2'"
)
else:
query_id = node.query(
f"SELECT queryID() FROM (SELECT d FROM '{table}' WHERE c == 12 ORDER BY d)"
f"SELECT queryID() FROM (SELECT d FROM '{table}' WHERE c == 12 ORDER BY d SETTINGS force_optimize_projection_name = 'proj2')"
).strip()
for _ in range(10):
node.query("SYSTEM FLUSH LOGS")
@ -275,6 +283,7 @@ def check(node, table, check_result, expect_broken_part="", expected_error=""):
assert False
assert "proj2" in res
if do_check_command:
assert check_result == int(node.query(f"CHECK TABLE {table}"))
@ -358,7 +367,14 @@ def test_broken_ignored(cluster):
# """)
# )
assert "all_3_3_0" in get_broken_projections_info(node, table_name)
assert ["all_0_0_0", "all_1_1_0", "all_2_2_0", "all_3_5_1"] == get_parts(
node, table_name
)
assert "all_3_3_0" in get_broken_projections_info(node, table_name, active=False)
assert "all_2_2_0" in get_broken_projections_info(node, table_name, active=True)
# 0 because of all_2_2_0
check(node, table_name, 0)
@ -580,3 +596,168 @@ def test_broken_projections_in_backups_3(cluster):
assert "all_1_1_0\tproj1\tNO_FILE_IN_DATA_PART" == get_broken_projections_info(
node, table_name
)
def test_check_part_thread(cluster):
node = cluster.instances["node"]
table_name = "check_part_thread_test1"
create_table(node, table_name, 1)
insert(node, table_name, 0, 5)
insert(node, table_name, 5, 5)
insert(node, table_name, 10, 5)
insert(node, table_name, 15, 5)
assert ["all_0_0_0", "all_1_1_0", "all_2_2_0", "all_3_3_0"] == get_parts(
node, table_name
)
# Break data file of projection 'proj2' for part all_2_2_0
break_projection(node, table_name, "proj2", "all_2_2_0", "data")
# It will not yet appear in broken projections info.
assert "proj2" not in get_broken_projections_info(node, table_name)
# Select now fails with error "File doesn't exist"
check(node, table_name, 0, "proj2", "FILE_DOESNT_EXIST", do_check_command=False)
good = False
for _ in range(10):
# We marked projection as broken, checkPartThread must not complain about the part.
good = node.contains_in_log(
f"{table_name} (ReplicatedMergeTreePartCheckThread): Part all_2_2_0 looks good"
)
if good:
break
time.sleep(1)
assert good
def test_broken_on_start(cluster):
node = cluster.instances["node"]
table_name = "test1"
create_table(node, table_name, 1)
insert(node, table_name, 0, 5)
insert(node, table_name, 5, 5)
insert(node, table_name, 10, 5)
insert(node, table_name, 15, 5)
assert ["all_0_0_0", "all_1_1_0", "all_2_2_0", "all_3_3_0"] == get_parts(
node, table_name
)
# Break data file of projection 'proj2' for part all_2_2_0
break_projection(node, table_name, "proj2", "all_2_2_0", "data")
# It will not yet appear in broken projections info.
assert "proj2" not in get_broken_projections_info(node, table_name)
# Select now fails with error "File doesn't exist"
# We will mark projection as broken.
check(node, table_name, 0, "proj2", "FILE_DOESNT_EXIST")
# Projection 'proj2' from part all_2_2_0 will now appear in broken parts info.
assert "all_2_2_0\tproj2\tNO_FILE_IN_DATA_PART" in get_broken_projections_info(
node, table_name
)
# Second select works, because projection is now marked as broken.
check(node, table_name, 0)
node.restart_clickhouse()
# It will not yet appear in broken projections info.
assert "proj2" in get_broken_projections_info(node, table_name)
# Select works
check(node, table_name, 0)
def test_mutation_with_broken_projection(cluster):
node = cluster.instances["node"]
table_name = "test1"
create_table(node, table_name, 1)
insert(node, table_name, 0, 5)
insert(node, table_name, 5, 5)
insert(node, table_name, 10, 5)
insert(node, table_name, 15, 5)
assert ["all_0_0_0", "all_1_1_0", "all_2_2_0", "all_3_3_0"] == get_parts(
node, table_name
)
check(node, table_name, 1)
node.query(
f"ALTER TABLE {table_name} DELETE WHERE c == 11 SETTINGS mutations_sync = 1"
)
assert ["all_0_0_0_4", "all_1_1_0_4", "all_2_2_0_4", "all_3_3_0_4"] == get_parts(
node, table_name
)
assert "" == get_broken_projections_info(node, table_name)
check(node, table_name, 1)
# Break data file of projection 'proj2' for part all_2_2_0_4
break_projection(node, table_name, "proj2", "all_2_2_0_4", "data")
# It will not yet appear in broken projections info.
assert "proj2" not in get_broken_projections_info(node, table_name)
# Select now fails with error "File doesn't exist"
# We will mark projection as broken.
check(node, table_name, 0, "proj2", "FILE_DOESNT_EXIST")
# Projection 'proj2' from part all_2_2_0_4 will now appear in broken parts info.
assert "all_2_2_0_4\tproj2\tNO_FILE_IN_DATA_PART" in get_broken_projections_info(
node, table_name
)
# Second select works, because projection is now marked as broken.
check(node, table_name, 0)
assert "all_2_2_0_4" in get_broken_projections_info(node, table_name)
node.query(
f"ALTER TABLE {table_name} DELETE WHERE _part == 'all_0_0_0_4' SETTINGS mutations_sync = 1"
)
# All parts changes because this is how alter delete works,
# but all parts apart from the first have only hardlinks to files in previous part.
assert ["all_0_0_0_5", "all_1_1_0_5", "all_2_2_0_5", "all_3_3_0_5"] == get_parts(
node, table_name
) or ["all_1_1_0_5", "all_2_2_0_5", "all_3_3_0_5"] == get_parts(node, table_name)
# Still broken because it was hardlinked.
broken = get_broken_projections_info(node, table_name)
assert (
"all_2_2_0_5" in broken or "" == broken
) # second could be because of a merge.
if "" == broken:
check(node, table_name, 1)
else:
check(node, table_name, 0)
node.query(
f"ALTER TABLE {table_name} DELETE WHERE c == 13 SETTINGS mutations_sync = 1"
)
assert ["all_1_1_0_6", "all_2_2_0_6", "all_3_3_0_6"] == get_parts(
node, table_name
) or ["all_0_0_0_6", "all_1_1_0_6", "all_2_2_0_6", "all_3_3_0_6"] == get_parts(
node, table_name
)
# Not broken anymore.
assert "" == get_broken_projections_info(node, table_name)
check(node, table_name, 1)