mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-26 17:41:59 +00:00
Fix ephemeral node removal
This commit is contained in:
parent
ba6ccbab42
commit
9d8b21a04d
@ -233,7 +233,7 @@ struct NuKeeperStorageGetRequest final : public NuKeeperStorageRequest
|
|||||||
struct NuKeeperStorageRemoveRequest final : public NuKeeperStorageRequest
|
struct NuKeeperStorageRemoveRequest final : public NuKeeperStorageRequest
|
||||||
{
|
{
|
||||||
using NuKeeperStorageRequest::NuKeeperStorageRequest;
|
using NuKeeperStorageRequest::NuKeeperStorageRequest;
|
||||||
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(NuKeeperStorage::Container & container, NuKeeperStorage::Ephemerals & ephemerals, int64_t /*zxid*/, int64_t session_id) const override
|
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(NuKeeperStorage::Container & container, NuKeeperStorage::Ephemerals & ephemerals, int64_t /*zxid*/, int64_t /*session_id*/) const override
|
||||||
{
|
{
|
||||||
Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse();
|
Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse();
|
||||||
Coordination::ZooKeeperRemoveResponse & response = dynamic_cast<Coordination::ZooKeeperRemoveResponse &>(*response_ptr);
|
Coordination::ZooKeeperRemoveResponse & response = dynamic_cast<Coordination::ZooKeeperRemoveResponse &>(*response_ptr);
|
||||||
@ -257,7 +257,12 @@ struct NuKeeperStorageRemoveRequest final : public NuKeeperStorageRequest
|
|||||||
{
|
{
|
||||||
auto prev_node = it->value;
|
auto prev_node = it->value;
|
||||||
if (prev_node.stat.ephemeralOwner != 0)
|
if (prev_node.stat.ephemeralOwner != 0)
|
||||||
ephemerals[session_id].erase(request.path);
|
{
|
||||||
|
auto ephemerals_it = ephemerals.find(prev_node.stat.ephemeralOwner);
|
||||||
|
ephemerals_it->second.erase(request.path);
|
||||||
|
if (ephemerals_it->second.empty())
|
||||||
|
ephemerals.erase(ephemerals_it);
|
||||||
|
}
|
||||||
|
|
||||||
auto child_basename = getBaseName(it->key);
|
auto child_basename = getBaseName(it->key);
|
||||||
container.updateValue(parentPath(request.path), [&child_basename] (NuKeeperStorage::Node & parent)
|
container.updateValue(parentPath(request.path), [&child_basename] (NuKeeperStorage::Node & parent)
|
||||||
@ -271,10 +276,10 @@ struct NuKeeperStorageRemoveRequest final : public NuKeeperStorageRequest
|
|||||||
|
|
||||||
container.erase(request.path);
|
container.erase(request.path);
|
||||||
|
|
||||||
undo = [prev_node, &container, &ephemerals, session_id, path = request.path, child_basename]
|
undo = [prev_node, &container, &ephemerals, path = request.path, child_basename]
|
||||||
{
|
{
|
||||||
if (prev_node.stat.ephemeralOwner != 0)
|
if (prev_node.stat.ephemeralOwner != 0)
|
||||||
ephemerals[session_id].emplace(path);
|
ephemerals[prev_node.stat.ephemeralOwner].emplace(path);
|
||||||
|
|
||||||
container.insert(path, prev_node);
|
container.insert(path, prev_node);
|
||||||
container.updateValue(parentPath(path), [&child_basename] (NuKeeperStorage::Node & parent)
|
container.updateValue(parentPath(path), [&child_basename] (NuKeeperStorage::Node & parent)
|
||||||
@ -377,7 +382,6 @@ struct NuKeeperStorageSetRequest final : public NuKeeperStorageRequest
|
|||||||
{
|
{
|
||||||
return processWatchesImpl(zk_request->getPath(), watches, list_watches, Coordination::Event::CHANGED);
|
return processWatchesImpl(zk_request->getPath(), watches, list_watches, Coordination::Event::CHANGED);
|
||||||
}
|
}
|
||||||
|
|
||||||
};
|
};
|
||||||
|
|
||||||
struct NuKeeperStorageListRequest final : public NuKeeperStorageRequest
|
struct NuKeeperStorageListRequest final : public NuKeeperStorageRequest
|
||||||
|
@ -1232,6 +1232,37 @@ TEST(CoordinationTest, TestStateMachineAndLogStore)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST(CoordinationTest, TestEphemeralNodeRemove)
|
||||||
|
{
|
||||||
|
using namespace Coordination;
|
||||||
|
using namespace DB;
|
||||||
|
|
||||||
|
ChangelogDirTest snapshots("./snapshots");
|
||||||
|
CoordinationSettingsPtr settings = std::make_shared<CoordinationSettings>();
|
||||||
|
|
||||||
|
ResponsesQueue queue;
|
||||||
|
SnapshotsQueue snapshots_queue{1};
|
||||||
|
auto state_machine = std::make_shared<NuKeeperStateMachine>(queue, snapshots_queue, "./snapshots", settings);
|
||||||
|
state_machine->init();
|
||||||
|
|
||||||
|
std::shared_ptr<ZooKeeperCreateRequest> request_c = std::make_shared<ZooKeeperCreateRequest>();
|
||||||
|
request_c->path = "/hello";
|
||||||
|
request_c->is_ephemeral = true;
|
||||||
|
auto entry_c = getLogEntryFromZKRequest(0, 1, request_c);
|
||||||
|
state_machine->commit(1, entry_c->get_buf());
|
||||||
|
const auto & storage = state_machine->getStorage();
|
||||||
|
|
||||||
|
EXPECT_EQ(storage.ephemerals.size(), 1);
|
||||||
|
std::shared_ptr<ZooKeeperRemoveRequest> request_d = std::make_shared<ZooKeeperRemoveRequest>();
|
||||||
|
request_d->path = "/hello";
|
||||||
|
/// Delete from other session
|
||||||
|
auto entry_d = getLogEntryFromZKRequest(0, 2, request_d);
|
||||||
|
state_machine->commit(2, entry_d->get_buf());
|
||||||
|
|
||||||
|
EXPECT_EQ(storage.ephemerals.size(), 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
int main(int argc, char ** argv)
|
int main(int argc, char ** argv)
|
||||||
{
|
{
|
||||||
Poco::AutoPtr<Poco::ConsoleChannel> channel(new Poco::ConsoleChannel(std::cerr));
|
Poco::AutoPtr<Poco::ConsoleChannel> channel(new Poco::ConsoleChannel(std::cerr));
|
||||||
|
Loading…
Reference in New Issue
Block a user