Do not consider parts broken if only projections are broken

This commit is contained in:
kssenii 2023-11-14 11:35:54 +01:00
parent 90936aa2c7
commit eb7aad0016
21 changed files with 795 additions and 62 deletions

View File

@ -305,6 +305,11 @@ bool MutationsInterpreter::Source::hasProjection(const String & name) const
return part && part->hasProjection(name);
}
bool MutationsInterpreter::Source::hasBrokenProjection(const String & name) const
{
return part && part->hasBrokenProjection(name);
}
bool MutationsInterpreter::Source::isCompactPart() const
{
return part && part->getType() == MergeTreeDataPartType::Compact;
@ -922,6 +927,15 @@ void MutationsInterpreter::prepare(bool dry_run)
materialized_indices.insert(index.name);
}
/// Always rebuild broken projections.
for (const auto & projection : metadata_snapshot->getProjections())
{
if (!source.hasBrokenProjection(projection.name))
continue;
materialized_projections.insert(projection.name);
}
for (const auto & projection : metadata_snapshot->getProjections())
{
if (!source.hasProjection(projection.name))

View File

@ -122,6 +122,7 @@ public:
bool materializeTTLRecalculateOnly() const;
bool hasSecondaryIndex(const String & name) const;
bool hasProjection(const String & name) const;
bool hasBrokenProjection(const String & name) const;
bool isCompactPart() const;
void read(

View File

@ -732,7 +732,23 @@ void IMergeTreeDataPart::loadProjections(bool require_columns_checksums, bool ch
else
{
auto part = getProjectionPartBuilder(projection.name).withPartFormatFromDisk().build();
part->loadColumnsChecksumsIndexes(require_columns_checksums, check_consistency);
try
{
part->loadColumnsChecksumsIndexes(require_columns_checksums, check_consistency);
}
catch (...)
{
if (isRetryableException(std::current_exception()))
throw;
LOG_ERROR(&Poco::Logger::get("IMergeTreeDataPart"),
"Cannot load projection {}, will consider it broken", projection.name);
addBrokenProjectionPart(projection.name, std::move(part), getCurrentExceptionMessage(false), getCurrentExceptionCode());
continue;
}
addProjectionPart(projection.name, std::move(part));
}
}
@ -1129,7 +1145,8 @@ void IMergeTreeDataPart::loadChecksums(bool require)
/// Check the data while we are at it.
LOG_WARNING(storage.log, "Checksums for part {} not found. Will calculate them from data on disk.", name);
checksums = checkDataPart(shared_from_this(), false);
bool noop;
checksums = checkDataPart(shared_from_this(), false, noop, /* is_cancelled */{}, /* throw_on_broken_projection */false);
writeChecksums(checksums, {});
bytes_on_disk = checksums.getTotalSizeOnDisk();
@ -2130,6 +2147,46 @@ std::optional<String> IMergeTreeDataPart::getStreamNameForColumn(
return getStreamNameOrHash(stream_name, extension, storage_);
}
void IMergeTreeDataPart::addBrokenProjectionPart(
const String & projection_name,
std::shared_ptr<IMergeTreeDataPart> projection_part,
const String & message,
int code)
{
projection_part->setBrokenReason(message, code);
bool inserted = broken_projection_parts.emplace(projection_name, projection_part).second;
if (!inserted)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Projection part {} in part {} is already added to a broken projection parts list", projection_name, name);
}
void IMergeTreeDataPart::markProjectionPartAsBroken(const String & projection_name, const String & message, int code) const
{
std::lock_guard lock(broken_projections_mutex);
auto it = projection_parts.find(projection_name);
if (it == projection_parts.end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "There is no projection part '{}'", projection_name);
it->second->setBrokenReason(message, code);
broken_projection_parts.emplace(projection_name, it->second);
projection_parts.erase(it);
}
void IMergeTreeDataPart::setBrokenReason(const String & message, int code)
{
std::lock_guard lock(broken_projections_mutex);
is_broken = true;
exception = message;
exception_code = code;
}
bool IMergeTreeDataPart::hasBrokenProjection(const String & projection_name) const
{
std::lock_guard lock(broken_projections_mutex);
return broken_projection_parts.contains(projection_name);
}
bool isCompactPart(const MergeTreeDataPartPtr & data_part)
{
return (data_part && data_part->getType() == MergeTreeDataPartType::Compact);

View File

@ -255,6 +255,12 @@ public:
/// Frozen by ALTER TABLE ... FREEZE ... It is used for information purposes in system.parts table.
mutable std::atomic<bool> is_frozen {false};
/// If it is a projection part, it can be broken sometimes.
mutable std::atomic<bool> is_broken {false};
mutable std::string exception;
mutable int exception_code = 0;
mutable std::mutex broken_projections_mutex;
/// Indicates that the part was marked Outdated by PartCheckThread because the part was not committed to ZooKeeper
mutable bool is_unexpected_local_part = false;
@ -405,12 +411,20 @@ public:
const std::map<String, std::shared_ptr<IMergeTreeDataPart>> & getProjectionParts() const { return projection_parts; }
const std::map<String, std::shared_ptr<IMergeTreeDataPart>> & getBrokenProjectionParts() const { return broken_projection_parts; }
MergeTreeDataPartBuilder getProjectionPartBuilder(const String & projection_name, bool is_temp_projection = false);
void addProjectionPart(const String & projection_name, std::shared_ptr<IMergeTreeDataPart> && projection_part);
void addBrokenProjectionPart(const String & projection_name, std::shared_ptr<IMergeTreeDataPart> projection_part, const String & message, int code);
void markProjectionPartAsBroken(const String & projection_name, const String & message, int code) const;
bool hasProjection(const String & projection_name) const { return projection_parts.contains(projection_name); }
bool hasBrokenProjection(const String & projection_name) const;
void loadProjections(bool require_columns_checksums, bool check_consistency, bool if_not_loaded = false);
/// Return set of metadata file names without checksums. For example,
@ -564,7 +578,8 @@ protected:
const IMergeTreeDataPart * parent_part;
String parent_part_name;
std::map<String, std::shared_ptr<IMergeTreeDataPart>> projection_parts;
mutable std::map<String, std::shared_ptr<IMergeTreeDataPart>> projection_parts;
mutable std::map<String, std::shared_ptr<IMergeTreeDataPart>> broken_projection_parts;
mutable PartMetadataManagerPtr metadata_manager;
@ -678,6 +693,8 @@ private:
void incrementStateMetric(MergeTreeDataPartState state) const;
void decrementStateMetric(MergeTreeDataPartState state) const;
void setBrokenReason(const String & message, int code);
/// This ugly flag is needed for debug assertions only
mutable bool part_is_probably_removed_from_disk = false;
};

View File

@ -5737,7 +5737,7 @@ MergeTreeData::getDataPartsVectorForInternalUsage(const DataPartStates & afforda
}
MergeTreeData::ProjectionPartsVector
MergeTreeData::getProjectionPartsVectorForInternalUsage(const DataPartStates & affordable_states, DataPartStateVector * out_states) const
MergeTreeData::getProjectionPartsVectorForInternalUsage(const DataPartStates & affordable_states, bool fill_states) const
{
auto lock = lockParts();
ProjectionPartsVector res;
@ -5749,14 +5749,20 @@ MergeTreeData::getProjectionPartsVectorForInternalUsage(const DataPartStates & a
res.data_parts.push_back(part);
for (const auto & [_, projection_part] : part->getProjectionParts())
res.projection_parts.push_back(projection_part);
for (const auto & [_, projection_part] : part->getBrokenProjectionParts())
res.broken_projection_parts.push_back(projection_part);
}
}
if (out_states != nullptr)
if (fill_states)
{
out_states->resize(res.projection_parts.size());
res.projection_parts_states.resize(res.projection_parts.size());
for (size_t i = 0; i < res.projection_parts.size(); ++i)
(*out_states)[i] = res.projection_parts[i]->getParentPart()->getState();
(res.projection_parts_states)[i] = res.projection_parts[i]->getParentPart()->getState();
res.broken_projection_parts_states.resize(res.broken_projection_parts.size());
for (size_t i = 0; i < res.broken_projection_parts.size(); ++i)
(res.broken_projection_parts_states)[i] = res.broken_projection_parts[i]->getParentPart()->getState();
}
return res;
@ -5809,7 +5815,7 @@ bool MergeTreeData::supportsLightweightDelete() const
return true;
}
MergeTreeData::ProjectionPartsVector MergeTreeData::getAllProjectionPartsVector(MergeTreeData::DataPartStateVector * out_states) const
MergeTreeData::ProjectionPartsVector MergeTreeData::getAllProjectionPartsVector(bool fill_states) const
{
ProjectionPartsVector res;
auto lock = lockParts();
@ -5820,11 +5826,15 @@ MergeTreeData::ProjectionPartsVector MergeTreeData::getAllProjectionPartsVector(
res.projection_parts.push_back(projection_part);
}
if (out_states != nullptr)
if (fill_states)
{
out_states->resize(res.projection_parts.size());
res.projection_parts_states.resize(res.projection_parts.size());
for (size_t i = 0; i < res.projection_parts.size(); ++i)
(*out_states)[i] = res.projection_parts[i]->getParentPart()->getState();
(res.projection_parts_states)[i] = res.projection_parts[i]->getParentPart()->getState();
res.broken_projection_parts_states.resize(res.broken_projection_parts.size());
for (size_t i = 0; i < res.broken_projection_parts.size(); ++i)
(res.broken_projection_parts_states)[i] = res.broken_projection_parts[i]->getParentPart()->getState();
}
return res;
}

View File

@ -468,8 +468,13 @@ public:
struct ProjectionPartsVector
{
DataPartsVector projection_parts;
DataPartsVector data_parts;
DataPartsVector projection_parts;
DataPartStateVector projection_parts_states;
DataPartsVector broken_projection_parts;
DataPartStateVector broken_projection_parts_states;
};
/// Returns a copy of the list so that the caller shouldn't worry about locks.
@ -484,7 +489,7 @@ public:
const DataPartStates & affordable_states, DataPartStateVector * out_states = nullptr) const;
/// Same as above but only returns projection parts
ProjectionPartsVector getProjectionPartsVectorForInternalUsage(
const DataPartStates & affordable_states, DataPartStateVector * out_states = nullptr) const;
const DataPartStates & affordable_states, bool fill_states = false) const;
/// Returns absolutely all parts (and snapshot of their states)
@ -496,7 +501,7 @@ public:
size_t getTotalMarksCount() const;
/// Same as above but only returns projection parts
ProjectionPartsVector getAllProjectionPartsVector(MergeTreeData::DataPartStateVector * out_states = nullptr) const;
ProjectionPartsVector getAllProjectionPartsVector(bool fill_states = false) const;
/// Returns parts in Active state
DataParts getDataPartsForInternalUsage() const;

