mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-29 02:52:13 +00:00
Merge branch 'master' of https://github.com/yandex/ClickHouse
This commit is contained in:
commit
5ee7aade1a
@ -1,8 +1,5 @@
|
|||||||
<yandex>
|
<yandex>
|
||||||
<!-- <zookeeper>
|
<zookeeper>
|
||||||
<node>
|
<implementation>testkeeper</implementation>
|
||||||
<host>localhost</host>
|
</zookeeper>
|
||||||
<port>2181</port>
|
|
||||||
</node>
|
|
||||||
</zookeeper>-->
|
|
||||||
</yandex>
|
</yandex>
|
||||||
|
@ -19,6 +19,7 @@ namespace ErrorCodes
|
|||||||
extern const int STD_EXCEPTION;
|
extern const int STD_EXCEPTION;
|
||||||
extern const int UNKNOWN_EXCEPTION;
|
extern const int UNKNOWN_EXCEPTION;
|
||||||
extern const int CANNOT_TRUNCATE_FILE;
|
extern const int CANNOT_TRUNCATE_FILE;
|
||||||
|
extern const int NOT_IMPLEMENTED;
|
||||||
}
|
}
|
||||||
|
|
||||||
std::string errnoToString(int code, int e)
|
std::string errnoToString(int code, int e)
|
||||||
|
727
dbms/src/Common/ZooKeeper/TestKeeper.cpp
Normal file
727
dbms/src/Common/ZooKeeper/TestKeeper.cpp
Normal file
@ -0,0 +1,727 @@
|
|||||||
|
#include <Common/ZooKeeper/TestKeeper.h>
|
||||||
|
#include <Common/setThreadName.h>
|
||||||
|
#include <Common/StringUtils/StringUtils.h>
|
||||||
|
#include <Core/Types.h>
|
||||||
|
|
||||||
|
#include <sstream>
|
||||||
|
#include <iomanip>
|
||||||
|
|
||||||
|
|
||||||
|
namespace Coordination
|
||||||
|
{
|
||||||
|
|
||||||
|
static String parentPath(const String & path)
|
||||||
|
{
|
||||||
|
auto rslash_pos = path.rfind('/');
|
||||||
|
if (rslash_pos > 0)
|
||||||
|
return path.substr(0, rslash_pos);
|
||||||
|
return "/";
|
||||||
|
}
|
||||||
|
|
||||||
|
static String baseName(const String & path)
|
||||||
|
{
|
||||||
|
auto rslash_pos = path.rfind('/');
|
||||||
|
return path.substr(rslash_pos + 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
struct TestKeeperRequest : virtual Request
|
||||||
|
{
|
||||||
|
virtual bool isMutable() const { return false; }
|
||||||
|
virtual ResponsePtr createResponse() const = 0;
|
||||||
|
virtual ResponsePtr process(TestKeeper::Container & container, int64_t zxid) const = 0;
|
||||||
|
virtual void processWatches(TestKeeper::Watches & /*watches*/, TestKeeper::Watches & /*list_watches*/) const {}
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
static void processWatchesImpl(const String & path, TestKeeper::Watches & watches, TestKeeper::Watches & list_watches)
|
||||||
|
{
|
||||||
|
WatchResponse watch_response;
|
||||||
|
watch_response.path = path;
|
||||||
|
|
||||||
|
auto it = watches.find(watch_response.path);
|
||||||
|
if (it != watches.end())
|
||||||
|
{
|
||||||
|
for (auto & callback : it->second)
|
||||||
|
if (callback)
|
||||||
|
callback(watch_response);
|
||||||
|
|
||||||
|
watches.erase(it);
|
||||||
|
}
|
||||||
|
|
||||||
|
WatchResponse watch_list_response;
|
||||||
|
watch_list_response.path = parentPath(path);
|
||||||
|
|
||||||
|
it = list_watches.find(watch_list_response.path);
|
||||||
|
if (it != list_watches.end())
|
||||||
|
{
|
||||||
|
for (auto & callback : it->second)
|
||||||
|
if (callback)
|
||||||
|
callback(watch_list_response);
|
||||||
|
|
||||||
|
list_watches.erase(it);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
struct TestKeeperCreateRequest final : CreateRequest, TestKeeperRequest
|
||||||
|
{
|
||||||
|
TestKeeperCreateRequest() {}
|
||||||
|
TestKeeperCreateRequest(const CreateRequest & base) : CreateRequest(base) {}
|
||||||
|
ResponsePtr createResponse() const override;
|
||||||
|
ResponsePtr process(TestKeeper::Container & container, int64_t zxid) const override;
|
||||||
|
|
||||||
|
void processWatches(TestKeeper::Watches & node_watches, TestKeeper::Watches & list_watches) const override
|
||||||
|
{
|
||||||
|
processWatchesImpl(getPath(), node_watches, list_watches);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
struct TestKeeperRemoveRequest final : RemoveRequest, TestKeeperRequest
|
||||||
|
{
|
||||||
|
TestKeeperRemoveRequest() {}
|
||||||
|
TestKeeperRemoveRequest(const RemoveRequest & base) : RemoveRequest(base) {}
|
||||||
|
bool isMutable() const override { return true; }
|
||||||
|
ResponsePtr createResponse() const override;
|
||||||
|
ResponsePtr process(TestKeeper::Container & container, int64_t zxid) const override;
|
||||||
|
|
||||||
|
void processWatches(TestKeeper::Watches & node_watches, TestKeeper::Watches & list_watches) const override
|
||||||
|
{
|
||||||
|
processWatchesImpl(getPath(), node_watches, list_watches);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
struct TestKeeperExistsRequest final : ExistsRequest, TestKeeperRequest
|
||||||
|
{
|
||||||
|
ResponsePtr createResponse() const override;
|
||||||
|
ResponsePtr process(TestKeeper::Container & container, int64_t zxid) const override;
|
||||||
|
};
|
||||||
|
|
||||||
|
struct TestKeeperGetRequest final : GetRequest, TestKeeperRequest
|
||||||
|
{
|
||||||
|
TestKeeperGetRequest() {}
|
||||||
|
ResponsePtr createResponse() const override;
|
||||||
|
ResponsePtr process(TestKeeper::Container & container, int64_t zxid) const override;
|
||||||
|
};
|
||||||
|
|
||||||
|
struct TestKeeperSetRequest final : SetRequest, TestKeeperRequest
|
||||||
|
{
|
||||||
|
TestKeeperSetRequest() {}
|
||||||
|
TestKeeperSetRequest(const SetRequest & base) : SetRequest(base) {}
|
||||||
|
bool isMutable() const override { return true; }
|
||||||
|
ResponsePtr createResponse() const override;
|
||||||
|
ResponsePtr process(TestKeeper::Container & container, int64_t zxid) const override;
|
||||||
|
|
||||||
|
void processWatches(TestKeeper::Watches & node_watches, TestKeeper::Watches & list_watches) const override
|
||||||
|
{
|
||||||
|
processWatchesImpl(getPath(), node_watches, list_watches);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
struct TestKeeperListRequest final : ListRequest, TestKeeperRequest
|
||||||
|
{
|
||||||
|
ResponsePtr createResponse() const override;
|
||||||
|
ResponsePtr process(TestKeeper::Container & container, int64_t zxid) const override;
|
||||||
|
};
|
||||||
|
|
||||||
|
struct TestKeeperCheckRequest final : CheckRequest, TestKeeperRequest
|
||||||
|
{
|
||||||
|
TestKeeperCheckRequest() {}
|
||||||
|
TestKeeperCheckRequest(const CheckRequest & base) : CheckRequest(base) {}
|
||||||
|
ResponsePtr createResponse() const override;
|
||||||
|
ResponsePtr process(TestKeeper::Container & container, int64_t zxid) const override;
|
||||||
|
};
|
||||||
|
|
||||||
|
struct TestKeeperMultiRequest final : MultiRequest, TestKeeperRequest
|
||||||
|
{
|
||||||
|
TestKeeperMultiRequest(const Requests & generic_requests)
|
||||||
|
{
|
||||||
|
requests.reserve(generic_requests.size());
|
||||||
|
|
||||||
|
for (const auto & generic_request : generic_requests)
|
||||||
|
{
|
||||||
|
if (auto * concrete_request_create = dynamic_cast<const CreateRequest *>(generic_request.get()))
|
||||||
|
{
|
||||||
|
auto create = std::make_shared<TestKeeperCreateRequest>(*concrete_request_create);
|
||||||
|
requests.push_back(create);
|
||||||
|
}
|
||||||
|
else if (auto * concrete_request_remove = dynamic_cast<const RemoveRequest *>(generic_request.get()))
|
||||||
|
{
|
||||||
|
requests.push_back(std::make_shared<TestKeeperRemoveRequest>(*concrete_request_remove));
|
||||||
|
}
|
||||||
|
else if (auto * concrete_request_set = dynamic_cast<const SetRequest *>(generic_request.get()))
|
||||||
|
{
|
||||||
|
requests.push_back(std::make_shared<TestKeeperSetRequest>(*concrete_request_set));
|
||||||
|
}
|
||||||
|
else if (auto * concrete_request_check = dynamic_cast<const CheckRequest *>(generic_request.get()))
|
||||||
|
{
|
||||||
|
requests.push_back(std::make_shared<TestKeeperCheckRequest>(*concrete_request_check));
|
||||||
|
}
|
||||||
|
else
|
||||||
|
throw Exception("Illegal command as part of multi ZooKeeper request", ZBADARGUMENTS);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void processWatches(TestKeeper::Watches & node_watches, TestKeeper::Watches & list_watches) const override
|
||||||
|
{
|
||||||
|
for (const auto & generic_request : requests)
|
||||||
|
dynamic_cast<const TestKeeperRequest &>(*generic_request).processWatches(node_watches, list_watches);
|
||||||
|
}
|
||||||
|
|
||||||
|
ResponsePtr createResponse() const override;
|
||||||
|
ResponsePtr process(TestKeeper::Container & container, int64_t zxid) const override;
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
ResponsePtr TestKeeperCreateRequest::process(TestKeeper::Container & container, int64_t zxid) const
|
||||||
|
{
|
||||||
|
CreateResponse response;
|
||||||
|
if (container.count(path))
|
||||||
|
{
|
||||||
|
response.error = Error::ZNODEEXISTS;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
auto it = container.find(parentPath(path));
|
||||||
|
|
||||||
|
if (it == container.end())
|
||||||
|
{
|
||||||
|
response.error = Error::ZNONODE;
|
||||||
|
}
|
||||||
|
else if (it->second.is_ephemeral)
|
||||||
|
{
|
||||||
|
response.error = Error::ZNOCHILDRENFOREPHEMERALS;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
TestKeeper::Node created_node;
|
||||||
|
created_node.seq_num = 0;
|
||||||
|
created_node.stat.czxid = zxid;
|
||||||
|
created_node.stat.mzxid = zxid;
|
||||||
|
created_node.stat.ctime = std::chrono::system_clock::now().time_since_epoch() / std::chrono::milliseconds(1);
|
||||||
|
created_node.stat.mtime = created_node.stat.ctime;
|
||||||
|
created_node.stat.numChildren = 0;
|
||||||
|
created_node.stat.dataLength = data.length();
|
||||||
|
created_node.data = data;
|
||||||
|
created_node.is_ephemeral = is_ephemeral;
|
||||||
|
created_node.is_sequental = is_sequential;
|
||||||
|
std::string path_created = path;
|
||||||
|
|
||||||
|
if (is_sequential)
|
||||||
|
{
|
||||||
|
auto seq_num = it->second.seq_num;
|
||||||
|
++it->second.seq_num;
|
||||||
|
|
||||||
|
std::stringstream seq_num_str;
|
||||||
|
seq_num_str << std::setw(10) << std::setfill('0') << seq_num;
|
||||||
|
|
||||||
|
path_created += seq_num_str.str();
|
||||||
|
}
|
||||||
|
|
||||||
|
response.path_created = path_created;
|
||||||
|
container.emplace(std::move(path_created), std::move(created_node));
|
||||||
|
|
||||||
|
++it->second.stat.cversion;
|
||||||
|
++it->second.stat.numChildren;
|
||||||
|
|
||||||
|
response.error = Error::ZOK;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return std::make_shared<CreateResponse>(response);
|
||||||
|
}
|
||||||
|
|
||||||
|
ResponsePtr TestKeeperRemoveRequest::process(TestKeeper::Container & container, int64_t) const
|
||||||
|
{
|
||||||
|
RemoveResponse response;
|
||||||
|
|
||||||
|
auto it = container.find(path);
|
||||||
|
if (it == container.end())
|
||||||
|
{
|
||||||
|
response.error = Error::ZNONODE;
|
||||||
|
}
|
||||||
|
else if (version != -1 && version != it->second.stat.version)
|
||||||
|
{
|
||||||
|
response.error = Error::ZBADVERSION;
|
||||||
|
}
|
||||||
|
else if (it->second.stat.numChildren)
|
||||||
|
{
|
||||||
|
response.error = Error::ZNOTEMPTY;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
container.erase(it);
|
||||||
|
auto & parent = container.at(parentPath(path));
|
||||||
|
--parent.stat.numChildren;
|
||||||
|
++parent.stat.cversion;
|
||||||
|
response.error = Error::ZOK;
|
||||||
|
}
|
||||||
|
|
||||||
|
return std::make_shared<RemoveResponse>(response);
|
||||||
|
}
|
||||||
|
|
||||||
|
ResponsePtr TestKeeperExistsRequest::process(TestKeeper::Container & container, int64_t) const
|
||||||
|
{
|
||||||
|
ExistsResponse response;
|
||||||
|
|
||||||
|
auto it = container.find(path);
|
||||||
|
if (it != container.end())
|
||||||
|
{
|
||||||
|
response.stat = it->second.stat;
|
||||||
|
response.error = Error::ZOK;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
response.error = Error::ZNONODE;
|
||||||
|
}
|
||||||
|
|
||||||
|
return std::make_shared<ExistsResponse>(response);
|
||||||
|
}
|
||||||
|
|
||||||
|
ResponsePtr TestKeeperGetRequest::process(TestKeeper::Container & container, int64_t) const
|
||||||
|
{
|
||||||
|
GetResponse response;
|
||||||
|
|
||||||
|
auto it = container.find(path);
|
||||||
|
if (it == container.end())
|
||||||
|
{
|
||||||
|
response.error = Error::ZNONODE;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
response.stat = it->second.stat;
|
||||||
|
response.data = it->second.data;
|
||||||
|
response.error = Error::ZOK;
|
||||||
|
}
|
||||||
|
|
||||||
|
return std::make_shared<GetResponse>(response);
|
||||||
|
}
|
||||||
|
|
||||||
|
ResponsePtr TestKeeperSetRequest::process(TestKeeper::Container & container, int64_t zxid) const
|
||||||
|
{
|
||||||
|
SetResponse response;
|
||||||
|
|
||||||
|
auto it = container.find(path);
|
||||||
|
if (it == container.end())
|
||||||
|
{
|
||||||
|
response.error = Error::ZNONODE;
|
||||||
|
}
|
||||||
|
else if (version == -1 || version == it->second.stat.version)
|
||||||
|
{
|
||||||
|
it->second.data = data;
|
||||||
|
++it->second.stat.version;
|
||||||
|
it->second.stat.mzxid = zxid;
|
||||||
|
it->second.stat.mtime = std::chrono::system_clock::now().time_since_epoch() / std::chrono::milliseconds(1);
|
||||||
|
it->second.data = data;
|
||||||
|
++container.at(parentPath(path)).stat.cversion;
|
||||||
|
response.stat = it->second.stat;
|
||||||
|
response.error = Error::ZOK;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
response.error = Error::ZBADVERSION;
|
||||||
|
}
|
||||||
|
|
||||||
|
return std::make_shared<SetResponse>(response);
|
||||||
|
}
|
||||||
|
|
||||||
|
ResponsePtr TestKeeperListRequest::process(TestKeeper::Container & container, int64_t) const
|
||||||
|
{
|
||||||
|
ListResponse response;
|
||||||
|
|
||||||
|
auto it = container.find(path);
|
||||||
|
if (it == container.end())
|
||||||
|
{
|
||||||
|
response.error = Error::ZNONODE;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
auto path_prefix = path;
|
||||||
|
if (path_prefix.empty())
|
||||||
|
throw Exception("Logical error: path cannot be empty", ZSESSIONEXPIRED);
|
||||||
|
|
||||||
|
if (path_prefix.back() != '/')
|
||||||
|
path_prefix += '/';
|
||||||
|
|
||||||
|
/// Fairly inefficient.
|
||||||
|
for (auto child_it = container.upper_bound(path_prefix); child_it != container.end() && startsWith(child_it->first, path_prefix); ++child_it)
|
||||||
|
if (parentPath(child_it->first) == path)
|
||||||
|
response.names.emplace_back(baseName(child_it->first));
|
||||||
|
|
||||||
|
response.stat = it->second.stat;
|
||||||
|
response.error = Error::ZOK;
|
||||||
|
}
|
||||||
|
|
||||||
|
return std::make_shared<ListResponse>(response);
|
||||||
|
}
|
||||||
|
|
||||||
|
ResponsePtr TestKeeperCheckRequest::process(TestKeeper::Container & container, int64_t) const
|
||||||
|
{
|
||||||
|
CheckResponse response;
|
||||||
|
auto it = container.find(path);
|
||||||
|
if (it == container.end())
|
||||||
|
{
|
||||||
|
response.error = Error::ZNONODE;
|
||||||
|
}
|
||||||
|
else if (version != -1 && version != it->second.stat.version)
|
||||||
|
{
|
||||||
|
response.error = Error::ZBADVERSION;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
response.error = Error::ZOK;
|
||||||
|
}
|
||||||
|
|
||||||
|
return std::make_shared<CheckResponse>(response);
|
||||||
|
}
|
||||||
|
|
||||||
|
ResponsePtr TestKeeperMultiRequest::process(TestKeeper::Container & container, int64_t zxid) const
|
||||||
|
{
|
||||||
|
MultiResponse response;
|
||||||
|
response.responses.reserve(requests.size());
|
||||||
|
|
||||||
|
/// Fairly inefficient.
|
||||||
|
auto container_copy = container;
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
for (const auto & request : requests)
|
||||||
|
{
|
||||||
|
const TestKeeperRequest & concrete_request = dynamic_cast<const TestKeeperRequest &>(*request);
|
||||||
|
auto cur_response = concrete_request.process(container, zxid);
|
||||||
|
response.responses.emplace_back(cur_response);
|
||||||
|
if (cur_response->error != Error::ZOK)
|
||||||
|
{
|
||||||
|
response.error = cur_response->error;
|
||||||
|
container = container_copy;
|
||||||
|
return std::make_shared<MultiResponse>(response);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
response.error = Error::ZOK;
|
||||||
|
return std::make_shared<MultiResponse>(response);
|
||||||
|
}
|
||||||
|
catch (...)
|
||||||
|
{
|
||||||
|
container = container_copy;
|
||||||
|
throw;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ResponsePtr TestKeeperCreateRequest::createResponse() const { return std::make_shared<CreateResponse>(); }
|
||||||
|
ResponsePtr TestKeeperRemoveRequest::createResponse() const { return std::make_shared<RemoveResponse>(); }
|
||||||
|
ResponsePtr TestKeeperExistsRequest::createResponse() const { return std::make_shared<ExistsResponse>(); }
|
||||||
|
ResponsePtr TestKeeperGetRequest::createResponse() const { return std::make_shared<GetResponse>(); }
|
||||||
|
ResponsePtr TestKeeperSetRequest::createResponse() const { return std::make_shared<SetResponse>(); }
|
||||||
|
ResponsePtr TestKeeperListRequest::createResponse() const { return std::make_shared<ListResponse>(); }
|
||||||
|
ResponsePtr TestKeeperCheckRequest::createResponse() const { return std::make_shared<CheckResponse>(); }
|
||||||
|
ResponsePtr TestKeeperMultiRequest::createResponse() const { return std::make_shared<MultiResponse>(); }
|
||||||
|
|
||||||
|
|
||||||
|
TestKeeper::TestKeeper(const String & root_path_, Poco::Timespan operation_timeout)
|
||||||
|
: root_path(root_path_), operation_timeout(operation_timeout)
|
||||||
|
{
|
||||||
|
container.emplace("/", Node());
|
||||||
|
|
||||||
|
if (!root_path.empty())
|
||||||
|
{
|
||||||
|
if (root_path.back() == '/')
|
||||||
|
root_path.pop_back();
|
||||||
|
}
|
||||||
|
|
||||||
|
processing_thread = ThreadFromGlobalPool([this] { processingThread(); });
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
TestKeeper::~TestKeeper()
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
finalize();
|
||||||
|
if (processing_thread.joinable())
|
||||||
|
processing_thread.join();
|
||||||
|
}
|
||||||
|
catch (...)
|
||||||
|
{
|
||||||
|
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void TestKeeper::processingThread()
|
||||||
|
{
|
||||||
|
setThreadName("TestKeeperProc");
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
while (!expired)
|
||||||
|
{
|
||||||
|
RequestInfo info;
|
||||||
|
|
||||||
|
UInt64 max_wait = UInt64(operation_timeout.totalMilliseconds());
|
||||||
|
if (requests_queue.tryPop(info, max_wait))
|
||||||
|
{
|
||||||
|
if (expired)
|
||||||
|
break;
|
||||||
|
|
||||||
|
if (info.watch)
|
||||||
|
{
|
||||||
|
auto & watches_type = dynamic_cast<const ListRequest *>(info.request.get())
|
||||||
|
? list_watches
|
||||||
|
: watches;
|
||||||
|
|
||||||
|
watches_type[info.request->getPath()].emplace_back(std::move(info.watch));
|
||||||
|
}
|
||||||
|
|
||||||
|
++zxid;
|
||||||
|
|
||||||
|
info.request->addRootPath(root_path);
|
||||||
|
ResponsePtr response = info.request->process(container, zxid);
|
||||||
|
if (response->error == Error::ZOK)
|
||||||
|
info.request->processWatches(watches, list_watches);
|
||||||
|
|
||||||
|
response->removeRootPath(root_path);
|
||||||
|
if (info.callback)
|
||||||
|
info.callback(*response);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (...)
|
||||||
|
{
|
||||||
|
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||||
|
finalize();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void TestKeeper::finalize()
|
||||||
|
{
|
||||||
|
{
|
||||||
|
std::lock_guard lock(push_request_mutex);
|
||||||
|
|
||||||
|
if (expired)
|
||||||
|
return;
|
||||||
|
expired = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
processing_thread.join();
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
{
|
||||||
|
for (auto & path_watch : watches)
|
||||||
|
{
|
||||||
|
WatchResponse response;
|
||||||
|
response.type = SESSION;
|
||||||
|
response.state = EXPIRED_SESSION;
|
||||||
|
response.error = ZSESSIONEXPIRED;
|
||||||
|
|
||||||
|
for (auto & callback : path_watch.second)
|
||||||
|
{
|
||||||
|
if (callback)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
callback(response);
|
||||||
|
}
|
||||||
|
catch (...)
|
||||||
|
{
|
||||||
|
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
watches.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
RequestInfo info;
|
||||||
|
while (requests_queue.tryPop(info))
|
||||||
|
{
|
||||||
|
if (info.callback)
|
||||||
|
{
|
||||||
|
ResponsePtr response = info.request->createResponse();
|
||||||
|
response->error = ZSESSIONEXPIRED;
|
||||||
|
try
|
||||||
|
{
|
||||||
|
info.callback(*response);
|
||||||
|
}
|
||||||
|
catch (...)
|
||||||
|
{
|
||||||
|
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (info.watch)
|
||||||
|
{
|
||||||
|
WatchResponse response;
|
||||||
|
response.type = SESSION;
|
||||||
|
response.state = EXPIRED_SESSION;
|
||||||
|
response.error = ZSESSIONEXPIRED;
|
||||||
|
try
|
||||||
|
{
|
||||||
|
info.watch(response);
|
||||||
|
}
|
||||||
|
catch (...)
|
||||||
|
{
|
||||||
|
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (...)
|
||||||
|
{
|
||||||
|
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void TestKeeper::pushRequest(RequestInfo && info)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
info.time = clock::now();
|
||||||
|
|
||||||
|
/// We must serialize 'pushRequest' and 'finalize' (from processingThread) calls
|
||||||
|
/// to avoid forgotten operations in the queue when session is expired.
|
||||||
|
/// Invariant: when expired, no new operations will be pushed to the queue in 'pushRequest'
|
||||||
|
/// and the queue will be drained in 'finalize'.
|
||||||
|
std::lock_guard lock(push_request_mutex);
|
||||||
|
|
||||||
|
if (expired)
|
||||||
|
throw Exception("Session expired", ZSESSIONEXPIRED);
|
||||||
|
|
||||||
|
if (!requests_queue.tryPush(std::move(info), operation_timeout.totalMilliseconds()))
|
||||||
|
throw Exception("Cannot push request to queue within operation timeout", ZOPERATIONTIMEOUT);
|
||||||
|
}
|
||||||
|
catch (...)
|
||||||
|
{
|
||||||
|
finalize();
|
||||||
|
throw;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void TestKeeper::create(
|
||||||
|
const String & path,
|
||||||
|
const String & data,
|
||||||
|
bool is_ephemeral,
|
||||||
|
bool is_sequential,
|
||||||
|
const ACLs &,
|
||||||
|
CreateCallback callback)
|
||||||
|
{
|
||||||
|
TestKeeperCreateRequest request;
|
||||||
|
request.path = path;
|
||||||
|
request.data = data;
|
||||||
|
request.is_ephemeral = is_ephemeral;
|
||||||
|
request.is_sequential = is_sequential;
|
||||||
|
|
||||||
|
RequestInfo request_info;
|
||||||
|
request_info.request = std::make_shared<TestKeeperCreateRequest>(std::move(request));
|
||||||
|
request_info.callback = [callback](const Response & response) { callback(dynamic_cast<const CreateResponse &>(response)); };
|
||||||
|
pushRequest(std::move(request_info));
|
||||||
|
}
|
||||||
|
|
||||||
|
void TestKeeper::remove(
|
||||||
|
const String & path,
|
||||||
|
int32_t version,
|
||||||
|
RemoveCallback callback)
|
||||||
|
{
|
||||||
|
TestKeeperRemoveRequest request;
|
||||||
|
request.path = path;
|
||||||
|
request.version = version;
|
||||||
|
|
||||||
|
RequestInfo request_info;
|
||||||
|
request_info.request = std::make_shared<TestKeeperRemoveRequest>(std::move(request));
|
||||||
|
request_info.callback = [callback](const Response & response) { callback(dynamic_cast<const RemoveResponse &>(response)); };
|
||||||
|
pushRequest(std::move(request_info));
|
||||||
|
}
|
||||||
|
|
||||||
|
void TestKeeper::exists(
|
||||||
|
const String & path,
|
||||||
|
ExistsCallback callback,
|
||||||
|
WatchCallback watch)
|
||||||
|
{
|
||||||
|
TestKeeperExistsRequest request;
|
||||||
|
request.path = path;
|
||||||
|
|
||||||
|
RequestInfo request_info;
|
||||||
|
request_info.request = std::make_shared<TestKeeperExistsRequest>(std::move(request));
|
||||||
|
request_info.callback = [callback](const Response & response) { callback(dynamic_cast<const ExistsResponse &>(response)); };
|
||||||
|
request_info.watch = watch;
|
||||||
|
pushRequest(std::move(request_info));
|
||||||
|
}
|
||||||
|
|
||||||
|
void TestKeeper::get(
|
||||||
|
const String & path,
|
||||||
|
GetCallback callback,
|
||||||
|
WatchCallback watch)
|
||||||
|
{
|
||||||
|
TestKeeperGetRequest request;
|
||||||
|
request.path = path;
|
||||||
|
|
||||||
|
RequestInfo request_info;
|
||||||
|
request_info.request = std::make_shared<TestKeeperGetRequest>(std::move(request));
|
||||||
|
request_info.callback = [callback](const Response & response) { callback(dynamic_cast<const GetResponse &>(response)); };
|
||||||
|
request_info.watch = watch;
|
||||||
|
pushRequest(std::move(request_info));
|
||||||
|
}
|
||||||
|
|
||||||
|
void TestKeeper::set(
|
||||||
|
const String & path,
|
||||||
|
const String & data,
|
||||||
|
int32_t version,
|
||||||
|
SetCallback callback)
|
||||||
|
{
|
||||||
|
TestKeeperSetRequest request;
|
||||||
|
request.path = path;
|
||||||
|
request.data = data;
|
||||||
|
request.version = version;
|
||||||
|
|
||||||
|
RequestInfo request_info;
|
||||||
|
request_info.request = std::make_shared<TestKeeperSetRequest>(std::move(request));
|
||||||
|
request_info.callback = [callback](const Response & response) { callback(dynamic_cast<const SetResponse &>(response)); };
|
||||||
|
pushRequest(std::move(request_info));
|
||||||
|
}
|
||||||
|
|
||||||
|
void TestKeeper::list(
|
||||||
|
const String & path,
|
||||||
|
ListCallback callback,
|
||||||
|
WatchCallback watch)
|
||||||
|
{
|
||||||
|
TestKeeperListRequest request;
|
||||||
|
request.path = path;
|
||||||
|
|
||||||
|
RequestInfo request_info;
|
||||||
|
request_info.request = std::make_shared<TestKeeperListRequest>(std::move(request));
|
||||||
|
request_info.callback = [callback](const Response & response) { callback(dynamic_cast<const ListResponse &>(response)); };
|
||||||
|
request_info.watch = watch;
|
||||||
|
pushRequest(std::move(request_info));
|
||||||
|
}
|
||||||
|
|
||||||
|
void TestKeeper::check(
|
||||||
|
const String & path,
|
||||||
|
int32_t version,
|
||||||
|
CheckCallback callback)
|
||||||
|
{
|
||||||
|
TestKeeperCheckRequest request;
|
||||||
|
request.path = path;
|
||||||
|
request.version = version;
|
||||||
|
|
||||||
|
RequestInfo request_info;
|
||||||
|
request_info.request = std::make_shared<TestKeeperCheckRequest>(std::move(request));
|
||||||
|
request_info.callback = [callback](const Response & response) { callback(dynamic_cast<const CheckResponse &>(response)); };
|
||||||
|
pushRequest(std::move(request_info));
|
||||||
|
}
|
||||||
|
|
||||||
|
void TestKeeper::multi(
|
||||||
|
const Requests & requests,
|
||||||
|
MultiCallback callback)
|
||||||
|
{
|
||||||
|
TestKeeperMultiRequest request(requests);
|
||||||
|
|
||||||
|
RequestInfo request_info;
|
||||||
|
request_info.request = std::make_shared<TestKeeperMultiRequest>(std::move(request));
|
||||||
|
request_info.callback = [callback](const Response & response) { callback(dynamic_cast<const MultiResponse &>(response)); };
|
||||||
|
pushRequest(std::move(request_info));
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
143
dbms/src/Common/ZooKeeper/TestKeeper.h
Normal file
143
dbms/src/Common/ZooKeeper/TestKeeper.h
Normal file
@ -0,0 +1,143 @@
|
|||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <mutex>
|
||||||
|
#include <map>
|
||||||
|
#include <atomic>
|
||||||
|
#include <thread>
|
||||||
|
#include <chrono>
|
||||||
|
|
||||||
|
#include <Poco/Timespan.h>
|
||||||
|
#include <Common/ZooKeeper/IKeeper.h>
|
||||||
|
#include <Common/ThreadPool.h>
|
||||||
|
#include <Common/ConcurrentBoundedQueue.h>
|
||||||
|
|
||||||
|
|
||||||
|
namespace Coordination
|
||||||
|
{
|
||||||
|
|
||||||
|
struct TestKeeperRequest;
|
||||||
|
using TestKeeperRequestPtr = std::shared_ptr<TestKeeperRequest>;
|
||||||
|
|
||||||
|
|
||||||
|
/** Looks like ZooKeeper but stores all data in memory of server process.
|
||||||
|
* All data is not shared between different servers and is lost after server restart.
|
||||||
|
*
|
||||||
|
* The only purpose is to more simple testing for interaction with ZooKeeper within a single server.
|
||||||
|
* This still makes sense, because multiple replicas of a single table can be created on a single server,
|
||||||
|
* and it is used to test replication logic.
|
||||||
|
*
|
||||||
|
* Does not support ACLs. Does not support NULL node values.
|
||||||
|
*
|
||||||
|
* NOTE: You can add various failure modes for better testing.
|
||||||
|
*/
|
||||||
|
class TestKeeper : public IKeeper
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
TestKeeper(const String & root_path, Poco::Timespan operation_timeout);
|
||||||
|
~TestKeeper() override;
|
||||||
|
|
||||||
|
bool isExpired() const override { return expired; }
|
||||||
|
int64_t getSessionID() const override { return 0; }
|
||||||
|
|
||||||
|
|
||||||
|
void create(
|
||||||
|
const String & path,
|
||||||
|
const String & data,
|
||||||
|
bool is_ephemeral,
|
||||||
|
bool is_sequential,
|
||||||
|
const ACLs & acls,
|
||||||
|
CreateCallback callback) override;
|
||||||
|
|
||||||
|
void remove(
|
||||||
|
const String & path,
|
||||||
|
int32_t version,
|
||||||
|
RemoveCallback callback) override;
|
||||||
|
|
||||||
|
void exists(
|
||||||
|
const String & path,
|
||||||
|
ExistsCallback callback,
|
||||||
|
WatchCallback watch) override;
|
||||||
|
|
||||||
|
void get(
|
||||||
|
const String & path,
|
||||||
|
GetCallback callback,
|
||||||
|
WatchCallback watch) override;
|
||||||
|
|
||||||
|
void set(
|
||||||
|
const String & path,
|
||||||
|
const String & data,
|
||||||
|
int32_t version,
|
||||||
|
SetCallback callback) override;
|
||||||
|
|
||||||
|
void list(
|
||||||
|
const String & path,
|
||||||
|
ListCallback callback,
|
||||||
|
WatchCallback watch) override;
|
||||||
|
|
||||||
|
void check(
|
||||||
|
const String & path,
|
||||||
|
int32_t version,
|
||||||
|
CheckCallback callback) override;
|
||||||
|
|
||||||
|
void multi(
|
||||||
|
const Requests & requests,
|
||||||
|
MultiCallback callback) override;
|
||||||
|
|
||||||
|
|
||||||
|
struct Node
|
||||||
|
{
|
||||||
|
String data;
|
||||||
|
ACLs acls;
|
||||||
|
bool is_ephemeral = false;
|
||||||
|
bool is_sequental = false;
|
||||||
|
Stat stat{};
|
||||||
|
int32_t seq_num = 0;
|
||||||
|
};
|
||||||
|
|
||||||
|
using Container = std::map<std::string, Node>;
|
||||||
|
|
||||||
|
using WatchCallbacks = std::vector<WatchCallback>;
|
||||||
|
using Watches = std::map<String /* path, relative of root_path */, WatchCallbacks>;
|
||||||
|
|
||||||
|
private:
|
||||||
|
using clock = std::chrono::steady_clock;
|
||||||
|
|
||||||
|
struct RequestInfo
|
||||||
|
{
|
||||||
|
TestKeeperRequestPtr request;
|
||||||
|
ResponseCallback callback;
|
||||||
|
WatchCallback watch;
|
||||||
|
clock::time_point time;
|
||||||
|
};
|
||||||
|
|
||||||
|
Container container;
|
||||||
|
|
||||||
|
String root_path;
|
||||||
|
ACLs default_acls;
|
||||||
|
|
||||||
|
Poco::Timespan operation_timeout;
|
||||||
|
|
||||||
|
std::mutex push_request_mutex;
|
||||||
|
std::atomic<bool> expired{false};
|
||||||
|
|
||||||
|
int64_t zxid = 0;
|
||||||
|
|
||||||
|
Watches watches;
|
||||||
|
Watches list_watches; /// Watches for 'list' request (watches on children).
|
||||||
|
|
||||||
|
void createWatchCallBack(const String & path);
|
||||||
|
|
||||||
|
using RequestsQueue = ConcurrentBoundedQueue<RequestInfo>;
|
||||||
|
RequestsQueue requests_queue{1};
|
||||||
|
|
||||||
|
void pushRequest(RequestInfo && request);
|
||||||
|
|
||||||
|
void finalize();
|
||||||
|
|
||||||
|
ThreadFromGlobalPool processing_thread;
|
||||||
|
|
||||||
|
void processingThread();
|
||||||
|
};
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -1,6 +1,7 @@
|
|||||||
#include "ZooKeeper.h"
|
#include "ZooKeeper.h"
|
||||||
#include "ZooKeeperImpl.h"
|
#include "ZooKeeperImpl.h"
|
||||||
#include "KeeperException.h"
|
#include "KeeperException.h"
|
||||||
|
#include "TestKeeper.h"
|
||||||
|
|
||||||
#include <random>
|
#include <random>
|
||||||
#include <pcg_random.hpp>
|
#include <pcg_random.hpp>
|
||||||
@ -24,6 +25,7 @@ namespace DB
|
|||||||
namespace ErrorCodes
|
namespace ErrorCodes
|
||||||
{
|
{
|
||||||
extern const int LOGICAL_ERROR;
|
extern const int LOGICAL_ERROR;
|
||||||
|
extern const int NOT_IMPLEMENTED;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -44,7 +46,7 @@ static void check(int32_t code, const std::string & path)
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void ZooKeeper::init(const std::string & hosts_, const std::string & identity_,
|
void ZooKeeper::init(const std::string & implementation, const std::string & hosts_, const std::string & identity_,
|
||||||
int32_t session_timeout_ms_, int32_t operation_timeout_ms_, const std::string & chroot_)
|
int32_t session_timeout_ms_, int32_t operation_timeout_ms_, const std::string & chroot_)
|
||||||
{
|
{
|
||||||
log = &Logger::get("ZooKeeper");
|
log = &Logger::get("ZooKeeper");
|
||||||
@ -54,6 +56,8 @@ void ZooKeeper::init(const std::string & hosts_, const std::string & identity_,
|
|||||||
operation_timeout_ms = operation_timeout_ms_;
|
operation_timeout_ms = operation_timeout_ms_;
|
||||||
chroot = chroot_;
|
chroot = chroot_;
|
||||||
|
|
||||||
|
if (implementation == "zookeeper")
|
||||||
|
{
|
||||||
if (hosts.empty())
|
if (hosts.empty())
|
||||||
throw KeeperException("No addresses passed to ZooKeeper constructor.", Coordination::ZBADARGUMENTS);
|
throw KeeperException("No addresses passed to ZooKeeper constructor.", Coordination::ZBADARGUMENTS);
|
||||||
|
|
||||||
@ -87,15 +91,26 @@ void ZooKeeper::init(const std::string & hosts_, const std::string & identity_,
|
|||||||
Poco::Timespan(0, operation_timeout_ms_ * 1000));
|
Poco::Timespan(0, operation_timeout_ms_ * 1000));
|
||||||
|
|
||||||
LOG_TRACE(log, "initialized, hosts: " << hosts << (chroot.empty() ? "" : ", chroot: " + chroot));
|
LOG_TRACE(log, "initialized, hosts: " << hosts << (chroot.empty() ? "" : ", chroot: " + chroot));
|
||||||
|
}
|
||||||
|
else if (implementation == "testkeeper")
|
||||||
|
{
|
||||||
|
impl = std::make_unique<Coordination::TestKeeper>(
|
||||||
|
chroot,
|
||||||
|
Poco::Timespan(0, operation_timeout_ms_ * 1000));
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
throw DB::Exception("Unknown implementation of coordination service: " + implementation, DB::ErrorCodes::NOT_IMPLEMENTED);
|
||||||
|
}
|
||||||
|
|
||||||
if (!chroot.empty() && !exists("/"))
|
if (!chroot.empty() && !exists("/"))
|
||||||
throw KeeperException("Zookeeper root doesn't exist. You should create root node " + chroot + " before start.", Coordination::ZNONODE);
|
throw KeeperException("Zookeeper root doesn't exist. You should create root node " + chroot + " before start.", Coordination::ZNONODE);
|
||||||
}
|
}
|
||||||
|
|
||||||
ZooKeeper::ZooKeeper(const std::string & hosts, const std::string & identity,
|
ZooKeeper::ZooKeeper(const std::string & hosts, const std::string & identity, int32_t session_timeout_ms,
|
||||||
int32_t session_timeout_ms, int32_t operation_timeout_ms, const std::string & chroot)
|
int32_t operation_timeout_ms, const std::string & chroot, const std::string & implementation)
|
||||||
{
|
{
|
||||||
init(hosts, identity, session_timeout_ms, operation_timeout_ms, chroot);
|
init(implementation, hosts, identity, session_timeout_ms, operation_timeout_ms, chroot);
|
||||||
}
|
}
|
||||||
|
|
||||||
struct ZooKeeperArgs
|
struct ZooKeeperArgs
|
||||||
@ -109,6 +124,7 @@ struct ZooKeeperArgs
|
|||||||
|
|
||||||
session_timeout_ms = DEFAULT_SESSION_TIMEOUT;
|
session_timeout_ms = DEFAULT_SESSION_TIMEOUT;
|
||||||
operation_timeout_ms = DEFAULT_OPERATION_TIMEOUT;
|
operation_timeout_ms = DEFAULT_OPERATION_TIMEOUT;
|
||||||
|
implementation = "zookeeper";
|
||||||
for (const auto & key : keys)
|
for (const auto & key : keys)
|
||||||
{
|
{
|
||||||
if (startsWith(key, "node"))
|
if (startsWith(key, "node"))
|
||||||
@ -134,6 +150,10 @@ struct ZooKeeperArgs
|
|||||||
{
|
{
|
||||||
chroot = config.getString(config_name + "." + key);
|
chroot = config.getString(config_name + "." + key);
|
||||||
}
|
}
|
||||||
|
else if (key == "implementation")
|
||||||
|
{
|
||||||
|
implementation = config.getString(config_name + "." + key);
|
||||||
|
}
|
||||||
else
|
else
|
||||||
throw KeeperException(std::string("Unknown key ") + key + " in config file", Coordination::ZBADARGUMENTS);
|
throw KeeperException(std::string("Unknown key ") + key + " in config file", Coordination::ZBADARGUMENTS);
|
||||||
}
|
}
|
||||||
@ -163,12 +183,13 @@ struct ZooKeeperArgs
|
|||||||
int session_timeout_ms;
|
int session_timeout_ms;
|
||||||
int operation_timeout_ms;
|
int operation_timeout_ms;
|
||||||
std::string chroot;
|
std::string chroot;
|
||||||
|
std::string implementation;
|
||||||
};
|
};
|
||||||
|
|
||||||
ZooKeeper::ZooKeeper(const Poco::Util::AbstractConfiguration & config, const std::string & config_name)
|
ZooKeeper::ZooKeeper(const Poco::Util::AbstractConfiguration & config, const std::string & config_name)
|
||||||
{
|
{
|
||||||
ZooKeeperArgs args(config, config_name);
|
ZooKeeperArgs args(config, config_name);
|
||||||
init(args.hosts, args.identity, args.session_timeout_ms, args.operation_timeout_ms, args.chroot);
|
init(args.implementation, args.hosts, args.identity, args.session_timeout_ms, args.operation_timeout_ms, args.chroot);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -931,5 +952,4 @@ Coordination::RequestPtr makeCheckRequest(const std::string & path, int version)
|
|||||||
request->version = version;
|
request->version = version;
|
||||||
return request;
|
return request;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -55,7 +55,8 @@ public:
|
|||||||
ZooKeeper(const std::string & hosts, const std::string & identity = "",
|
ZooKeeper(const std::string & hosts, const std::string & identity = "",
|
||||||
int32_t session_timeout_ms = DEFAULT_SESSION_TIMEOUT,
|
int32_t session_timeout_ms = DEFAULT_SESSION_TIMEOUT,
|
||||||
int32_t operation_timeout_ms = DEFAULT_OPERATION_TIMEOUT,
|
int32_t operation_timeout_ms = DEFAULT_OPERATION_TIMEOUT,
|
||||||
const std::string & chroot = "");
|
const std::string & chroot = "",
|
||||||
|
const std::string & implementation = "zookeeper");
|
||||||
|
|
||||||
/** Config of the form:
|
/** Config of the form:
|
||||||
<zookeeper>
|
<zookeeper>
|
||||||
@ -235,7 +236,7 @@ public:
|
|||||||
private:
|
private:
|
||||||
friend class EphemeralNodeHolder;
|
friend class EphemeralNodeHolder;
|
||||||
|
|
||||||
void init(const std::string & hosts_, const std::string & identity_,
|
void init(const std::string & implementation_, const std::string & hosts_, const std::string & identity_,
|
||||||
int32_t session_timeout_ms_, int32_t operation_timeout_ms_, const std::string & chroot_);
|
int32_t session_timeout_ms_, int32_t operation_timeout_ms_, const std::string & chroot_);
|
||||||
|
|
||||||
void removeChildrenRecursive(const std::string & path);
|
void removeChildrenRecursive(const std::string & path);
|
||||||
@ -320,5 +321,4 @@ private:
|
|||||||
};
|
};
|
||||||
|
|
||||||
using EphemeralNodeHolderPtr = EphemeralNodeHolder::Ptr;
|
using EphemeralNodeHolderPtr = EphemeralNodeHolder::Ptr;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -1,16 +1,5 @@
|
|||||||
<yandex>
|
<yandex>
|
||||||
<zookeeper>
|
<zookeeper>
|
||||||
<node>
|
<implementation>testkeeper</implementation>
|
||||||
<host>localhost</host>
|
|
||||||
<port>2181</port>
|
|
||||||
</node>
|
|
||||||
<node>
|
|
||||||
<host>yandex.ru</host>
|
|
||||||
<port>2181</port>
|
|
||||||
</node>
|
|
||||||
<node>
|
|
||||||
<host>111.0.1.2</host>
|
|
||||||
<port>2181</port>
|
|
||||||
</node>
|
|
||||||
</zookeeper>
|
</zookeeper>
|
||||||
</yandex>
|
</yandex>
|
||||||
|
@ -79,6 +79,7 @@ def test_mysql_client(mysql_client, server_address):
|
|||||||
-e "INSERT INTO table1 VALUES (0), (1), (5);"
|
-e "INSERT INTO table1 VALUES (0), (1), (5);"
|
||||||
-e "INSERT INTO table1 VALUES (0), (1), (5);"
|
-e "INSERT INTO table1 VALUES (0), (1), (5);"
|
||||||
-e "SELECT * FROM table1 ORDER BY a;"
|
-e "SELECT * FROM table1 ORDER BY a;"
|
||||||
|
-e "DROP DATABASE x;"
|
||||||
'''.format(host=server_address, port=server_port), demux=True)
|
'''.format(host=server_address, port=server_port), demux=True)
|
||||||
|
|
||||||
assert stdout == 'a\n0\n0\n1\n1\n5\n5\n'
|
assert stdout == 'a\n0\n0\n1\n1\n5\n5\n'
|
||||||
@ -108,9 +109,10 @@ def test_python_client(server_address):
|
|||||||
|
|
||||||
assert exc_info.value.args == (81, "Database system2 doesn't exist")
|
assert exc_info.value.args == (81, "Database system2 doesn't exist")
|
||||||
|
|
||||||
client.select_db('x')
|
|
||||||
cursor = client.cursor(pymysql.cursors.DictCursor)
|
cursor = client.cursor(pymysql.cursors.DictCursor)
|
||||||
cursor.execute("TRUNCATE TABLE table1")
|
cursor.execute('CREATE DATABASE x')
|
||||||
|
client.select_db('x')
|
||||||
|
cursor.execute("CREATE TABLE table1 (a UInt32) ENGINE = Memory")
|
||||||
cursor.execute("INSERT INTO table1 VALUES (1), (3)")
|
cursor.execute("INSERT INTO table1 VALUES (1), (3)")
|
||||||
cursor.execute("INSERT INTO table1 VALUES (1), (4)")
|
cursor.execute("INSERT INTO table1 VALUES (1), (4)")
|
||||||
cursor.execute("SELECT * FROM table1 ORDER BY a")
|
cursor.execute("SELECT * FROM table1 ORDER BY a")
|
||||||
|
@ -1,9 +1,5 @@
|
|||||||
<test>
|
<test>
|
||||||
<name>Benchmark</name>
|
<name>columns_hashing</name>
|
||||||
|
|
||||||
<tags>
|
|
||||||
<tag>columns_hashing</tag>
|
|
||||||
</tags>
|
|
||||||
|
|
||||||
<preconditions>
|
<preconditions>
|
||||||
<table_exists>hits_100m_single</table_exists>
|
<table_exists>hits_100m_single</table_exists>
|
||||||
@ -25,12 +21,12 @@
|
|||||||
|
|
||||||
<!--
|
<!--
|
||||||
<query><![CDATA[select count() from hits_100m_single any left join hits_100m_single using (UserID, RegionID)]]></query>
|
<query><![CDATA[select count() from hits_100m_single any left join hits_100m_single using (UserID, RegionID)]]></query>
|
||||||
<query><![CDATA[select count() from hits_100m_single any left join hits_100m_single using (UserID)]]></query>
|
<query><![CDATA[select count() from hits_100m_single any left join hits_100m_single using UserID]]></query>
|
||||||
<query><![CDATA[select count() from hits_100m_single any left join hits_100m_single using URL where URL != '']]></query>
|
<query><![CDATA[select count() from hits_100m_single any left join hits_100m_single using URL where URL != '']]></query>
|
||||||
<query><![CDATA[select count() from hits_1000m_single any left join hits_1000m_single using MobilePhoneModel where MobilePhoneModel != '']]></query>
|
<query><![CDATA[select count() from hits_1000m_single any left join hits_1000m_single using MobilePhoneModel where MobilePhoneModel != '']]></query>
|
||||||
<query><![CDATA[select count() from hits_100m_single any left join hits_100m_single using (MobilePhoneModel, UserID) where MobilePhoneModel != '']]></query>
|
<query><![CDATA[select count() from hits_100m_single any left join hits_100m_single using (MobilePhoneModel, UserID) where MobilePhoneModel != '']]></query>
|
||||||
|
|
||||||
<query><![CDATA[select count() from (select count() from hits_1000m_single group by (UserID))]]></query>
|
<query><![CDATA[select count() from (select count() from hits_1000m_single group by UserID)]]></query>
|
||||||
<query><![CDATA[select count() from (select count() from hits_100m_single group by (UserID, RegionID))]]></query>
|
<query><![CDATA[select count() from (select count() from hits_100m_single group by (UserID, RegionID))]]></query>
|
||||||
<query><![CDATA[select count() from (select count() from hits_100m_single where URL != '' group by URL)]]></query>
|
<query><![CDATA[select count() from (select count() from hits_100m_single where URL != '' group by URL)]]></query>
|
||||||
<query><![CDATA[select count() from (select count() from hits_1000m_single where MobilePhoneModel != '' group by MobilePhoneModel)]]></query>
|
<query><![CDATA[select count() from (select count() from hits_1000m_single where MobilePhoneModel != '' group by MobilePhoneModel)]]></query>
|
||||||
|
@ -11,6 +11,10 @@
|
|||||||
|
|
||||||
<type>loop</type>
|
<type>loop</type>
|
||||||
|
|
||||||
|
<settings>
|
||||||
|
<max_memory_usage>20000000000</max_memory_usage>
|
||||||
|
</settings>
|
||||||
|
|
||||||
<stop_conditions>
|
<stop_conditions>
|
||||||
<all_of>
|
<all_of>
|
||||||
<iterations>5</iterations>
|
<iterations>5</iterations>
|
||||||
@ -21,6 +25,7 @@
|
|||||||
<total_time_ms>60000</total_time_ms>
|
<total_time_ms>60000</total_time_ms>
|
||||||
</any_of>
|
</any_of>
|
||||||
</stop_conditions>
|
</stop_conditions>
|
||||||
|
|
||||||
<query>SELECT DISTINCT URL,Title, ngramDistance(Title, URL) AS distance FROM hits_100m_single ORDER BY distance ASC LIMIT 50</query>
|
<query>SELECT DISTINCT URL,Title, ngramDistance(Title, URL) AS distance FROM hits_100m_single ORDER BY distance ASC LIMIT 50</query>
|
||||||
<query>SELECT DISTINCT SearchPhrase,Title, ngramDistance(Title, SearchPhrase) AS distance FROM hits_100m_single ORDER BY distance ASC LIMIT 50</query>
|
<query>SELECT DISTINCT SearchPhrase,Title, ngramDistance(Title, SearchPhrase) AS distance FROM hits_100m_single ORDER BY distance ASC LIMIT 50</query>
|
||||||
<query>SELECT DISTINCT Title, ngramDistance(Title, 'what is love') AS distance FROM hits_100m_single ORDER BY distance ASC LIMIT 50</query>
|
<query>SELECT DISTINCT Title, ngramDistance(Title, 'what is love') AS distance FROM hits_100m_single ORDER BY distance ASC LIMIT 50</query>
|
||||||
|
@ -2,7 +2,7 @@
|
|||||||
<name>number_formatting_formats</name>
|
<name>number_formatting_formats</name>
|
||||||
<type>loop</type>
|
<type>loop</type>
|
||||||
|
|
||||||
<create_query>CREATE TABLE IF NOT EXISTS table_{format} (x UInt64) ENGINE = File({format})</create_query>
|
<create_query>CREATE TABLE IF NOT EXISTS table_{format} (x UInt64) ENGINE = File(`{format}`)</create_query>
|
||||||
|
|
||||||
<stop_conditions>
|
<stop_conditions>
|
||||||
<all_of>
|
<all_of>
|
||||||
|
Loading…
Reference in New Issue
Block a user