Speed-up parts removal #6372

This commit is contained in:
Alexey Milovidov 2019-08-11 22:14:42 +03:00
parent cce3ab08bb
commit ed7db76c94
6 changed files with 77 additions and 12 deletions

View File

@ -1064,17 +1064,58 @@ void MergeTreeData::removePartsFinally(const MergeTreeData::DataPartsVector & pa
void MergeTreeData::clearOldPartsFromFilesystem()
{
auto parts_to_remove = grabOldParts();
for (const DataPartPtr & part : parts_to_remove)
{
LOG_DEBUG(log, "Removing part from filesystem " << part->name);
part->remove();
}
DataPartsVector parts_to_remove = grabOldParts();
clearPartsFromFilesystem(parts_to_remove);
removePartsFinally(parts_to_remove);
}
void MergeTreeData::clearPartsFromFilesystem(const DataPartsVector & parts_to_remove)
{
if (parts_to_remove.size() > 1 && settings.max_part_removal_threads > 1 && parts_to_remove.size() > settings.concurrent_part_removal_threshold)
{
/// Parallel parts removal.
size_t num_threads = std::min(size_t(settings.max_part_removal_threads), parts_to_remove.size());
std::mutex mutex;
ThreadPool pool(num_threads);
DataPartsVector parts_to_process = parts_to_remove;
/// NOTE: Under heavy system load you may get "Cannot schedule a task" from ThreadPool.
for (size_t i = 0; i < num_threads; ++i)
{
pool.schedule([&]
{
for (auto & part : parts_to_process)
{
/// Take out a part to remove.
DataPartPtr part_to_remove;
{
std::lock_guard lock(mutex);
if (!part)
continue;
std::swap(part_to_remove, part);
}
LOG_DEBUG(log, "Removing part from filesystem " << part_to_remove->name);
part_to_remove->remove();
}
});
}
pool.wait();
}
else
{
for (const DataPartPtr & part : parts_to_remove)
{
LOG_DEBUG(log, "Removing part from filesystem " << part->name);
part->remove();
}
}
}
void MergeTreeData::setPath(const String & new_full_path)
{
if (Poco::File{new_full_path}.exists())
@ -1094,6 +1135,8 @@ void MergeTreeData::dropAllData()
LOG_TRACE(log, "dropAllData: removing data from memory.");
DataPartsVector all_parts(data_parts_by_info.begin(), data_parts_by_info.end());
data_parts_indexes.clear();
column_sizes.clear();
@ -1102,8 +1145,7 @@ void MergeTreeData::dropAllData()
LOG_TRACE(log, "dropAllData: removing data from filesystem.");
/// Removing of each data part before recursive removal of directory is to speed-up removal, because there will be less number of syscalls.
for (DataPartPtr part : data_parts_by_info) /// a copy intended
part->remove();
clearPartsFromFilesystem(all_parts);
Poco::File(full_path).remove(true);

View File

@ -477,6 +477,7 @@ public:
/// Delete irrelevant parts from memory and disk.
void clearOldPartsFromFilesystem();
void clearPartsFromFilesystem(const DataPartsVector & parts);
/// Delete all directories which names begin with "tmp"
/// Set non-negative parameter value to override MergeTreeSettings temporary_directories_lifetime

View File

@ -80,7 +80,9 @@ struct MergeTreeSettings : public SettingsCollection<MergeTreeSettings>
M(SettingUInt64, index_granularity_bytes, 10 * 1024 * 1024, "Approximate amount of bytes in single granule (0 - disabled).") \
M(SettingInt64, merge_with_ttl_timeout, 3600 * 24, "Minimal time in seconds, when merge with TTL can be repeated.") \
M(SettingBool, write_final_mark, 1, "Write final mark after end of column (0 - disabled, do nothing if index_granularity_bytes=0)") \
M(SettingBool, enable_mixed_granularity_parts, 0, "Enable parts with adaptive and non adaptive granularity")
M(SettingBool, enable_mixed_granularity_parts, 0, "Enable parts with adaptive and non adaptive granularity") \
M(SettingMaxThreads, max_part_removal_threads, 0, "The number of theads for concurrent removal of inactive data parts. One is usually enough, but in 'Google Compute Environment SSD Persistent Disks' file removal (unlink) operation is extraordinarily slow and you probably have to increase this number (recommended is up to 16).") \
M(SettingUInt64, concurrent_part_removal_threshold, 100, "Activate concurrent part removal (see 'max_part_removal_threads') only if the number of inactive data parts is at least this.") \
DECLARE_SETTINGS_COLLECTION(LIST_OF_MERGE_TREE_SETTINGS)

View File

@ -761,7 +761,7 @@ BackgroundProcessingPoolTaskResult StorageMergeTree::backgroundTask()
else
return BackgroundProcessingPoolTaskResult::ERROR;
}
catch (Exception & e)
catch (const Exception & e)
{
if (e.code() == ErrorCodes::ABORTED)
{

View File

@ -0,0 +1,2 @@
1000 499500
1000 499500

View File

@ -0,0 +1,18 @@
DROP TABLE IF EXISTS mt;
CREATE TABLE mt (x UInt64) ENGINE = MergeTree ORDER BY x SETTINGS max_part_removal_threads = 16, cleanup_delay_period = 1, cleanup_delay_period_random_add = 0, old_parts_lifetime = 1, parts_to_delay_insert = 100000, parts_to_throw_insert = 100000;
SYSTEM STOP MERGES;
SET max_block_size = 1, min_insert_block_size_rows = 0, min_insert_block_size_bytes = 0;
INSERT INTO mt SELECT * FROM numbers(1000);
SET max_block_size = 65536;
SELECT count(), sum(x) FROM mt;
SYSTEM START MERGES;
OPTIMIZE TABLE mt FINAL;
SELECT count(), sum(x) FROM mt;
DROP TABLE mt;