Merge pull request #40592 from azat/inmemory-WAL-cleanup

Remove completely processed WAL files
This commit is contained in:
Anton Popov 2022-09-05 14:15:19 +02:00 committed by GitHub
commit f11b7499d1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 145 additions and 45 deletions

View File

@ -1177,14 +1177,10 @@ void MergeTreeData::loadDataPartsFromDisk(
void MergeTreeData::loadDataPartsFromWAL( void MergeTreeData::loadDataPartsFromWAL(
DataPartsVector & /* broken_parts_to_detach */, DataPartsVector & /* broken_parts_to_detach */,
DataPartsVector & duplicate_parts_to_remove, DataPartsVector & duplicate_parts_to_remove,
MutableDataPartsVector & parts_from_wal, MutableDataPartsVector & parts_from_wal)
DataPartsLock & part_lock)
{ {
for (auto & part : parts_from_wal) for (auto & part : parts_from_wal)
{ {
if (getActiveContainingPart(part->info, DataPartState::Active, part_lock))
continue;
part->modification_time = time(nullptr); part->modification_time = time(nullptr);
/// Assume that all parts are Active, covered parts will be detected and marked as Outdated later /// Assume that all parts are Active, covered parts will be detected and marked as Outdated later
part->setState(DataPartState::Active); part->setState(DataPartState::Active);
@ -1212,7 +1208,6 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
auto metadata_snapshot = getInMemoryMetadataPtr(); auto metadata_snapshot = getInMemoryMetadataPtr();
const auto settings = getSettings(); const auto settings = getSettings();
MutableDataPartsVector parts_from_wal;
Strings part_file_names; Strings part_file_names;
auto disks = getStoragePolicy()->getDisks(); auto disks = getStoragePolicy()->getDisks();
@ -1269,16 +1264,14 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
/// Collect part names by disk. /// Collect part names by disk.
std::map<String, std::vector<std::pair<String, DiskPtr>>> disk_part_map; std::map<String, std::vector<std::pair<String, DiskPtr>>> disk_part_map;
std::map<String, MutableDataPartsVector> disk_wal_part_map;
ThreadPool pool(disks.size()); ThreadPool pool(disks.size());
std::mutex wal_init_lock;
for (const auto & disk_ptr : disks) for (const auto & disk_ptr : disks)
{ {
if (disk_ptr->isBroken()) if (disk_ptr->isBroken())
continue; continue;
auto & disk_parts = disk_part_map[disk_ptr->getName()]; auto & disk_parts = disk_part_map[disk_ptr->getName()];
auto & disk_wal_parts = disk_wal_part_map[disk_ptr->getName()];
pool.scheduleOrThrowOnError([&, disk_ptr]() pool.scheduleOrThrowOnError([&, disk_ptr]()
{ {
@ -1291,34 +1284,11 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
if (!startsWith(it->name(), MergeTreeWriteAheadLog::WAL_FILE_NAME)) if (!startsWith(it->name(), MergeTreeWriteAheadLog::WAL_FILE_NAME))
disk_parts.emplace_back(std::make_pair(it->name(), disk_ptr)); disk_parts.emplace_back(std::make_pair(it->name(), disk_ptr));
else if (it->name() == MergeTreeWriteAheadLog::DEFAULT_WAL_FILE_NAME && settings->in_memory_parts_enable_wal)
{
std::lock_guard lock(wal_init_lock);
if (write_ahead_log != nullptr)
throw Exception(
"There are multiple WAL files appeared in current storage policy. You need to resolve this manually",
ErrorCodes::CORRUPTED_DATA);
write_ahead_log = std::make_shared<MergeTreeWriteAheadLog>(*this, disk_ptr, it->name());
for (auto && part : write_ahead_log->restore(metadata_snapshot, getContext()))
disk_wal_parts.push_back(std::move(part));
}
else if (settings->in_memory_parts_enable_wal)
{
MergeTreeWriteAheadLog wal(*this, disk_ptr, it->name());
for (auto && part : wal.restore(metadata_snapshot, getContext()))
disk_wal_parts.push_back(std::move(part));
}
} }
}); });
} }
pool.wait(); pool.wait();
for (auto & [_, disk_wal_parts] : disk_wal_part_map)
parts_from_wal.insert(
parts_from_wal.end(), std::make_move_iterator(disk_wal_parts.begin()), std::make_move_iterator(disk_wal_parts.end()));
size_t num_parts = 0; size_t num_parts = 0;
std::queue<std::vector<std::pair<String, DiskPtr>>> parts_queue; std::queue<std::vector<std::pair<String, DiskPtr>>> parts_queue;
for (auto & [_, disk_parts] : disk_part_map) for (auto & [_, disk_parts] : disk_part_map)
@ -1332,13 +1302,6 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
auto part_lock = lockParts(); auto part_lock = lockParts();
data_parts_indexes.clear(); data_parts_indexes.clear();
if (num_parts == 0 && parts_from_wal.empty())
{
resetObjectColumnsFromActiveParts(part_lock);
LOG_DEBUG(log, "There are no data parts");
return;
}
DataPartsVector broken_parts_to_detach; DataPartsVector broken_parts_to_detach;
DataPartsVector duplicate_parts_to_remove; DataPartsVector duplicate_parts_to_remove;
@ -1346,8 +1309,65 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
loadDataPartsFromDisk( loadDataPartsFromDisk(
broken_parts_to_detach, duplicate_parts_to_remove, pool, num_parts, parts_queue, skip_sanity_checks, settings); broken_parts_to_detach, duplicate_parts_to_remove, pool, num_parts, parts_queue, skip_sanity_checks, settings);
if (!parts_from_wal.empty()) if (settings->in_memory_parts_enable_wal)
loadDataPartsFromWAL(broken_parts_to_detach, duplicate_parts_to_remove, parts_from_wal, part_lock); {
std::map<String, MutableDataPartsVector> disk_wal_part_map;
std::mutex wal_init_lock;
for (const auto & disk_ptr : disks)
{
if (disk_ptr->isBroken())
continue;
auto & disk_wal_parts = disk_wal_part_map[disk_ptr->getName()];
pool.scheduleOrThrowOnError([&, disk_ptr]()
{
for (auto it = disk_ptr->iterateDirectory(relative_data_path); it->isValid(); it->next())
{
if (!startsWith(it->name(), MergeTreeWriteAheadLog::WAL_FILE_NAME))
continue;
if (it->name() == MergeTreeWriteAheadLog::DEFAULT_WAL_FILE_NAME)
{
std::lock_guard lock(wal_init_lock);
if (write_ahead_log != nullptr)
throw Exception(
"There are multiple WAL files appeared in current storage policy. You need to resolve this manually",
ErrorCodes::CORRUPTED_DATA);
write_ahead_log = std::make_shared<MergeTreeWriteAheadLog>(*this, disk_ptr, it->name());
for (auto && part : write_ahead_log->restore(metadata_snapshot, getContext(), part_lock))
disk_wal_parts.push_back(std::move(part));
}
else
{
MergeTreeWriteAheadLog wal(*this, disk_ptr, it->name());
for (auto && part : wal.restore(metadata_snapshot, getContext(), part_lock))
disk_wal_parts.push_back(std::move(part));
}
}
});
}
pool.wait();
MutableDataPartsVector parts_from_wal;
for (auto & [_, disk_wal_parts] : disk_wal_part_map)
parts_from_wal.insert(
parts_from_wal.end(), std::make_move_iterator(disk_wal_parts.begin()), std::make_move_iterator(disk_wal_parts.end()));
loadDataPartsFromWAL(broken_parts_to_detach, duplicate_parts_to_remove, parts_from_wal);
num_parts += parts_from_wal.size();
}
if (num_parts == 0)
{
resetObjectColumnsFromActiveParts(part_lock);
LOG_DEBUG(log, "There are no data parts");
return;
}
for (auto & part : broken_parts_to_detach) for (auto & part : broken_parts_to_detach)
{ {

View File

@ -1339,8 +1339,7 @@ private:
void loadDataPartsFromWAL( void loadDataPartsFromWAL(
DataPartsVector & broken_parts_to_detach, DataPartsVector & broken_parts_to_detach,
DataPartsVector & duplicate_parts_to_remove, DataPartsVector & duplicate_parts_to_remove,
MutableDataPartsVector & parts_from_wal, MutableDataPartsVector & parts_from_wal);
DataPartsLock & part_lock);
void resetObjectColumnsFromActiveParts(const DataPartsLock & lock); void resetObjectColumnsFromActiveParts(const DataPartsLock & lock);
void updateObjectColumns(const DataPartPtr & part, const DataPartsLock & lock); void updateObjectColumns(const DataPartPtr & part, const DataPartsLock & lock);

View File

@ -3,6 +3,7 @@
#include <Storages/MergeTree/MergeTreeData.h> #include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MergeTreeDataWriter.h> #include <Storages/MergeTree/MergeTreeDataWriter.h>
#include <Storages/MergeTree/MergedBlockOutputStream.h> #include <Storages/MergeTree/MergedBlockOutputStream.h>
#include <Storages/MergeTree/MergeTreeDataPartState.h>
#include <IO/MemoryReadWriteBuffer.h> #include <IO/MemoryReadWriteBuffer.h>
#include <IO/ReadHelpers.h> #include <IO/ReadHelpers.h>
#include <IO/copyData.h> #include <IO/copyData.h>
@ -122,7 +123,10 @@ void MergeTreeWriteAheadLog::rotate(const std::unique_lock<std::mutex> &)
init(); init();
} }
MergeTreeData::MutableDataPartsVector MergeTreeWriteAheadLog::restore(const StorageMetadataPtr & metadata_snapshot, ContextPtr context) MergeTreeData::MutableDataPartsVector MergeTreeWriteAheadLog::restore(
const StorageMetadataPtr & metadata_snapshot,
ContextPtr context,
std::unique_lock<std::mutex> & parts_lock)
{ {
std::unique_lock lock(write_mutex); std::unique_lock lock(write_mutex);
@ -172,6 +176,9 @@ MergeTreeData::MutableDataPartsVector MergeTreeWriteAheadLog::restore(const Stor
part->uuid = metadata.part_uuid; part->uuid = metadata.part_uuid;
block = block_in.read(); block = block_in.read();
if (storage.getActiveContainingPart(part->info, MergeTreeDataPartState::Active, parts_lock))
continue;
} }
else else
{ {
@ -238,6 +245,15 @@ MergeTreeData::MutableDataPartsVector MergeTreeWriteAheadLog::restore(const Stor
std::copy_if(parts.begin(), parts.end(), std::back_inserter(result), std::copy_if(parts.begin(), parts.end(), std::back_inserter(result),
[&dropped_parts](const auto & part) { return dropped_parts.count(part->name) == 0; }); [&dropped_parts](const auto & part) { return dropped_parts.count(part->name) == 0; });
/// All parts in WAL had been already committed into the disk -> clear the WAL
if (result.empty())
{
LOG_DEBUG(log, "WAL file '{}' had been completely processed. Removing.", path);
disk->removeFile(path);
init();
return {};
}
return result; return result;
} }

View File

@ -62,7 +62,10 @@ public:
void addPart(DataPartInMemoryPtr & part); void addPart(DataPartInMemoryPtr & part);
void dropPart(const String & part_name); void dropPart(const String & part_name);
std::vector<MergeTreeMutableDataPartPtr> restore(const StorageMetadataPtr & metadata_snapshot, ContextPtr context); std::vector<MergeTreeMutableDataPartPtr> restore(
const StorageMetadataPtr & metadata_snapshot,
ContextPtr context,
std::unique_lock<std::mutex> & parts_lock);
using MinMaxBlockNumber = std::pair<Int64, Int64>; using MinMaxBlockNumber = std::pair<Int64, Int64>;
static std::optional<MinMaxBlockNumber> tryParseMinMaxBlockNumber(const String & filename); static std::optional<MinMaxBlockNumber> tryParseMinMaxBlockNumber(const String & filename);

View File

@ -0,0 +1,35 @@
-- { echo }
DROP TABLE IF EXISTS in_memory;
CREATE TABLE in_memory (a UInt32) ENGINE = MergeTree ORDER BY a SETTINGS min_rows_for_compact_part = 1000, min_bytes_for_wide_part = 10485760;
INSERT INTO in_memory VALUES (1);
INSERT INTO in_memory VALUES (2);
SELECT name, active, part_type FROM system.parts WHERE database = currentDatabase() AND table = 'in_memory';
all_1_1_0 1 InMemory
all_2_2_0 1 InMemory
SELECT * FROM in_memory ORDER BY a;
1
2
-- no WAL remove since parts are still in use
DETACH TABLE in_memory;
ATTACH TABLE in_memory;
SELECT name, active, part_type FROM system.parts WHERE database = currentDatabase() AND table = 'in_memory';
all_1_1_0 1 InMemory
all_2_2_0 1 InMemory
SELECT * FROM in_memory ORDER BY a;
1
2
-- WAL should be removed, since on disk part covers all parts in WAL
OPTIMIZE TABLE in_memory;
DETACH TABLE in_memory;
ATTACH TABLE in_memory;
SELECT name, active, part_type FROM system.parts WHERE database = currentDatabase() AND table = 'in_memory';
all_1_2_1 1 Compact
-- check that the WAL will be reinitialized after remove
INSERT INTO in_memory VALUES (3);
DETACH TABLE in_memory;
ATTACH TABLE in_memory;
SELECT * FROM in_memory ORDER BY a;
1
2
3

View File

@ -0,0 +1,27 @@
-- { echo }
DROP TABLE IF EXISTS in_memory;
CREATE TABLE in_memory (a UInt32) ENGINE = MergeTree ORDER BY a SETTINGS min_rows_for_compact_part = 1000, min_bytes_for_wide_part = 10485760;
INSERT INTO in_memory VALUES (1);
INSERT INTO in_memory VALUES (2);
SELECT name, active, part_type FROM system.parts WHERE database = currentDatabase() AND table = 'in_memory';
SELECT * FROM in_memory ORDER BY a;
-- no WAL remove since parts are still in use
DETACH TABLE in_memory;
ATTACH TABLE in_memory;
SELECT name, active, part_type FROM system.parts WHERE database = currentDatabase() AND table = 'in_memory';
SELECT * FROM in_memory ORDER BY a;
-- WAL should be removed, since on disk part covers all parts in WAL
OPTIMIZE TABLE in_memory;
DETACH TABLE in_memory;
ATTACH TABLE in_memory;
SELECT name, active, part_type FROM system.parts WHERE database = currentDatabase() AND table = 'in_memory';
-- check that the WAL will be reinitialized after remove
INSERT INTO in_memory VALUES (3);
DETACH TABLE in_memory;
ATTACH TABLE in_memory;
SELECT * FROM in_memory ORDER BY a;