View File

@ -54,6 +54,8 @@ struct MergeTreeDataPartChecksums
bool has(const String & file_name) const { return files.find(file_name) != files.end(); }
bool remove(const String & file_name) { return files.erase(file_name); }
bool empty() const { return files.empty(); }
/// Checks that the set of columns and their checksums are the same. If not, throws an exception.

View File

@ -513,7 +513,9 @@ static std::set<ProjectionDescriptionRawPtr> getProjectionsToRecalculate(
{
bool need_recalculate =
materialized_projections.contains(projection.name)
|| (!is_full_part_storage && source_part->hasProjection(projection.name));
|| (!is_full_part_storage
&& (source_part->hasProjection(projection.name)
|| source_part->hasBrokenProjection(projection.name)));
if (need_recalculate)
projections_to_recalc.insert(&projection);
@ -1367,7 +1369,9 @@ private:
bool need_recalculate =
ctx->materialized_projections.contains(projection.name)
|| (!is_full_part_storage && ctx->source_part->hasProjection(projection.name));
|| (!is_full_part_storage
&& (ctx->source_part->hasProjection(projection.name)
|| ctx->source_part->hasBrokenProjection(projection.name)));
if (need_recalculate)
{

View File

@ -274,7 +274,7 @@ std::pair<bool, MergeTreeDataPartPtr> ReplicatedMergeTreePartCheckThread::findLo
return std::make_pair(exists_in_zookeeper, part);
}
ReplicatedCheckResult ReplicatedMergeTreePartCheckThread::checkPartImpl(const String & part_name)
ReplicatedCheckResult ReplicatedMergeTreePartCheckThread::checkPartImpl(const String & part_name, bool throw_on_broken_projection)
{
ReplicatedCheckResult result;
auto [exists_in_zookeeper, part] = findLocalPart(part_name);
@ -341,6 +341,7 @@ ReplicatedCheckResult ReplicatedMergeTreePartCheckThread::checkPartImpl(const St
/// before the ReplicatedMergeTreePartHeader was introduced.
String part_path = storage.replica_path + "/parts/" + part_name;
String part_znode = zookeeper->get(part_path);
bool is_broken_projection = false;
try
{
@ -362,8 +363,10 @@ ReplicatedCheckResult ReplicatedMergeTreePartCheckThread::checkPartImpl(const St
checkDataPart(
part,
true,
[this] { return need_stop.load(); });
/* require_checksums */true,
is_broken_projection,
[this] { return need_stop.load(); },
throw_on_broken_projection);
if (need_stop)
{
@ -384,12 +387,22 @@ ReplicatedCheckResult ReplicatedMergeTreePartCheckThread::checkPartImpl(const St
tryLogCurrentException(log, __PRETTY_FUNCTION__);
auto message = PreformattedMessage::create("Part {} looks broken. Removing it and will try to fetch.", part_name);
LOG_ERROR(log, message);
PreformattedMessage message;
if (is_broken_projection)
{
message = PreformattedMessage::create("Part {} has a broken projection. It will be ignored.", part_name);
LOG_DEBUG(log, message);
result.action = ReplicatedCheckResult::DoNothing;
}
else
{
message = PreformattedMessage::create("Part {} looks broken. Removing it and will try to fetch.", part_name);
LOG_ERROR(log, message);
result.action = ReplicatedCheckResult::TryFetchMissing;
}
/// Part is broken, let's try to find it and fetch.
result.status = {part_name, false, message};
result.action = ReplicatedCheckResult::TryFetchMissing;
return result;
}
@ -419,12 +432,12 @@ ReplicatedCheckResult ReplicatedMergeTreePartCheckThread::checkPartImpl(const St
}
CheckResult ReplicatedMergeTreePartCheckThread::checkPartAndFix(const String & part_name, std::optional<time_t> * recheck_after)
CheckResult ReplicatedMergeTreePartCheckThread::checkPartAndFix(const String & part_name, std::optional<time_t> * recheck_after, bool throw_on_broken_projection)
{
LOG_INFO(log, "Checking part {}", part_name);
ProfileEvents::increment(ProfileEvents::ReplicatedPartChecks);
ReplicatedCheckResult result = checkPartImpl(part_name);
ReplicatedCheckResult result = checkPartImpl(part_name, throw_on_broken_projection);
switch (result.action)
{
case ReplicatedCheckResult::None: UNREACHABLE();
@ -577,7 +590,7 @@ void ReplicatedMergeTreePartCheckThread::run()
}
std::optional<time_t> recheck_after;
checkPartAndFix(selected->name, &recheck_after);
checkPartAndFix(selected->name, &recheck_after, /* throw_on_broken_projection */false);
if (need_stop)
return;

View File

@ -65,9 +65,9 @@ public:
size_t size() const;
/// Check part by name
CheckResult checkPartAndFix(const String & part_name, std::optional<time_t> * recheck_after = nullptr);
CheckResult checkPartAndFix(const String & part_name, std::optional<time_t> * recheck_after = nullptr, bool throw_on_broken_projection = true);
ReplicatedCheckResult checkPartImpl(const String & part_name);
ReplicatedCheckResult checkPartImpl(const String & part_name, bool throw_on_broken_projection);
std::unique_lock<std::mutex> pausePartsCheck();

View File

@ -111,7 +111,9 @@ static IMergeTreeDataPart::Checksums checkDataPart(
const NameSet & files_without_checksums,
const ReadSettings & read_settings,
bool require_checksums,
std::function<bool()> is_cancelled)
std::function<bool()> is_cancelled,
bool & is_broken_projection,
bool throw_on_broken_projection)
{
/** Responsibility:
* - read list of columns from columns.txt;
@ -120,6 +122,7 @@ static IMergeTreeDataPart::Checksums checkDataPart(
*/
CurrentMetrics::Increment metric_increment{CurrentMetrics::ReplicatedChecks};
Poco::Logger * log = &Poco::Logger::get("checkDataPart");
NamesAndTypesList columns_txt;
@ -269,23 +272,68 @@ static IMergeTreeDataPart::Checksums checkDataPart(
}
}
for (const auto & [name, projection] : data_part->getProjectionParts())
auto check_projection = [&](const String & name, std::shared_ptr<IMergeTreeDataPart> projection)
{
if (is_cancelled())
return {};
auto projection_file = name + ".proj";
auto projection_checksums = checkDataPart(
projection, *data_part_storage.getProjection(projection_file),
projection->getColumns(), projection->getType(),
projection->getFileNamesWithoutChecksums(),
read_settings, require_checksums, is_cancelled);
if (!throw_on_broken_projection && projection->is_broken)
{
projections_on_disk.erase(projection_file);
checksums_txt.remove(projection_file);
return;
}
IMergeTreeDataPart::Checksums projection_checksums;
try
{
bool noop;
projection_checksums = checkDataPart(
projection, *data_part_storage.getProjection(projection_file),
projection->getColumns(), projection->getType(),
projection->getFileNamesWithoutChecksums(),
read_settings, require_checksums, is_cancelled, noop, /* throw_on_broken_projection */false);
}
catch (...)
{
if (isRetryableException(std::current_exception()))
throw;
LOG_TEST(log, "Marking projection {} as broken ({})", name, projection_file);
if (!data_part->hasBrokenProjection(name))
data_part->markProjectionPartAsBroken(name, getCurrentExceptionMessage(false), getCurrentExceptionCode());
is_broken_projection = true;
if (throw_on_broken_projection)
throw;
projections_on_disk.erase(projection_file);
checksums_txt.remove(projection_file);
return;
}
checksums_data.files[projection_file] = IMergeTreeDataPart::Checksums::Checksum(
projection_checksums.getTotalSizeOnDisk(),
projection_checksums.getTotalChecksumUInt128());
projections_on_disk.erase(projection_file);
};
auto broken_projection_parts = data_part->getBrokenProjectionParts(); /// Iterate over copy
for (const auto & [name, projection] : broken_projection_parts)
{
if (is_cancelled())
return {};
else
check_projection(name, projection);
}
auto projection_parts = data_part->getProjectionParts(); /// Iterate over copy
for (const auto & [name, projection] : projection_parts)
{
if (is_cancelled())
return {};
else
check_projection(name, projection);
}
if (require_checksums && !projections_on_disk.empty())
@ -315,7 +363,9 @@ IMergeTreeDataPart::Checksums checkDataPartInMemory(const DataPartInMemoryPtr &
IMergeTreeDataPart::Checksums checkDataPart(
MergeTreeData::DataPartPtr data_part,
bool require_checksums,
std::function<bool()> is_cancelled)
bool & is_broken_projection,
std::function<bool()> is_cancelled,
bool throw_on_broken_projection)
{
if (auto part_in_memory = asInMemoryPart(data_part))
return checkDataPartInMemory(part_in_memory);
@ -357,7 +407,9 @@ IMergeTreeDataPart::Checksums checkDataPart(
data_part->getFileNamesWithoutChecksums(),
read_settings,
require_checksums,
is_cancelled);
is_cancelled,
is_broken_projection,
throw_on_broken_projection);
};
try
@ -371,7 +423,9 @@ IMergeTreeDataPart::Checksums checkDataPart(
data_part->getFileNamesWithoutChecksums(),
read_settings,
require_checksums,
is_cancelled);
is_cancelled,
is_broken_projection,
throw_on_broken_projection);
}
catch (...)
{

View File

@ -10,7 +10,9 @@ namespace DB
IMergeTreeDataPart::Checksums checkDataPart(
MergeTreeData::DataPartPtr data_part,
bool require_checksums,
std::function<bool()> is_cancelled = []{ return false; });
bool & is_broken_projection,
std::function<bool()> is_cancelled = []{ return false; },
bool throw_on_broken_projection = false);
bool isNotEnoughMemoryErrorCode(int code);
bool isRetryableException(const std::exception_ptr exception_ptr);

View File

@ -2242,11 +2242,12 @@ std::optional<CheckResult> StorageMergeTree::checkDataNext(DataValidationTasksPt
{
/// If the checksums file is not present, calculate the checksums and write them to disk.
static constexpr auto checksums_path = "checksums.txt";
bool noop;
if (part->isStoredOnDisk() && !part->getDataPartStorage().exists(checksums_path))
{
try
{
auto calculated_checksums = checkDataPart(part, false);
auto calculated_checksums = checkDataPart(part, false, noop, /* is_cancelled */{}, /* throw_on_broken_projection */true);
calculated_checksums.checkEqual(part->checksums, true);
auto & part_mutable = const_cast<IMergeTreeDataPart &>(*part);
@ -2267,7 +2268,7 @@ std::optional<CheckResult> StorageMergeTree::checkDataNext(DataValidationTasksPt
{
try
{
checkDataPart(part, true);
checkDataPart(part, true, noop, /* is_cancelled */{}, /* throw_on_broken_projection */true);
return CheckResult(part->name, true, "");
}
catch (...)

View File

@ -8690,12 +8690,11 @@ IStorage::DataValidationTasksPtr StorageReplicatedMergeTree::getCheckTaskList(
std::optional<CheckResult> StorageReplicatedMergeTree::checkDataNext(DataValidationTasksPtr & check_task_list)
{
if (auto part = assert_cast<DataValidationTasks *>(check_task_list.get())->next())
{
try
{
return CheckResult(part_check_thread.checkPartAndFix(part->name));
return part_check_thread.checkPartAndFix(part->name, /* recheck_after */nullptr, /* throw_on_broken_projection */true);
}
catch (const Exception & ex)
{

View File

@ -63,7 +63,7 @@ Pipe StorageSystemDisks::read(
for (const auto & [disk_name, disk_ptr] : context->getDisksMap())
{
col_name->insert(disk_name);
col_path->insert(disk_ptr->getPath());
col_path->insert(fs::absolute(disk_ptr->getPath()).string());
col_free->insert(disk_ptr->getAvailableSpace().value_or(std::numeric_limits<UInt64>::max()));
col_total->insert(disk_ptr->getTotalSpace().value_or(std::numeric_limits<UInt64>::max()));
col_unreserved->insert(disk_ptr->getUnreservedSpace().value_or(std::numeric_limits<UInt64>::max()));

View File

@ -64,7 +64,7 @@ StoragesInfo::getParts(MergeTreeData::DataPartStateVector & state, bool has_stat
}
MergeTreeData::ProjectionPartsVector
StoragesInfo::getProjectionParts(MergeTreeData::DataPartStateVector & state, bool has_state_column) const
StoragesInfo::getProjectionParts(bool fill_states, bool has_state_column) const
{
if (data->getInMemoryMetadataPtr()->projections.empty())
return {};
@ -74,12 +74,12 @@ StoragesInfo::getProjectionParts(MergeTreeData::DataPartStateVector & state, boo
{
/// If has_state_column is requested, return all states.
if (!has_state_column)
return data->getProjectionPartsVectorForInternalUsage({State::Active, State::Outdated}, &state);
return data->getProjectionPartsVectorForInternalUsage({State::Active, State::Outdated}, fill_states);
return data->getAllProjectionPartsVector(&state);
return data->getAllProjectionPartsVector(fill_states);
}
return data->getProjectionPartsVectorForInternalUsage({State::Active}, &state);
return data->getProjectionPartsVectorForInternalUsage({State::Active}, fill_states);
}
StoragesInfoStream::StoragesInfoStream(const SelectQueryInfo & query_info, ContextPtr context)

View File

@ -25,7 +25,7 @@ struct StoragesInfo
explicit operator bool() const { return storage != nullptr; }
MergeTreeData::DataPartsVector getParts(MergeTreeData::DataPartStateVector & state, bool has_state_column) const;
MergeTreeData::ProjectionPartsVector getProjectionParts(MergeTreeData::DataPartStateVector & state, bool has_state_column) const;
MergeTreeData::ProjectionPartsVector getProjectionParts(bool fill_states, bool has_state_column) const;
};
/** A helper class that enumerates the storages that match given query. */

View File

@ -83,7 +83,11 @@ StorageSystemProjectionParts::StorageSystemProjectionParts(const StorageID & tab
{"rows_where_ttl_info.expression", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>())},
{"rows_where_ttl_info.min", std::make_shared<DataTypeArray>(std::make_shared<DataTypeDateTime>())},
{"rows_where_ttl_info.max", std::make_shared<DataTypeArray>(std::make_shared<DataTypeDateTime>())}
{"rows_where_ttl_info.max", std::make_shared<DataTypeArray>(std::make_shared<DataTypeDateTime>())},
{"is_broken", std::make_shared<DataTypeUInt8>()},
{"exception_code", std::make_shared<DataTypeInt32>()},
{"exception", std::make_shared<DataTypeString>()},
}
)
{
@ -93,15 +97,14 @@ void StorageSystemProjectionParts::processNextStorage(
ContextPtr, MutableColumns & columns, std::vector<UInt8> & columns_mask, const StoragesInfo & info, bool has_state_column)
{
using State = MergeTreeDataPartState;
MergeTreeData::DataPartStateVector all_parts_state;
MergeTreeData::ProjectionPartsVector all_parts = info.getProjectionParts(all_parts_state, has_state_column);
for (size_t part_number = 0; part_number < all_parts.projection_parts.size(); ++part_number)
MergeTreeData::ProjectionPartsVector all_parts = info.getProjectionParts(true, has_state_column);
auto fill_part_info = [&](size_t part_number, const MergeTreeData::DataPartsVector & parts, const MergeTreeData::DataPartStateVector & states)
{
const auto & part = all_parts.projection_parts[part_number];
const auto & part = parts[part_number];
const auto * parent_part = part->getParentPart();
chassert(parent_part);
auto part_state = all_parts_state[part_number];
auto part_state = states[part_number];
ColumnSize columns_size = part->getTotalColumnsSize();
ColumnSize parent_columns_size = parent_part->getTotalColumnsSize();
@ -278,10 +281,43 @@ void StorageSystemProjectionParts::processNextStorage(
add_ttl_info_map(part->ttl_infos.group_by_ttl);
add_ttl_info_map(part->ttl_infos.rows_where_ttl);
{
if (columns_mask[src_index++])
columns[res_index++]->insert(part->is_broken.load(std::memory_order_relaxed));
if (part->is_broken)
{
std::lock_guard lock(part->broken_projections_mutex);
if (columns_mask[src_index++])
columns[res_index++]->insert(part->exception_code);
if (columns_mask[src_index++])
columns[res_index++]->insert(part->exception);
}
else
{
if (columns_mask[src_index++])
columns[res_index++]->insertDefault();
if (columns_mask[src_index++])
columns[res_index++]->insertDefault();
}
}
/// _state column should be the latest.
/// Do not use part->getState*, it can be changed from different thread
if (has_state_column)
columns[res_index++]->insert(IMergeTreeDataPart::stateString(part_state));
};
for (size_t part_number = 0; part_number < all_parts.projection_parts.size(); ++part_number)
{
auto part = all_parts.projection_parts[part_number];
fill_part_info(part_number, all_parts.projection_parts, all_parts.projection_parts_states);
}
for (size_t part_number = 0; part_number < all_parts.broken_projection_parts.size(); ++part_number)
{
auto part = all_parts.broken_projection_parts[part_number];
fill_part_info(part_number, all_parts.broken_projection_parts, all_parts.broken_projection_parts_states);
}
}

View File

@ -103,15 +103,14 @@ void StorageSystemProjectionPartsColumns::processNextStorage(
}
/// Go through the list of projection parts.
MergeTreeData::DataPartStateVector all_parts_state;
MergeTreeData::ProjectionPartsVector all_parts = info.getProjectionParts(all_parts_state, has_state_column);
for (size_t part_number = 0; part_number < all_parts.projection_parts.size(); ++part_number)
MergeTreeData::ProjectionPartsVector all_parts = info.getProjectionParts(true, has_state_column);
auto fill_part_info = [&](size_t part_number, const MergeTreeData::DataPartsVector & parts, const MergeTreeData::DataPartStateVector & states)
{
const auto & part = all_parts.projection_parts[part_number];
const auto & part = parts[part_number];
const auto * parent_part = part->getParentPart();
chassert(parent_part);
auto part_state = all_parts_state[part_number];
auto part_state = states[part_number];
auto columns_size = part->getTotalColumnsSize();
auto parent_columns_size = parent_part->getTotalColumnsSize();
@ -260,6 +259,18 @@ void StorageSystemProjectionPartsColumns::processNextStorage(
if (has_state_column)
columns[res_index++]->insert(part->stateString());
}
};
for (size_t part_number = 0; part_number < all_parts.projection_parts.size(); ++part_number)
{
auto part = all_parts.projection_parts[part_number];
fill_part_info(part_number, all_parts.projection_parts, all_parts.projection_parts_states);
}
for (size_t part_number = 0; part_number < all_parts.broken_projection_parts.size(); ++part_number)
{
auto part = all_parts.broken_projection_parts[part_number];
fill_part_info(part_number, all_parts.broken_projection_parts, all_parts.broken_projection_parts_states);
}
}

View File

@ -0,0 +1,224 @@
insert new part
insert new part
insert new part
insert new part
system.parts
all_0_0_0 1 ['proj','proj_2']
all_1_1_0 1 ['proj','proj_2']
all_2_2_0 1 ['proj','proj_2']
all_3_3_0 1 ['proj','proj_2']
select from projection 'proj'
16
12
used projections
SELECT c FROM test WHERE d == 12 OR d == 16; ['default.test.proj']
select from projection 'proj_2'
12
16
used projections
SELECT d FROM test WHERE c == 12 OR c == 16; ['default.test.proj_2']
check table
1
0
broke metadata of part 'proj' (parent part: all_2_2_0)
system.parts
all_0_0_0 1 ['proj','proj_2']
all_1_1_0 1 ['proj','proj_2']
all_2_2_0 1 ['proj','proj_2']
all_3_3_0 1 ['proj','proj_2']
select from projection 'proj'
16
12
used projections
SELECT c FROM test WHERE d == 12 OR d == 16; ['default.test.proj']
select from projection 'proj_2'
16
12
used projections
SELECT d FROM test WHERE c == 12 OR c == 16; ['default.test.proj_2']
check table
0
broken projections info
all_2_2_0 proj FILE_DOESNT_EXIST
check table full
all_0_0_0 1
all_1_1_0 1
all_3_3_0 1
all_2_2_0 0 Part all_2_2_0 has a broken projection. It will be ignored.
0
broke data of part 'proj_2' (parent part: all_2_2_0)
broken projections info
all_2_2_0 proj FILE_DOESNT_EXIST
system.parts
all_0_0_0 1 ['proj','proj_2']
all_1_1_0 1 ['proj','proj_2']
all_2_2_0 1 ['proj_2']
all_3_3_0 1 ['proj','proj_2']
select from projection 'proj'
16
12
used projections
SELECT c FROM test WHERE d == 12 OR d == 16; ['default.test.proj']
select from projection 'proj_2'
FILE_DOESNT_EXIST
check table
0
broken projections info
all_2_2_0 proj FILE_DOESNT_EXIST
all_2_2_0 proj_2 NO_FILE_IN_DATA_PART
system.parts
all_0_0_0 1 ['proj','proj_2']
all_1_1_0 1 ['proj','proj_2']
all_2_2_0 1 []
all_3_3_0 1 ['proj','proj_2']
select from projection 'proj'
16
12
used projections
SELECT c FROM test WHERE d == 12 OR d == 16; ['default.test.proj']
select from projection 'proj_2'
16
12
used projections
SELECT d FROM test WHERE c == 12 OR c == 16; ['default.test.proj_2']
check table
0
0
broke data of part 'proj_2' (parent part: all_3_3_0)
broken projections info
all_2_2_0 proj FILE_DOESNT_EXIST
all_2_2_0 proj_2 NO_FILE_IN_DATA_PART
insert new part
insert new part
optimize
0
broken projections info
all_2_2_0 proj FILE_DOESNT_EXIST
all_2_2_0 proj_2 NO_FILE_IN_DATA_PART
all_3_3_0 proj_2 NO_FILE_IN_DATA_PART
system.parts
all_0_0_0 1 ['proj','proj_2']
all_1_1_0 1 ['proj','proj_2']
all_2_2_0 1 []
all_3_3_0 0 ['proj']
all_3_5_1 1 ['proj']
all_4_4_0 0 ['proj','proj_2']
all_5_5_0 0 ['proj','proj_2']
select from projection 'proj'
16
12
used projections
SELECT c FROM test WHERE d == 12 OR d == 16; ['default.test.proj']
select from projection 'proj_2'
16
12
used projections
SELECT d FROM test WHERE c == 12 OR c == 16; ['default.test.proj_2']
check table
0
0
broke metadata of part 'proj' (parent part: all_1_1_0)
Detach - Attach
broken projections info
all_1_1_0 proj NO_FILE_IN_DATA_PART
all_2_2_0 proj NO_FILE_IN_DATA_PART
all_2_2_0 proj_2 FILE_DOESNT_EXIST
all_3_3_0 proj_2 FILE_DOESNT_EXIST
0
broke data of part 'proj_2' (parent part: all_1_1_0)
Detach - Attach
broken projections info
all_1_1_0 proj NO_FILE_IN_DATA_PART
all_1_1_0 proj_2 FILE_DOESNT_EXIST
all_2_2_0 proj NO_FILE_IN_DATA_PART
all_2_2_0 proj_2 FILE_DOESNT_EXIST
all_3_3_0 proj_2 FILE_DOESNT_EXIST
system.parts
all_0_0_0 1 ['proj','proj_2']
all_1_1_0 1 []
all_2_2_0 1 []
all_3_3_0 0 ['proj']
all_3_5_1 1 ['proj']
all_4_4_0 0 ['proj','proj_2']
all_5_5_0 0 ['proj','proj_2']
select from projection 'proj'
16
12
used projections
SELECT c FROM test WHERE d == 12 OR d == 16; ['default.test.proj']
select from projection 'proj_2'
16
12
used projections
SELECT d FROM test WHERE c == 12 OR c == 16; ['default.test.proj_2']
check table
0
check table full
all_3_5_1 1
all_0_0_0 1
all_1_1_0 0 Part all_1_1_0 has a broken projection. It will be ignored.
all_2_2_0 0 Part all_2_2_0 has a broken projection. It will be ignored.
materialize projection proj
check table full
all_3_5_1_6 1
all_0_0_0_6 1
all_2_2_0_6 1
all_1_1_0_6 1
system.parts
all_0_0_0 0 ['proj','proj_2']
all_0_0_0_6 1 ['proj','proj_2']
all_1_1_0 0 []
all_1_1_0_6 1 ['proj','proj_2']
all_2_2_0 0 []
all_2_2_0_6 1 ['proj','proj_2']
all_3_3_0 0 ['proj']
all_3_5_1 0 ['proj']
all_3_5_1_6 1 ['proj']
all_4_4_0 0 ['proj','proj_2']
all_5_5_0 0 ['proj','proj_2']
select from projection 'proj'
16
12
used projections
SELECT c FROM test WHERE d == 12 OR d == 16; ['default.test.proj']
select from projection 'proj_2'
12
16
used projections
SELECT d FROM test WHERE c == 12 OR c == 16; ['default.test.proj_2']
check table
1
materialize projection proj_2
check table full
all_3_5_1_7 1
all_0_0_0_7 1
all_2_2_0_7 1
all_1_1_0_7 1
system.parts
all_0_0_0 0 ['proj','proj_2']
all_0_0_0_6 0 ['proj','proj_2']
all_0_0_0_7 1 ['proj','proj_2']
all_1_1_0 0 []
all_1_1_0_6 0 ['proj','proj_2']
all_1_1_0_7 1 ['proj','proj_2']
all_2_2_0 0 []
all_2_2_0_6 0 ['proj','proj_2']
all_2_2_0_7 1 ['proj','proj_2']
all_3_3_0 0 ['proj']
all_3_5_1 0 ['proj']
all_3_5_1_6 0 ['proj']
all_3_5_1_7 1 ['proj','proj_2']
all_4_4_0 0 ['proj','proj_2']
all_5_5_0 0 ['proj','proj_2']
select from projection 'proj'
12
16
used projections
SELECT c FROM test WHERE d == 12 OR d == 16; ['default.test.proj']
select from projection 'proj_2'
16
12
used projections
SELECT d FROM test WHERE c == 12 OR c == 16; ['default.test.proj_2']
check table
1

View File

@ -0,0 +1,283 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_CLIENT -nm -q "
DROP TABLE IF EXISTS test SYNC;
CREATE TABLE test
(
a String,
b String,
c Int32,
d Int32,
e Int32,
PROJECTION proj
(
SELECT c ORDER BY d
),
PROJECTION proj_2
(
SELECT d ORDER BY c
)
)
ENGINE = ReplicatedMergeTree('/test2/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/', '1') PRIMARY KEY (a)
SETTINGS min_bytes_for_wide_part = 0,
max_parts_to_merge_at_once=3,
enable_vertical_merge_algorithm=1,
vertical_merge_algorithm_min_rows_to_activate = 1,
vertical_merge_algorithm_min_columns_to_activate = 1,
vertical_merge_algorithm_min_columns_to_activate = 1;
"
table_uuid=$($CLICKHOUSE_CLIENT -q "SELECT uuid FROM system.tables WHERE table='test' and database=currentDatabase()")
function random()
{
cat /dev/urandom | LC_ALL=C tr -dc 'a-zA-Z' | fold -w ${1:-8} | head -n 1
}
function insert()
{
offset=$1
size=$2
echo 'insert new part'
$CLICKHOUSE_CLIENT -q "INSERT INTO test SELECT number, number, number, number, number%2 FROM numbers($offset, $size);"
}
function break_projection()
{
part_name=$1
parent_name=$2
break_type=$3
read -r disk_name part_path <<< $($CLICKHOUSE_CLIENT -nm -q "
SELECT disk_name, path
FROM system.projection_parts
WHERE table='test'
AND database=currentDatabase()
AND active=1
AND part_name='$part_name'
AND parent_name='$parent_name'
LIMIT 1;
")
path=$($CLICKHOUSE_CLIENT -q "SELECT path FROM system.disks WHERE name='$disk_name'")
# make sure path is absolute
$CLICKHOUSE_CLIENT -q "select throwIf(substring('$path', 1, 1) != '/', 'Path is relative: $path')" || exit
if [ "$break_type" = "data" ]
then
rm "$path/$part_path/d.bin"
rm "$path/$part_path/c.bin"
echo "broke data of part '$part_name' (parent part: $parent_name)"
else
rm "$path/$part_path/columns.txt"
echo "broke metadata of part '$part_name' (parent part: $parent_name)"
fi
}
function broken_projections_info()
{
echo 'broken projections info'
$CLICKHOUSE_CLIENT -q "
SELECT parent_name, name, errors.name FROM
(
SELECT parent_name, name, exception_code
FROM system.projection_parts
WHERE table='test'
AND database=currentDatabase()
AND is_broken = 1
) AS parts_info
INNER JOIN system.errors AS errors
ON parts_info.exception_code = errors.code
ORDER BY parent_name, name
"
}
function check()
{
expect_broken_part=""
expected_error=""
if [ $# -ne 0 ]; then
expect_broken_part=$1
expected_error=$2
fi
echo 'system.parts'
$CLICKHOUSE_CLIENT -q "
SELECT name, active, projections
FROM system.parts
WHERE table='test' AND database=currentDatabase()
ORDER BY name;"
echo "select from projection 'proj'"
query_id=$(random 8)
if [ "$expect_broken_part" = "proj" ]
then
$CLICKHOUSE_CLIENT --optimize_use_projections 1 --query_id $query_id -q "SELECT c FROM test WHERE d == 12;" 2>&1 | grep -o $expected_error
else
$CLICKHOUSE_CLIENT --optimize_use_projections 1 --query_id $query_id -q "SELECT c FROM test WHERE d == 12 OR d == 16;"
echo 'used projections'
$CLICKHOUSE_CLIENT -nm -q "
SYSTEM FLUSH LOGS;
SELECT query, projections FROM system.query_log WHERE query_id='$query_id' and type='QueryFinish'
"
fi
echo "select from projection 'proj_2'"
query_id=$(random 8)
if [ "$expect_broken_part" = "proj_2" ]
then
$CLICKHOUSE_CLIENT --optimize_use_projections 1 --query_id $query_id -q "SELECT d FROM test WHERE c == 12;" 2>&1 | grep -o $expected_error
else
$CLICKHOUSE_CLIENT --optimize_use_projections 1 --query_id $query_id -q "SELECT d FROM test WHERE c == 12 OR c == 16;"
echo 'used projections'
$CLICKHOUSE_CLIENT -nm -q "
SYSTEM FLUSH LOGS;
SELECT query, projections FROM system.query_log WHERE query_id='$query_id' and type='QueryFinish'
"
fi
echo 'check table'
$CLICKHOUSE_CLIENT -q "CHECK TABLE test"
}
function optimize_no_wait()
{
echo 'optimize'
$CLICKHOUSE_CLIENT -nm -q "OPTIMIZE TABLE test SETTINGS alter_sync=0;"
}
function reattach()
{
echo 'Detach - Attach'
$CLICKHOUSE_CLIENT -nm -q "
DETACH TABLE test;
ATTACH TABLE test;
"
}
function materialize_projection
{
projection=$1
echo "materialize projection $projection"
$CLICKHOUSE_CLIENT -q "ALTER TABLE test MATERIALIZE PROJECTION $projection SETTINGS mutations_sync=2"
}
function check_table_full()
{
echo 'check table full'
$CLICKHOUSE_CLIENT -q "CHECK TABLE test SETTINGS check_query_single_value_result = 0"
}
insert 0 5
insert 5 5
insert 10 5
insert 15 5
check
# Break metadata file of projection 'proj'
break_projection proj all_2_2_0 metadata
# Do select and after "check table" query.
# Select works because it does not read columns.txt.
check
# Projection 'proj' from part all_2_2_0 will now appear in broken parts info
# because it was marked broken during "check table" query.
# TODO: try to mark it during select as well
broken_projections_info
# Check table query will also show a list of parts which have broken projections.
check_table_full
# Break data file of projection 'proj_2' for part all_2_2_0
break_projection proj_2 all_2_2_0 data
# It will not yet appear in broken projections info.
broken_projections_info
# Select now fails with error "File doesn't exist"
check "proj_2" "FILE_DOESNT_EXIST"
# Projection 'proj_2' from part all_2_2_0 will now appear in broken parts info.
broken_projections_info
# Second select works, because projection is now marked as broken.
check
# Break data file of projection 'proj_2' for part all_3_3_0
break_projection proj_2 all_3_3_0 data
# It will not yet appear in broken projections info.
broken_projections_info
insert 20 5
insert 25 5
# Part all_3_3_0 has 'proj' and 'proj_2' projections, but 'proj_2' is broken and server does NOT know it yet.
# Parts all_4_4_0 and all_5_5_0 have both non-broken projections.
# So a merge will be create for future part all_3_5_1.
# During merge it will fail to read from 'proj_2' of part all_3_3_0 and proj_2 will be marked broken.
# Merge will be retried and on second attempt it will succeed.
# The result part all_3_5_1 will have only 1 projection - 'proj', because
# it will skip 'proj_2' as it will see that one part does not have it anymore in the set of valid projections.
optimize_no_wait
sleep 2
$CLICKHOUSE_CLIENT -nm -q "
SYSTEM FLUSH LOGS;
SELECT count() FROM system.text_log
WHERE level='Error'
AND logger_name='MergeTreeBackgroundExecutor'
AND message like 'Exception while executing background task {$table_uuid:all_3_5_1}%Cannot open file%proj_2.proj/c.bin%'
"
# Projection 'proj_2' from part all_2_2_0 will now appear in broken parts info.
broken_projections_info
check
break_projection proj all_1_1_0 metadata
reattach
broken_projections_info
break_projection proj_2 all_1_1_0 data
reattach
broken_projections_info
check
check_table_full
materialize_projection proj
check_table_full
check
materialize_projection proj_2
check_table_full
check
$CLICKHOUSE_CLIENT -nm -q "
DROP TABLE test;
"