Remove zkutil namespace from TestKeeperStorage

This commit is contained in:
alesapin 2021-01-21 18:09:48 +03:00
parent 61fe49194b
commit 4aa11b3494
13 changed files with 32 additions and 43 deletions

View File

@ -8,10 +8,10 @@
namespace DB namespace DB
{ {
zkutil::TestKeeperStorage::RequestForSession parseRequest(nuraft::buffer & data) TestKeeperStorage::RequestForSession parseRequest(nuraft::buffer & data)
{ {
ReadBufferFromNuraftBuffer buffer(data); ReadBufferFromNuraftBuffer buffer(data);
zkutil::TestKeeperStorage::RequestForSession request_for_session; TestKeeperStorage::RequestForSession request_for_session;
readIntBinary(request_for_session.session_id, buffer); readIntBinary(request_for_session.session_id, buffer);
int32_t length; int32_t length;
@ -29,7 +29,7 @@ zkutil::TestKeeperStorage::RequestForSession parseRequest(nuraft::buffer & data)
return request_for_session; return request_for_session;
} }
nuraft::ptr<nuraft::buffer> writeResponses(zkutil::TestKeeperStorage::ResponsesForSessions & responses) nuraft::ptr<nuraft::buffer> writeResponses(TestKeeperStorage::ResponsesForSessions & responses)
{ {
WriteBufferFromNuraftBuffer buffer; WriteBufferFromNuraftBuffer buffer;
for (const auto & response_and_session : responses) for (const auto & response_and_session : responses)
@ -52,7 +52,7 @@ nuraft::ptr<nuraft::buffer> NuKeeperStateMachine::commit(const size_t log_idx, n
{ {
LOG_DEBUG(log, "Commiting logidx {}", log_idx); LOG_DEBUG(log, "Commiting logidx {}", log_idx);
auto request_for_session = parseRequest(data); auto request_for_session = parseRequest(data);
zkutil::TestKeeperStorage::ResponsesForSessions responses_for_sessions; TestKeeperStorage::ResponsesForSessions responses_for_sessions;
{ {
std::lock_guard lock(storage_lock); std::lock_guard lock(storage_lock);
responses_for_sessions = storage.processRequest(request_for_session.request, request_for_session.session_id); responses_for_sessions = storage.processRequest(request_for_session.request, request_for_session.session_id);
@ -107,7 +107,7 @@ NuKeeperStateMachine::StorageSnapshotPtr NuKeeperStateMachine::readSnapshot(nura
TestKeeperStorageSerializer serializer; TestKeeperStorageSerializer serializer;
ReadBufferFromNuraftBuffer reader(in); ReadBufferFromNuraftBuffer reader(in);
zkutil::TestKeeperStorage new_storage; TestKeeperStorage new_storage;
serializer.deserialize(new_storage, reader); serializer.deserialize(new_storage, reader);
return std::make_shared<StorageSnapshot>(ss, new_storage); return std::make_shared<StorageSnapshot>(ss, new_storage);
} }

View File

@ -42,7 +42,7 @@ public:
nuraft::ptr<nuraft::buffer> & data_out, nuraft::ptr<nuraft::buffer> & data_out,
bool & is_last_obj) override; bool & is_last_obj) override;
zkutil::TestKeeperStorage & getStorage() TestKeeperStorage & getStorage()
{ {
return storage; return storage;
} }
@ -50,13 +50,13 @@ public:
private: private:
struct StorageSnapshot struct StorageSnapshot
{ {
StorageSnapshot(const nuraft::ptr<nuraft::snapshot> & s, const zkutil::TestKeeperStorage & storage_) StorageSnapshot(const nuraft::ptr<nuraft::snapshot> & s, const TestKeeperStorage & storage_)
: snapshot(s) : snapshot(s)
, storage(storage_) , storage(storage_)
{} {}
nuraft::ptr<nuraft::snapshot> snapshot; nuraft::ptr<nuraft::snapshot> snapshot;
zkutil::TestKeeperStorage storage; TestKeeperStorage storage;
}; };
using StorageSnapshotPtr = std::shared_ptr<StorageSnapshot>; using StorageSnapshotPtr = std::shared_ptr<StorageSnapshot>;
@ -67,7 +67,7 @@ private:
void writeSnapshot(const StorageSnapshotPtr & snapshot, nuraft::ptr<nuraft::buffer> & out) const; void writeSnapshot(const StorageSnapshotPtr & snapshot, nuraft::ptr<nuraft::buffer> & out) const;
zkutil::TestKeeperStorage storage; TestKeeperStorage storage;
/// Mutex for snapshots /// Mutex for snapshots
std::mutex snapshots_lock; std::mutex snapshots_lock;

View File

@ -17,13 +17,6 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS; extern const int BAD_ARGUMENTS;
} }
}
namespace zkutil
{
using namespace DB;
static String parentPath(const String & path) static String parentPath(const String & path)
{ {
auto rslash_pos = path.rfind('/'); auto rslash_pos = path.rfind('/');

View File

@ -8,7 +8,7 @@
#include <unordered_set> #include <unordered_set>
#include <vector> #include <vector>
namespace zkutil namespace DB
{ {
using namespace DB; using namespace DB;

View File

@ -11,10 +11,6 @@ namespace ErrorCodes
extern const int TIMEOUT_EXCEEDED; extern const int TIMEOUT_EXCEEDED;
} }
}
namespace zkutil
{
void TestKeeperStorageDispatcher::processingThread() void TestKeeperStorageDispatcher::processingThread()
{ {
setThreadName("TestKeeperSProc"); setThreadName("TestKeeperSProc");

View File

@ -5,7 +5,7 @@
#include <Coordination/TestKeeperStorage.h> #include <Coordination/TestKeeperStorage.h>
#include <functional> #include <functional>
namespace zkutil namespace DB
{ {
using ZooKeeperResponseCallback = std::function<void(const Coordination::ZooKeeperResponsePtr & response)>; using ZooKeeperResponseCallback = std::function<void(const Coordination::ZooKeeperResponsePtr & response)>;

View File

@ -8,7 +8,7 @@ namespace DB
namespace namespace
{ {
void writeNode(const zkutil::TestKeeperStorage::Node & node, WriteBuffer & out) void writeNode(const TestKeeperStorage::Node & node, WriteBuffer & out)
{ {
Coordination::write(node.data, out); Coordination::write(node.data, out);
Coordination::write(node.acls, out); Coordination::write(node.acls, out);
@ -18,7 +18,7 @@ namespace
Coordination::write(node.seq_num, out); Coordination::write(node.seq_num, out);
} }
void readNode(zkutil::TestKeeperStorage::Node & node, ReadBuffer & in) void readNode(TestKeeperStorage::Node & node, ReadBuffer & in)
{ {
Coordination::read(node.data, in); Coordination::read(node.data, in);
Coordination::read(node.acls, in); Coordination::read(node.acls, in);
@ -29,7 +29,7 @@ namespace
} }
} }
void TestKeeperStorageSerializer::serialize(const zkutil::TestKeeperStorage & storage, WriteBuffer & out) const void TestKeeperStorageSerializer::serialize(const TestKeeperStorage & storage, WriteBuffer & out) const
{ {
Coordination::write(storage.zxid, out); Coordination::write(storage.zxid, out);
Coordination::write(storage.session_id_counter, out); Coordination::write(storage.session_id_counter, out);
@ -49,7 +49,7 @@ void TestKeeperStorageSerializer::serialize(const zkutil::TestKeeperStorage & st
} }
} }
void TestKeeperStorageSerializer::deserialize(zkutil::TestKeeperStorage & storage, ReadBuffer & in) const void TestKeeperStorageSerializer::deserialize(TestKeeperStorage & storage, ReadBuffer & in) const
{ {
int64_t session_id_counter, zxid; int64_t session_id_counter, zxid;
Coordination::read(zxid, in); Coordination::read(zxid, in);
@ -63,7 +63,7 @@ void TestKeeperStorageSerializer::deserialize(zkutil::TestKeeperStorage & storag
{ {
std::string path; std::string path;
Coordination::read(path, in); Coordination::read(path, in);
zkutil::TestKeeperStorage::Node node; TestKeeperStorage::Node node;
readNode(node, in); readNode(node, in);
storage.container[path] = node; storage.container[path] = node;
} }

View File

@ -9,9 +9,9 @@ namespace DB
class TestKeeperStorageSerializer class TestKeeperStorageSerializer
{ {
public: public:
void serialize(const zkutil::TestKeeperStorage & storage, WriteBuffer & out) const; void serialize(const TestKeeperStorage & storage, WriteBuffer & out) const;
void deserialize(zkutil::TestKeeperStorage & storage, ReadBuffer & in) const; void deserialize(TestKeeperStorage & storage, ReadBuffer & in) const;
}; };
} }

View File

@ -276,9 +276,9 @@ nuraft::ptr<nuraft::buffer> getZooKeeperLogEntry(int64_t session_id, const Coord
return buf.getBuffer(); return buf.getBuffer();
} }
zkutil::TestKeeperStorage::ResponsesForSessions getZooKeeperResponses(nuraft::ptr<nuraft::buffer> & buffer, const Coordination::ZooKeeperRequestPtr & request) DB::TestKeeperStorage::ResponsesForSessions getZooKeeperResponses(nuraft::ptr<nuraft::buffer> & buffer, const Coordination::ZooKeeperRequestPtr & request)
{ {
zkutil::TestKeeperStorage::ResponsesForSessions results; DB::TestKeeperStorage::ResponsesForSessions results;
DB::ReadBufferFromNuraftBuffer buf(buffer); DB::ReadBufferFromNuraftBuffer buf(buffer);
while (!buf.eof()) while (!buf.eof())
{ {
@ -296,28 +296,28 @@ zkutil::TestKeeperStorage::ResponsesForSessions getZooKeeperResponses(nuraft::pt
Coordination::read(err, buf); Coordination::read(err, buf);
auto response = request->makeResponse(); auto response = request->makeResponse();
response->readImpl(buf); response->readImpl(buf);
results.push_back(zkutil::TestKeeperStorage::ResponseForSession{session_id, response}); results.push_back(DB::TestKeeperStorage::ResponseForSession{session_id, response});
} }
return results; return results;
} }
TEST(CoordinationTest, TestStorageSerialization) TEST(CoordinationTest, TestStorageSerialization)
{ {
zkutil::TestKeeperStorage storage; DB::TestKeeperStorage storage;
storage.container["/hello"] = zkutil::TestKeeperStorage::Node{.data="world"}; storage.container["/hello"] = DB::TestKeeperStorage::Node{.data="world"};
storage.container["/hello/somepath"] = zkutil::TestKeeperStorage::Node{.data="somedata"}; storage.container["/hello/somepath"] = DB::TestKeeperStorage::Node{.data="somedata"};
storage.session_id_counter = 5; storage.session_id_counter = 5;
storage.zxid = 156; storage.zxid = 156;
storage.ephemerals[3] = {"/hello", "/"}; storage.ephemerals[3] = {"/hello", "/"};
storage.ephemerals[1] = {"/hello/somepath"}; storage.ephemerals[1] = {"/hello/somepath"};
DB::WriteBufferFromOwnString buffer; DB::WriteBufferFromOwnString buffer;
zkutil::TestKeeperStorageSerializer serializer; DB::TestKeeperStorageSerializer serializer;
serializer.serialize(storage, buffer); serializer.serialize(storage, buffer);
std::string serialized = buffer.str(); std::string serialized = buffer.str();
EXPECT_NE(serialized.size(), 0); EXPECT_NE(serialized.size(), 0);
DB::ReadBufferFromString read(serialized); DB::ReadBufferFromString read(serialized);
zkutil::TestKeeperStorage new_storage; DB::TestKeeperStorage new_storage;
serializer.deserialize(new_storage, read); serializer.deserialize(new_storage, read);
EXPECT_EQ(new_storage.container.size(), 3); EXPECT_EQ(new_storage.container.size(), 3);

0
src/Coordination/ya.make Normal file
View File

View File

@ -306,7 +306,7 @@ struct ContextShared
ConfigurationPtr zookeeper_config; /// Stores zookeeper configs ConfigurationPtr zookeeper_config; /// Stores zookeeper configs
mutable std::mutex test_keeper_storage_dispatcher_mutex; mutable std::mutex test_keeper_storage_dispatcher_mutex;
mutable std::shared_ptr<zkutil::TestKeeperStorageDispatcher> test_keeper_storage_dispatcher; mutable std::shared_ptr<TestKeeperStorageDispatcher> test_keeper_storage_dispatcher;
mutable std::mutex auxiliary_zookeepers_mutex; mutable std::mutex auxiliary_zookeepers_mutex;
mutable std::map<String, zkutil::ZooKeeperPtr> auxiliary_zookeepers; /// Map for auxiliary ZooKeeper clients. mutable std::map<String, zkutil::ZooKeeperPtr> auxiliary_zookeepers; /// Map for auxiliary ZooKeeper clients.
ConfigurationPtr auxiliary_zookeepers_config; /// Stores auxiliary zookeepers configs ConfigurationPtr auxiliary_zookeepers_config; /// Stores auxiliary zookeepers configs
@ -1531,11 +1531,11 @@ zkutil::ZooKeeperPtr Context::getZooKeeper() const
return shared->zookeeper; return shared->zookeeper;
} }
std::shared_ptr<zkutil::TestKeeperStorageDispatcher> & Context::getTestKeeperStorageDispatcher() const std::shared_ptr<TestKeeperStorageDispatcher> & Context::getTestKeeperStorageDispatcher() const
{ {
std::lock_guard lock(shared->test_keeper_storage_dispatcher_mutex); std::lock_guard lock(shared->test_keeper_storage_dispatcher_mutex);
if (!shared->test_keeper_storage_dispatcher) if (!shared->test_keeper_storage_dispatcher)
shared->test_keeper_storage_dispatcher = std::make_shared<zkutil::TestKeeperStorageDispatcher>(); shared->test_keeper_storage_dispatcher = std::make_shared<TestKeeperStorageDispatcher>();
return shared->test_keeper_storage_dispatcher; return shared->test_keeper_storage_dispatcher;
} }

View File

@ -40,7 +40,6 @@ namespace Poco
namespace zkutil namespace zkutil
{ {
class ZooKeeper; class ZooKeeper;
class TestKeeperStorageDispatcher;
} }
@ -107,6 +106,7 @@ using StoragePolicyPtr = std::shared_ptr<const StoragePolicy>;
using StoragePoliciesMap = std::map<String, StoragePolicyPtr>; using StoragePoliciesMap = std::map<String, StoragePolicyPtr>;
class StoragePolicySelector; class StoragePolicySelector;
using StoragePolicySelectorPtr = std::shared_ptr<const StoragePolicySelector>; using StoragePolicySelectorPtr = std::shared_ptr<const StoragePolicySelector>;
class TestKeeperStorageDispatcher;
class IOutputFormat; class IOutputFormat;
using OutputFormatPtr = std::shared_ptr<IOutputFormat>; using OutputFormatPtr = std::shared_ptr<IOutputFormat>;
@ -513,7 +513,7 @@ public:
std::shared_ptr<zkutil::ZooKeeper> getAuxiliaryZooKeeper(const String & name) const; std::shared_ptr<zkutil::ZooKeeper> getAuxiliaryZooKeeper(const String & name) const;
std::shared_ptr<zkutil::TestKeeperStorageDispatcher> & getTestKeeperStorageDispatcher() const; std::shared_ptr<TestKeeperStorageDispatcher> & getTestKeeperStorageDispatcher() const;
/// Set auxiliary zookeepers configuration at server starting or configuration reloading. /// Set auxiliary zookeepers configuration at server starting or configuration reloading.
void reloadAuxiliaryZooKeepersConfigIfChanged(const ConfigurationPtr & config); void reloadAuxiliaryZooKeepersConfigIfChanged(const ConfigurationPtr & config);

View File

@ -28,7 +28,7 @@ private:
IServer & server; IServer & server;
Poco::Logger * log; Poco::Logger * log;
Context global_context; Context global_context;
std::shared_ptr<zkutil::TestKeeperStorageDispatcher> test_keeper_storage_dispatcher; std::shared_ptr<TestKeeperStorageDispatcher> test_keeper_storage_dispatcher;
Poco::Timespan operation_timeout; Poco::Timespan operation_timeout;
Poco::Timespan session_timeout; Poco::Timespan session_timeout;
int64_t session_id; int64_t session_id;