Merge pull request #66956 from ClickHouse/backport/24.6/66599

Backport #66599 to 24.6: Fix dropping named collection in local storage (manually created)
This commit is contained in:
robot-clickhouse 2024-07-23 16:48:11 +02:00 committed by GitHub
commit e99f190ab9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -115,32 +115,32 @@ public:
return elements;
}
bool exists(const std::string & path) const override
bool exists(const std::string & file_name) const override
{
return fs::exists(getPath(path));
return fs::exists(getPath(file_name));
}
std::string read(const std::string & path) const override
std::string read(const std::string & file_name) const override
{
ReadBufferFromFile in(getPath(path));
ReadBufferFromFile in(getPath(file_name));
std::string data;
readStringUntilEOF(data, in);
return data;
}
void write(const std::string & path, const std::string & data, bool replace) override
void write(const std::string & file_name, const std::string & data, bool replace) override
{
if (!replace && fs::exists(path))
if (!replace && fs::exists(file_name))
{
throw Exception(
ErrorCodes::NAMED_COLLECTION_ALREADY_EXISTS,
"Metadata file {} for named collection already exists",
path);
file_name);
}
fs::create_directories(root_path);
auto tmp_path = getPath(path + ".tmp");
auto tmp_path = getPath(file_name + ".tmp");
WriteBufferFromFile out(tmp_path, data.size(), O_WRONLY | O_CREAT | O_EXCL);
writeString(data, out);
@ -149,28 +149,32 @@ public:
out.sync();
out.close();
fs::rename(tmp_path, getPath(path));
fs::rename(tmp_path, getPath(file_name));
}
void remove(const std::string & path) override
void remove(const std::string & file_name) override
{
if (!removeIfExists(getPath(path)))
if (!removeIfExists(file_name))
{
throw Exception(
ErrorCodes::NAMED_COLLECTION_DOESNT_EXIST,
"Cannot remove `{}`, because it doesn't exist", path);
"Cannot remove `{}`, because it doesn't exist", file_name);
}
}
bool removeIfExists(const std::string & path) override
bool removeIfExists(const std::string & file_name) override
{
return fs::remove(getPath(path));
return fs::remove(getPath(file_name));
}
private:
std::string getPath(const std::string & path) const
std::string getPath(const std::string & file_name) const
{
return fs::path(root_path) / path;
const auto file_name_as_path = fs::path(file_name);
if (file_name_as_path.is_absolute())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Filename {} cannot be an absolute path", file_name);
return fs::path(root_path) / file_name_as_path;
}
/// Delete .tmp files. They could be left undeleted in case of
@ -263,49 +267,49 @@ public:
return children;
}
bool exists(const std::string & path) const override
bool exists(const std::string & file_name) const override
{
return getClient()->exists(getPath(path));
return getClient()->exists(getPath(file_name));
}
std::string read(const std::string & path) const override
std::string read(const std::string & file_name) const override
{
return getClient()->get(getPath(path));
return getClient()->get(getPath(file_name));
}
void write(const std::string & path, const std::string & data, bool replace) override
void write(const std::string & file_name, const std::string & data, bool replace) override
{
if (replace)
{
getClient()->createOrUpdate(getPath(path), data, zkutil::CreateMode::Persistent);
getClient()->createOrUpdate(getPath(file_name), data, zkutil::CreateMode::Persistent);
}
else
{
auto code = getClient()->tryCreate(getPath(path), data, zkutil::CreateMode::Persistent);
auto code = getClient()->tryCreate(getPath(file_name), data, zkutil::CreateMode::Persistent);
if (code == Coordination::Error::ZNODEEXISTS)
{
throw Exception(
ErrorCodes::NAMED_COLLECTION_ALREADY_EXISTS,
"Metadata file {} for named collection already exists",
path);
file_name);
}
}
}
void remove(const std::string & path) override
void remove(const std::string & file_name) override
{
getClient()->remove(getPath(path));
getClient()->remove(getPath(file_name));
}
bool removeIfExists(const std::string & path) override
bool removeIfExists(const std::string & file_name) override
{
auto code = getClient()->tryRemove(getPath(path));
auto code = getClient()->tryRemove(getPath(file_name));
if (code == Coordination::Error::ZOK)
return true;
if (code == Coordination::Error::ZNONODE)
return false;
throw Coordination::Exception::fromPath(code, getPath(path));
throw Coordination::Exception::fromPath(code, getPath(file_name));
}
private:
@ -319,9 +323,13 @@ private:
return zookeeper_client;
}
std::string getPath(const std::string & path) const
std::string getPath(const std::string & file_name) const
{
return fs::path(root_path) / path;
const auto file_name_as_path = fs::path(file_name);
if (file_name_as_path.is_absolute())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Filename {} cannot be an absolute path", file_name);
return fs::path(root_path) / file_name_as_path;
}
};