Replaced removing checksums.txt with version

with the explicit data loading and verification.

If the function fails, the exception will re-throw upper,
cancelling the fetch in the handling of the replicated log entry,
but this event will also wake the PartCheckThread, which will
issue a re-fetch.
This commit is contained in:
Mike Kot 2021-03-22 18:44:44 +03:00
parent da67e06aa0
commit 9f25ee3719
3 changed files with 12 additions and 18 deletions

View File

@ -373,11 +373,8 @@ void MinimalisticDataPartChecksums::computeTotalChecksums(const MergeTreeDataPar
SipHash hash_of_uncompressed_files_state;
SipHash uncompressed_hash_of_compressed_files_state;
for (const auto & elem : full_checksums_.files)
for (const auto & [name, checksum] : full_checksums_.files)
{
const String & name = elem.first;
const auto & checksum = elem.second;
updateHash(hash_of_all_files_state, name);
hash_of_all_files_state.update(checksum.file_hash);

View File

@ -213,7 +213,7 @@ std::pair<bool, MergeTreeDataPartPtr> ReplicatedMergeTreePartCheckThread::findLo
/// because our checks of local storage and zookeeper are not consistent.
/// If part exists in zookeeper and doesn't exists in local storage definitely require
/// to fetch this part. But if we check local storage first and than check zookeeper
/// some background process can successfully commit part between this checks (both to the local stoarge and zookeeper),
/// some background process can successfully commit part between this checks (both to the local storage and zookeeper),
/// but checker thread will remove part from zookeeper and queue fetch.
bool exists_in_zookeeper = zookeeper->exists(part_path);
@ -234,6 +234,8 @@ CheckResult ReplicatedMergeTreePartCheckThread::checkPart(const String & part_na
auto [exists_in_zookeeper, part] = findLocalPart(part_name);
LOG_TRACE(log, "Part {} in zookeeper: {}, locally: {}", part_name, exists_in_zookeeper, part != nullptr);
/// We do not have this or a covering part.
if (!part)
{

View File

@ -1377,23 +1377,14 @@ MergeTreeData::MutableDataPartPtr StorageReplicatedMergeTree::attachPartHelperFo
const VolumePtr volume = std::make_shared<SingleDiskVolume>("volume_" + part_old_name, disk);
MergeTreeData::MutableDataPartPtr part = createPart(part_new_name, part_info, volume, part_path);
/// We don't check consistency as in that case this method may throw if the data is corrupted.
/// The faster way is to load invalid data and just check the checksums -- they won't match.
/// The issue here is that one of data part files may be corrupted (e.g. data.bin), but the
/// pre-calculated checksums.txt is correct, so that could lead to the invalid part being attached.
/// So we explicitly remove it and force recalculation.
disk->removeFileIfExists(part->getFullRelativePath() + "checksums.txt");
/// The try-catch here is for the case when one of the part's mandatory data files is missing, so the
/// internal loading method in the IMergeTreeDataPart throws.
/// We can't rethrow upper as in that case the fetching will not happen.
/// This situation is not thought to happen often, so handling the exception is ok.
try
{
part->loadColumnsChecksumsIndexes(false, false);
part->loadColumnsChecksumsIndexes(true, true);
}
catch (const Exception&) // we don't really want to know what exactly happened
catch (const Exception&)
{
/// This method throws if the part data is corrupted or partly missing. In this case, we simply don't
/// process the part.
continue;
}
@ -3333,12 +3324,16 @@ String StorageReplicatedMergeTree::findReplicaHavingPart(const String & part_nam
/// Select replicas in uniformly random order.
std::shuffle(replicas.begin(), replicas.end(), thread_local_rng);
LOG_TRACE(log, "Candidate replicas: {}", replicas.size());
for (const String & replica : replicas)
{
/// We aren't interested in ourself.
if (replica == replica_name)
continue;
LOG_TRACE(log, "Candidate replica: {}", replica);
if (checkReplicaHavePart(replica, part_name) &&
(!active || zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/is_active")))
return replica;