Backport #66599 to 24.6: Fix dropping named collection in local storage

This commit is contained in:
robot-clickhouse 2024-07-18 13:08:38 +00:00
parent 423ff5b42c
commit 2ad5ba63d9

View File

@ -115,32 +115,32 @@ public:
return elements; return elements;
} }
bool exists(const std::string & path) const override bool exists(const std::string & file_name) const override
{ {
return fs::exists(getPath(path)); return fs::exists(getPath(file_name));
} }
std::string read(const std::string & path) const override std::string read(const std::string & file_name) const override
{ {
ReadBufferFromFile in(getPath(path)); ReadBufferFromFile in(getPath(file_name));
std::string data; std::string data;
readStringUntilEOF(data, in); readStringUntilEOF(data, in);
return data; return data;
} }
void write(const std::string & path, const std::string & data, bool replace) override void write(const std::string & file_name, const std::string & data, bool replace) override
{ {
if (!replace && fs::exists(path)) if (!replace && fs::exists(file_name))
{ {
throw Exception( throw Exception(
ErrorCodes::NAMED_COLLECTION_ALREADY_EXISTS, ErrorCodes::NAMED_COLLECTION_ALREADY_EXISTS,
"Metadata file {} for named collection already exists", "Metadata file {} for named collection already exists",
path); file_name);
} }
fs::create_directories(root_path); fs::create_directories(root_path);
auto tmp_path = getPath(path + ".tmp"); auto tmp_path = getPath(file_name + ".tmp");
WriteBufferFromFile out(tmp_path, data.size(), O_WRONLY | O_CREAT | O_EXCL); WriteBufferFromFile out(tmp_path, data.size(), O_WRONLY | O_CREAT | O_EXCL);
writeString(data, out); writeString(data, out);
@ -149,28 +149,32 @@ public:
out.sync(); out.sync();
out.close(); out.close();
fs::rename(tmp_path, getPath(path)); fs::rename(tmp_path, getPath(file_name));
} }
void remove(const std::string & path) override void remove(const std::string & file_name) override
{ {
if (!removeIfExists(getPath(path))) if (!removeIfExists(file_name))
{ {
throw Exception( throw Exception(
ErrorCodes::NAMED_COLLECTION_DOESNT_EXIST, ErrorCodes::NAMED_COLLECTION_DOESNT_EXIST,
"Cannot remove `{}`, because it doesn't exist", path); "Cannot remove `{}`, because it doesn't exist", file_name);
} }
} }
bool removeIfExists(const std::string & path) override bool removeIfExists(const std::string & file_name) override
{ {
return fs::remove(getPath(path)); return fs::remove(getPath(file_name));
} }
private: private:
std::string getPath(const std::string & path) const std::string getPath(const std::string & file_name) const
{ {
return fs::path(root_path) / path; const auto file_name_as_path = fs::path(file_name);
if (file_name_as_path.is_absolute())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Filename {} cannot be an absolute path", file_name);
return fs::path(root_path) / file_name_as_path;
} }
/// Delete .tmp files. They could be left undeleted in case of /// Delete .tmp files. They could be left undeleted in case of
@ -263,49 +267,49 @@ public:
return children; return children;
} }
bool exists(const std::string & path) const override bool exists(const std::string & file_name) const override
{ {
return getClient()->exists(getPath(path)); return getClient()->exists(getPath(file_name));
} }
std::string read(const std::string & path) const override std::string read(const std::string & file_name) const override
{ {
return getClient()->get(getPath(path)); return getClient()->get(getPath(file_name));
} }
void write(const std::string & path, const std::string & data, bool replace) override void write(const std::string & file_name, const std::string & data, bool replace) override
{ {
if (replace) if (replace)
{ {
getClient()->createOrUpdate(getPath(path), data, zkutil::CreateMode::Persistent); getClient()->createOrUpdate(getPath(file_name), data, zkutil::CreateMode::Persistent);
} }
else else
{ {
auto code = getClient()->tryCreate(getPath(path), data, zkutil::CreateMode::Persistent); auto code = getClient()->tryCreate(getPath(file_name), data, zkutil::CreateMode::Persistent);
if (code == Coordination::Error::ZNODEEXISTS) if (code == Coordination::Error::ZNODEEXISTS)
{ {
throw Exception( throw Exception(
ErrorCodes::NAMED_COLLECTION_ALREADY_EXISTS, ErrorCodes::NAMED_COLLECTION_ALREADY_EXISTS,
"Metadata file {} for named collection already exists", "Metadata file {} for named collection already exists",
path); file_name);
} }
} }
} }
void remove(const std::string & path) override void remove(const std::string & file_name) override
{ {
getClient()->remove(getPath(path)); getClient()->remove(getPath(file_name));
} }
bool removeIfExists(const std::string & path) override bool removeIfExists(const std::string & file_name) override
{ {
auto code = getClient()->tryRemove(getPath(path)); auto code = getClient()->tryRemove(getPath(file_name));
if (code == Coordination::Error::ZOK) if (code == Coordination::Error::ZOK)
return true; return true;
if (code == Coordination::Error::ZNONODE) if (code == Coordination::Error::ZNONODE)
return false; return false;
throw Coordination::Exception::fromPath(code, getPath(path)); throw Coordination::Exception::fromPath(code, getPath(file_name));
} }
private: private:
@ -319,9 +323,13 @@ private:
return zookeeper_client; return zookeeper_client;
} }
std::string getPath(const std::string & path) const std::string getPath(const std::string & file_name) const
{ {
return fs::path(root_path) / path; const auto file_name_as_path = fs::path(file_name);
if (file_name_as_path.is_absolute())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Filename {} cannot be an absolute path", file_name);
return fs::path(root_path) / file_name_as_path;
} }
}; };