mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 08:40:50 +00:00
Fix incorrect order of operations in disk transactions
(cherry picked from commit 0d3904b788
)
This commit is contained in:
parent
7c721578c7
commit
9a89154352
@ -8,6 +8,11 @@ DiskDecorator::DiskDecorator(const DiskPtr & delegate_) : delegate(delegate_)
|
|||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
|
DiskTransactionPtr DiskDecorator::createTransaction()
|
||||||
|
{
|
||||||
|
return delegate->createTransaction();
|
||||||
|
}
|
||||||
|
|
||||||
const String & DiskDecorator::getName() const
|
const String & DiskDecorator::getName() const
|
||||||
{
|
{
|
||||||
return delegate->getName();
|
return delegate->getName();
|
||||||
|
@ -12,6 +12,8 @@ class DiskDecorator : public IDisk
|
|||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
explicit DiskDecorator(const DiskPtr & delegate_);
|
explicit DiskDecorator(const DiskPtr & delegate_);
|
||||||
|
|
||||||
|
DiskTransactionPtr createTransaction() override;
|
||||||
const String & getName() const override;
|
const String & getName() const override;
|
||||||
ReservationPtr reserve(UInt64 bytes) override;
|
ReservationPtr reserve(UInt64 bytes) override;
|
||||||
~DiskDecorator() override = default;
|
~DiskDecorator() override = default;
|
||||||
|
@ -108,7 +108,6 @@ public:
|
|||||||
|
|
||||||
/// Create hardlink from `src_path` to `dst_path`.
|
/// Create hardlink from `src_path` to `dst_path`.
|
||||||
virtual void createHardLink(const std::string & src_path, const std::string & dst_path) = 0;
|
virtual void createHardLink(const std::string & src_path, const std::string & dst_path) = 0;
|
||||||
|
|
||||||
};
|
};
|
||||||
|
|
||||||
using DiskTransactionPtr = std::shared_ptr<IDiskTransaction>;
|
using DiskTransactionPtr = std::shared_ptr<IDiskTransaction>;
|
||||||
|
@ -3,6 +3,7 @@
|
|||||||
#include <Common/checkStackSize.h>
|
#include <Common/checkStackSize.h>
|
||||||
#include <Common/getRandomASCIIString.h>
|
#include <Common/getRandomASCIIString.h>
|
||||||
#include <ranges>
|
#include <ranges>
|
||||||
|
#include <Common/logger_useful.h>
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
@ -277,6 +278,8 @@ struct WriteFileObjectStorageOperation final : public IDiskObjectStorageOperatio
|
|||||||
{
|
{
|
||||||
std::string path;
|
std::string path;
|
||||||
std::string blob_path;
|
std::string blob_path;
|
||||||
|
size_t size;
|
||||||
|
std::function<void(MetadataTransactionPtr)> on_execute;
|
||||||
|
|
||||||
WriteFileObjectStorageOperation(
|
WriteFileObjectStorageOperation(
|
||||||
IObjectStorage & object_storage_,
|
IObjectStorage & object_storage_,
|
||||||
@ -288,9 +291,14 @@ struct WriteFileObjectStorageOperation final : public IDiskObjectStorageOperatio
|
|||||||
, blob_path(blob_path_)
|
, blob_path(blob_path_)
|
||||||
{}
|
{}
|
||||||
|
|
||||||
void execute(MetadataTransactionPtr) override
|
void setOnExecute(std::function<void(MetadataTransactionPtr)> && on_execute_)
|
||||||
{
|
{
|
||||||
|
on_execute = on_execute_;
|
||||||
|
}
|
||||||
|
|
||||||
|
void execute(MetadataTransactionPtr tx) override
|
||||||
|
{
|
||||||
|
on_execute(tx);
|
||||||
}
|
}
|
||||||
|
|
||||||
void undo() override
|
void undo() override
|
||||||
@ -368,6 +376,7 @@ void DiskObjectStorageTransaction::createDirectory(const std::string & path)
|
|||||||
|
|
||||||
void DiskObjectStorageTransaction::createDirectories(const std::string & path)
|
void DiskObjectStorageTransaction::createDirectories(const std::string & path)
|
||||||
{
|
{
|
||||||
|
LOG_DEBUG(&Poco::Logger::get("DEBUG"), "CREATE DIRECTORIES TRANSACTION FOR PATH {}", path);
|
||||||
operations_to_execute.emplace_back(
|
operations_to_execute.emplace_back(
|
||||||
std::make_unique<PureMetadataObjectStorageOperation>(object_storage, metadata_storage, [path](MetadataTransactionPtr tx)
|
std::make_unique<PureMetadataObjectStorageOperation>(object_storage, metadata_storage, [path](MetadataTransactionPtr tx)
|
||||||
{
|
{
|
||||||
@ -477,6 +486,14 @@ String revisionToString(UInt64 revision)
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
std::string DiskObjectStorageTransaction::getUniqueId(const std::string & path) const
|
||||||
|
{
|
||||||
|
auto it = unique_ids.find(path);
|
||||||
|
if (it != unique_ids.end())
|
||||||
|
return it->second;
|
||||||
|
return "";
|
||||||
|
}
|
||||||
|
|
||||||
std::unique_ptr<WriteBufferFromFileBase> DiskObjectStorageTransaction::writeFile( /// NOLINT
|
std::unique_ptr<WriteBufferFromFileBase> DiskObjectStorageTransaction::writeFile( /// NOLINT
|
||||||
const std::string & path,
|
const std::string & path,
|
||||||
size_t buf_size,
|
size_t buf_size,
|
||||||
@ -497,21 +514,41 @@ std::unique_ptr<WriteBufferFromFileBase> DiskObjectStorageTransaction::writeFile
|
|||||||
blob_name = "r" + revisionToString(revision) + "-file-" + blob_name;
|
blob_name = "r" + revisionToString(revision) + "-file-" + blob_name;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
unique_ids[path] = blob_name;
|
||||||
|
|
||||||
auto blob_path = fs::path(remote_fs_root_path) / blob_name;
|
auto blob_path = fs::path(remote_fs_root_path) / blob_name;
|
||||||
|
|
||||||
|
auto write_operation = std::make_unique<WriteFileObjectStorageOperation>(object_storage, metadata_storage, path, blob_path);
|
||||||
|
std::function<void(size_t count)> create_metadata_callback;
|
||||||
|
|
||||||
auto create_metadata_callback = [tx = shared_from_this(), mode, path, blob_name, autocommit] (size_t count)
|
if (autocommit)
|
||||||
{
|
{
|
||||||
if (mode == WriteMode::Rewrite)
|
create_metadata_callback = [tx = shared_from_this(), mode, path, blob_name] (size_t count)
|
||||||
tx->metadata_transaction->createMetadataFile(path, blob_name, count);
|
{
|
||||||
else
|
if (mode == WriteMode::Rewrite)
|
||||||
tx->metadata_transaction->addBlobToMetadata(path, blob_name, count);
|
tx->metadata_transaction->createMetadataFile(path, blob_name, count);
|
||||||
|
else
|
||||||
|
tx->metadata_transaction->addBlobToMetadata(path, blob_name, count);
|
||||||
|
|
||||||
if (autocommit)
|
|
||||||
tx->metadata_transaction->commit();
|
tx->metadata_transaction->commit();
|
||||||
};
|
};
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
create_metadata_callback = [write_op = write_operation.get(), mode, path, blob_name] (size_t count)
|
||||||
|
{
|
||||||
|
write_op->setOnExecute([mode, path, blob_name, count](MetadataTransactionPtr tx)
|
||||||
|
{
|
||||||
|
if (mode == WriteMode::Rewrite)
|
||||||
|
tx->createMetadataFile(path, blob_name, count);
|
||||||
|
else
|
||||||
|
tx->addBlobToMetadata(path, blob_name, count);
|
||||||
|
});
|
||||||
|
};
|
||||||
|
|
||||||
operations_to_execute.emplace_back(std::make_unique<WriteFileObjectStorageOperation>(object_storage, metadata_storage, path, blob_path));
|
}
|
||||||
|
|
||||||
|
operations_to_execute.emplace_back(std::move(write_operation));
|
||||||
|
|
||||||
/// We always use mode Rewrite because we simulate append using metadata and different files
|
/// We always use mode Rewrite because we simulate append using metadata and different files
|
||||||
return object_storage.writeObject(
|
return object_storage.writeObject(
|
||||||
|
@ -56,6 +56,7 @@ private:
|
|||||||
/// TODO we can get rid of this params
|
/// TODO we can get rid of this params
|
||||||
const std::string & remote_fs_root_path;
|
const std::string & remote_fs_root_path;
|
||||||
DiskObjectStorageRemoteMetadataRestoreHelper * metadata_helper;
|
DiskObjectStorageRemoteMetadataRestoreHelper * metadata_helper;
|
||||||
|
std::unordered_map<std::string, std::string> unique_ids;
|
||||||
|
|
||||||
DiskObjectStorageOperations operations_to_execute;
|
DiskObjectStorageOperations operations_to_execute;
|
||||||
public:
|
public:
|
||||||
|
Loading…
Reference in New Issue
Block a user