Merge pull request #63976 from ClickHouse/backport/24.3/63202

Backport #63202 to 24.3: fix intersect parts when restart after drop range
This commit is contained in:
robot-ch-test-poll 2024-10-16 16:58:21 +02:00 committed by GitHub
commit 3e29f2d91a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 397 additions and 198 deletions

View File

@ -32,7 +32,7 @@ WHERE name LIKE '%thread_pool%'
```
``` text
┌─name────────────────────────────────────────┬─value─┬─default─┬─changed─┬─description─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┬─type───┬─changeable_without_restart─┬─is_obsolete─┐
┌─name──────────────────────────────────────────┬─value─┬─default─┬─changed─┬─description─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┬─type───┬─changeable_without_restart─┬─is_obsolete─┐
│ max_thread_pool_size │ 10000 │ 10000 │ 0 │ The maximum number of threads that could be allocated from the OS and used for query execution and background operations. │ UInt64 │ No │ 0 │
│ max_thread_pool_free_size │ 1000 │ 1000 │ 0 │ The maximum number of threads that will always stay in a global thread pool once allocated and remain idle in case of insufficient number of tasks. │ UInt64 │ No │ 0 │
│ thread_pool_queue_size │ 10000 │ 10000 │ 0 │ The maximum number of tasks that will be placed in a queue and wait for execution. │ UInt64 │ No │ 0 │
@ -41,11 +41,12 @@ WHERE name LIKE '%thread_pool%'
│ io_thread_pool_queue_size │ 10000 │ 10000 │ 0 │ Queue size for IO thread pool. │ UInt64 │ No │ 0 │
│ max_active_parts_loading_thread_pool_size │ 64 │ 64 │ 0 │ The number of threads to load active set of data parts (Active ones) at startup. │ UInt64 │ No │ 0 │
│ max_outdated_parts_loading_thread_pool_size │ 32 │ 32 │ 0 │ The number of threads to load inactive set of data parts (Outdated ones) at startup. │ UInt64 │ No │ 0 │
│ max_unexpected_parts_loading_thread_pool_size │ 32 │ 32 │ 0 │ The number of threads to load inactive set of data parts (Unexpected ones) at startup. │ UInt64 │ No │ 0 │
│ max_parts_cleaning_thread_pool_size │ 128 │ 128 │ 0 │ The number of threads for concurrent removal of inactive data parts. │ UInt64 │ No │ 0 │
│ max_backups_io_thread_pool_size │ 1000 │ 1000 │ 0 │ The maximum number of threads that would be used for IO operations for BACKUP queries │ UInt64 │ No │ 0 │
│ max_backups_io_thread_pool_free_size │ 0 │ 0 │ 0 │ Max free size for backups IO thread pool. │ UInt64 │ No │ 0 │
│ backups_io_thread_pool_queue_size │ 0 │ 0 │ 0 │ Queue size for backups IO thread pool. │ UInt64 │ No │ 0 │
└─────────────────────────────────────────────┴───────┴─────────┴─────────┴─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┴────────┴────────────────────────────┴─────────────┘
└───────────────────────────────────────────────┴───────┴─────────┴─────────┴─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┴────────┴────────────────────────────┴─────────────┘
```

View File

@ -161,6 +161,14 @@ void LocalServer::initialize(Poco::Util::Application & self)
getOutdatedPartsLoadingThreadPool().setMaxTurboThreads(active_parts_loading_threads);
const size_t unexpected_parts_loading_threads = config().getUInt("max_unexpected_parts_loading_thread_pool_size", 32);
getUnexpectedPartsLoadingThreadPool().initialize(
unexpected_parts_loading_threads,
0, // We don't need any threads one all the parts will be loaded
unexpected_parts_loading_threads);
getUnexpectedPartsLoadingThreadPool().setMaxTurboThreads(active_parts_loading_threads);
const size_t cleanup_threads = config().getUInt("max_parts_cleaning_thread_pool_size", 128);
getPartsCleaningThreadPool().initialize(
cleanup_threads,

View File

@ -843,6 +843,16 @@ try
server_settings.max_active_parts_loading_thread_pool_size
);
getUnexpectedPartsLoadingThreadPool().initialize(
server_settings.max_unexpected_parts_loading_thread_pool_size,
0, // We don't need any threads once all the parts will be loaded
server_settings.max_unexpected_parts_loading_thread_pool_size);
/// It could grow if we need to synchronously wait until all the data parts will be loaded.
getUnexpectedPartsLoadingThreadPool().setMaxTurboThreads(
server_settings.max_active_parts_loading_thread_pool_size
);
getPartsCleaningThreadPool().initialize(
server_settings.max_parts_cleaning_thread_pool_size,
0, // We don't need any threads one all the parts will be deleted

View File

@ -177,6 +177,9 @@
M(MergeTreeOutdatedPartsLoaderThreads, "Number of threads in the threadpool for loading Outdated data parts.") \
M(MergeTreeOutdatedPartsLoaderThreadsActive, "Number of active threads in the threadpool for loading Outdated data parts.") \
M(MergeTreeOutdatedPartsLoaderThreadsScheduled, "Number of queued or active jobs in the threadpool for loading Outdated data parts.") \
M(MergeTreeUnexpectedPartsLoaderThreads, "Number of threads in the threadpool for loading Unexpected data parts.") \
M(MergeTreeUnexpectedPartsLoaderThreadsActive, "Number of active threads in the threadpool for loading Unexpected data parts.") \
M(MergeTreeUnexpectedPartsLoaderThreadsScheduled, "Number of queued or active jobs in the threadpool for loading Unexpected data parts.") \
M(MergeTreePartsCleanerThreads, "Number of threads in the MergeTree parts cleaner thread pool.") \
M(MergeTreePartsCleanerThreadsActive, "Number of threads in the MergeTree parts cleaner thread pool running a task.") \
M(MergeTreePartsCleanerThreadsScheduled, "Number of queued or active jobs in the MergeTree parts cleaner thread pool.") \

View File

@ -25,6 +25,7 @@ namespace DB
M(UInt64, io_thread_pool_queue_size, 10000, "Queue size for IO thread pool.", 0) \
M(UInt64, max_active_parts_loading_thread_pool_size, 64, "The number of threads to load active set of data parts (Active ones) at startup.", 0) \
M(UInt64, max_outdated_parts_loading_thread_pool_size, 32, "The number of threads to load inactive set of data parts (Outdated ones) at startup.", 0) \
M(UInt64, max_unexpected_parts_loading_thread_pool_size, 8, "The number of threads to load inactive set of data parts (Unexpected ones) at startup.", 0) \
M(UInt64, max_parts_cleaning_thread_pool_size, 128, "The number of threads for concurrent removal of inactive data parts.", 0) \
M(UInt64, max_mutations_bandwidth_for_server, 0, "The maximum read speed of all mutations on server in bytes per second. Zero means unlimited.", 0) \
M(UInt64, max_merges_bandwidth_for_server, 0, "The maximum read speed of all merges on server in bytes per second. Zero means unlimited.", 0) \

View File

@ -20,6 +20,9 @@ namespace CurrentMetrics
extern const Metric MergeTreeOutdatedPartsLoaderThreads;
extern const Metric MergeTreeOutdatedPartsLoaderThreadsActive;
extern const Metric MergeTreeOutdatedPartsLoaderThreadsScheduled;
extern const Metric MergeTreeUnexpectedPartsLoaderThreads;
extern const Metric MergeTreeUnexpectedPartsLoaderThreadsActive;
extern const Metric MergeTreeUnexpectedPartsLoaderThreadsScheduled;
extern const Metric DatabaseReplicatedCreateTablesThreads;
extern const Metric DatabaseReplicatedCreateTablesThreadsActive;
extern const Metric DatabaseReplicatedCreateTablesThreadsScheduled;
@ -151,6 +154,12 @@ StaticThreadPool & getOutdatedPartsLoadingThreadPool()
return instance;
}
StaticThreadPool & getUnexpectedPartsLoadingThreadPool()
{
static StaticThreadPool instance("MergeTreeUnexpectedPartsLoaderThreadPool", CurrentMetrics::MergeTreeUnexpectedPartsLoaderThreads, CurrentMetrics::MergeTreeUnexpectedPartsLoaderThreadsActive, CurrentMetrics::MergeTreeUnexpectedPartsLoaderThreadsScheduled);
return instance;
}
StaticThreadPool & getDatabaseReplicatedCreateTablesThreadPool()
{
static StaticThreadPool instance("CreateTablesThreadPool", CurrentMetrics::DatabaseReplicatedCreateTablesThreads, CurrentMetrics::DatabaseReplicatedCreateTablesThreadsActive, CurrentMetrics::DatabaseReplicatedCreateTablesThreadsScheduled);

View File

@ -64,6 +64,8 @@ StaticThreadPool & getPartsCleaningThreadPool();
/// the number of threads by calling enableTurboMode() :-)
StaticThreadPool & getOutdatedPartsLoadingThreadPool();
StaticThreadPool & getUnexpectedPartsLoadingThreadPool();
/// ThreadPool used for creating tables in DatabaseReplicated.
StaticThreadPool & getDatabaseReplicatedCreateTablesThreadPool();

View File

@ -1257,6 +1257,33 @@ void IMergeTreeDataPart::appendFilesOfChecksums(Strings & files)
files.push_back("checksums.txt");
}
void IMergeTreeDataPart::loadRowsCountFileForUnexpectedPart()
{
auto read_rows_count = [&]()
{
auto buf = metadata_manager->read("count.txt");
readIntText(rows_count, *buf);
assertEOF(*buf);
};
if (storage.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING || part_type == Type::Compact || parent_part)
{
if (metadata_manager->exists("count.txt"))
{
read_rows_count();
return;
}
}
else
{
if (getDataPartStorage().exists("count.txt"))
{
read_rows_count();
return;
}
}
throw Exception(ErrorCodes::NO_FILE_IN_DATA_PART, "No count.txt in part {}", name);
}
void IMergeTreeDataPart::loadRowsCount()
{
auto read_rows_count = [&]()

View File

@ -183,6 +183,8 @@ public:
void loadColumnsChecksumsIndexes(bool require_columns_checksums, bool check_consistency);
void appendFilesOfColumnsChecksumsIndexes(Strings & files, bool include_projection = false) const;
void loadRowsCountFileForUnexpectedPart();
String getMarksFileExtension() const { return index_granularity_info.mark_type.getFileExtension(); }
/// Generate the new name for this part according to `new_part_info` and min/max dates from the old name.

View File

@ -1253,6 +1253,46 @@ static constexpr size_t loading_parts_initial_backoff_ms = 100;
static constexpr size_t loading_parts_max_backoff_ms = 5000;
static constexpr size_t loading_parts_max_tries = 3;
void MergeTreeData::loadUnexpectedDataPart(UnexpectedPartLoadState & state)
{
const MergeTreePartInfo & part_info = state.loading_info->info;
const String & part_name = state.loading_info->name;
const DiskPtr & part_disk_ptr = state.loading_info->disk;
LOG_TRACE(log, "Loading unexpected part {} from disk {}", part_name, part_disk_ptr->getName());
LoadPartResult res;
auto single_disk_volume = std::make_shared<SingleDiskVolume>("volume_" + part_name, part_disk_ptr, 0);
auto data_part_storage = std::make_shared<DataPartStorageOnDiskFull>(single_disk_volume, relative_data_path, part_name);
String part_path = fs::path(relative_data_path) / part_name;
try
{
state.part = getDataPartBuilder(part_name, single_disk_volume, part_name)
.withPartInfo(part_info)
.withPartFormatFromDisk()
.build();
state.part->loadRowsCountFileForUnexpectedPart();
}
catch (...)
{
LOG_DEBUG(log, "Failed to load unexcepted data part {} with exception: {}", part_name, getExceptionMessage(std::current_exception(), false));
if (!state.part)
{
/// Build a fake part and mark it as broken in case of filesystem error.
/// If the error impacts part directory instead of single files,
/// an exception will be thrown during detach and silently ignored.
state.part = getDataPartBuilder(part_name, single_disk_volume, part_name)
.withPartStorageType(MergeTreeDataPartStorageType::Full)
.withPartType(MergeTreeDataPartType::Wide)
.build();
}
state.is_broken = true;
tryLogCurrentException(log, fmt::format("while loading unexcepted part {} on path {}", part_name, part_path));
}
}
MergeTreeData::LoadPartResult MergeTreeData::loadDataPart(
const MergeTreePartInfo & part_info,
const String & part_name,
@ -1676,6 +1716,7 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks, std::optional<std::un
auto runner = threadPoolCallbackRunner<void>(getActivePartsLoadingThreadPool().get(), "ActiveParts");
std::vector<PartLoadingTree::PartLoadingInfos> parts_to_load_by_disk(disks.size());
std::vector<PartLoadingTree::PartLoadingInfos> unexpected_parts_to_load_by_disk(disks.size());
std::vector<std::future<void>> disks_futures;
disks_futures.reserve(disks.size());
@ -1687,6 +1728,7 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks, std::optional<std::un
continue;
auto & disk_parts = parts_to_load_by_disk[i];
auto & unexpected_disk_parts = unexpected_parts_to_load_by_disk[i];
disks_futures.push_back(runner([&, disk_ptr]()
{
@ -1698,8 +1740,13 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks, std::optional<std::un
continue;
if (auto part_info = MergeTreePartInfo::tryParsePartName(it->name(), format_version))
{
if (expected_parts && !expected_parts->contains(it->name()))
unexpected_disk_parts.emplace_back(*part_info, it->name(), disk_ptr);
else
disk_parts.emplace_back(*part_info, it->name(), disk_ptr);
}
}
}, Priority{0}));
}
@ -1709,6 +1756,9 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks, std::optional<std::un
PartLoadingTree::PartLoadingInfos parts_to_load;
for (auto & disk_parts : parts_to_load_by_disk)
std::move(disk_parts.begin(), disk_parts.end(), std::back_inserter(parts_to_load));
PartLoadingTree::PartLoadingInfos unexpected_parts_to_load;
for (auto & disk_parts : unexpected_parts_to_load_by_disk)
std::move(disk_parts.begin(), disk_parts.end(), std::back_inserter(unexpected_parts_to_load));
auto loading_tree = PartLoadingTree::build(std::move(parts_to_load));
@ -1784,7 +1834,7 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks, std::optional<std::un
}
}
if (num_parts == 0)
if (num_parts == 0 && unexpected_parts_to_load.empty())
{
resetObjectColumnsFromActiveParts(part_lock);
LOG_DEBUG(log, "There are no data parts");
@ -1837,6 +1887,36 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks, std::optional<std::un
calculateColumnAndSecondaryIndexSizesImpl();
PartLoadingTreeNodes unloaded_parts;
std::vector<UnexpectedPartLoadState> unexpected_unloaded_data_parts;
for (const auto & [info, name, disk] : unexpected_parts_to_load)
{
bool uncovered = true;
for (const auto & part : unexpected_parts_to_load)
{
if (name != part.name && part.info.contains(info))
{
uncovered = false;
break;
}
}
unexpected_unloaded_data_parts.push_back({std::make_shared<PartLoadingTree::Node>(info, name, disk), uncovered, /*is_broken*/ false, /*part*/ nullptr});
}
if (!unexpected_unloaded_data_parts.empty())
{
LOG_DEBUG(log, "Found {} unexpected data parts. They will be loaded asynchronously", unexpected_unloaded_data_parts.size());
{
std::lock_guard lock(unexpected_data_parts_mutex);
unexpected_data_parts = std::move(unexpected_unloaded_data_parts);
unexpected_data_parts_loading_finished = false;
}
unexpected_data_parts_loading_task = getContext()->getSchedulePool().createTask(
"MergeTreeData::loadUnexpectedDataParts",
[this] { loadUnexpectedDataParts(); });
}
loading_tree.traverse(/*recursive=*/ true, [&](const auto & node)
{
if (!node->is_loaded)
@ -1862,6 +1942,55 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks, std::optional<std::un
data_parts_loading_finished = true;
}
void MergeTreeData::loadUnexpectedDataParts()
{
{
std::lock_guard lock(unexpected_data_parts_mutex);
if (unexpected_data_parts.empty())
{
unexpected_data_parts_loading_finished = true;
unexpected_data_parts_cv.notify_all();
return;
}
LOG_DEBUG(log, "Loading {} unexpected data parts",
unexpected_data_parts.size());
}
ThreadFuzzer::maybeInjectSleep();
auto runner = threadPoolCallbackRunner<void>(getUnexpectedPartsLoadingThreadPool().get(), "UnexpectedParts");
std::vector<std::future<void>> parts_futures;
for (auto & load_state : unexpected_data_parts)
{
std::lock_guard lock(unexpected_data_parts_mutex);
chassert(!load_state.part);
if (unexpected_data_parts_loading_canceled)
{
waitForAllToFinishAndRethrowFirstError(parts_futures);
return;
}
parts_futures.push_back(runner([&]()
{
loadUnexpectedDataPart(load_state);
chassert(load_state.part);
if (load_state.is_broken)
{
load_state.part->renameToDetached("broken-on-start"); /// detached parts must not have '_' in prefixes
}
}, Priority{}));
}
waitForAllToFinishAndRethrowFirstError(parts_futures);
LOG_DEBUG(log, "Loaded {} unexpected data parts", unexpected_data_parts.size());
{
std::lock_guard lock(unexpected_data_parts_mutex);
unexpected_data_parts_loading_finished = true;
unexpected_data_parts_cv.notify_all();
}
}
void MergeTreeData::loadOutdatedDataParts(bool is_async)
try
{
@ -1989,17 +2118,55 @@ void MergeTreeData::waitForOutdatedPartsToBeLoaded() const TSA_NO_THREAD_SAFETY_
LOG_TRACE(log, "Finished waiting for outdated data parts to be loaded");
}
void MergeTreeData::startOutdatedDataPartsLoadingTask()
void MergeTreeData::waitForUnexpectedPartsToBeLoaded() const TSA_NO_THREAD_SAFETY_ANALYSIS
{
/// Background tasks are not run if storage is static.
if (isStaticStorage())
return;
/// If waiting is not required, do NOT log and do NOT enable/disable turbo mode to make `waitForUnexpectedPartsToBeLoaded` a lightweight check
{
std::unique_lock lock(unexpected_data_parts_mutex);
if (unexpected_data_parts_loading_canceled)
throw Exception(ErrorCodes::NOT_INITIALIZED, "Loading of unexpected data parts was already canceled");
if (unexpected_data_parts_loading_finished)
return;
}
/// We need to load parts as fast as possible
getUnexpectedPartsLoadingThreadPool().enableTurboMode();
SCOPE_EXIT({
/// Let's lower the number of threads e.g. for later ATTACH queries to behave as usual
getUnexpectedPartsLoadingThreadPool().disableTurboMode();
});
LOG_TRACE(log, "Will wait for unexpected data parts to be loaded");
std::unique_lock lock(unexpected_data_parts_mutex);
unexpected_data_parts_cv.wait(lock, [this]() TSA_NO_THREAD_SAFETY_ANALYSIS
{
return unexpected_data_parts_loading_finished || unexpected_data_parts_loading_canceled;
});
if (unexpected_data_parts_loading_canceled)
throw Exception(ErrorCodes::NOT_INITIALIZED, "Loading of unexpected data parts was canceled");
LOG_TRACE(log, "Finished waiting for unexpected data parts to be loaded");
}
void MergeTreeData::startOutdatedAndUnexpectedDataPartsLoadingTask()
{
if (outdated_data_parts_loading_task)
outdated_data_parts_loading_task->activateAndSchedule();
if (unexpected_data_parts_loading_task)
unexpected_data_parts_loading_task->activateAndSchedule();
}
void MergeTreeData::stopOutdatedDataPartsLoadingTask()
void MergeTreeData::stopOutdatedAndUnexpectedDataPartsLoadingTask()
{
if (!outdated_data_parts_loading_task)
return;
if (outdated_data_parts_loading_task)
{
{
std::lock_guard lock(outdated_data_parts_mutex);
outdated_data_parts_loading_canceled = true;
@ -2007,6 +2174,18 @@ void MergeTreeData::stopOutdatedDataPartsLoadingTask()
outdated_data_parts_loading_task->deactivate();
outdated_data_parts_cv.notify_all();
}
if (unexpected_data_parts_loading_task)
{
{
std::lock_guard lock(unexpected_data_parts_mutex);
unexpected_data_parts_loading_canceled = true;
}
unexpected_data_parts_loading_task->deactivate();
unexpected_data_parts_cv.notify_all();
}
}
/// Is the part directory old.
@ -4058,16 +4237,13 @@ void MergeTreeData::outdateUnexpectedPartAndCloneToDetached(const DataPartPtr &
removePartsFromWorkingSet(NO_TRANSACTION_RAW, {part_to_detach}, true, &lock);
}
void MergeTreeData::forcefullyMovePartToDetachedAndRemoveFromMemory(const MergeTreeData::DataPartPtr & part_to_detach, const String & prefix, bool restore_covered)
void MergeTreeData::forcefullyMovePartToDetachedAndRemoveFromMemory(const MergeTreeData::DataPartPtr & part_to_detach, const String & prefix)
{
if (prefix.empty())
LOG_INFO(log, "Renaming {} to {} and forgetting it.", part_to_detach->getDataPartStorage().getPartDirectory(), part_to_detach->name);
else
LOG_INFO(log, "Renaming {} to {}_{} and forgetting it.", part_to_detach->getDataPartStorage().getPartDirectory(), prefix, part_to_detach->name);
if (restore_covered)
waitForOutdatedPartsToBeLoaded();
auto lock = lockParts();
bool removed_active_part = false;
bool restored_active_part = false;
@ -4093,132 +4269,6 @@ void MergeTreeData::forcefullyMovePartToDetachedAndRemoveFromMemory(const MergeT
LOG_TEST(log, "forcefullyMovePartToDetachedAndRemoveFromMemory: removing {} from data_parts_indexes", part->getNameWithState());
data_parts_indexes.erase(it_part);
if (restore_covered && part->info.level == 0 && part->info.mutation == 0)
{
LOG_WARNING(log, "Will not recover parts covered by zero-level part {}", part->name);
return;
}
/// Let's restore some parts covered by unexpected to avoid partial data
if (restore_covered)
{
Strings restored;
Strings error_parts;
auto is_appropriate_state = [] (const DataPartPtr & part_)
{
/// In rare cases, we may have a chain of unexpected parts that cover common source parts, e.g. all_1_2_3, all_1_3_4
/// It may happen as a result of interrupted cloneReplica
bool already_active = part_->getState() == DataPartState::Active;
if (!already_active && part_->getState() != DataPartState::Outdated)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to restore a part {} from unexpected state: {}", part_->name, part_->getState());
return !already_active;
};
auto activate_part = [this, &restored_active_part](auto it)
{
/// It's not clear what to do if we try to activate part that was removed in transaction.
/// It may happen only in ReplicatedMergeTree, so let's simply throw LOGICAL_ERROR for now.
chassert((*it)->version.isRemovalTIDLocked());
if ((*it)->version.removal_tid_lock == Tx::PrehistoricTID.getHash())
(*it)->version.unlockRemovalTID(Tx::PrehistoricTID, TransactionInfoContext{getStorageID(), (*it)->name});
else
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot activate part {} that was removed by transaction ({})",
(*it)->name, (*it)->version.removal_tid_lock);
addPartContributionToColumnAndSecondaryIndexSizes(*it);
addPartContributionToDataVolume(*it);
modifyPartState(it, DataPartState::Active); /// iterator is not invalidated here
restored_active_part = true;
};
/// ActiveDataPartSet allows to restore most top-level parts instead of unexpected.
/// It can be important in case of assigned merges. If unexpected part is result of some
/// finished, but not committed merge then we should restore (at least try to restore)
/// closest ancestors for the unexpected part to be able to execute it.
/// However it's not guaranteed because outdated parts can intersect
ActiveDataPartSet parts_for_replacement(format_version);
auto range = getDataPartsPartitionRange(part->info.partition_id);
DataPartsVector parts_candidates(range.begin(), range.end());
/// In case of intersecting outdated parts we want to add bigger parts (with higher level) first
auto comparator = [] (const DataPartPtr left, const DataPartPtr right) -> bool
{
if (left->info.level < right->info.level)
return true;
else if (left->info.level > right->info.level)
return false;
else
return left->info.mutation < right->info.mutation;
};
std::sort(parts_candidates.begin(), parts_candidates.end(), comparator);
/// From larger to smaller parts
for (const auto & part_candidate_in_partition : parts_candidates | std::views::reverse)
{
if (part->info.contains(part_candidate_in_partition->info)
&& is_appropriate_state(part_candidate_in_partition))
{
String out_reason;
/// Outdated parts can itersect legally (because of DROP_PART) here it's okay, we
/// are trying to do out best to restore covered parts.
auto outcome = parts_for_replacement.tryAddPart(part_candidate_in_partition->info, &out_reason);
if (outcome == ActiveDataPartSet::AddPartOutcome::HasIntersectingPart)
{
error_parts.push_back(part->name);
LOG_ERROR(log, "Failed to restore part {}, because of intersection reason '{}'", part->name, out_reason);
}
}
}
if (parts_for_replacement.size() > 0)
{
std::vector<std::pair<uint64_t, uint64_t>> holes_list;
/// Most part of the code below is just to write pretty message
auto part_infos = parts_for_replacement.getPartInfos();
int64_t current_right_block = part_infos[0].min_block;
for (const auto & top_level_part_to_replace : part_infos)
{
auto data_part_it = data_parts_by_info.find(top_level_part_to_replace);
if (data_part_it == data_parts_by_info.end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot find part {} in own set", top_level_part_to_replace.getPartNameForLogs());
activate_part(data_part_it);
restored.push_back((*data_part_it)->name);
if (top_level_part_to_replace.min_block - current_right_block > 1)
holes_list.emplace_back(current_right_block, top_level_part_to_replace.min_block);
current_right_block = top_level_part_to_replace.max_block;
}
if (part->info.max_block != current_right_block)
holes_list.emplace_back(current_right_block, part->info.max_block);
for (const String & name : restored)
LOG_INFO(log, "Activated part {} in place of unexpected {}", name, part->name);
if (!error_parts.empty() || !holes_list.empty())
{
std::string error_parts_message, holes_list_message;
if (!error_parts.empty())
error_parts_message = fmt::format(" Parts failed to restore because of intersection: [{}]", fmt::join(error_parts, ", "));
if (!holes_list.empty())
{
if (!error_parts.empty())
holes_list_message = ".";
Strings holes_list_pairs;
for (const auto & [left_side, right_side] : holes_list)
holes_list_pairs.push_back(fmt::format("({}, {})", left_side + 1, right_side - 1));
holes_list_message += fmt::format(" Block ranges failed to restore: [{}]", fmt::join(holes_list_pairs, ", "));
}
LOG_WARNING(log, "The set of parts restored in place of {} looks incomplete. "
"SELECT queries may observe gaps in data until this replica is synchronized with other replicas.{}{}",
part->name, error_parts_message, holes_list_message);
}
}
else
{
LOG_INFO(log, "Don't find any parts for replacement instead of unexpected {}", part->name);
}
}
if (removed_active_part || restored_active_part)
resetObjectColumnsFromActiveParts(lock);
}

View File

@ -652,10 +652,9 @@ public:
/// Renames the part to detached/<prefix>_<part> and removes it from data_parts,
//// so it will not be deleted in clearOldParts.
/// If restore_covered is true, adds to the working set inactive parts, which were merged into the deleted part.
/// NOTE: This method is safe to use only for parts which nobody else holds (like on server start or for parts which was not committed).
/// For active parts it's unsafe because this method modifies fields of part (rename) while some other thread can try to read it.
void forcefullyMovePartToDetachedAndRemoveFromMemory(const DataPartPtr & part, const String & prefix = "", bool restore_covered = false);
void forcefullyMovePartToDetachedAndRemoveFromMemory(const DataPartPtr & part, const String & prefix = "");
/// This method should not be here, but async loading of Outdated parts is implemented in MergeTreeData
virtual void forcefullyRemoveBrokenOutdatedPartFromZooKeeperBeforeDetaching(const String & /*part_name*/) {}
@ -1068,6 +1067,7 @@ public:
scope_guard getTemporaryPartDirectoryHolder(const String & part_dir_name) const;
void waitForOutdatedPartsToBeLoaded() const;
void waitForUnexpectedPartsToBeLoaded() const;
bool canUsePolymorphicParts() const;
/// TODO: make enabled by default in the next release if no problems found.
@ -1545,13 +1545,33 @@ protected:
PartLoadingTreeNodes outdated_unloaded_data_parts TSA_GUARDED_BY(outdated_data_parts_mutex);
bool outdated_data_parts_loading_canceled TSA_GUARDED_BY(outdated_data_parts_mutex) = false;
mutable std::mutex unexpected_data_parts_mutex;
mutable std::condition_variable unexpected_data_parts_cv;
struct UnexpectedPartLoadState
{
PartLoadingTree::NodePtr loading_info;
/// if it is covered by any unexpected part
bool uncovered = true;
bool is_broken = false;
MutableDataPartPtr part;
};
BackgroundSchedulePool::TaskHolder unexpected_data_parts_loading_task;
std::vector<UnexpectedPartLoadState> unexpected_data_parts;
bool unexpected_data_parts_loading_canceled TSA_GUARDED_BY(unexpected_data_parts_mutex) = false;
void loadUnexpectedDataParts();
void loadUnexpectedDataPart(UnexpectedPartLoadState & state);
/// This has to be "true" by default, because in case of empty table or absence of Outdated parts
/// it is automatically finished.
std::atomic_bool outdated_data_parts_loading_finished = true;
std::atomic_bool unexpected_data_parts_loading_finished = true;
void loadOutdatedDataParts(bool is_async);
void startOutdatedDataPartsLoadingTask();
void stopOutdatedDataPartsLoadingTask();
void startOutdatedAndUnexpectedDataPartsLoadingTask();
void stopOutdatedAndUnexpectedDataPartsLoadingTask();
static void incrementInsertedPartsProfileEvent(MergeTreeDataPartType type);
static void incrementMergedPartsProfileEvent(MergeTreeDataPartType type);

View File

@ -153,7 +153,7 @@ void StorageMergeTree::startup()
{
background_operations_assignee.start();
startBackgroundMovesIfNeeded();
startOutdatedDataPartsLoadingTask();
startOutdatedAndUnexpectedDataPartsLoadingTask();
}
catch (...)
{
@ -179,7 +179,7 @@ void StorageMergeTree::shutdown(bool)
if (shutdown_called.exchange(true))
return;
stopOutdatedDataPartsLoadingTask();
stopOutdatedAndUnexpectedDataPartsLoadingTask();
/// Unlock all waiting mutations
{

View File

@ -1565,18 +1565,12 @@ bool StorageReplicatedMergeTree::checkPartsImpl(bool skip_sanity_checks)
* But actually we can't precisely determine that ALL missing parts
* covered by this unexpected part. So missing parts will be downloaded.
*/
DataParts unexpected_parts;
/// Intersection of local parts and expected parts
ActiveDataPartSet local_expected_parts_set(format_version);
/// Collect unexpected parts
for (const auto & part : parts)
{
if (expected_parts.contains(part->name))
local_expected_parts_set.add(part->name);
else
unexpected_parts.insert(part); /// this parts we will place to detached with ignored_ prefix
}
/// Which parts should be taken from other replicas.
@ -1588,18 +1582,15 @@ bool StorageReplicatedMergeTree::checkPartsImpl(bool skip_sanity_checks)
paranoidCheckForCoveredPartsInZooKeeperOnStart(expected_parts_vec, parts_to_fetch);
waitForUnexpectedPartsToBeLoaded();
ActiveDataPartSet set_of_empty_unexpected_parts(format_version);
for (const auto & part : parts)
for (const auto & load_state : unexpected_data_parts)
{
if (part->rows_count || part->getState() != MergeTreeDataPartState::Active || expected_parts.contains(part->name))
if (load_state.is_broken || load_state.part->rows_count || !load_state.uncovered)
continue;
if (incomplete_list_of_outdated_parts)
{
LOG_INFO(log, "Outdated parts are not loaded yet, but we may need them to handle dropped parts. Need retry.");
return false;
}
set_of_empty_unexpected_parts.add(part->name);
set_of_empty_unexpected_parts.add(load_state.part->name);
}
if (auto empty_count = set_of_empty_unexpected_parts.size())
LOG_WARNING(log, "Found {} empty unexpected parts (probably some dropped parts were not cleaned up before restart): [{}]",
@ -1618,33 +1609,35 @@ bool StorageReplicatedMergeTree::checkPartsImpl(bool skip_sanity_checks)
std::unordered_set<String> restorable_unexpected_parts;
UInt64 uncovered_unexpected_parts_rows = 0;
for (const auto & part : unexpected_parts)
for (const auto & load_state : unexpected_data_parts)
{
unexpected_parts_rows += part->rows_count;
if (load_state.is_broken)
continue;
unexpected_parts_rows += load_state.part->rows_count;
/// This part may be covered by some expected part that is active and present locally
/// Probably we just did not remove this part from disk before restart (but removed from ZooKeeper)
String covering_local_part = local_expected_parts_set.getContainingPart(part->name);
String covering_local_part = local_expected_parts_set.getContainingPart(load_state.part->name);
if (!covering_local_part.empty())
{
covered_unexpected_parts.push_back(part->name);
covered_unexpected_parts.push_back(load_state.part->name);
continue;
}
String covering_empty_part = set_of_empty_unexpected_parts.getContainingPart(part->name);
String covering_empty_part = set_of_empty_unexpected_parts.getContainingPart(load_state.part->name);
if (!covering_empty_part.empty())
{
LOG_INFO(log, "Unexpected part {} is covered by empty part {}, assuming it has been dropped just before restart",
part->name, covering_empty_part);
covered_unexpected_parts.push_back(part->name);
load_state.part->name, covering_empty_part);
covered_unexpected_parts.push_back(load_state.part->name);
continue;
}
auto covered_parts = local_expected_parts_set.getPartInfosCoveredBy(part->info);
auto covered_parts = local_expected_parts_set.getPartInfosCoveredBy(load_state.part->info);
if (MergeTreePartInfo::areAllBlockNumbersCovered(part->info, covered_parts))
if (MergeTreePartInfo::areAllBlockNumbersCovered(load_state.part->info, covered_parts))
{
restorable_unexpected_parts.insert(part->name);
restorable_unexpected_parts.insert(load_state.part->name);
continue;
}
@ -1658,13 +1651,13 @@ bool StorageReplicatedMergeTree::checkPartsImpl(bool skip_sanity_checks)
}
/// Part is unexpected and we don't have covering part: it's suspicious
uncovered_unexpected_parts.insert(part->name);
uncovered_unexpected_parts_rows += part->rows_count;
uncovered_unexpected_parts.insert(load_state.part->name);
uncovered_unexpected_parts_rows += load_state.part->rows_count;
if (part->info.level > 0)
if (load_state.part->info.level > 0)
{
++unexpected_parts_nonnew;
unexpected_parts_nonnew_rows += part->rows_count;
unexpected_parts_nonnew_rows += load_state.part->rows_count;
}
}
@ -1690,6 +1683,9 @@ bool StorageReplicatedMergeTree::checkPartsImpl(bool skip_sanity_checks)
UInt64 total_rows_on_filesystem = 0;
for (const auto & part : parts)
total_rows_on_filesystem += part->rows_count;
/// We need to sum the rows count of all unexpected data parts;
for (const auto & part : unexpected_data_parts)
total_rows_on_filesystem += part.part->rows_count;
const auto storage_settings_ptr = getSettings();
bool insane = uncovered_unexpected_parts_rows > total_rows_on_filesystem * storage_settings_ptr->replicated_max_ratio_of_wrong_parts;
@ -1731,13 +1727,12 @@ bool StorageReplicatedMergeTree::checkPartsImpl(bool skip_sanity_checks)
/// Add to the queue jobs to pick up the missing parts from other replicas and remove from ZK the information that we have them.
queue.setBrokenPartsToEnqueueFetchesOnLoading(std::move(parts_to_fetch));
/// Remove extra local parts.
for (const DataPartPtr & part : unexpected_parts)
/// detached all unexpected data parts after sanity check.
for (auto & part_state : unexpected_data_parts)
{
bool restore_covered = restorable_unexpected_parts.contains(part->name) || uncovered_unexpected_parts.contains(part->name);
LOG_ERROR(log, "Renaming unexpected part {} to ignored_{}{}", part->name, part->name, restore_covered ? ", restoring covered parts" : "");
forcefullyMovePartToDetachedAndRemoveFromMemory(part, "ignored", restore_covered);
part_state.part->renameToDetached("ignored");
}
unexpected_data_parts.clear();
return true;
}
@ -5114,7 +5109,7 @@ MergeTreeData::MutableDataPartPtr StorageReplicatedMergeTree::fetchExistsPart(
void StorageReplicatedMergeTree::startup()
{
LOG_TRACE(log, "Starting up table");
startOutdatedDataPartsLoadingTask();
startOutdatedAndUnexpectedDataPartsLoadingTask();
if (attach_thread)
{
attach_thread->start();
@ -5317,7 +5312,7 @@ void StorageReplicatedMergeTree::shutdown(bool)
}
session_expired_callback_handler.reset();
stopOutdatedDataPartsLoadingTask();
stopOutdatedAndUnexpectedDataPartsLoadingTask();
partialShutdown();

View File

@ -0,0 +1,71 @@
import pytest
import logging
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance("node", with_zookeeper=True)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
# This test construct intersecting parts intentially. It's not a elegent test.
# TODO(hanfei): write a test which select part 1_1 merging with part 2_2 and drop range.
def test_intersect_parts_when_restart(started_cluster):
node.query(
"""
CREATE TABLE data (
key Int
)
ENGINE = ReplicatedMergeTree('/ch/tables/default/data', 'node')
ORDER BY key;
"""
)
node.query("system stop cleanup data")
node.query("INSERT INTO data values (1)")
node.query("INSERT INTO data values (2)")
node.query("INSERT INTO data values (3)")
node.query("INSERT INTO data values (4)")
node.query("ALTER TABLE data DROP PART 'all_1_1_0'")
node.query("ALTER TABLE data DROP PART 'all_2_2_0'")
node.query("OPTIMIZE TABLE data FINAL")
part_path = node.query(
"SELECT path FROM system.parts WHERE table = 'data' and name = 'all_0_3_1'"
).strip()
assert len(part_path) != 0
node.query("detach table data")
new_path = part_path[:-6] + "1_2_3"
node.exec_in_container(
[
"bash",
"-c",
"cp -r {p} {p1}".format(p=part_path, p1=new_path),
],
privileged=True,
)
# mock empty part
node.exec_in_container(
[
"bash",
"-c",
"echo -n 0 > {p1}/count.txt".format(p1=new_path),
],
privileged=True,
)
node.query("attach table data")
data_size = node.query("SELECT sum(key) FROM data").strip()
assert data_size == "5"

View File

@ -223,4 +223,4 @@ def test_corrupted_unexpected_part_ultimate():
== "1\n"
)
assert node.query("SELECT sum(key) FROM broken_table_3") == "190\n"
assert node.query("SELECT sum(key) FROM broken_table_3") == "145\n"