Use disk operation to serialize and deserialize meta files of StorageFileLog

This commit is contained in:
flynn 2022-11-30 10:34:00 +00:00
parent 3eec147906
commit 6a625e4b9b
2 changed files with 38 additions and 48 deletions

View File

@ -1,6 +1,7 @@
#include <DataTypes/DataTypeLowCardinality.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <Disks/StoragePolicy.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromFile.h>
@ -17,11 +18,11 @@
#include <Storages/StorageFactory.h>
#include <Storages/StorageMaterializedView.h>
#include <Storages/checkAndGetLiteralArgument.h>
#include <Common/logger_useful.h>
#include <Common/Exception.h>
#include <Common/Macros.h>
#include <Common/filesystemHelpers.h>
#include <Common/getNumberOfPhysicalCPUCores.h>
#include <Common/logger_useful.h>
#include <sys/stat.h>
@ -64,6 +65,7 @@ StorageFileLog::StorageFileLog(
, metadata_base_path(std::filesystem::path(metadata_base_path_) / "metadata")
, format_name(format_name_)
, log(&Poco::Logger::get("StorageFileLog (" + table_id_.table_name + ")"))
, disk(getStoragePolicy()->getDisks()[0])
, milliseconds_to_wait(filelog_settings->poll_directory_watch_events_backoff_init.totalMilliseconds())
{
StorageInMemoryMetadata storage_metadata;
@ -75,21 +77,14 @@ StorageFileLog::StorageFileLog(
{
if (!attach)
{
std::error_code ec;
std::filesystem::create_directories(metadata_base_path, ec);
if (ec)
if (disk->exists(metadata_base_path))
{
if (ec == std::make_error_code(std::errc::file_exists))
{
throw Exception(ErrorCodes::TABLE_METADATA_ALREADY_EXISTS,
"Metadata files already exist by path: {}, remove them manually if it is intended",
metadata_base_path);
}
else
throw Exception(ErrorCodes::DIRECTORY_DOESNT_EXIST,
"Could not create directory {}, reason: {}", metadata_base_path, ec.message());
throw Exception(
ErrorCodes::TABLE_METADATA_ALREADY_EXISTS,
"Metadata files already exist by path: {}, remove them manually if it is intended",
metadata_base_path);
}
disk->createDirectories(metadata_base_path);
}
loadMetaFiles(attach);
@ -119,17 +114,8 @@ void StorageFileLog::loadMetaFiles(bool attach)
{
const auto & storage = getStorageID();
auto metadata_path_exist = std::filesystem::exists(metadata_base_path);
auto previous_path = std::filesystem::path(getContext()->getPath()) / ".filelog_storage_metadata" / storage.getDatabaseName() / storage.getTableName();
/// For compatibility with the previous path version.
if (std::filesystem::exists(previous_path) && !metadata_path_exist)
{
std::filesystem::copy(previous_path, metadata_base_path, std::filesystem::copy_options::recursive);
std::filesystem::remove_all(previous_path);
}
/// Meta file may lost, log and create directory
else if (!metadata_path_exist)
if (!disk->exists(metadata_base_path))
{
/// Create metadata_base_path directory when store meta data
LOG_ERROR(log, "Metadata files of table {} are lost.", getStorageID().getTableName());
@ -228,70 +214,70 @@ void StorageFileLog::serialize() const
for (const auto & [inode, meta] : file_infos.meta_by_inode)
{
auto full_name = getFullMetaPath(meta.file_name);
if (!std::filesystem::exists(full_name))
if (!disk->exists(full_name))
{
FS::createFile(full_name);
disk->createFile(full_name);
}
else
{
checkOffsetIsValid(full_name, meta.last_writen_position);
}
WriteBufferFromFile out(full_name);
writeIntText(inode, out);
writeChar('\n', out);
writeIntText(meta.last_writen_position, out);
auto out = disk->writeFile(full_name);
writeIntText(inode, *out);
writeChar('\n', *out);
writeIntText(meta.last_writen_position, *out);
}
}
void StorageFileLog::serialize(UInt64 inode, const FileMeta & file_meta) const
{
auto full_name = getFullMetaPath(file_meta.file_name);
if (!std::filesystem::exists(full_name))
if (!disk->exists(full_name))
{
FS::createFile(full_name);
disk->createFile(full_name);
}
else
{
checkOffsetIsValid(full_name, file_meta.last_writen_position);
}
WriteBufferFromFile out(full_name);
writeIntText(inode, out);
writeChar('\n', out);
writeIntText(file_meta.last_writen_position, out);
auto out = disk->writeFile(full_name);
writeIntText(inode, *out);
writeChar('\n', *out);
writeIntText(file_meta.last_writen_position, *out);
}
void StorageFileLog::deserialize()
{
if (!std::filesystem::exists(metadata_base_path))
if (!disk->exists(metadata_base_path))
return;
/// In case of single file (not a watched directory),
/// iterated directory always has one file inside.
for (const auto & dir_entry : std::filesystem::directory_iterator{metadata_base_path})
for (const auto dir_iter = disk->iterateDirectory(metadata_base_path); dir_iter->isValid(); dir_iter->next())
{
if (!dir_entry.is_regular_file())
if (!disk->isFile(dir_iter->name()))
{
throw Exception(
ErrorCodes::BAD_FILE_TYPE,
"The file {} under {} is not a regular file when deserializing meta files",
dir_entry.path().c_str(),
dir_iter->name(),
metadata_base_path);
}
ReadBufferFromFile in(dir_entry.path().c_str());
auto in = disk->readFile(dir_iter->name());
FileMeta meta;
UInt64 inode, last_written_pos;
if (!tryReadIntText(inode, in))
if (!tryReadIntText(inode, *in))
{
throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Read meta file {} failed", dir_entry.path().c_str());
throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Read meta file {} failed", dir_iter->path());
}
assertChar('\n', in);
if (!tryReadIntText(last_written_pos, in))
assertChar('\n', *in);
if (!tryReadIntText(last_written_pos, *in))
{
throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Read meta file {} failed", dir_entry.path().c_str());
throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Read meta file {} failed", dir_iter->path());
}
meta.file_name = dir_entry.path().filename();
meta.file_name = dir_iter->name();
meta.last_writen_position = last_written_pos;
file_infos.meta_by_inode.emplace(inode, meta);

View File

@ -1,5 +1,7 @@
#pragma once
#include <Disks/IDisk.h>
#include <Storages/FileLog/Buffer_fwd.h>
#include <Storages/FileLog/FileLogDirectoryWatcher.h>
#include <Storages/FileLog/FileLogSettings.h>
@ -147,6 +149,8 @@ private:
const String format_name;
Poco::Logger * log;
DiskPtr disk;
uint64_t milliseconds_to_wait;
/// In order to avoid data race, using a naive trick to forbid execute two select
@ -198,7 +202,7 @@ private:
void serialize(UInt64 inode, const FileMeta & file_meta) const;
void deserialize();
static void checkOffsetIsValid(const String & full_name, UInt64 offset);
void checkOffsetIsValid(const String & full_name, UInt64 offset) const;
};
}