This commit is contained in:
alesapin 2023-03-16 21:48:47 +01:00
parent 3de905bb7c
commit 3731567b6a
4 changed files with 60 additions and 4 deletions

View File

@ -221,6 +221,7 @@ ReplicatedMergeMutateTaskBase::PrepareResult MergeFromLogEntryTask::prepare()
if (!zero_copy_lock) if (!zero_copy_lock)
{ {
LOG_DEBUG(log, "Merge of part {} started by some other replica, will wait it and fetch merged part", entry.new_part_name); LOG_DEBUG(log, "Merge of part {} started by some other replica, will wait it and fetch merged part", entry.new_part_name);
storage.addZeroCopyLock(entry.new_part_name, disk);
/// Don't check for missing part -- it's missing because other replica still not /// Don't check for missing part -- it's missing because other replica still not
/// finished merge. /// finished merge.
return PrepareResult{ return PrepareResult{

View File

@ -30,7 +30,9 @@ public:
UInt64 getPriority() override { return priority; } UInt64 getPriority() override { return priority; }
private: private:
ReplicatedMergeMutateTaskBase::PrepareResult prepare() override; ReplicatedMergeMutateTaskBase::PrepareResult prepare() override;
bool finalize(ReplicatedMergeMutateTaskBase::PartLogWriter write_part_log) override; bool finalize(ReplicatedMergeMutateTaskBase::PartLogWriter write_part_log) override;
bool executeInnerTask() override bool executeInnerTask() override

View File

@ -8550,7 +8550,6 @@ String StorageReplicatedMergeTree::getSharedDataReplica(
return best_replica; return best_replica;
} }
Strings StorageReplicatedMergeTree::getZeroCopyPartPath( Strings StorageReplicatedMergeTree::getZeroCopyPartPath(
const MergeTreeSettings & settings, const std::string & disk_type, const String & table_uuid, const MergeTreeSettings & settings, const std::string & disk_type, const String & table_uuid,
const String & part_name, const String & zookeeper_path_old) const String & part_name, const String & zookeeper_path_old)
@ -8570,19 +8569,62 @@ Strings StorageReplicatedMergeTree::getZeroCopyPartPath(
return res; return res;
} }
void StorageReplicatedMergeTree::addZeroCopyLock(const String & part_name, const DiskPtr & disk)
{
auto path = getZeroCopyPartPath(part_name, disk);
if (path)
{
auto zookeeper = getZooKeeper();
auto lock_path = fs::path(*path) / "part_exclusive_lock";
std::shared_ptr<std::atomic<bool>> flag = std::make_shared<std::atomic<bool>>(true);
{
std::lock_guard lock(existing_zero_copy_locks_mutex);
existing_zero_copy_locks.emplace(lock_path, {"", flag});
}
std::string replica;
bool exists = zookeeper->tryGet(lock_path, replica, [flag] (const WatchResponse &)
{
*flag = false;
});
if (exists)
{
std::lock_guard lock(existing_zero_copy_locks_mutex);
existing_zero_copy_locks[lock_path].replica = replica;
}
}
}
bool StorageReplicatedMergeTree::checkZeroCopyLockExists(const String & part_name, const DiskPtr & disk, String & lock_replica) bool StorageReplicatedMergeTree::checkZeroCopyLockExists(const String & part_name, const DiskPtr & disk, String & lock_replica)
{ {
auto path = getZeroCopyPartPath(part_name, disk); auto path = getZeroCopyPartPath(part_name, disk);
if (path) if (path)
{ {
/// FIXME
auto lock_path = fs::path(*path) / "part_exclusive_lock"; auto lock_path = fs::path(*path) / "part_exclusive_lock";
if (getZooKeeper()->tryGet(lock_path, lock_replica))
std::lock_guard lock(existing_zero_copy_locks_mutex);
if (auto it = existing_zero_copy_locks.find(lock_path); it != existing_zero_copy_locks.end())
{ {
lock_replica = it->second;
if (*it->second.exists)
return true; return true;
} }
} }
{
std::lock_guard lock(existing_zero_copy_locks_mutex);
/// cleanup
for (auto it = existing_zero_copy_locks.begin(); it != existing_zero_copy_locks.end())
{
if (*it->second.exists)
++it;
else
it = existing_zero_copy_locks.erase(it);
}
}
return false; return false;
} }

View File

@ -482,6 +482,16 @@ private:
std::mutex last_broken_disks_mutex; std::mutex last_broken_disks_mutex;
std::set<String> last_broken_disks; std::set<String> last_broken_disks;
std::mutex existing_zero_copy_locks_mutex;
struct ZeroCopyLockDescription
{
std::string replica;
std::shared_ptr<std::atomic_flag> exists;
};
std::unordered_map<String, ZeroCopyLockDescription> existing_zero_copy_locks;
static std::optional<QueryPipeline> distributedWriteFromClusterStorage(const std::shared_ptr<IStorageCluster> & src_storage_cluster, const ASTInsertQuery & query, ContextPtr context); static std::optional<QueryPipeline> distributedWriteFromClusterStorage(const std::shared_ptr<IStorageCluster> & src_storage_cluster, const ASTInsertQuery & query, ContextPtr context);
template <class Func> template <class Func>
@ -862,6 +872,7 @@ private:
void createTableSharedID() const; void createTableSharedID() const;
bool checkZeroCopyLockExists(const String & part_name, const DiskPtr & disk, String & lock_replica); bool checkZeroCopyLockExists(const String & part_name, const DiskPtr & disk, String & lock_replica);
void addZeroCopyLock(const String & part_name, const DiskPtr & disk);
std::optional<String> getZeroCopyPartPath(const String & part_name, const DiskPtr & disk); std::optional<String> getZeroCopyPartPath(const String & part_name, const DiskPtr & disk);