Trying to fix WAL log

This commit is contained in:
alesapin 2022-06-05 01:05:15 +02:00
parent 083b15fea0
commit a2704bc541
5 changed files with 57 additions and 24 deletions

View File

@ -189,6 +189,9 @@ void DiskObjectStorage::moveFile(const String & from_path, const String & to_pat
if (exists(to_path))
throw Exception("File already exists: " + to_path, ErrorCodes::FILE_ALREADY_EXISTS);
if (!exists(from_path))
throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "File {} doesn't exist, cannot move", to_path);
if (should_send_metadata)
{
auto revision = metadata_helper->revision_counter + 1;
@ -215,10 +218,13 @@ void DiskObjectStorage::replaceFile(const String & from_path, const String & to_
{
if (exists(to_path))
{
const String tmp_path = to_path + ".old";
moveFile(to_path, tmp_path);
moveFile(from_path, to_path);
removeFile(tmp_path);
auto blobs = metadata_storage->getRemotePaths(to_path);
auto tx = metadata_storage->createTransaction();
metadata_storage->replaceFile(from_path, to_path, tx);
tx->commit();
removeFromRemoteFS(blobs);
}
else
moveFile(from_path, to_path);

View File

@ -13,8 +13,6 @@ namespace ErrorCodes
{
extern const int FS_METADATA_ERROR;
extern const int LOGICAL_ERROR;
extern const int CANNOT_OPEN_FILE;
extern const int FILE_DOESNT_EXIST;
}
@ -79,28 +77,26 @@ class UnlinkFileOperation final : public IMetadataOperation
{
std::string path;
IDisk & disk;
std::string temp_filepath;
std::string prev_data;
public:
UnlinkFileOperation(const std::string & path_, IDisk & disk_)
: path(path_)
, disk(disk_)
, temp_filepath(getTempFileName())
{
}
void execute() override
{
disk.moveFile(path, temp_filepath);
auto buf = disk.readFile(path);
readStringUntilEOF(prev_data, *buf);
disk.removeFile(path);
}
void undo() override
{
disk.moveFile(temp_filepath, path);
}
void finalize() override
{
disk.removeFileIfExists(temp_filepath);
auto buf = disk.writeFile(path);
writeString(prev_data, *buf);
buf->finalize();
}
};
@ -175,7 +171,17 @@ public:
void execute() override
{
disk.removeDirectory(path);
try
{
disk.removeDirectory(path);
}
catch (...)
{
std::vector<std::string> files;
disk.listFiles(path, files);
LOG_DEBUG(&Poco::Logger::get("DEBUG"), "GOT FIlES {}", fmt::join(files, ", "));
throw;
}
}
void undo() override
@ -573,6 +579,7 @@ void MetadataStorageFromDisk::createHardLink(const std::string & path_from, cons
auto metadata = readMetadata(path_from);
metadata->incrementRefCount();
writeMetadataToFile(path_from, transaction, metadata->serializeToString());
transaction->addOperation(std::make_unique<CreateHardlinkOperation>(path_from, path_to, *disk));
@ -681,15 +688,12 @@ uint32_t MetadataStorageFromDisk::unlinkAndGetHardlinkCount(const std::string &
{
auto metadata = readMetadata(path);
uint32_t ref_count = metadata->getRefCount();
if (ref_count == 0)
{
unlinkFile(path, transaction);
}
else
if (ref_count != 0)
{
metadata->decrementRefCount();
writeMetadataToFile(path, transaction, metadata->serializeToString());
}
unlinkFile(path, transaction);
return ref_count;
}

View File

@ -1957,6 +1957,7 @@ void MergeTreeData::dropAllData()
data_parts_indexes.clear();
column_sizes.clear();
write_ahead_log->shutdown();
/// Tables in atomic databases have UUID and stored in persistent locations.
/// No need to drop caches (that are keyed by filesystem path) because collision is not possible.

View File

@ -46,9 +46,14 @@ MergeTreeWriteAheadLog::MergeTreeWriteAheadLog(
MergeTreeWriteAheadLog::~MergeTreeWriteAheadLog()
{
std::unique_lock lock(write_mutex);
if (sync_scheduled)
sync_cv.wait(lock, [this] { return !sync_scheduled; });
try
{
shutdown();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
void MergeTreeWriteAheadLog::init()
@ -252,6 +257,21 @@ void MergeTreeWriteAheadLog::sync(std::unique_lock<std::mutex> & lock)
sync_cv.wait(lock, [this] { return !sync_scheduled; });
}
void MergeTreeWriteAheadLog::shutdown()
{
std::unique_lock lock(write_mutex);
if (shutted_down)
return;
if (sync_scheduled)
sync_cv.wait(lock, [this] { return !sync_scheduled; });
shutted_down = true;
sync_task->deactivate();
out->finalize();
out.reset();
}
std::optional<MergeTreeWriteAheadLog::MinMaxBlockNumber>
MergeTreeWriteAheadLog::tryParseMinMaxBlockNumber(const String & filename)
{

View File

@ -66,6 +66,7 @@ public:
using MinMaxBlockNumber = std::pair<Int64, Int64>;
static std::optional<MinMaxBlockNumber> tryParseMinMaxBlockNumber(const String & filename);
void shutdown();
private:
void init();
@ -89,6 +90,7 @@ private:
size_t bytes_at_last_sync = 0;
bool sync_scheduled = false;
bool shutted_down = false;
mutable std::mutex write_mutex;