Replace lost parts with empty parts instead of hacking replication queue

This commit is contained in:
alesapin 2021-06-29 18:14:44 +03:00
parent cdc95fa98a
commit 5822f0ba29
5 changed files with 101 additions and 65 deletions

View File

@ -190,11 +190,9 @@ void ReplicatedMergeTreePartCheckThread::searchForMissingPartAndFetchIfPossible(
if (missing_part_search_result == MissingPartSearchResult::LostForever)
{
/// Is it in the replication queue? If there is - delete, because the task can not be processed.
if (!storage.queue.remove(zookeeper, part_name))
if (!storage.createEmptyPartInsteadOfLost(part_name))
{
/// The part was not in our queue.
LOG_WARNING(log, "Missing part {} is not in our queue, this can happen rarely.", part_name);
LOG_WARNING(log, "Cannot create empty part {} instead of lost. Will retry later", part_name);
}
/** This situation is possible if on all the replicas where the part was, it deteriorated.

View File

@ -410,62 +410,6 @@ void ReplicatedMergeTreeQueue::removeProcessedEntry(zkutil::ZooKeeperPtr zookeep
updateTimesInZooKeeper(zookeeper, min_unprocessed_insert_time_changed, max_processed_insert_time_changed);
}
bool ReplicatedMergeTreeQueue::remove(zkutil::ZooKeeperPtr zookeeper, const String & part_name)
{
LogEntryPtr found;
size_t queue_size = 0;
std::optional<time_t> min_unprocessed_insert_time_changed;
std::optional<time_t> max_processed_insert_time_changed;
{
std::unique_lock lock(state_mutex);
bool removed = virtual_parts.remove(part_name);
for (Queue::iterator it = queue.begin(); it != queue.end();)
{
if ((*it)->new_part_name == part_name)
{
found = *it;
if (removed)
{
/// Preserve invariant `virtual_parts` = `current_parts` + `queue`.
/// We remove new_part from virtual parts and add all source parts
/// which present in current_parts.
for (const auto & source_part : found->source_parts)
{
auto part_in_current_parts = current_parts.getContainingPart(source_part);
if (part_in_current_parts == source_part)
virtual_parts.add(source_part, nullptr, log);
}
}
updateStateOnQueueEntryRemoval(
found, /* is_successful = */ false,
min_unprocessed_insert_time_changed, max_processed_insert_time_changed, lock);
queue.erase(it++);
queue_size = queue.size();
break;
}
else
++it;
}
}
if (!found)
return false;
notifySubscribers(queue_size);
zookeeper->tryRemove(fs::path(replica_path) / "queue" / found->znode_name);
updateTimesInZooKeeper(zookeeper, min_unprocessed_insert_time_changed, max_processed_insert_time_changed);
return true;
}
bool ReplicatedMergeTreeQueue::removeFailedQuorumPart(const MergeTreePartInfo & part_info)
{
assert(part_info.level == 0);

View File

@ -281,11 +281,6 @@ public:
*/
void insert(zkutil::ZooKeeperPtr zookeeper, LogEntryPtr & entry);
/** Delete the action with the specified part (as new_part_name) from the queue.
* Called for unreachable actions in the queue - old lost parts.
*/
bool remove(zkutil::ZooKeeperPtr zookeeper, const String & part_name);
/** Load (initialize) a queue from ZooKeeper (/replicas/me/queue/).
* If queue was not empty load() would not load duplicate records.
* return true, if we update queue.

View File

@ -17,6 +17,7 @@
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Storages/MergeTree/MergeList.h>
#include <Storages/MergeTree/MergedBlockOutputStream.h>
#include <Storages/MergeTree/PinnedPartUUIDs.h>
#include <Storages/MergeTree/PartitionPruner.h>
#include <Storages/MergeTree/ReplicatedMergeTreeTableMetadata.h>
@ -7385,4 +7386,100 @@ bool StorageReplicatedMergeTree::checkIfDetachedPartitionExists(const String & p
}
return false;
}
bool StorageReplicatedMergeTree::createEmptyPartInsteadOfLost(const String & lost_part_name)
{
LOG_INFO(log, "Going to replace lost part {} with empty part", lost_part_name);
auto metadata_snapshot = getInMemoryMetadataPtr();
auto storage_settings = getSettings();
constexpr static auto TMP_PREFIX = "tmp_empty_";
auto new_part_info = MergeTreePartInfo::fromPartName(lost_part_name, format_version);
auto block = metadata_snapshot->getSampleBlock();
DB::IMergeTreeDataPart::TTLInfos move_ttl_infos;
NamesAndTypesList columns = metadata_snapshot->getColumns().getAllPhysical().filter(block.getNames());
ReservationPtr reservation = reserveSpacePreferringTTLRules(metadata_snapshot, 0, move_ttl_infos, time(nullptr), 0, true);
VolumePtr volume = getStoragePolicy()->getVolume(0);
IMergeTreeDataPart::MinMaxIndex minmax_idx;
minmax_idx.update(block, getMinMaxColumnsNames(metadata_snapshot->getPartitionKey()));
auto new_data_part = createPart(
lost_part_name,
choosePartType(0, block.rows()),
new_part_info,
createVolumeFromReservation(reservation, volume),
TMP_PREFIX + lost_part_name);
if (storage_settings->assign_part_uuids)
new_data_part->uuid = UUIDHelpers::generateV4();
new_data_part->setColumns(columns);
new_data_part->rows_count = block.rows();
auto parts_in_partition = getDataPartsPartitionRange(new_part_info.partition_id);
if (parts_in_partition.empty())
{
LOG_WARNING(log, "Empty part {} is not created instead of lost part because there is no parts in partition {}, just DROP this partition.", lost_part_name, new_part_info.partition_id);
return false;
}
new_data_part->partition = (*parts_in_partition.begin())->partition;
new_data_part->minmax_idx = std::move(minmax_idx);
new_data_part->is_temp = true;
SyncGuardPtr sync_guard;
if (new_data_part->isStoredOnDisk())
{
/// The name could be non-unique in case of stale files from previous runs.
String full_path = new_data_part->getFullRelativePath();
if (new_data_part->volume->getDisk()->exists(full_path))
{
LOG_WARNING(log, "Removing old temporary directory {}", fullPath(new_data_part->volume->getDisk(), full_path));
new_data_part->volume->getDisk()->removeRecursive(full_path);
}
const auto disk = new_data_part->volume->getDisk();
disk->createDirectories(full_path);
if (getSettings()->fsync_part_directory)
sync_guard = disk->getDirectorySyncGuard(full_path);
}
/// This effectively chooses minimal compression method:
/// either default lz4 or compression method with zero thresholds on absolute and relative part size.
auto compression_codec = getContext()->chooseCompressionCodec(0, 0);
const auto & index_factory = MergeTreeIndexFactory::instance();
MergedBlockOutputStream out(new_data_part, metadata_snapshot, columns, index_factory.getMany(metadata_snapshot->getSecondaryIndices()), compression_codec);
bool sync_on_insert = storage_settings->fsync_after_insert;
out.writePrefix();
out.write(block);
out.writeSuffixAndFinalizePart(new_data_part, sync_on_insert);
try
{
MergeTreeData::Transaction transaction(*this);
renameTempPartAndReplace(new_data_part, nullptr, &transaction);
checkPartChecksumsAndCommit(transaction, new_data_part);
}
catch (const Exception & ex)
{
LOG_WARNING(log, "Cannot commit empty part {} with error {}", lost_part_name, ex.displayText());
return false;
}
LOG_INFO(log, "Created empty part {} instead of lost part", lost_part_name);
return true;
}
}

View File

@ -258,6 +258,8 @@ public:
return replicated_sends_throttler;
}
bool createEmptyPartInsteadOfLost(const String & lost_part_name);
private:
std::atomic_bool are_restoring_replica {false};