Ignore disappeared projections on start

This commit is contained in:
alesapin 2024-08-07 18:24:03 +02:00
parent e809dbed60
commit ad678cb5a8
4 changed files with 81 additions and 11 deletions

View File

@ -72,6 +72,7 @@ namespace ErrorCodes
extern const int BAD_TTL_FILE; extern const int BAD_TTL_FILE;
extern const int NOT_IMPLEMENTED; extern const int NOT_IMPLEMENTED;
extern const int NO_SUCH_COLUMN_IN_TABLE; extern const int NO_SUCH_COLUMN_IN_TABLE;
extern const int FILE_DOESNT_EXIST;
} }
@ -749,8 +750,16 @@ void IMergeTreeDataPart::loadColumnsChecksumsIndexes(bool require_columns_checks
/// Probably there is something wrong with files of this part. /// Probably there is something wrong with files of this part.
/// So it can be helpful to add to the error message some information about those files. /// So it can be helpful to add to the error message some information about those files.
String files_in_part; String files_in_part;
for (auto it = getDataPartStorage().iterate(); it->isValid(); it->next()) for (auto it = getDataPartStorage().iterate(); it->isValid(); it->next())
files_in_part += fmt::format("{}{} ({} bytes)", (files_in_part.empty() ? "" : ", "), it->name(), getDataPartStorage().getFileSize(it->name())); {
std::string file_info;
if (!getDataPartStorage().isDirectory(it->name()))
file_info = fmt::format(" ({} bytes)", getDataPartStorage().getFileSize(it->name()));
files_in_part += fmt::format("{}{}{}", (files_in_part.empty() ? "" : ", "), it->name(), file_info);
}
if (!files_in_part.empty()) if (!files_in_part.empty())
e->addMessage("Part contains files: {}", files_in_part); e->addMessage("Part contains files: {}", files_in_part);
if (isEmpty()) if (isEmpty())
@ -2141,7 +2150,27 @@ void IMergeTreeDataPart::checkConsistencyBase() const
} }
} }
checksums.checkSizes(getDataPartStorage()); const auto & data_part_storage = getDataPartStorage();
for (const auto & [filename, checksum] : checksums.files)
{
try
{
checksum.checkSize(data_part_storage, filename);
}
catch (const Exception & ex)
{
/// For projection parts check will mark them broken in loadProjections
if (!parent_part && filename.ends_with(".proj"))
{
std::string projection_name = fs::path(filename).stem();
LOG_INFO(storage.log, "Projection {} doesn't exist on start for part {}, marking it as broken", projection_name, name);
if (hasProjection(projection_name))
markProjectionPartAsBroken(projection_name, ex.message(), ex.code());
}
else
throw;
}
}
} }
else else
{ {

View File

@ -100,12 +100,6 @@ void MergeTreeDataPartChecksums::checkEqual(const MergeTreeDataPartChecksums & r
} }
} }
void MergeTreeDataPartChecksums::checkSizes(const IDataPartStorage & storage) const
{
for (const auto & [name, checksum] : files)
checksum.checkSize(storage, name);
}
UInt64 MergeTreeDataPartChecksums::getTotalSizeOnDisk() const UInt64 MergeTreeDataPartChecksums::getTotalSizeOnDisk() const
{ {
UInt64 res = 0; UInt64 res = 0;

View File

@ -65,9 +65,6 @@ struct MergeTreeDataPartChecksums
static bool isBadChecksumsErrorCode(int code); static bool isBadChecksumsErrorCode(int code);
/// Checks that the directory contains all the needed files of the correct size. Does not check the checksum.
void checkSizes(const IDataPartStorage & storage) const;
/// Returns false if the checksum is too old. /// Returns false if the checksum is too old.
bool read(ReadBuffer & in); bool read(ReadBuffer & in);
/// Assume that header with version (the first line) is read /// Assume that header with version (the first line) is read

View File

@ -4,6 +4,7 @@ import logging
import string import string
import random import random
from helpers.cluster import ClickHouseCluster from helpers.cluster import ClickHouseCluster
from multiprocessing.dummy import Pool
cluster = ClickHouseCluster(__file__) cluster = ClickHouseCluster(__file__)
@ -18,6 +19,12 @@ def cluster():
stay_alive=True, stay_alive=True,
with_zookeeper=True, with_zookeeper=True,
) )
cluster.add_instance(
"node_restart",
main_configs=["config.d/dont_start_broken.xml"],
stay_alive=True,
with_zookeeper=True,
)
logging.info("Starting cluster...") logging.info("Starting cluster...")
cluster.start() cluster.start()
@ -632,6 +639,49 @@ def test_broken_on_start(cluster):
check(node, table_name, 0) check(node, table_name, 0)
def test_disappeared_projection_on_start(cluster):
node = cluster.instances["node_restart"]
table_name = "test_disapperead_projection"
create_table(node, table_name, 1)
node.query(f"SYSTEM STOP MERGES {table_name}")
insert(node, table_name, 0, 5)
insert(node, table_name, 5, 5)
insert(node, table_name, 10, 5)
insert(node, table_name, 15, 5)
assert ["all_0_0_0", "all_1_1_0", "all_2_2_0", "all_3_3_0"] == get_parts(
node, table_name
)
def drop_projection():
node.query(
f"ALTER TABLE {table_name} DROP PROJECTION proj2",
settings={"mutations_sync": "0"},
)
p = Pool(2)
p.apply_async(drop_projection)
for i in range(30):
create_query = node.query(f"SHOW CREATE TABLE {table_name}")
if "proj2" not in create_query:
break
time.sleep(0.5)
assert "proj2" not in create_query
# Remove 'proj2' for part all_2_2_0
break_projection(node, table_name, "proj2", "all_2_2_0", "part")
node.restart_clickhouse()
# proj2 is not broken, it doesn't exist, but ok
check(node, table_name, 0, expect_broken_part="proj2", do_check_command=0)
def test_mutation_with_broken_projection(cluster): def test_mutation_with_broken_projection(cluster):
node = cluster.instances["node"] node = cluster.instances["node"]