mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-10 09:32:06 +00:00
Merge pull request #26129 from ClickHouse/system_zookeeper_log
Add system.zookeeper_log table
This commit is contained in:
commit
1277db4435
@ -143,6 +143,7 @@ if [[ -n "$WITH_COVERAGE" ]] && [[ "$WITH_COVERAGE" -eq 1 ]]; then
|
||||
fi
|
||||
tar -chf /test_output/text_log_dump.tar /var/lib/clickhouse/data/system/text_log ||:
|
||||
tar -chf /test_output/query_log_dump.tar /var/lib/clickhouse/data/system/query_log ||:
|
||||
tar -chf /test_output/zookeeper_log_dump.tar /var/lib/clickhouse/data/system/zookeeper_log ||:
|
||||
tar -chf /test_output/coordination.tar /var/lib/clickhouse/coordination ||:
|
||||
|
||||
if [[ -n "$USE_DATABASE_REPLICATED" ]] && [[ "$USE_DATABASE_REPLICATED" -eq 1 ]]; then
|
||||
@ -152,6 +153,8 @@ if [[ -n "$USE_DATABASE_REPLICATED" ]] && [[ "$USE_DATABASE_REPLICATED" -eq 1 ]]
|
||||
pigz < /var/log/clickhouse-server/clickhouse-server2.log > /test_output/clickhouse-server2.log.gz ||:
|
||||
mv /var/log/clickhouse-server/stderr1.log /test_output/ ||:
|
||||
mv /var/log/clickhouse-server/stderr2.log /test_output/ ||:
|
||||
tar -chf /test_output/zookeeper_log_dump1.tar /var/lib/clickhouse1/data/system/zookeeper_log ||:
|
||||
tar -chf /test_output/zookeeper_log_dump2.tar /var/lib/clickhouse2/data/system/zookeeper_log ||:
|
||||
tar -chf /test_output/coordination1.tar /var/lib/clickhouse1/coordination ||:
|
||||
tar -chf /test_output/coordination2.tar /var/lib/clickhouse2/coordination ||:
|
||||
fi
|
||||
|
@ -33,7 +33,7 @@ static std::string extractFromConfig(
|
||||
{
|
||||
DB::ConfigurationPtr bootstrap_configuration(new Poco::Util::XMLConfiguration(config_xml));
|
||||
zkutil::ZooKeeperPtr zookeeper = std::make_shared<zkutil::ZooKeeper>(
|
||||
*bootstrap_configuration, "zookeeper");
|
||||
*bootstrap_configuration, "zookeeper", nullptr);
|
||||
zkutil::ZooKeeperNodeCache zk_node_cache([&] { return zookeeper; });
|
||||
config_xml = processor.processConfig(&has_zk_includes, &zk_node_cache);
|
||||
}
|
||||
|
@ -111,7 +111,8 @@ void ZooKeeper::init(const std::string & implementation_, const Strings & hosts_
|
||||
identity_,
|
||||
Poco::Timespan(0, session_timeout_ms_ * 1000),
|
||||
Poco::Timespan(0, ZOOKEEPER_CONNECTION_TIMEOUT_MS * 1000),
|
||||
Poco::Timespan(0, operation_timeout_ms_ * 1000));
|
||||
Poco::Timespan(0, operation_timeout_ms_ * 1000),
|
||||
zk_log);
|
||||
|
||||
if (chroot.empty())
|
||||
LOG_TRACE(log, "Initialized, hosts: {}", fmt::join(hosts, ","));
|
||||
@ -209,7 +210,8 @@ struct ZooKeeperArgs
|
||||
std::string implementation;
|
||||
};
|
||||
|
||||
ZooKeeper::ZooKeeper(const Poco::Util::AbstractConfiguration & config, const std::string & config_name)
|
||||
ZooKeeper::ZooKeeper(const Poco::Util::AbstractConfiguration & config, const std::string & config_name, std::shared_ptr<DB::ZooKeeperLog> zk_log_)
|
||||
: zk_log(std::move(zk_log_))
|
||||
{
|
||||
ZooKeeperArgs args(config, config_name);
|
||||
init(args.implementation, args.hosts, args.identity, args.session_timeout_ms, args.operation_timeout_ms, args.chroot);
|
||||
|
@ -25,6 +25,10 @@ namespace CurrentMetrics
|
||||
extern const Metric EphemeralNode;
|
||||
}
|
||||
|
||||
namespace DB
|
||||
{
|
||||
class ZooKeeperLog;
|
||||
}
|
||||
|
||||
namespace zkutil
|
||||
{
|
||||
@ -82,7 +86,7 @@ public:
|
||||
<identity>user:password</identity>
|
||||
</zookeeper>
|
||||
*/
|
||||
ZooKeeper(const Poco::Util::AbstractConfiguration & config, const std::string & config_name);
|
||||
ZooKeeper(const Poco::Util::AbstractConfiguration & config, const std::string & config_name, std::shared_ptr<DB::ZooKeeperLog> zk_log_);
|
||||
|
||||
/// Creates a new session with the same parameters. This method can be used for reconnecting
|
||||
/// after the session has expired.
|
||||
@ -298,6 +302,7 @@ private:
|
||||
std::mutex mutex;
|
||||
|
||||
Poco::Logger * log = nullptr;
|
||||
std::shared_ptr<DB::ZooKeeperLog> zk_log;
|
||||
};
|
||||
|
||||
|
||||
|
@ -537,6 +537,139 @@ void ZooKeeperSessionIDResponse::writeImpl(WriteBuffer & out) const
|
||||
Coordination::write(server_id, out);
|
||||
}
|
||||
|
||||
|
||||
void ZooKeeperRequest::createLogElements(LogElements & elems) const
|
||||
{
|
||||
elems.emplace_back();
|
||||
auto & elem = elems.back();
|
||||
elem.xid = xid;
|
||||
elem.has_watch = has_watch;
|
||||
elem.op_num = static_cast<uint32_t>(getOpNum());
|
||||
elem.path = getPath();
|
||||
elem.request_idx = elems.size() - 1;
|
||||
}
|
||||
|
||||
|
||||
void ZooKeeperCreateRequest::createLogElements(LogElements & elems) const
|
||||
{
|
||||
ZooKeeperRequest::createLogElements(elems);
|
||||
auto & elem = elems.back();
|
||||
elem.data = data;
|
||||
elem.is_ephemeral = is_ephemeral;
|
||||
elem.is_sequential = is_sequential;
|
||||
}
|
||||
|
||||
void ZooKeeperRemoveRequest::createLogElements(LogElements & elems) const
|
||||
{
|
||||
ZooKeeperRequest::createLogElements(elems);
|
||||
auto & elem = elems.back();
|
||||
elem.version = version;
|
||||
}
|
||||
|
||||
void ZooKeeperSetRequest::createLogElements(LogElements & elems) const
|
||||
{
|
||||
ZooKeeperRequest::createLogElements(elems);
|
||||
auto & elem = elems.back();
|
||||
elem.data = data;
|
||||
elem.version = version;
|
||||
}
|
||||
|
||||
void ZooKeeperCheckRequest::createLogElements(LogElements & elems) const
|
||||
{
|
||||
ZooKeeperRequest::createLogElements(elems);
|
||||
auto & elem = elems.back();
|
||||
elem.version = version;
|
||||
}
|
||||
|
||||
void ZooKeeperMultiRequest::createLogElements(LogElements & elems) const
|
||||
{
|
||||
ZooKeeperRequest::createLogElements(elems);
|
||||
elems.back().requests_size = requests.size();
|
||||
for (const auto & request : requests)
|
||||
{
|
||||
auto & req = dynamic_cast<ZooKeeperRequest &>(*request);
|
||||
assert(!req.xid || req.xid == xid);
|
||||
req.createLogElements(elems);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void ZooKeeperResponse::fillLogElements(LogElements & elems, size_t idx) const
|
||||
{
|
||||
auto & elem = elems[idx];
|
||||
assert(!elem.xid || elem.xid == xid);
|
||||
elem.xid = xid;
|
||||
int32_t response_op = tryGetOpNum();
|
||||
assert(!elem.op_num || elem.op_num == response_op || response_op < 0);
|
||||
elem.op_num = response_op;
|
||||
|
||||
elem.zxid = zxid;
|
||||
elem.error = static_cast<Int32>(error);
|
||||
}
|
||||
|
||||
void ZooKeeperWatchResponse::fillLogElements(LogElements & elems, size_t idx) const
|
||||
{
|
||||
ZooKeeperResponse::fillLogElements(elems, idx);
|
||||
auto & elem = elems[idx];
|
||||
elem.watch_type = type;
|
||||
elem.watch_state = state;
|
||||
elem.path = path;
|
||||
}
|
||||
|
||||
void ZooKeeperCreateResponse::fillLogElements(LogElements & elems, size_t idx) const
|
||||
{
|
||||
ZooKeeperResponse::fillLogElements(elems, idx);
|
||||
auto & elem = elems[idx];
|
||||
elem.path_created = path_created;
|
||||
}
|
||||
|
||||
void ZooKeeperExistsResponse::fillLogElements(LogElements & elems, size_t idx) const
|
||||
{
|
||||
ZooKeeperResponse::fillLogElements(elems, idx);
|
||||
auto & elem = elems[idx];
|
||||
elem.stat = stat;
|
||||
}
|
||||
|
||||
void ZooKeeperGetResponse::fillLogElements(LogElements & elems, size_t idx) const
|
||||
{
|
||||
ZooKeeperResponse::fillLogElements(elems, idx);
|
||||
auto & elem = elems[idx];
|
||||
elem.data = data;
|
||||
elem.stat = stat;
|
||||
}
|
||||
|
||||
void ZooKeeperSetResponse::fillLogElements(LogElements & elems, size_t idx) const
|
||||
{
|
||||
ZooKeeperResponse::fillLogElements(elems, idx);
|
||||
auto & elem = elems[idx];
|
||||
elem.stat = stat;
|
||||
}
|
||||
|
||||
void ZooKeeperListResponse::fillLogElements(LogElements & elems, size_t idx) const
|
||||
{
|
||||
ZooKeeperResponse::fillLogElements(elems, idx);
|
||||
auto & elem = elems[idx];
|
||||
elem.stat = stat;
|
||||
elem.children = names;
|
||||
}
|
||||
|
||||
void ZooKeeperMultiResponse::fillLogElements(LogElements & elems, size_t idx) const
|
||||
{
|
||||
assert(idx == 0);
|
||||
assert(elems.size() == responses.size() + 1);
|
||||
ZooKeeperResponse::fillLogElements(elems, idx);
|
||||
for (const auto & response : responses)
|
||||
{
|
||||
auto & resp = dynamic_cast<ZooKeeperResponse &>(*response);
|
||||
assert(!resp.xid || resp.xid == xid);
|
||||
assert(!resp.zxid || resp.zxid == zxid);
|
||||
resp.xid = xid;
|
||||
resp.zxid = zxid;
|
||||
resp.fillLogElements(elems, ++idx);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void ZooKeeperRequestFactory::registerRequest(OpNum op_num, Creator creator)
|
||||
{
|
||||
if (!op_num_to_request.try_emplace(op_num, creator).second)
|
||||
|
@ -2,6 +2,7 @@
|
||||
|
||||
#include <Common/ZooKeeper/IKeeper.h>
|
||||
#include <Common/ZooKeeper/ZooKeeperConstants.h>
|
||||
#include <Interpreters/ZooKeeperLog.h>
|
||||
|
||||
#include <boost/noncopyable.hpp>
|
||||
#include <IO/ReadBuffer.h>
|
||||
@ -22,6 +23,8 @@
|
||||
namespace Coordination
|
||||
{
|
||||
|
||||
using LogElements = std::vector<ZooKeeperLogElement>;
|
||||
|
||||
struct ZooKeeperResponse : virtual Response
|
||||
{
|
||||
XID xid = 0;
|
||||
@ -32,6 +35,8 @@ struct ZooKeeperResponse : virtual Response
|
||||
virtual void writeImpl(WriteBuffer &) const = 0;
|
||||
virtual void write(WriteBuffer & out) const;
|
||||
virtual OpNum getOpNum() const = 0;
|
||||
virtual void fillLogElements(LogElements & elems, size_t idx) const;
|
||||
virtual int32_t tryGetOpNum() const { return static_cast<int32_t>(getOpNum()); }
|
||||
};
|
||||
|
||||
using ZooKeeperResponsePtr = std::shared_ptr<ZooKeeperResponse>;
|
||||
@ -63,6 +68,8 @@ struct ZooKeeperRequest : virtual Request
|
||||
|
||||
virtual ZooKeeperResponsePtr makeResponse() const = 0;
|
||||
virtual bool isReadRequest() const = 0;
|
||||
|
||||
virtual void createLogElements(LogElements & elems) const;
|
||||
};
|
||||
|
||||
using ZooKeeperRequestPtr = std::shared_ptr<ZooKeeperRequest>;
|
||||
@ -119,6 +126,9 @@ struct ZooKeeperWatchResponse final : WatchResponse, ZooKeeperResponse
|
||||
{
|
||||
throw Exception("OpNum for watch response doesn't exist", Error::ZRUNTIMEINCONSISTENCY);
|
||||
}
|
||||
|
||||
void fillLogElements(LogElements & elems, size_t idx) const override;
|
||||
int32_t tryGetOpNum() const override { return 0; }
|
||||
};
|
||||
|
||||
struct ZooKeeperAuthRequest final : ZooKeeperRequest
|
||||
@ -188,6 +198,8 @@ struct ZooKeeperCreateRequest final : public CreateRequest, ZooKeeperRequest
|
||||
bool isReadRequest() const override { return false; }
|
||||
|
||||
size_t bytesSize() const override { return CreateRequest::bytesSize() + sizeof(xid) + sizeof(has_watch); }
|
||||
|
||||
void createLogElements(LogElements & elems) const override;
|
||||
};
|
||||
|
||||
struct ZooKeeperCreateResponse final : CreateResponse, ZooKeeperResponse
|
||||
@ -199,6 +211,8 @@ struct ZooKeeperCreateResponse final : CreateResponse, ZooKeeperResponse
|
||||
OpNum getOpNum() const override { return OpNum::Create; }
|
||||
|
||||
size_t bytesSize() const override { return CreateResponse::bytesSize() + sizeof(xid) + sizeof(zxid); }
|
||||
|
||||
void fillLogElements(LogElements & elems, size_t idx) const override;
|
||||
};
|
||||
|
||||
struct ZooKeeperRemoveRequest final : RemoveRequest, ZooKeeperRequest
|
||||
@ -214,6 +228,8 @@ struct ZooKeeperRemoveRequest final : RemoveRequest, ZooKeeperRequest
|
||||
bool isReadRequest() const override { return false; }
|
||||
|
||||
size_t bytesSize() const override { return RemoveRequest::bytesSize() + sizeof(xid); }
|
||||
|
||||
void createLogElements(LogElements & elems) const override;
|
||||
};
|
||||
|
||||
struct ZooKeeperRemoveResponse final : RemoveResponse, ZooKeeperResponse
|
||||
@ -244,6 +260,8 @@ struct ZooKeeperExistsResponse final : ExistsResponse, ZooKeeperResponse
|
||||
OpNum getOpNum() const override { return OpNum::Exists; }
|
||||
|
||||
size_t bytesSize() const override { return ExistsResponse::bytesSize() + sizeof(xid) + sizeof(zxid); }
|
||||
|
||||
void fillLogElements(LogElements & elems, size_t idx) const override;
|
||||
};
|
||||
|
||||
struct ZooKeeperGetRequest final : GetRequest, ZooKeeperRequest
|
||||
@ -265,6 +283,8 @@ struct ZooKeeperGetResponse final : GetResponse, ZooKeeperResponse
|
||||
OpNum getOpNum() const override { return OpNum::Get; }
|
||||
|
||||
size_t bytesSize() const override { return GetResponse::bytesSize() + sizeof(xid) + sizeof(zxid); }
|
||||
|
||||
void fillLogElements(LogElements & elems, size_t idx) const override;
|
||||
};
|
||||
|
||||
struct ZooKeeperSetRequest final : SetRequest, ZooKeeperRequest
|
||||
@ -279,6 +299,8 @@ struct ZooKeeperSetRequest final : SetRequest, ZooKeeperRequest
|
||||
bool isReadRequest() const override { return false; }
|
||||
|
||||
size_t bytesSize() const override { return SetRequest::bytesSize() + sizeof(xid); }
|
||||
|
||||
void createLogElements(LogElements & elems) const override;
|
||||
};
|
||||
|
||||
struct ZooKeeperSetResponse final : SetResponse, ZooKeeperResponse
|
||||
@ -288,6 +310,8 @@ struct ZooKeeperSetResponse final : SetResponse, ZooKeeperResponse
|
||||
OpNum getOpNum() const override { return OpNum::Set; }
|
||||
|
||||
size_t bytesSize() const override { return SetResponse::bytesSize() + sizeof(xid) + sizeof(zxid); }
|
||||
|
||||
void fillLogElements(LogElements & elems, size_t idx) const override;
|
||||
};
|
||||
|
||||
struct ZooKeeperListRequest : ListRequest, ZooKeeperRequest
|
||||
@ -313,6 +337,8 @@ struct ZooKeeperListResponse : ListResponse, ZooKeeperResponse
|
||||
OpNum getOpNum() const override { return OpNum::List; }
|
||||
|
||||
size_t bytesSize() const override { return ListResponse::bytesSize() + sizeof(xid) + sizeof(zxid); }
|
||||
|
||||
void fillLogElements(LogElements & elems, size_t idx) const override;
|
||||
};
|
||||
|
||||
struct ZooKeeperSimpleListResponse final : ZooKeeperListResponse
|
||||
@ -333,6 +359,8 @@ struct ZooKeeperCheckRequest final : CheckRequest, ZooKeeperRequest
|
||||
bool isReadRequest() const override { return true; }
|
||||
|
||||
size_t bytesSize() const override { return CheckRequest::bytesSize() + sizeof(xid) + sizeof(has_watch); }
|
||||
|
||||
void createLogElements(LogElements & elems) const override;
|
||||
};
|
||||
|
||||
struct ZooKeeperCheckResponse final : CheckResponse, ZooKeeperResponse
|
||||
@ -409,6 +437,8 @@ struct ZooKeeperMultiRequest final : MultiRequest, ZooKeeperRequest
|
||||
bool isReadRequest() const override;
|
||||
|
||||
size_t bytesSize() const override { return MultiRequest::bytesSize() + sizeof(xid) + sizeof(has_watch); }
|
||||
|
||||
void createLogElements(LogElements & elems) const override;
|
||||
};
|
||||
|
||||
struct ZooKeeperMultiResponse final : MultiResponse, ZooKeeperResponse
|
||||
@ -433,6 +463,8 @@ struct ZooKeeperMultiResponse final : MultiResponse, ZooKeeperResponse
|
||||
void writeImpl(WriteBuffer & out) const override;
|
||||
|
||||
size_t bytesSize() const override { return MultiResponse::bytesSize() + sizeof(xid) + sizeof(zxid); }
|
||||
|
||||
void fillLogElements(LogElements & elems, size_t idx) const override;
|
||||
};
|
||||
|
||||
/// Fake internal coordination (keeper) response. Never received from client
|
||||
|
@ -311,10 +311,12 @@ ZooKeeper::ZooKeeper(
|
||||
const String & auth_data,
|
||||
Poco::Timespan session_timeout_,
|
||||
Poco::Timespan connection_timeout,
|
||||
Poco::Timespan operation_timeout_)
|
||||
Poco::Timespan operation_timeout_,
|
||||
std::shared_ptr<ZooKeeperLog> zk_log_)
|
||||
: root_path(root_path_),
|
||||
session_timeout(session_timeout_),
|
||||
operation_timeout(std::min(operation_timeout_, session_timeout_))
|
||||
operation_timeout(std::min(operation_timeout_, session_timeout_)),
|
||||
zk_log(std::move(zk_log_))
|
||||
{
|
||||
if (!root_path.empty())
|
||||
{
|
||||
@ -578,6 +580,8 @@ void ZooKeeper::sendThread()
|
||||
info.request->probably_sent = true;
|
||||
info.request->write(*out);
|
||||
|
||||
logOperationIfNeeded(info.request);
|
||||
|
||||
/// We sent close request, exit
|
||||
if (info.request->xid == CLOSE_XID)
|
||||
break;
|
||||
@ -747,6 +751,9 @@ void ZooKeeper::receiveEvent()
|
||||
if (!response)
|
||||
response = request_info.request->makeResponse();
|
||||
|
||||
response->xid = xid;
|
||||
response->zxid = zxid;
|
||||
|
||||
if (err != Error::ZOK)
|
||||
{
|
||||
response->error = err;
|
||||
@ -785,6 +792,8 @@ void ZooKeeper::receiveEvent()
|
||||
int32_t actual_length = in->count() - count_before_event;
|
||||
if (length != actual_length)
|
||||
throw Exception("Response length doesn't match. Expected: " + DB::toString(length) + ", actual: " + DB::toString(actual_length), Error::ZMARSHALLINGERROR);
|
||||
|
||||
logOperationIfNeeded(request_info.request, response); //-V614
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
@ -802,6 +811,8 @@ void ZooKeeper::receiveEvent()
|
||||
{
|
||||
if (request_info.callback)
|
||||
request_info.callback(*response);
|
||||
|
||||
logOperationIfNeeded(request_info.request, response);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
@ -880,17 +891,19 @@ void ZooKeeper::finalize(bool error_send, bool error_receive)
|
||||
for (auto & op : operations)
|
||||
{
|
||||
RequestInfo & request_info = op.second;
|
||||
ResponsePtr response = request_info.request->makeResponse();
|
||||
ZooKeeperResponsePtr response = request_info.request->makeResponse();
|
||||
|
||||
response->error = request_info.request->probably_sent
|
||||
? Error::ZCONNECTIONLOSS
|
||||
: Error::ZSESSIONEXPIRED;
|
||||
response->xid = request_info.request->xid;
|
||||
|
||||
if (request_info.callback)
|
||||
{
|
||||
try
|
||||
{
|
||||
request_info.callback(*response);
|
||||
logOperationIfNeeded(request_info.request, response, true);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
@ -942,13 +955,15 @@ void ZooKeeper::finalize(bool error_send, bool error_receive)
|
||||
{
|
||||
if (info.callback)
|
||||
{
|
||||
ResponsePtr response = info.request->makeResponse();
|
||||
ZooKeeperResponsePtr response = info.request->makeResponse();
|
||||
if (response)
|
||||
{
|
||||
response->error = Error::ZSESSIONEXPIRED;
|
||||
response->xid = info.request->xid;
|
||||
try
|
||||
{
|
||||
info.callback(*response);
|
||||
logOperationIfNeeded(info.request, response, true);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
@ -993,6 +1008,12 @@ void ZooKeeper::pushRequest(RequestInfo && info)
|
||||
throw Exception("xid equal to close_xid", Error::ZSESSIONEXPIRED);
|
||||
if (info.request->xid < 0)
|
||||
throw Exception("XID overflow", Error::ZSESSIONEXPIRED);
|
||||
|
||||
if (auto * multi_request = dynamic_cast<ZooKeeperMultiRequest *>(info.request.get()))
|
||||
{
|
||||
for (auto & request : multi_request->requests)
|
||||
dynamic_cast<ZooKeeperRequest &>(*request).xid = multi_request->xid;
|
||||
}
|
||||
}
|
||||
|
||||
/// We must serialize 'pushRequest' and 'finalize' (from sendThread, receiveThread) calls
|
||||
@ -1190,4 +1211,46 @@ void ZooKeeper::close()
|
||||
ProfileEvents::increment(ProfileEvents::ZooKeeperClose);
|
||||
}
|
||||
|
||||
|
||||
void ZooKeeper::logOperationIfNeeded(const ZooKeeperRequestPtr & request, const ZooKeeperResponsePtr & response, bool finalize)
|
||||
{
|
||||
if (!zk_log)
|
||||
return;
|
||||
|
||||
ZooKeeperLogElement::Type log_type = ZooKeeperLogElement::UNKNOWN;
|
||||
Decimal64 event_time = std::chrono::duration_cast<std::chrono::microseconds>(
|
||||
std::chrono::system_clock::now().time_since_epoch()
|
||||
).count();
|
||||
LogElements elems;
|
||||
if (request)
|
||||
{
|
||||
request->createLogElements(elems);
|
||||
log_type = ZooKeeperLogElement::REQUEST;
|
||||
}
|
||||
else
|
||||
{
|
||||
assert(response);
|
||||
assert(response->xid == PING_XID || response->xid == WATCH_XID);
|
||||
elems.emplace_back();
|
||||
}
|
||||
|
||||
if (response)
|
||||
{
|
||||
response->fillLogElements(elems, 0);
|
||||
log_type = ZooKeeperLogElement::RESPONSE;
|
||||
}
|
||||
|
||||
if (finalize)
|
||||
log_type = ZooKeeperLogElement::FINALIZE;
|
||||
|
||||
for (auto & elem : elems)
|
||||
{
|
||||
elem.type = log_type;
|
||||
elem.event_time = event_time;
|
||||
elem.address = socket.peerAddress();
|
||||
elem.session_id = session_id;
|
||||
zk_log->add(elem);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -80,6 +80,10 @@ namespace CurrentMetrics
|
||||
extern const Metric ZooKeeperSession;
|
||||
}
|
||||
|
||||
namespace DB
|
||||
{
|
||||
class ZooKeeperLog;
|
||||
}
|
||||
|
||||
namespace Coordination
|
||||
{
|
||||
@ -110,7 +114,8 @@ public:
|
||||
const String & auth_data,
|
||||
Poco::Timespan session_timeout_,
|
||||
Poco::Timespan connection_timeout,
|
||||
Poco::Timespan operation_timeout_);
|
||||
Poco::Timespan operation_timeout_,
|
||||
std::shared_ptr<ZooKeeperLog> zk_log_);
|
||||
|
||||
~ZooKeeper() override;
|
||||
|
||||
@ -258,7 +263,10 @@ private:
|
||||
template <typename T>
|
||||
void read(T &);
|
||||
|
||||
void logOperationIfNeeded(const ZooKeeperRequestPtr & request, const ZooKeeperResponsePtr & response = nullptr, bool finalize = false);
|
||||
|
||||
CurrentMetrics::Increment active_session_metric_increment{CurrentMetrics::ZooKeeperSession};
|
||||
std::shared_ptr<ZooKeeperLog> zk_log;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -25,7 +25,7 @@ int main(int argc, char ** argv)
|
||||
|
||||
DB::ConfigProcessor processor(argv[1], false, true);
|
||||
auto config = processor.loadConfig().configuration;
|
||||
zkutil::ZooKeeper zk(*config, "zookeeper");
|
||||
zkutil::ZooKeeper zk(*config, "zookeeper", nullptr);
|
||||
zkutil::EventPtr watch = std::make_shared<Poco::Event>();
|
||||
|
||||
/// NOTE: setting watches in multiple threads because doing it in a single thread is too slow.
|
||||
|
@ -40,7 +40,7 @@ try
|
||||
}
|
||||
|
||||
|
||||
ZooKeeper zk(nodes, {}, {}, {}, {5, 0}, {0, 50000}, {0, 50000});
|
||||
ZooKeeper zk(nodes, {}, {}, {}, {5, 0}, {0, 50000}, {0, 50000}, nullptr);
|
||||
|
||||
Poco::Event event(true);
|
||||
|
||||
|
@ -5,7 +5,7 @@
|
||||
int main()
|
||||
try
|
||||
{
|
||||
Coordination::ZooKeeper zookeeper({Coordination::ZooKeeper::Node{Poco::Net::SocketAddress{"localhost:2181"}, false}}, "", "", "", {30, 0}, {0, 50000}, {0, 50000});
|
||||
Coordination::ZooKeeper zookeeper({Coordination::ZooKeeper::Node{Poco::Net::SocketAddress{"localhost:2181"}, false}}, "", "", "", {30, 0}, {0, 50000}, {0, 50000}, nullptr);
|
||||
|
||||
zookeeper.create("/test", "hello", false, false, {}, [](const Coordination::CreateResponse & response)
|
||||
{
|
||||
|
@ -31,10 +31,6 @@ SRCS(
|
||||
MySQL/PacketsProtocolText.cpp
|
||||
MySQL/PacketsReplication.cpp
|
||||
NamesAndTypes.cpp
|
||||
PostgreSQL/Connection.cpp
|
||||
PostgreSQL/PoolWithFailover.cpp
|
||||
PostgreSQL/Utils.cpp
|
||||
PostgreSQL/insertPostgreSQLValue.cpp
|
||||
PostgreSQLProtocol.cpp
|
||||
QueryProcessingStage.cpp
|
||||
Settings.cpp
|
||||
|
@ -1745,7 +1745,7 @@ zkutil::ZooKeeperPtr Context::getZooKeeper() const
|
||||
|
||||
const auto & config = shared->zookeeper_config ? *shared->zookeeper_config : getConfigRef();
|
||||
if (!shared->zookeeper)
|
||||
shared->zookeeper = std::make_shared<zkutil::ZooKeeper>(config, "zookeeper");
|
||||
shared->zookeeper = std::make_shared<zkutil::ZooKeeper>(config, "zookeeper", getZooKeeperLog());
|
||||
else if (shared->zookeeper->expired())
|
||||
shared->zookeeper = shared->zookeeper->startNewSession();
|
||||
|
||||
@ -1809,8 +1809,8 @@ zkutil::ZooKeeperPtr Context::getAuxiliaryZooKeeper(const String & name) const
|
||||
"config.xml",
|
||||
name);
|
||||
|
||||
zookeeper
|
||||
= shared->auxiliary_zookeepers.emplace(name, std::make_shared<zkutil::ZooKeeper>(config, "auxiliary_zookeepers." + name)).first;
|
||||
zookeeper = shared->auxiliary_zookeepers.emplace(name,
|
||||
std::make_shared<zkutil::ZooKeeper>(config, "auxiliary_zookeepers." + name, getZooKeeperLog())).first;
|
||||
}
|
||||
else if (zookeeper->second->expired())
|
||||
zookeeper->second = zookeeper->second->startNewSession();
|
||||
@ -1824,14 +1824,15 @@ void Context::resetZooKeeper() const
|
||||
shared->zookeeper.reset();
|
||||
}
|
||||
|
||||
static void reloadZooKeeperIfChangedImpl(const ConfigurationPtr & config, const std::string & config_name, zkutil::ZooKeeperPtr & zk)
|
||||
static void reloadZooKeeperIfChangedImpl(const ConfigurationPtr & config, const std::string & config_name, zkutil::ZooKeeperPtr & zk,
|
||||
std::shared_ptr<ZooKeeperLog> zk_log)
|
||||
{
|
||||
if (!zk || zk->configChanged(*config, config_name))
|
||||
{
|
||||
if (zk)
|
||||
zk->finalize();
|
||||
|
||||
zk = std::make_shared<zkutil::ZooKeeper>(*config, config_name);
|
||||
zk = std::make_shared<zkutil::ZooKeeper>(*config, config_name, std::move(zk_log));
|
||||
}
|
||||
}
|
||||
|
||||
@ -1839,7 +1840,7 @@ void Context::reloadZooKeeperIfChanged(const ConfigurationPtr & config) const
|
||||
{
|
||||
std::lock_guard lock(shared->zookeeper_mutex);
|
||||
shared->zookeeper_config = config;
|
||||
reloadZooKeeperIfChangedImpl(config, "zookeeper", shared->zookeeper);
|
||||
reloadZooKeeperIfChangedImpl(config, "zookeeper", shared->zookeeper, getZooKeeperLog());
|
||||
}
|
||||
|
||||
void Context::reloadAuxiliaryZooKeepersConfigIfChanged(const ConfigurationPtr & config)
|
||||
@ -1854,7 +1855,7 @@ void Context::reloadAuxiliaryZooKeepersConfigIfChanged(const ConfigurationPtr &
|
||||
it = shared->auxiliary_zookeepers.erase(it);
|
||||
else
|
||||
{
|
||||
reloadZooKeeperIfChangedImpl(config, "auxiliary_zookeepers." + it->first, it->second);
|
||||
reloadZooKeeperIfChangedImpl(config, "auxiliary_zookeepers." + it->first, it->second, getZooKeeperLog());
|
||||
++it;
|
||||
}
|
||||
}
|
||||
@ -2137,6 +2138,17 @@ std::shared_ptr<OpenTelemetrySpanLog> Context::getOpenTelemetrySpanLog() const
|
||||
}
|
||||
|
||||
|
||||
std::shared_ptr<ZooKeeperLog> Context::getZooKeeperLog() const
|
||||
{
|
||||
auto lock = getLock();
|
||||
|
||||
if (!shared->system_logs)
|
||||
return {};
|
||||
|
||||
return shared->system_logs->zookeeper_log;
|
||||
}
|
||||
|
||||
|
||||
CompressionCodecPtr Context::chooseCompressionCodec(size_t part_size, double part_size_ratio) const
|
||||
{
|
||||
auto lock = getLock();
|
||||
|
@ -76,6 +76,7 @@ class TraceLog;
|
||||
class MetricLog;
|
||||
class AsynchronousMetricLog;
|
||||
class OpenTelemetrySpanLog;
|
||||
class ZooKeeperLog;
|
||||
struct MergeTreeSettings;
|
||||
class StorageS3Settings;
|
||||
class IDatabase;
|
||||
@ -714,6 +715,7 @@ public:
|
||||
std::shared_ptr<MetricLog> getMetricLog() const;
|
||||
std::shared_ptr<AsynchronousMetricLog> getAsynchronousMetricLog() const;
|
||||
std::shared_ptr<OpenTelemetrySpanLog> getOpenTelemetrySpanLog() const;
|
||||
std::shared_ptr<ZooKeeperLog> getZooKeeperLog() const;
|
||||
|
||||
/// Returns an object used to log operations with parts if it possible.
|
||||
/// Provide table name to make required checks.
|
||||
|
@ -31,6 +31,8 @@
|
||||
#include <pcg_random.hpp>
|
||||
#include <common/scope_guard_safe.h>
|
||||
|
||||
#include <Interpreters/ZooKeeperLog.h>
|
||||
|
||||
namespace fs = std::filesystem;
|
||||
|
||||
|
||||
@ -371,7 +373,7 @@ void DDLWorker::scheduleTasks(bool reinitialized)
|
||||
}
|
||||
}
|
||||
|
||||
Strings queue_nodes = zookeeper->getChildren(queue_dir, nullptr, queue_updated_event);
|
||||
Strings queue_nodes = zookeeper->getChildren(queue_dir, &queue_node_stat, queue_updated_event);
|
||||
size_t size_before_filtering = queue_nodes.size();
|
||||
filterAndSortQueueNodes(queue_nodes);
|
||||
/// The following message is too verbose, but it can be useful too debug mysterious test failures in CI
|
||||
@ -1136,10 +1138,32 @@ void DDLWorker::runMainThread()
|
||||
cleanup_event->set();
|
||||
scheduleTasks(reinitialized);
|
||||
|
||||
LOG_DEBUG(log, "Waiting for queue updates");
|
||||
LOG_DEBUG(log, "Waiting for queue updates (stat: {}, {}, {}, {})",
|
||||
queue_node_stat.version, queue_node_stat.cversion, queue_node_stat.numChildren, queue_node_stat.pzxid);
|
||||
/// FIXME It may hang for unknown reason. Timeout is just a hotfix.
|
||||
constexpr int queue_wait_timeout_ms = 10000;
|
||||
queue_updated_event->tryWait(queue_wait_timeout_ms);
|
||||
bool updated = queue_updated_event->tryWait(queue_wait_timeout_ms);
|
||||
if (!updated)
|
||||
{
|
||||
Coordination::Stat new_stat;
|
||||
tryGetZooKeeper()->get(queue_dir, &new_stat);
|
||||
bool queue_changed = memcmp(&queue_node_stat, &new_stat, sizeof(Coordination::Stat)) != 0;
|
||||
bool watch_triggered = queue_updated_event->tryWait(0);
|
||||
if (queue_changed && !watch_triggered)
|
||||
{
|
||||
/// It should never happen.
|
||||
/// Maybe log message, abort() and system.zookeeper_log will help to debug it and remove timeout (#26036).
|
||||
LOG_TRACE(
|
||||
log,
|
||||
"Queue was not updated (stat: {}, {}, {}, {})",
|
||||
new_stat.version,
|
||||
new_stat.cversion,
|
||||
new_stat.numChildren,
|
||||
new_stat.pzxid);
|
||||
context->getZooKeeperLog()->flush();
|
||||
abort();
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (const Coordination::Exception & e)
|
||||
{
|
||||
|
@ -3,6 +3,7 @@
|
||||
#include <Common/CurrentThread.h>
|
||||
#include <Common/DNSResolver.h>
|
||||
#include <Common/ThreadPool.h>
|
||||
#include <Common/ZooKeeper/IKeeper.h>
|
||||
#include <Storages/IStorage_fwd.h>
|
||||
#include <Parsers/IAST_fwd.h>
|
||||
#include <Interpreters/Context.h>
|
||||
@ -125,6 +126,7 @@ protected:
|
||||
std::optional<String> first_failed_task_name;
|
||||
std::list<DDLTaskPtr> current_tasks;
|
||||
|
||||
Coordination::Stat queue_node_stat;
|
||||
std::shared_ptr<Poco::Event> queue_updated_event = std::make_shared<Poco::Event>();
|
||||
std::shared_ptr<Poco::Event> cleanup_event = std::make_shared<Poco::Event>();
|
||||
std::atomic<bool> initialized = false;
|
||||
|
@ -25,6 +25,7 @@
|
||||
#include <Interpreters/MetricLog.h>
|
||||
#include <Interpreters/AsynchronousMetricLog.h>
|
||||
#include <Interpreters/OpenTelemetrySpanLog.h>
|
||||
#include <Interpreters/ZooKeeperLog.h>
|
||||
#include <Interpreters/JIT/CompiledExpressionCache.h>
|
||||
#include <Access/ContextAccess.h>
|
||||
#include <Access/AllowedClientHosts.h>
|
||||
@ -416,7 +417,8 @@ BlockIO InterpreterSystemQuery::execute()
|
||||
[&] { if (auto text_log = getContext()->getTextLog()) text_log->flush(true); },
|
||||
[&] { if (auto metric_log = getContext()->getMetricLog()) metric_log->flush(true); },
|
||||
[&] { if (auto asynchronous_metric_log = getContext()->getAsynchronousMetricLog()) asynchronous_metric_log->flush(true); },
|
||||
[&] { if (auto opentelemetry_span_log = getContext()->getOpenTelemetrySpanLog()) opentelemetry_span_log->flush(true); }
|
||||
[&] { if (auto opentelemetry_span_log = getContext()->getOpenTelemetrySpanLog()) opentelemetry_span_log->flush(true); },
|
||||
[&] { if (auto zookeeper_log = getContext()->getZooKeeperLog()) zookeeper_log->flush(true); }
|
||||
);
|
||||
break;
|
||||
}
|
||||
|
@ -8,6 +8,7 @@
|
||||
#include <Interpreters/MetricLog.h>
|
||||
#include <Interpreters/AsynchronousMetricLog.h>
|
||||
#include <Interpreters/OpenTelemetrySpanLog.h>
|
||||
#include <Interpreters/ZooKeeperLog.h>
|
||||
|
||||
#include <Poco/Util/AbstractConfiguration.h>
|
||||
#include <common/logger_useful.h>
|
||||
@ -103,6 +104,7 @@ SystemLogs::SystemLogs(ContextPtr global_context, const Poco::Util::AbstractConf
|
||||
opentelemetry_span_log = createSystemLog<OpenTelemetrySpanLog>(
|
||||
global_context, "system", "opentelemetry_span_log", config,
|
||||
"opentelemetry_span_log");
|
||||
zookeeper_log = createSystemLog<ZooKeeperLog>(global_context, "system", "zookeeper_log", config, "zookeeper_log");
|
||||
|
||||
if (query_log)
|
||||
logs.emplace_back(query_log.get());
|
||||
@ -122,6 +124,8 @@ SystemLogs::SystemLogs(ContextPtr global_context, const Poco::Util::AbstractConf
|
||||
logs.emplace_back(asynchronous_metric_log.get());
|
||||
if (opentelemetry_span_log)
|
||||
logs.emplace_back(opentelemetry_span_log.get());
|
||||
if (zookeeper_log)
|
||||
logs.emplace_back(zookeeper_log.get());
|
||||
|
||||
try
|
||||
{
|
||||
|
@ -74,6 +74,7 @@ class CrashLog;
|
||||
class MetricLog;
|
||||
class AsynchronousMetricLog;
|
||||
class OpenTelemetrySpanLog;
|
||||
class ZooKeeperLog;
|
||||
|
||||
|
||||
class ISystemLog
|
||||
@ -110,6 +111,8 @@ struct SystemLogs
|
||||
std::shared_ptr<AsynchronousMetricLog> asynchronous_metric_log;
|
||||
/// OpenTelemetry trace spans.
|
||||
std::shared_ptr<OpenTelemetrySpanLog> opentelemetry_span_log;
|
||||
/// Used to log all actions of ZooKeeper client
|
||||
std::shared_ptr<ZooKeeperLog> zookeeper_log;
|
||||
|
||||
std::vector<ISystemLog *> logs;
|
||||
};
|
||||
|
202
src/Interpreters/ZooKeeperLog.cpp
Normal file
202
src/Interpreters/ZooKeeperLog.cpp
Normal file
@ -0,0 +1,202 @@
|
||||
#include <Interpreters/ZooKeeperLog.h>
|
||||
#include <Columns/ColumnArray.h>
|
||||
#include <Columns/ColumnString.h>
|
||||
#include <Columns/ColumnsNumber.h>
|
||||
#include <DataTypes/DataTypeArray.h>
|
||||
#include <DataTypes/DataTypeDateTime64.h>
|
||||
#include <DataTypes/DataTypeDate.h>
|
||||
#include <DataTypes/DataTypeNullable.h>
|
||||
#include <DataTypes/DataTypeEnum.h>
|
||||
#include <DataTypes/DataTypeFactory.h>
|
||||
#include <DataTypes/DataTypeString.h>
|
||||
#include <DataTypes/DataTypesNumber.h>
|
||||
#include <Interpreters/ProfileEventsExt.h>
|
||||
#include <Interpreters/QueryLog.h>
|
||||
#include <Poco/Net/IPAddress.h>
|
||||
#include <Common/IPv6ToBinary.h>
|
||||
#include <Common/ZooKeeper/ZooKeeperConstants.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
NamesAndTypesList ZooKeeperLogElement::getNamesAndTypes()
|
||||
{
|
||||
auto type_enum = std::make_shared<DataTypeEnum8>(
|
||||
DataTypeEnum8::Values
|
||||
{
|
||||
{"Request", static_cast<Int8>(REQUEST)},
|
||||
{"Response", static_cast<Int8>(RESPONSE)},
|
||||
{"Finalize", static_cast<Int8>(FINALIZE)},
|
||||
});
|
||||
|
||||
auto op_num_enum = std::make_shared<DataTypeEnum16>(
|
||||
DataTypeEnum16::Values
|
||||
{
|
||||
{"Watch", 0},
|
||||
{"Close", static_cast<Int16>(Coordination::OpNum::Close)},
|
||||
{"Error", static_cast<Int16>(Coordination::OpNum::Error)},
|
||||
{"Create", static_cast<Int16>(Coordination::OpNum::Create)},
|
||||
{"Remove", static_cast<Int16>(Coordination::OpNum::Remove)},
|
||||
{"Exists", static_cast<Int16>(Coordination::OpNum::Exists)},
|
||||
{"Get", static_cast<Int16>(Coordination::OpNum::Get)},
|
||||
{"Set", static_cast<Int16>(Coordination::OpNum::Set)},
|
||||
{"GetACL", static_cast<Int16>(Coordination::OpNum::GetACL)},
|
||||
{"SetACL", static_cast<Int16>(Coordination::OpNum::SetACL)},
|
||||
{"SimpleList", static_cast<Int16>(Coordination::OpNum::SimpleList)},
|
||||
{"Sync", static_cast<Int16>(Coordination::OpNum::Sync)},
|
||||
{"Heartbeat", static_cast<Int16>(Coordination::OpNum::Heartbeat)},
|
||||
{"List", static_cast<Int16>(Coordination::OpNum::List)},
|
||||
{"Check", static_cast<Int16>(Coordination::OpNum::Check)},
|
||||
{"Multi", static_cast<Int16>(Coordination::OpNum::Multi)},
|
||||
{"Auth", static_cast<Int16>(Coordination::OpNum::Auth)},
|
||||
{"SessionID", static_cast<Int16>(Coordination::OpNum::SessionID)},
|
||||
});
|
||||
|
||||
auto error_enum = std::make_shared<DataTypeEnum8>(
|
||||
DataTypeEnum8::Values
|
||||
{
|
||||
{"ZOK", static_cast<Int8>(Coordination::Error::ZOK)},
|
||||
|
||||
{"ZSYSTEMERROR", static_cast<Int8>(Coordination::Error::ZSYSTEMERROR)},
|
||||
{"ZRUNTIMEINCONSISTENCY", static_cast<Int8>(Coordination::Error::ZRUNTIMEINCONSISTENCY)},
|
||||
{"ZDATAINCONSISTENCY", static_cast<Int8>(Coordination::Error::ZDATAINCONSISTENCY)},
|
||||
{"ZCONNECTIONLOSS", static_cast<Int8>(Coordination::Error::ZCONNECTIONLOSS)},
|
||||
{"ZMARSHALLINGERROR", static_cast<Int8>(Coordination::Error::ZMARSHALLINGERROR)},
|
||||
{"ZUNIMPLEMENTED", static_cast<Int8>(Coordination::Error::ZUNIMPLEMENTED)},
|
||||
{"ZOPERATIONTIMEOUT", static_cast<Int8>(Coordination::Error::ZOPERATIONTIMEOUT)},
|
||||
{"ZBADARGUMENTS", static_cast<Int8>(Coordination::Error::ZBADARGUMENTS)},
|
||||
{"ZINVALIDSTATE", static_cast<Int8>(Coordination::Error::ZINVALIDSTATE)},
|
||||
|
||||
{"ZAPIERROR", static_cast<Int8>(Coordination::Error::ZAPIERROR)},
|
||||
{"ZNONODE", static_cast<Int8>(Coordination::Error::ZNONODE)},
|
||||
{"ZNOAUTH", static_cast<Int8>(Coordination::Error::ZNOAUTH)},
|
||||
{"ZBADVERSION", static_cast<Int8>(Coordination::Error::ZBADVERSION)},
|
||||
{"ZNOCHILDRENFOREPHEMERALS", static_cast<Int8>(Coordination::Error::ZNOCHILDRENFOREPHEMERALS)},
|
||||
{"ZNODEEXISTS", static_cast<Int8>(Coordination::Error::ZNODEEXISTS)},
|
||||
{"ZNOTEMPTY", static_cast<Int8>(Coordination::Error::ZNOTEMPTY)},
|
||||
{"ZSESSIONEXPIRED", static_cast<Int8>(Coordination::Error::ZSESSIONEXPIRED)},
|
||||
{"ZINVALIDCALLBACK", static_cast<Int8>(Coordination::Error::ZINVALIDCALLBACK)},
|
||||
{"ZINVALIDACL", static_cast<Int8>(Coordination::Error::ZINVALIDACL)},
|
||||
{"ZAUTHFAILED", static_cast<Int8>(Coordination::Error::ZAUTHFAILED)},
|
||||
{"ZCLOSING", static_cast<Int8>(Coordination::Error::ZCLOSING)},
|
||||
{"ZNOTHING", static_cast<Int8>(Coordination::Error::ZNOTHING)},
|
||||
{"ZSESSIONMOVED", static_cast<Int8>(Coordination::Error::ZSESSIONMOVED)},
|
||||
});
|
||||
|
||||
auto watch_type_enum = std::make_shared<DataTypeEnum8>(
|
||||
DataTypeEnum8::Values
|
||||
{
|
||||
{"CREATED", static_cast<Int8>(Coordination::Event::CREATED)},
|
||||
{"DELETED", static_cast<Int8>(Coordination::Event::DELETED)},
|
||||
{"CHANGED", static_cast<Int8>(Coordination::Event::CHANGED)},
|
||||
{"CHILD", static_cast<Int8>(Coordination::Event::CHILD)},
|
||||
{"SESSION", static_cast<Int8>(Coordination::Event::SESSION)},
|
||||
{"NOTWATCHING", static_cast<Int8>(Coordination::Event::NOTWATCHING)},
|
||||
});
|
||||
|
||||
auto watch_state_enum = std::make_shared<DataTypeEnum16>(
|
||||
DataTypeEnum16::Values
|
||||
{
|
||||
{"EXPIRED_SESSION", static_cast<Int16>(Coordination::State::EXPIRED_SESSION)},
|
||||
{"AUTH_FAILED", static_cast<Int16>(Coordination::State::AUTH_FAILED)},
|
||||
{"CONNECTING", static_cast<Int16>(Coordination::State::CONNECTING)},
|
||||
{"ASSOCIATING", static_cast<Int16>(Coordination::State::ASSOCIATING)},
|
||||
{"CONNECTED", static_cast<Int16>(Coordination::State::CONNECTED)},
|
||||
{"NOTCONNECTED", static_cast<Int16>(Coordination::State::NOTCONNECTED)},
|
||||
});
|
||||
|
||||
return
|
||||
{
|
||||
{"type", std::move(type_enum)},
|
||||
{"event_date", std::make_shared<DataTypeDate>()},
|
||||
{"event_time", std::make_shared<DataTypeDateTime64>(6)},
|
||||
{"address", DataTypeFactory::instance().get("IPv6")},
|
||||
{"port", std::make_shared<DataTypeUInt16>()},
|
||||
{"session_id", std::make_shared<DataTypeInt64>()},
|
||||
|
||||
{"xid", std::make_shared<DataTypeInt32>()},
|
||||
{"has_watch", std::make_shared<DataTypeUInt8>()},
|
||||
{"op_num", op_num_enum},
|
||||
{"path", std::make_shared<DataTypeString>()},
|
||||
|
||||
{"data", std::make_shared<DataTypeString>()},
|
||||
|
||||
{"is_ephemeral", std::make_shared<DataTypeUInt8>()},
|
||||
{"is_sequential", std::make_shared<DataTypeUInt8>()},
|
||||
|
||||
{"version", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt32>())},
|
||||
|
||||
{"requests_size", std::make_shared<DataTypeUInt32>()},
|
||||
{"request_idx", std::make_shared<DataTypeUInt32>()},
|
||||
|
||||
{"zxid", std::make_shared<DataTypeInt64>()},
|
||||
{"error", std::make_shared<DataTypeNullable>(error_enum)},
|
||||
|
||||
{"watch_type", std::make_shared<DataTypeNullable>(watch_type_enum)},
|
||||
{"watch_state", std::make_shared<DataTypeNullable>(watch_state_enum)},
|
||||
|
||||
{"path_created", std::make_shared<DataTypeString>()},
|
||||
|
||||
{"stat_czxid", std::make_shared<DataTypeInt64>()},
|
||||
{"stat_mzxid", std::make_shared<DataTypeInt64>()},
|
||||
{"stat_pzxid", std::make_shared<DataTypeInt64>()},
|
||||
{"stat_version", std::make_shared<DataTypeInt32>()},
|
||||
{"stat_cversion", std::make_shared<DataTypeInt32>()},
|
||||
{"stat_dataLength", std::make_shared<DataTypeInt32>()},
|
||||
{"stat_numChildren", std::make_shared<DataTypeInt32>()},
|
||||
|
||||
{"children", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>())},
|
||||
};
|
||||
}
|
||||
|
||||
void ZooKeeperLogElement::appendToBlock(MutableColumns & columns) const
|
||||
{
|
||||
assert(type != UNKNOWN);
|
||||
size_t i = 0;
|
||||
|
||||
columns[i++]->insert(type);
|
||||
auto event_time_seconds = event_time / 1000000;
|
||||
columns[i++]->insert(DateLUT::instance().toDayNum(event_time_seconds).toUnderType());
|
||||
columns[i++]->insert(event_time);
|
||||
columns[i++]->insert(IPv6ToBinary(address.host()).data());
|
||||
columns[i++]->insert(address.port());
|
||||
columns[i++]->insert(session_id);
|
||||
|
||||
columns[i++]->insert(xid);
|
||||
columns[i++]->insert(has_watch);
|
||||
columns[i++]->insert(op_num);
|
||||
columns[i++]->insert(path);
|
||||
|
||||
columns[i++]->insert(data);
|
||||
|
||||
columns[i++]->insert(is_ephemeral);
|
||||
columns[i++]->insert(is_sequential);
|
||||
|
||||
columns[i++]->insert(version ? Field(*version) : Field());
|
||||
|
||||
columns[i++]->insert(requests_size);
|
||||
columns[i++]->insert(request_idx);
|
||||
|
||||
columns[i++]->insert(zxid);
|
||||
columns[i++]->insert(error ? Field(*error) : Field());
|
||||
|
||||
columns[i++]->insert(watch_type ? Field(*watch_type) : Field());
|
||||
columns[i++]->insert(watch_state ? Field(*watch_state) : Field());
|
||||
|
||||
columns[i++]->insert(path_created);
|
||||
|
||||
columns[i++]->insert(stat.czxid);
|
||||
columns[i++]->insert(stat.mzxid);
|
||||
columns[i++]->insert(stat.pzxid);
|
||||
columns[i++]->insert(stat.version);
|
||||
columns[i++]->insert(stat.cversion);
|
||||
columns[i++]->insert(stat.dataLength);
|
||||
columns[i++]->insert(stat.numChildren);
|
||||
|
||||
Array children_array;
|
||||
for (const auto & c : children)
|
||||
children_array.emplace_back(c);
|
||||
columns[i++]->insert(children_array);
|
||||
}
|
||||
|
||||
};
|
76
src/Interpreters/ZooKeeperLog.h
Normal file
76
src/Interpreters/ZooKeeperLog.h
Normal file
@ -0,0 +1,76 @@
|
||||
#pragma once
|
||||
|
||||
#include <Core/NamesAndAliases.h>
|
||||
#include <Interpreters/SystemLog.h>
|
||||
#include <Interpreters/ClientInfo.h>
|
||||
#include <Common/ZooKeeper/IKeeper.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
struct ZooKeeperLogElement
|
||||
{
|
||||
enum Type
|
||||
{
|
||||
UNKNOWN = 0,
|
||||
REQUEST = 1,
|
||||
RESPONSE = 2,
|
||||
FINALIZE = 3
|
||||
};
|
||||
|
||||
Type type = UNKNOWN;
|
||||
Decimal64 event_time = 0;
|
||||
Poco::Net::SocketAddress address;
|
||||
Int64 session_id = 0;
|
||||
|
||||
/// Common request info
|
||||
Int32 xid = 0;
|
||||
bool has_watch = false;
|
||||
Int32 op_num = 0;
|
||||
String path;
|
||||
|
||||
/// create, set
|
||||
String data;
|
||||
|
||||
/// create
|
||||
bool is_ephemeral = false;
|
||||
bool is_sequential = false;
|
||||
|
||||
/// remove, check, set
|
||||
std::optional<Int32> version;
|
||||
|
||||
/// multi
|
||||
UInt32 requests_size = 0;
|
||||
UInt32 request_idx = 0;
|
||||
|
||||
/// Common response info
|
||||
Int64 zxid = 0;
|
||||
std::optional<Int32> error;
|
||||
|
||||
/// watch
|
||||
std::optional<Int32> watch_type;
|
||||
std::optional<Int32> watch_state;
|
||||
|
||||
/// create
|
||||
String path_created;
|
||||
|
||||
/// exists, get, set, list
|
||||
Coordination::Stat stat = {};
|
||||
|
||||
/// list
|
||||
Strings children;
|
||||
|
||||
|
||||
static std::string name() { return "ZooKeeperLog"; }
|
||||
static NamesAndTypesList getNamesAndTypes();
|
||||
static NamesAndAliases getNamesAndAliases() { return {}; }
|
||||
void appendToBlock(MutableColumns & columns) const;
|
||||
};
|
||||
|
||||
class ZooKeeperLog : public SystemLog<ZooKeeperLogElement>
|
||||
{
|
||||
using SystemLog<ZooKeeperLogElement>::SystemLog;
|
||||
};
|
||||
|
||||
}
|
@ -155,6 +155,7 @@ SRCS(
|
||||
TreeOptimizer.cpp
|
||||
TreeRewriter.cpp
|
||||
WindowDescription.cpp
|
||||
ZooKeeperLog.cpp
|
||||
addMissingDefaults.cpp
|
||||
addTypeConversionToAST.cpp
|
||||
castColumn.cpp
|
||||
|
@ -26,7 +26,7 @@ try
|
||||
auto config = processor.loadConfig().configuration;
|
||||
String root_path = argv[2];
|
||||
|
||||
zkutil::ZooKeeper zk(*config, "zookeeper");
|
||||
zkutil::ZooKeeper zk(*config, "zookeeper", nullptr);
|
||||
|
||||
String temp_path = root_path + "/temp";
|
||||
String blocks_path = root_path + "/block_numbers";
|
||||
|
@ -29,7 +29,7 @@ try
|
||||
auto config = processor.loadConfig().configuration;
|
||||
String zookeeper_path = argv[2];
|
||||
|
||||
auto zookeeper = std::make_shared<zkutil::ZooKeeper>(*config, "zookeeper");
|
||||
auto zookeeper = std::make_shared<zkutil::ZooKeeper>(*config, "zookeeper", nullptr);
|
||||
|
||||
std::unordered_map<String, std::set<Int64>> current_inserts;
|
||||
|
||||
|
7
tests/config/config.d/zookeeper_log.xml
Normal file
7
tests/config/config.d/zookeeper_log.xml
Normal file
@ -0,0 +1,7 @@
|
||||
<yandex>
|
||||
<zookeeper_log>
|
||||
<database>system</database>
|
||||
<table>zookeeper_log</table>
|
||||
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
|
||||
</zookeeper_log>
|
||||
</yandex>
|
@ -34,6 +34,7 @@ ln -sf $SRC_PATH/config.d/logging_no_rotate.xml $DEST_SERVER_PATH/config.d/
|
||||
ln -sf $SRC_PATH/config.d/tcp_with_proxy.xml $DEST_SERVER_PATH/config.d/
|
||||
ln -sf $SRC_PATH/config.d/top_level_domains_lists.xml $DEST_SERVER_PATH/config.d/
|
||||
ln -sf $SRC_PATH/config.d/top_level_domains_path.xml $DEST_SERVER_PATH/config.d/
|
||||
ln -sf $SRC_PATH/config.d/zookeeper_log.xml $DEST_SERVER_PATH/config.d/
|
||||
ln -sf $SRC_PATH/users.d/log_queries.xml $DEST_SERVER_PATH/users.d/
|
||||
ln -sf $SRC_PATH/users.d/readonly.xml $DEST_SERVER_PATH/users.d/
|
||||
ln -sf $SRC_PATH/users.d/access_management.xml $DEST_SERVER_PATH/users.d/
|
||||
|
40
tests/queries/0_stateless/01158_zookeeper_log.reference
Normal file
40
tests/queries/0_stateless/01158_zookeeper_log.reference
Normal file
@ -0,0 +1,40 @@
|
||||
log
|
||||
Response 0 Watch /test/01158/default/rmt/log 0 0 \N 0 0 ZOK CHILD CONNECTED 0 0 0 0
|
||||
Request 0 Create /test/01158/default/rmt/log 0 0 \N 0 4 \N \N \N 0 0 0 0
|
||||
Response 0 Create /test/01158/default/rmt/log 0 0 \N 0 4 ZOK \N \N /test/01158/default/rmt/log 0 0 0 0
|
||||
Request 0 Create /test/01158/default/rmt/log/log- 0 1 \N 0 1 \N \N \N 0 0 0 0
|
||||
Response 0 Create /test/01158/default/rmt/log/log- 0 1 \N 0 1 ZOK \N \N /test/01158/default/rmt/log/log-0000000000 0 0 0 0
|
||||
parts
|
||||
Request 0 Multi 0 0 \N 5 0 \N \N \N 0 0 0 0
|
||||
Request 0 Create /test/01158/default/rmt/log/log- 0 1 \N 0 1 \N \N \N 0 0 0 0
|
||||
Request 0 Remove /test/01158/default/rmt/block_numbers/all/block-0000000000 0 0 -1 0 2 \N \N \N 0 0 0 0
|
||||
Request 0 Remove /test/01158/default/rmt/temp/abandonable_lock-0000000000 0 0 -1 0 3 \N \N \N 0 0 0 0
|
||||
Request 0 Create /test/01158/default/rmt/blocks/all_6308706741995381342_2495791770474910886 0 0 \N 0 4 \N \N \N 0 0 0 0
|
||||
Request 0 Create /test/01158/default/rmt/replicas/1/parts/all_0_0_0 0 0 \N 0 5 \N \N \N 0 0 0 0
|
||||
Response 0 Multi 0 0 \N 5 0 ZOK \N \N 0 0 0 0
|
||||
Response 0 Create /test/01158/default/rmt/log/log- 0 1 \N 0 1 ZOK \N \N /test/01158/default/rmt/log/log-0000000000 0 0 0 0
|
||||
Response 0 Remove /test/01158/default/rmt/block_numbers/all/block-0000000000 0 0 -1 0 2 ZOK \N \N 0 0 0 0
|
||||
Response 0 Remove /test/01158/default/rmt/temp/abandonable_lock-0000000000 0 0 -1 0 3 ZOK \N \N 0 0 0 0
|
||||
Response 0 Create /test/01158/default/rmt/blocks/all_6308706741995381342_2495791770474910886 0 0 \N 0 4 ZOK \N \N /test/01158/default/rmt/blocks/all_6308706741995381342_2495791770474910886 0 0 0 0
|
||||
Response 0 Create /test/01158/default/rmt/replicas/1/parts/all_0_0_0 0 0 \N 0 5 ZOK \N \N /test/01158/default/rmt/replicas/1/parts/all_0_0_0 0 0 0 0
|
||||
Request 0 Exists /test/01158/default/rmt/replicas/1/parts/all_0_0_0 0 0 \N 0 0 \N \N \N 0 0 0 0
|
||||
Response 0 Exists /test/01158/default/rmt/replicas/1/parts/all_0_0_0 0 0 \N 0 0 ZOK \N \N 0 0 96 0
|
||||
blocks
|
||||
Request 0 Multi 0 0 \N 3 0 \N \N \N 0 0 0 0
|
||||
Request 0 Create /test/01158/default/rmt/blocks/all_6308706741995381342_2495791770474910886 0 0 \N 0 1 \N \N \N 0 0 0 0
|
||||
Request 0 Remove /test/01158/default/rmt/blocks/all_6308706741995381342_2495791770474910886 0 0 -1 0 2 \N \N \N 0 0 0 0
|
||||
Request 0 Create /test/01158/default/rmt/temp/abandonable_lock- 1 1 \N 0 3 \N \N \N 0 0 0 0
|
||||
Response 0 Multi 0 0 \N 3 0 ZOK \N \N 0 0 0 0
|
||||
Response 0 Create /test/01158/default/rmt/blocks/all_6308706741995381342_2495791770474910886 0 0 \N 0 1 ZOK \N \N /test/01158/default/rmt/blocks/all_6308706741995381342_2495791770474910886 0 0 0 0
|
||||
Response 0 Remove /test/01158/default/rmt/blocks/all_6308706741995381342_2495791770474910886 0 0 -1 0 2 ZOK \N \N 0 0 0 0
|
||||
Response 0 Create /test/01158/default/rmt/temp/abandonable_lock- 1 1 \N 0 3 ZOK \N \N /test/01158/default/rmt/temp/abandonable_lock-0000000000 0 0 0 0
|
||||
Request 0 Multi 0 0 \N 3 0 \N \N \N 0 0 0 0
|
||||
Request 0 Create /test/01158/default/rmt/blocks/all_6308706741995381342_2495791770474910886 0 0 \N 0 1 \N \N \N 0 0 0 0
|
||||
Request 0 Remove /test/01158/default/rmt/blocks/all_6308706741995381342_2495791770474910886 0 0 -1 0 2 \N \N \N 0 0 0 0
|
||||
Request 0 Create /test/01158/default/rmt/temp/abandonable_lock- 1 1 \N 0 3 \N \N \N 0 0 0 0
|
||||
Response 0 Multi 0 0 \N 3 0 ZNODEEXISTS \N \N 0 0 0 0
|
||||
Response 0 Error /test/01158/default/rmt/blocks/all_6308706741995381342_2495791770474910886 0 0 \N 0 1 ZNODEEXISTS \N \N 0 0 0 0
|
||||
Response 0 Error /test/01158/default/rmt/blocks/all_6308706741995381342_2495791770474910886 0 0 -1 0 2 ZRUNTIMEINCONSISTENCY \N \N 0 0 0 0
|
||||
Response 0 Error /test/01158/default/rmt/temp/abandonable_lock- 1 1 \N 0 3 ZRUNTIMEINCONSISTENCY \N \N 0 0 0 0
|
||||
Request 0 Get /test/01158/default/rmt/blocks/all_6308706741995381342_2495791770474910886 0 0 \N 0 0 \N \N \N 0 0 0 0
|
||||
Response 0 Get /test/01158/default/rmt/blocks/all_6308706741995381342_2495791770474910886 0 0 \N 0 0 ZOK \N \N 0 0 9 0
|
28
tests/queries/0_stateless/01158_zookeeper_log.sql
Normal file
28
tests/queries/0_stateless/01158_zookeeper_log.sql
Normal file
@ -0,0 +1,28 @@
|
||||
drop table if exists rmt;
|
||||
create table rmt (n int) engine=ReplicatedMergeTree('/test/01158/{database}/rmt', '1') order by n;
|
||||
system sync replica rmt;
|
||||
insert into rmt values (1);
|
||||
insert into rmt values (1);
|
||||
system flush logs;
|
||||
|
||||
select 'log';
|
||||
select type, has_watch, op_num, path, is_ephemeral, is_sequential, version, requests_size, request_idx, error, watch_type,
|
||||
watch_state, path_created, stat_version, stat_cversion, stat_dataLength, stat_numChildren
|
||||
from system.zookeeper_log where path like '/test/01158/' || currentDatabase() || '/rmt/log%' and op_num not in (3, 4, 12)
|
||||
order by xid, type, request_idx;
|
||||
|
||||
select 'parts';
|
||||
select type, has_watch, op_num, path, is_ephemeral, is_sequential, version, requests_size, request_idx, error, watch_type,
|
||||
watch_state, path_created, stat_version, stat_cversion, stat_dataLength, stat_numChildren
|
||||
from system.zookeeper_log
|
||||
where (session_id, xid) in (select session_id, xid from system.zookeeper_log where path='/test/01158/' || currentDatabase() || '/rmt/replicas/1/parts/all_0_0_0')
|
||||
order by xid, type, request_idx;
|
||||
|
||||
select 'blocks';
|
||||
select type, has_watch, op_num, path, is_ephemeral, is_sequential, version, requests_size, request_idx, error, watch_type,
|
||||
watch_state, path_created, stat_version, stat_cversion, stat_dataLength, stat_numChildren
|
||||
from system.zookeeper_log
|
||||
where (session_id, xid) in (select session_id, xid from system.zookeeper_log where path like '/test/01158/' || currentDatabase() || '/rmt/blocks%' and op_num not in (1, 12))
|
||||
order by xid, type, request_idx;
|
||||
|
||||
drop table rmt;
|
@ -137,6 +137,7 @@
|
||||
"01532_execute_merges_on_single_replica",
|
||||
"00652_replicated_mutations_default_database_zookeeper",
|
||||
"00620_optimize_on_nonleader_replica_zookeeper",
|
||||
"01158_zookeeper_log",
|
||||
/// grep -c
|
||||
"01018_ddl_dictionaries_bad_queries",
|
||||
"00908_bloom_filter_index",
|
||||
@ -182,7 +183,8 @@
|
||||
],
|
||||
"polymorphic-parts": [
|
||||
"01508_partition_pruning_long", /// bug, shoud be fixed
|
||||
"01482_move_to_prewhere_and_cast" /// bug, shoud be fixed
|
||||
"01482_move_to_prewhere_and_cast", /// bug, shoud be fixed
|
||||
"01158_zookeeper_log"
|
||||
],
|
||||
"parallel":
|
||||
[
|
||||
|
Loading…
Reference in New Issue
Block a user