add system.zookeeper_log

This commit is contained in:
Alexander Tokmakov 2021-07-09 17:05:35 +03:00
parent e231f6ce5c
commit 4165ba2a01
21 changed files with 593 additions and 22 deletions

View File

@ -138,15 +138,18 @@ if [[ -n "$WITH_COVERAGE" ]] && [[ "$WITH_COVERAGE" -eq 1 ]]; then
fi
tar -chf /test_output/text_log_dump.tar /var/lib/clickhouse/data/system/text_log ||:
tar -chf /test_output/query_log_dump.tar /var/lib/clickhouse/data/system/query_log ||:
tar -chf /test_output/zookeeper_log_dump.tar /var/lib/clickhouse/data/system/zookeeper_log ||:
tar -chf /test_output/coordination.tar /var/lib/clickhouse/coordination ||:
if [[ -n "$USE_DATABASE_REPLICATED" ]] && [[ "$USE_DATABASE_REPLICATED" -eq 1 ]]; then
grep -Fa "Fatal" /var/log/clickhouse-server/clickhouse-server1.log ||:
grep -Fa "Fatal" /var/log/clickhouse-server/clickhouse-server2.log ||:
grep -Fa "Fatal" /var/log/clickhouse-server/clickhouse-server1.log ||:
grep -Fa "Fatal" /var/log/clickhouse-server/clickhouse-server2.log ||:
pigz < /var/log/clickhouse-server/clickhouse-server1.log > /test_output/clickhouse-server1.log.gz ||:
pigz < /var/log/clickhouse-server/clickhouse-server2.log > /test_output/clickhouse-server2.log.gz ||:
mv /var/log/clickhouse-server/stderr1.log /test_output/ ||:
mv /var/log/clickhouse-server/stderr2.log /test_output/ ||:
tar -chf /test_output/zookeeper_log_dump1.tar /var/lib/clickhouse1/data/system/zookeeper_log ||:
tar -chf /test_output/zookeeper_log_dump2.tar /var/lib/clickhouse2/data/system/zookeeper_log ||:
tar -chf /test_output/coordination1.tar /var/lib/clickhouse1/coordination ||:
tar -chf /test_output/coordination2.tar /var/lib/clickhouse2/coordination ||:
fi

View File

@ -33,7 +33,7 @@ static std::string extractFromConfig(
{
DB::ConfigurationPtr bootstrap_configuration(new Poco::Util::XMLConfiguration(config_xml));
zkutil::ZooKeeperPtr zookeeper = std::make_shared<zkutil::ZooKeeper>(
*bootstrap_configuration, "zookeeper");
*bootstrap_configuration, "zookeeper", nullptr);
zkutil::ZooKeeperNodeCache zk_node_cache([&] { return zookeeper; });
config_xml = processor.processConfig(&has_zk_includes, &zk_node_cache);
}

View File

@ -111,7 +111,8 @@ void ZooKeeper::init(const std::string & implementation_, const Strings & hosts_
identity_,
Poco::Timespan(0, session_timeout_ms_ * 1000),
Poco::Timespan(0, ZOOKEEPER_CONNECTION_TIMEOUT_MS * 1000),
Poco::Timespan(0, operation_timeout_ms_ * 1000));
Poco::Timespan(0, operation_timeout_ms_ * 1000),
zk_log);
if (chroot.empty())
LOG_TRACE(log, "Initialized, hosts: {}", fmt::join(hosts, ","));
@ -209,7 +210,8 @@ struct ZooKeeperArgs
std::string implementation;
};
ZooKeeper::ZooKeeper(const Poco::Util::AbstractConfiguration & config, const std::string & config_name)
ZooKeeper::ZooKeeper(const Poco::Util::AbstractConfiguration & config, const std::string & config_name, std::shared_ptr<DB::ZooKeeperLog> zk_log_)
: zk_log(std::move(zk_log_))
{
ZooKeeperArgs args(config, config_name);
init(args.implementation, args.hosts, args.identity, args.session_timeout_ms, args.operation_timeout_ms, args.chroot);

View File

@ -25,6 +25,10 @@ namespace CurrentMetrics
extern const Metric EphemeralNode;
}
namespace DB
{
class ZooKeeperLog;
}
namespace zkutil
{
@ -82,7 +86,7 @@ public:
<identity>user:password</identity>
</zookeeper>
*/
ZooKeeper(const Poco::Util::AbstractConfiguration & config, const std::string & config_name);
ZooKeeper(const Poco::Util::AbstractConfiguration & config, const std::string & config_name, std::shared_ptr<DB::ZooKeeperLog> zk_log_);
/// Creates a new session with the same parameters. This method can be used for reconnecting
/// after the session has expired.
@ -298,6 +302,7 @@ private:
std::mutex mutex;
Poco::Logger * log = nullptr;
std::shared_ptr<DB::ZooKeeperLog> zk_log;
};

View File

@ -537,6 +537,140 @@ void ZooKeeperSessionIDResponse::writeImpl(WriteBuffer & out) const
Coordination::write(server_id, out);
}
void ZooKeeperRequest::createLogElements(LogElements & elems) const
{
elems.emplace_back();
auto & elem = elems.back();
elem.xid = xid;
elem.has_watch = has_watch;
elem.op_num = static_cast<uint32_t>(getOpNum());
elem.path = getPath();
elem.request_idx = elems.size() - 1;
}
void ZooKeeperCreateRequest::createLogElements(LogElements & elems) const
{
ZooKeeperRequest::createLogElements(elems);
auto & elem = elems.back();
elem.data = data;
elem.is_ephemeral = is_ephemeral;
elem.is_sequential = is_sequential;
}
void ZooKeeperRemoveRequest::createLogElements(LogElements & elems) const
{
ZooKeeperRequest::createLogElements(elems);
auto & elem = elems.back();
elem.version = version;
}
void ZooKeeperSetRequest::createLogElements(LogElements & elems) const
{
ZooKeeperRequest::createLogElements(elems);
auto & elem = elems.back();
elem.data = data;
elem.version = version;
}
void ZooKeeperCheckRequest::createLogElements(LogElements & elems) const
{
ZooKeeperRequest::createLogElements(elems);
auto & elem = elems.back();
elem.version = version;
}
void ZooKeeperMultiRequest::createLogElements(LogElements & elems) const
{
ZooKeeperRequest::createLogElements(elems);
elems.back().requests_size = requests.size();
for (const auto & request : requests)
{
auto * req = dynamic_cast<ZooKeeperRequest *>(request.get());
assert(!req->xid || req->xid == xid);
req->xid = xid;
req->createLogElements(elems);
}
}
void ZooKeeperResponse::fillLogElements(LogElements & elems, size_t idx) const
{
auto & elem = elems[idx];
assert(!elem.xid || elem.xid == xid);
elem.xid = xid;
int32_t response_op = tryGetOpNum();
assert(!elem.op_num || elem.op_num == response_op || response_op < 0);
elem.op_num = response_op;
elem.zxid = zxid;
elem.error = static_cast<Int32>(error);
}
void ZooKeeperWatchResponse::fillLogElements(LogElements & elems, size_t idx) const
{
ZooKeeperResponse::fillLogElements(elems, idx);
auto & elem = elems[idx];
elem.watch_type = type;
elem.watch_state = state;
elem.path = path;
}
void ZooKeeperCreateResponse::fillLogElements(LogElements & elems, size_t idx) const
{
ZooKeeperResponse::fillLogElements(elems, idx);
auto & elem = elems[idx];
elem.path_created = path_created;
}
void ZooKeeperExistsResponse::fillLogElements(LogElements & elems, size_t idx) const
{
ZooKeeperResponse::fillLogElements(elems, idx);
auto & elem = elems[idx];
elem.stat = stat;
}
void ZooKeeperGetResponse::fillLogElements(LogElements & elems, size_t idx) const
{
ZooKeeperResponse::fillLogElements(elems, idx);
auto & elem = elems[idx];
elem.data = data;
elem.stat = stat;
}
void ZooKeeperSetResponse::fillLogElements(LogElements & elems, size_t idx) const
{
ZooKeeperResponse::fillLogElements(elems, idx);
auto & elem = elems[idx];
elem.stat = stat;
}
void ZooKeeperListResponse::fillLogElements(LogElements & elems, size_t idx) const
{
ZooKeeperResponse::fillLogElements(elems, idx);
auto & elem = elems[idx];
elem.stat = stat;
elem.children = names;
}
void ZooKeeperMultiResponse::fillLogElements(LogElements & elems, size_t idx) const
{
assert(idx == 0);
assert(elems.size() == responses.size() + 1);
ZooKeeperResponse::fillLogElements(elems, idx);
for (const auto & response : responses)
{
auto * resp = dynamic_cast<ZooKeeperResponse *>(response.get());
assert(!resp->xid || resp->xid == xid);
assert(!resp->zxid || resp->zxid == zxid);
resp->xid = xid;
resp->zxid = zxid;
resp->fillLogElements(elems, ++idx);
}
}
void ZooKeeperRequestFactory::registerRequest(OpNum op_num, Creator creator)
{
if (!op_num_to_request.try_emplace(op_num, creator).second)

View File

@ -2,6 +2,7 @@
#include <Common/ZooKeeper/IKeeper.h>
#include <Common/ZooKeeper/ZooKeeperConstants.h>
#include <Interpreters/ZooKeeperLog.h>
#include <boost/noncopyable.hpp>
#include <IO/ReadBuffer.h>
@ -22,6 +23,8 @@
namespace Coordination
{
using LogElements = std::vector<ZooKeeperLogElement>;
struct ZooKeeperResponse : virtual Response
{
XID xid = 0;
@ -32,6 +35,8 @@ struct ZooKeeperResponse : virtual Response
virtual void writeImpl(WriteBuffer &) const = 0;
virtual void write(WriteBuffer & out) const;
virtual OpNum getOpNum() const = 0;
virtual void fillLogElements(LogElements & elems, size_t idx) const;
virtual int32_t tryGetOpNum() const { return static_cast<int32_t>(getOpNum()); }
};
using ZooKeeperResponsePtr = std::shared_ptr<ZooKeeperResponse>;
@ -61,6 +66,8 @@ struct ZooKeeperRequest : virtual Request
virtual ZooKeeperResponsePtr makeResponse() const = 0;
virtual bool isReadRequest() const = 0;
virtual void createLogElements(LogElements & elems) const;
};
using ZooKeeperRequestPtr = std::shared_ptr<ZooKeeperRequest>;
@ -117,6 +124,9 @@ struct ZooKeeperWatchResponse final : WatchResponse, ZooKeeperResponse
{
throw Exception("OpNum for watch response doesn't exist", Error::ZRUNTIMEINCONSISTENCY);
}
void fillLogElements(LogElements & elems, size_t idx) const override;
int32_t tryGetOpNum() const override { return 0; }
};
struct ZooKeeperAuthRequest final : ZooKeeperRequest
@ -184,6 +194,8 @@ struct ZooKeeperCreateRequest final : public CreateRequest, ZooKeeperRequest
size_t bytesSize() const override { return CreateRequest::bytesSize() + sizeof(xid) + sizeof(has_watch); }
void createLogElements(LogElements & elems) const override;
/// During recovery from log we don't rehash ACLs
bool need_to_hash_acls = true;
};
@ -197,6 +209,8 @@ struct ZooKeeperCreateResponse final : CreateResponse, ZooKeeperResponse
OpNum getOpNum() const override { return OpNum::Create; }
size_t bytesSize() const override { return CreateResponse::bytesSize() + sizeof(xid) + sizeof(zxid); }
void fillLogElements(LogElements & elems, size_t idx) const override;
};
struct ZooKeeperRemoveRequest final : RemoveRequest, ZooKeeperRequest
@ -212,6 +226,8 @@ struct ZooKeeperRemoveRequest final : RemoveRequest, ZooKeeperRequest
bool isReadRequest() const override { return false; }
size_t bytesSize() const override { return RemoveRequest::bytesSize() + sizeof(xid); }
void createLogElements(LogElements & elems) const override;
};
struct ZooKeeperRemoveResponse final : RemoveResponse, ZooKeeperResponse
@ -242,6 +258,8 @@ struct ZooKeeperExistsResponse final : ExistsResponse, ZooKeeperResponse
OpNum getOpNum() const override { return OpNum::Exists; }
size_t bytesSize() const override { return ExistsResponse::bytesSize() + sizeof(xid) + sizeof(zxid); }
void fillLogElements(LogElements & elems, size_t idx) const override;
};
struct ZooKeeperGetRequest final : GetRequest, ZooKeeperRequest
@ -263,6 +281,8 @@ struct ZooKeeperGetResponse final : GetResponse, ZooKeeperResponse
OpNum getOpNum() const override { return OpNum::Get; }
size_t bytesSize() const override { return GetResponse::bytesSize() + sizeof(xid) + sizeof(zxid); }
void fillLogElements(LogElements & elems, size_t idx) const override;
};
struct ZooKeeperSetRequest final : SetRequest, ZooKeeperRequest
@ -277,6 +297,8 @@ struct ZooKeeperSetRequest final : SetRequest, ZooKeeperRequest
bool isReadRequest() const override { return false; }
size_t bytesSize() const override { return SetRequest::bytesSize() + sizeof(xid); }
void createLogElements(LogElements & elems) const override;
};
struct ZooKeeperSetResponse final : SetResponse, ZooKeeperResponse
@ -286,6 +308,8 @@ struct ZooKeeperSetResponse final : SetResponse, ZooKeeperResponse
OpNum getOpNum() const override { return OpNum::Set; }
size_t bytesSize() const override { return SetResponse::bytesSize() + sizeof(xid) + sizeof(zxid); }
void fillLogElements(LogElements & elems, size_t idx) const override;
};
struct ZooKeeperListRequest : ListRequest, ZooKeeperRequest
@ -311,6 +335,8 @@ struct ZooKeeperListResponse : ListResponse, ZooKeeperResponse
OpNum getOpNum() const override { return OpNum::List; }
size_t bytesSize() const override { return ListResponse::bytesSize() + sizeof(xid) + sizeof(zxid); }
void fillLogElements(LogElements & elems, size_t idx) const override;
};
struct ZooKeeperSimpleListResponse final : ZooKeeperListResponse
@ -331,6 +357,8 @@ struct ZooKeeperCheckRequest final : CheckRequest, ZooKeeperRequest
bool isReadRequest() const override { return true; }
size_t bytesSize() const override { return CheckRequest::bytesSize() + sizeof(xid) + sizeof(has_watch); }
void createLogElements(LogElements & elems) const override;
};
struct ZooKeeperCheckResponse final : CheckResponse, ZooKeeperResponse
@ -409,6 +437,8 @@ struct ZooKeeperMultiRequest final : MultiRequest, ZooKeeperRequest
bool isReadRequest() const override;
size_t bytesSize() const override { return MultiRequest::bytesSize() + sizeof(xid) + sizeof(has_watch); }
void createLogElements(LogElements & elems) const override;
};
struct ZooKeeperMultiResponse final : MultiResponse, ZooKeeperResponse
@ -433,6 +463,8 @@ struct ZooKeeperMultiResponse final : MultiResponse, ZooKeeperResponse
void writeImpl(WriteBuffer & out) const override;
size_t bytesSize() const override { return MultiResponse::bytesSize() + sizeof(xid) + sizeof(zxid); }
void fillLogElements(LogElements & elems, size_t idx) const override;
};
/// Fake internal coordination (keeper) response. Never received from client

View File

@ -311,10 +311,12 @@ ZooKeeper::ZooKeeper(
const String & auth_data,
Poco::Timespan session_timeout_,
Poco::Timespan connection_timeout,
Poco::Timespan operation_timeout_)
Poco::Timespan operation_timeout_,
std::shared_ptr<ZooKeeperLog> zk_log_)
: root_path(root_path_),
session_timeout(session_timeout_),
operation_timeout(std::min(operation_timeout_, session_timeout_))
operation_timeout(std::min(operation_timeout_, session_timeout_)),
zk_log(std::move(zk_log_))
{
if (!root_path.empty())
{
@ -578,6 +580,8 @@ void ZooKeeper::sendThread()
info.request->probably_sent = true;
info.request->write(*out);
logOperationIfNeeded(info.request);
/// We sent close request, exit
if (info.request->xid == CLOSE_XID)
break;
@ -747,6 +751,9 @@ void ZooKeeper::receiveEvent()
if (!response)
response = request_info.request->makeResponse();
response->xid = xid;
response->zxid = zxid;
if (err != Error::ZOK)
{
response->error = err;
@ -785,6 +792,8 @@ void ZooKeeper::receiveEvent()
int32_t actual_length = in->count() - count_before_event;
if (length != actual_length)
throw Exception("Response length doesn't match. Expected: " + DB::toString(length) + ", actual: " + DB::toString(actual_length), Error::ZMARSHALLINGERROR);
logOperationIfNeeded(request_info.request, response);
}
catch (...)
{
@ -802,6 +811,8 @@ void ZooKeeper::receiveEvent()
{
if (request_info.callback)
request_info.callback(*response);
logOperationIfNeeded(request_info.request, response);
}
catch (...)
{
@ -879,17 +890,19 @@ void ZooKeeper::finalize(bool error_send, bool error_receive)
for (auto & op : operations)
{
RequestInfo & request_info = op.second;
ResponsePtr response = request_info.request->makeResponse();
ZooKeeperResponsePtr response = request_info.request->makeResponse();
response->error = request_info.request->probably_sent
? Error::ZCONNECTIONLOSS
: Error::ZSESSIONEXPIRED;
response->xid = request_info.request->xid;
if (request_info.callback)
{
try
{
request_info.callback(*response);
logOperationIfNeeded(request_info.request, response, true);
}
catch (...)
{
@ -941,13 +954,15 @@ void ZooKeeper::finalize(bool error_send, bool error_receive)
{
if (info.callback)
{
ResponsePtr response = info.request->makeResponse();
ZooKeeperResponsePtr response = info.request->makeResponse();
if (response)
{
response->error = Error::ZSESSIONEXPIRED;
response->xid = info.request->xid;
try
{
info.callback(*response);
logOperationIfNeeded(info.request, response, true);
}
catch (...)
{
@ -1189,4 +1204,46 @@ void ZooKeeper::close()
ProfileEvents::increment(ProfileEvents::ZooKeeperClose);
}
void ZooKeeper::logOperationIfNeeded(const ZooKeeperRequestPtr & request, const ZooKeeperResponsePtr & response, bool finalize)
{
if (!zk_log)
return;
ZooKeeperLogElement::Type log_type = ZooKeeperLogElement::UNKNOWN;
Decimal64 event_time = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::system_clock::now().time_since_epoch()
).count();
LogElements elems;
if (request)
{
request->createLogElements(elems);
log_type = ZooKeeperLogElement::SEND;
}
else
{
assert(response);
assert(response->xid == PING_XID || response->xid == WATCH_XID);
elems.emplace_back();
}
if (response)
{
response->fillLogElements(elems, 0);
log_type = ZooKeeperLogElement::RECEIVE;
}
if (finalize)
log_type = ZooKeeperLogElement::FINALIZE;
for (auto & elem : elems)
{
elem.type = log_type;
elem.event_time = event_time;
elem.address = socket.peerAddress();
elem.session_id = session_id;
zk_log->add(elem);
}
}
}

View File

@ -80,6 +80,10 @@ namespace CurrentMetrics
extern const Metric ZooKeeperSession;
}
namespace DB
{
class ZooKeeperLog;
}
namespace Coordination
{
@ -110,7 +114,8 @@ public:
const String & auth_data,
Poco::Timespan session_timeout_,
Poco::Timespan connection_timeout,
Poco::Timespan operation_timeout_);
Poco::Timespan operation_timeout_,
std::shared_ptr<ZooKeeperLog> zk_log_);
~ZooKeeper() override;
@ -258,7 +263,10 @@ private:
template <typename T>
void read(T &);
void logOperationIfNeeded(const ZooKeeperRequestPtr & request, const ZooKeeperResponsePtr & response = nullptr, bool finalize = false);
CurrentMetrics::Increment active_session_metric_increment{CurrentMetrics::ZooKeeperSession};
std::shared_ptr<ZooKeeperLog> zk_log;
};
}

View File

@ -1698,7 +1698,7 @@ zkutil::ZooKeeperPtr Context::getZooKeeper() const
const auto & config = shared->zookeeper_config ? *shared->zookeeper_config : getConfigRef();
if (!shared->zookeeper)
shared->zookeeper = std::make_shared<zkutil::ZooKeeper>(config, "zookeeper");
shared->zookeeper = std::make_shared<zkutil::ZooKeeper>(config, "zookeeper", getZooKeeperLog());
else if (shared->zookeeper->expired())
shared->zookeeper = shared->zookeeper->startNewSession();
@ -1762,8 +1762,8 @@ zkutil::ZooKeeperPtr Context::getAuxiliaryZooKeeper(const String & name) const
"config.xml",
name);
zookeeper
= shared->auxiliary_zookeepers.emplace(name, std::make_shared<zkutil::ZooKeeper>(config, "auxiliary_zookeepers." + name)).first;
zookeeper = shared->auxiliary_zookeepers.emplace(name,
std::make_shared<zkutil::ZooKeeper>(config, "auxiliary_zookeepers." + name, getZooKeeperLog())).first;
}
else if (zookeeper->second->expired())
zookeeper->second = zookeeper->second->startNewSession();
@ -1777,14 +1777,15 @@ void Context::resetZooKeeper() const
shared->zookeeper.reset();
}
static void reloadZooKeeperIfChangedImpl(const ConfigurationPtr & config, const std::string & config_name, zkutil::ZooKeeperPtr & zk)
static void reloadZooKeeperIfChangedImpl(const ConfigurationPtr & config, const std::string & config_name, zkutil::ZooKeeperPtr & zk,
std::shared_ptr<ZooKeeperLog> zk_log)
{
if (!zk || zk->configChanged(*config, config_name))
{
if (zk)
zk->finalize();
zk = std::make_shared<zkutil::ZooKeeper>(*config, config_name);
zk = std::make_shared<zkutil::ZooKeeper>(*config, config_name, std::move(zk_log));
}
}
@ -1792,7 +1793,7 @@ void Context::reloadZooKeeperIfChanged(const ConfigurationPtr & config) const
{
std::lock_guard lock(shared->zookeeper_mutex);
shared->zookeeper_config = config;
reloadZooKeeperIfChangedImpl(config, "zookeeper", shared->zookeeper);
reloadZooKeeperIfChangedImpl(config, "zookeeper", shared->zookeeper, getZooKeeperLog());
}
void Context::reloadAuxiliaryZooKeepersConfigIfChanged(const ConfigurationPtr & config)
@ -1807,7 +1808,7 @@ void Context::reloadAuxiliaryZooKeepersConfigIfChanged(const ConfigurationPtr &
it = shared->auxiliary_zookeepers.erase(it);
else
{
reloadZooKeeperIfChangedImpl(config, "auxiliary_zookeepers." + it->first, it->second);
reloadZooKeeperIfChangedImpl(config, "auxiliary_zookeepers." + it->first, it->second, getZooKeeperLog());
++it;
}
}
@ -2090,6 +2091,17 @@ std::shared_ptr<OpenTelemetrySpanLog> Context::getOpenTelemetrySpanLog() const
}
std::shared_ptr<ZooKeeperLog> Context::getZooKeeperLog() const
{
auto lock = getLock();
if (!shared->system_logs)
return {};
return shared->system_logs->zookeeper_log;
}
CompressionCodecPtr Context::chooseCompressionCodec(size_t part_size, double part_size_ratio) const
{
auto lock = getLock();

View File

@ -76,6 +76,7 @@ class TraceLog;
class MetricLog;
class AsynchronousMetricLog;
class OpenTelemetrySpanLog;
class ZooKeeperLog;
struct MergeTreeSettings;
class StorageS3Settings;
class IDatabase;
@ -705,6 +706,7 @@ public:
std::shared_ptr<MetricLog> getMetricLog() const;
std::shared_ptr<AsynchronousMetricLog> getAsynchronousMetricLog() const;
std::shared_ptr<OpenTelemetrySpanLog> getOpenTelemetrySpanLog() const;
std::shared_ptr<ZooKeeperLog> getZooKeeperLog() const;
/// Returns an object used to log operations with parts if it possible.
/// Provide table name to make required checks.

View File

@ -31,6 +31,8 @@
#include <pcg_random.hpp>
#include <common/scope_guard_safe.h>
#include <Interpreters/ZooKeeperLog.h>
namespace fs = std::filesystem;
@ -371,7 +373,7 @@ void DDLWorker::scheduleTasks(bool reinitialized)
}
}
Strings queue_nodes = zookeeper->getChildren(queue_dir, nullptr, queue_updated_event);
Strings queue_nodes = zookeeper->getChildren(queue_dir, &queue_node_stat, queue_updated_event);
size_t size_before_filtering = queue_nodes.size();
filterAndSortQueueNodes(queue_nodes);
/// The following message is too verbose, but it can be useful too debug mysterious test failures in CI
@ -1136,10 +1138,32 @@ void DDLWorker::runMainThread()
cleanup_event->set();
scheduleTasks(reinitialized);
LOG_DEBUG(log, "Waiting for queue updates");
LOG_DEBUG(log, "Waiting for queue updates (stat: {}, {}, {}, {})",
queue_node_stat.version, queue_node_stat.cversion, queue_node_stat.numChildren, queue_node_stat.pzxid);
/// FIXME It may hang for unknown reason. Timeout is just a hotfix.
constexpr int queue_wait_timeout_ms = 10000;
queue_updated_event->tryWait(queue_wait_timeout_ms);
bool updated = queue_updated_event->tryWait(queue_wait_timeout_ms);
if (!updated)
{
Coordination::Stat new_stat;
tryGetZooKeeper()->get(queue_dir, &new_stat);
bool queue_changed = memcmp(&queue_node_stat, &new_stat, sizeof(Coordination::Stat)) != 0;
bool watch_triggered = queue_updated_event->tryWait(0);
if (queue_changed && !watch_triggered)
{
/// It should never happen.
/// Maybe log message, abort() and system.zookeeper_log will help to debug it and remove timeout (#26036).
LOG_TRACE(
log,
"Queue was not updated (stat: {}, {}, {}, {})",
new_stat.version,
new_stat.cversion,
new_stat.numChildren,
new_stat.pzxid);
context->getZooKeeperLog()->flush();
abort();
}
}
}
catch (const Coordination::Exception & e)
{

View File

@ -3,6 +3,7 @@
#include <Common/CurrentThread.h>
#include <Common/DNSResolver.h>
#include <Common/ThreadPool.h>
#include <Common/ZooKeeper/IKeeper.h>
#include <Storages/IStorage_fwd.h>
#include <Parsers/IAST_fwd.h>
#include <Interpreters/Context.h>
@ -125,6 +126,7 @@ protected:
std::optional<String> first_failed_task_name;
std::list<DDLTaskPtr> current_tasks;
Coordination::Stat queue_node_stat;
std::shared_ptr<Poco::Event> queue_updated_event = std::make_shared<Poco::Event>();
std::shared_ptr<Poco::Event> cleanup_event = std::make_shared<Poco::Event>();
std::atomic<bool> initialized = false;

View File

@ -25,6 +25,7 @@
#include <Interpreters/MetricLog.h>
#include <Interpreters/AsynchronousMetricLog.h>
#include <Interpreters/OpenTelemetrySpanLog.h>
#include <Interpreters/ZooKeeperLog.h>
#include <Interpreters/JIT/CompiledExpressionCache.h>
#include <Access/ContextAccess.h>
#include <Access/AllowedClientHosts.h>
@ -416,7 +417,8 @@ BlockIO InterpreterSystemQuery::execute()
[&] { if (auto text_log = getContext()->getTextLog()) text_log->flush(true); },
[&] { if (auto metric_log = getContext()->getMetricLog()) metric_log->flush(true); },
[&] { if (auto asynchronous_metric_log = getContext()->getAsynchronousMetricLog()) asynchronous_metric_log->flush(true); },
[&] { if (auto opentelemetry_span_log = getContext()->getOpenTelemetrySpanLog()) opentelemetry_span_log->flush(true); }
[&] { if (auto opentelemetry_span_log = getContext()->getOpenTelemetrySpanLog()) opentelemetry_span_log->flush(true); },
[&] { if (auto zookeeper_log = getContext()->getZooKeeperLog()) zookeeper_log->flush(true); }
);
break;
}

View File

@ -8,6 +8,7 @@
#include <Interpreters/MetricLog.h>
#include <Interpreters/AsynchronousMetricLog.h>
#include <Interpreters/OpenTelemetrySpanLog.h>
#include <Interpreters/ZooKeeperLog.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <common/logger_useful.h>
@ -103,6 +104,7 @@ SystemLogs::SystemLogs(ContextPtr global_context, const Poco::Util::AbstractConf
opentelemetry_span_log = createSystemLog<OpenTelemetrySpanLog>(
global_context, "system", "opentelemetry_span_log", config,
"opentelemetry_span_log");
zookeeper_log = createSystemLog<ZooKeeperLog>(global_context, "system", "zookeeper_log", config, "zookeeper_log");
if (query_log)
logs.emplace_back(query_log.get());
@ -122,6 +124,8 @@ SystemLogs::SystemLogs(ContextPtr global_context, const Poco::Util::AbstractConf
logs.emplace_back(asynchronous_metric_log.get());
if (opentelemetry_span_log)
logs.emplace_back(opentelemetry_span_log.get());
if (zookeeper_log)
logs.emplace_back(zookeeper_log.get());
try
{

View File

@ -74,6 +74,7 @@ class CrashLog;
class MetricLog;
class AsynchronousMetricLog;
class OpenTelemetrySpanLog;
class ZooKeeperLog;
class ISystemLog
@ -110,6 +111,8 @@ struct SystemLogs
std::shared_ptr<AsynchronousMetricLog> asynchronous_metric_log;
/// OpenTelemetry trace spans.
std::shared_ptr<OpenTelemetrySpanLog> opentelemetry_span_log;
/// Used to log all actions of ZooKeeper client
std::shared_ptr<ZooKeeperLog> zookeeper_log;
std::vector<ISystemLog *> logs;
};

View File

@ -0,0 +1,130 @@
#include <Interpreters/ZooKeeperLog.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnFixedString.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnsNumber.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeDateTime64.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeEnum.h>
#include <DataTypes/DataTypeFactory.h>
#include <DataTypes/DataTypeMap.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <Interpreters/ProfileEventsExt.h>
#include <Interpreters/QueryLog.h>
#include <Poco/Net/IPAddress.h>
#include <Common/ClickHouseRevision.h>
#include <Common/IPv6ToBinary.h>
namespace DB
{
NamesAndTypesList ZooKeeperLogElement::getNamesAndTypes()
{
auto event_type = std::make_shared<DataTypeEnum8>(
DataTypeEnum8::Values
{
{"Send", static_cast<Int8>(SEND)},
{"Receive", static_cast<Int8>(RECEIVE)},
{"Finalize", static_cast<Int8>(FINALIZE)},
});
return
{
{"type", std::move(event_type)},
{"event_date", std::make_shared<DataTypeDate>()},
{"event_time", std::make_shared<DataTypeDateTime64>(6)},
{"address", DataTypeFactory::instance().get("IPv6")},
{"port", std::make_shared<DataTypeUInt16>()},
{"session_id", std::make_shared<DataTypeInt64>()},
{"xid", std::make_shared<DataTypeInt32>()},
{"has_watch", std::make_shared<DataTypeUInt8>()},
{"op_num", std::make_shared<DataTypeInt32>()},
{"path", std::make_shared<DataTypeString>()},
{"data", std::make_shared<DataTypeString>()},
{"is_ephemeral", std::make_shared<DataTypeUInt8>()},
{"is_sequential", std::make_shared<DataTypeUInt8>()},
{"version", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt32>())},
{"requests_size", std::make_shared<DataTypeUInt32>()},
{"request_idx", std::make_shared<DataTypeUInt32>()},
{"zxid", std::make_shared<DataTypeInt64>()},
{"error", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt32>())},
{"watch_type", std::make_shared<DataTypeInt32>()},
{"watch_state", std::make_shared<DataTypeInt32>()},
{"path_created", std::make_shared<DataTypeString>()},
{"stat_czxid", std::make_shared<DataTypeInt64>()},
{"stat_mzxid", std::make_shared<DataTypeInt64>()},
{"stat_pzxid", std::make_shared<DataTypeInt64>()},
{"stat_version", std::make_shared<DataTypeInt32>()},
{"stat_cversion", std::make_shared<DataTypeInt32>()},
{"stat_dataLength", std::make_shared<DataTypeInt32>()},
{"stat_numChildren", std::make_shared<DataTypeInt32>()},
{"children", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>())},
};
}
void ZooKeeperLogElement::appendToBlock(MutableColumns & columns) const
{
assert(type != UNKNOWN);
size_t i = 0;
columns[i++]->insert(type);
auto event_time_seconds = event_time / 1000000;
columns[i++]->insert(DateLUT::instance().toDayNum(event_time_seconds).toUnderType());
columns[i++]->insert(event_time);
columns[i++]->insert(IPv6ToBinary(address.host()).data());
columns[i++]->insert(address.port());
columns[i++]->insert(session_id);
columns[i++]->insert(xid);
columns[i++]->insert(has_watch);
columns[i++]->insert(op_num);
columns[i++]->insert(path);
columns[i++]->insert(data);
columns[i++]->insert(is_ephemeral);
columns[i++]->insert(is_sequential);
columns[i++]->insert(version ? Field(*version) : Field());
columns[i++]->insert(requests_size);
columns[i++]->insert(request_idx);
columns[i++]->insert(zxid);
columns[i++]->insert(error ? Field(*error) : Field());
columns[i++]->insert(watch_type);
columns[i++]->insert(watch_state);
columns[i++]->insert(path_created);
columns[i++]->insert(stat.czxid);
columns[i++]->insert(stat.mzxid);
columns[i++]->insert(stat.pzxid);
columns[i++]->insert(stat.version);
columns[i++]->insert(stat.cversion);
columns[i++]->insert(stat.dataLength);
columns[i++]->insert(stat.numChildren);
Array children_array;
for (const auto & c : children)
children_array.emplace_back(c);
columns[i++]->insert(children_array);
}
};

View File

@ -0,0 +1,76 @@
#pragma once
#include <Core/NamesAndAliases.h>
#include <Interpreters/SystemLog.h>
#include <Interpreters/ClientInfo.h>
#include <Common/ZooKeeper/IKeeper.h>
namespace DB
{
struct ZooKeeperLogElement
{
enum Type
{
UNKNOWN = 0,
SEND = 1,
RECEIVE = 2,
FINALIZE = 3
};
Type type = UNKNOWN;
Decimal64 event_time = 0;
Poco::Net::SocketAddress address;
Int64 session_id = 0;
/// Common request info
Int32 xid = 0;
bool has_watch = false;
Int32 op_num = 0;
String path;
/// create, set
String data;
/// create
bool is_ephemeral = false;
bool is_sequential = false;
/// remove, check, set
std::optional<Int32> version = 0;
/// multi
UInt32 requests_size = 0;
UInt32 request_idx = 0;
/// Common response info
Int64 zxid = 0;
std::optional<Int32> error;
/// watch
Int32 watch_type = 0;
Int32 watch_state = 0;
/// create
String path_created;
/// exists, get, set, list
Coordination::Stat stat = {};
/// list
Strings children;
static std::string name() { return "ZooKeeperLog"; }
static NamesAndTypesList getNamesAndTypes();
static NamesAndAliases getNamesAndAliases() { return {}; }
void appendToBlock(MutableColumns & columns) const;
};
class ZooKeeperLog : public SystemLog<ZooKeeperLogElement>
{
using SystemLog<ZooKeeperLogElement>::SystemLog;
};
}

View File

@ -0,0 +1,7 @@
<yandex>
<zookeeper_log>
<database>system</database>
<table>zookeeper_log</table>
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
</zookeeper_log>
</yandex>

View File

@ -34,6 +34,7 @@ ln -sf $SRC_PATH/config.d/logging_no_rotate.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/tcp_with_proxy.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/top_level_domains_lists.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/top_level_domains_path.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/zookeeper_log.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/users.d/log_queries.xml $DEST_SERVER_PATH/users.d/
ln -sf $SRC_PATH/users.d/readonly.xml $DEST_SERVER_PATH/users.d/
ln -sf $SRC_PATH/users.d/access_management.xml $DEST_SERVER_PATH/users.d/

View File

@ -0,0 +1,40 @@
log
Receive 0 0 /test/01158/default/rmt/log 0 0 0 0 0 0 4 3 0 0 0 0
Send 0 1 /test/01158/default/rmt/log 0 0 0 0 4 \N 0 0 0 0 0 0
Receive 0 1 /test/01158/default/rmt/log 0 0 0 0 4 0 0 0 /test/01158/default/rmt/log 0 0 0 0
Send 0 1 /test/01158/default/rmt/log/log- 0 1 0 0 1 \N 0 0 0 0 0 0
Receive 0 1 /test/01158/default/rmt/log/log- 0 1 0 0 1 0 0 0 /test/01158/default/rmt/log/log-0000000000 0 0 0 0
parts
Send 0 14 0 0 0 5 0 \N 0 0 0 0 0 0
Send 0 1 /test/01158/default/rmt/log/log- 0 1 0 0 1 \N 0 0 0 0 0 0
Send 0 2 /test/01158/default/rmt/block_numbers/all/block-0000000000 0 0 -1 0 2 \N 0 0 0 0 0 0
Send 0 2 /test/01158/default/rmt/temp/abandonable_lock-0000000000 0 0 -1 0 3 \N 0 0 0 0 0 0
Send 0 1 /test/01158/default/rmt/blocks/all_6308706741995381342_2495791770474910886 0 0 0 0 4 \N 0 0 0 0 0 0
Send 0 1 /test/01158/default/rmt/replicas/1/parts/all_0_0_0 0 0 0 0 5 \N 0 0 0 0 0 0
Receive 0 14 0 0 0 5 0 0 0 0 0 0 0 0
Receive 0 1 /test/01158/default/rmt/log/log- 0 1 0 0 1 0 0 0 /test/01158/default/rmt/log/log-0000000000 0 0 0 0
Receive 0 2 /test/01158/default/rmt/block_numbers/all/block-0000000000 0 0 -1 0 2 0 0 0 0 0 0 0
Receive 0 2 /test/01158/default/rmt/temp/abandonable_lock-0000000000 0 0 -1 0 3 0 0 0 0 0 0 0
Receive 0 1 /test/01158/default/rmt/blocks/all_6308706741995381342_2495791770474910886 0 0 0 0 4 0 0 0 /test/01158/default/rmt/blocks/all_6308706741995381342_2495791770474910886 0 0 0 0
Receive 0 1 /test/01158/default/rmt/replicas/1/parts/all_0_0_0 0 0 0 0 5 0 0 0 /test/01158/default/rmt/replicas/1/parts/all_0_0_0 0 0 0 0
Send 0 3 /test/01158/default/rmt/replicas/1/parts/all_0_0_0 0 0 0 0 0 \N 0 0 0 0 0 0
Receive 0 3 /test/01158/default/rmt/replicas/1/parts/all_0_0_0 0 0 0 0 0 0 0 0 0 0 96 0
blocks
Send 0 14 0 0 0 3 0 \N 0 0 0 0 0 0
Send 0 1 /test/01158/default/rmt/blocks/all_6308706741995381342_2495791770474910886 0 0 0 0 1 \N 0 0 0 0 0 0
Send 0 2 /test/01158/default/rmt/blocks/all_6308706741995381342_2495791770474910886 0 0 -1 0 2 \N 0 0 0 0 0 0
Send 0 1 /test/01158/default/rmt/temp/abandonable_lock- 1 1 0 0 3 \N 0 0 0 0 0 0
Receive 0 14 0 0 0 3 0 0 0 0 0 0 0 0
Receive 0 1 /test/01158/default/rmt/blocks/all_6308706741995381342_2495791770474910886 0 0 0 0 1 0 0 0 /test/01158/default/rmt/blocks/all_6308706741995381342_2495791770474910886 0 0 0 0
Receive 0 2 /test/01158/default/rmt/blocks/all_6308706741995381342_2495791770474910886 0 0 -1 0 2 0 0 0 0 0 0 0
Receive 0 1 /test/01158/default/rmt/temp/abandonable_lock- 1 1 0 0 3 0 0 0 /test/01158/default/rmt/temp/abandonable_lock-0000000000 0 0 0 0
Send 0 14 0 0 0 3 0 \N 0 0 0 0 0 0
Send 0 1 /test/01158/default/rmt/blocks/all_6308706741995381342_2495791770474910886 0 0 0 0 1 \N 0 0 0 0 0 0
Send 0 2 /test/01158/default/rmt/blocks/all_6308706741995381342_2495791770474910886 0 0 -1 0 2 \N 0 0 0 0 0 0
Send 0 1 /test/01158/default/rmt/temp/abandonable_lock- 1 1 0 0 3 \N 0 0 0 0 0 0
Receive 0 14 0 0 0 3 0 -110 0 0 0 0 0 0
Receive 0 -1 /test/01158/default/rmt/blocks/all_6308706741995381342_2495791770474910886 0 0 0 0 1 -110 0 0 0 0 0 0
Receive 0 -1 /test/01158/default/rmt/blocks/all_6308706741995381342_2495791770474910886 0 0 -1 0 2 -2 0 0 0 0 0 0
Receive 0 -1 /test/01158/default/rmt/temp/abandonable_lock- 1 1 0 0 3 -2 0 0 0 0 0 0
Send 0 4 /test/01158/default/rmt/blocks/all_6308706741995381342_2495791770474910886 0 0 0 0 0 \N 0 0 0 0 0 0
Receive 0 4 /test/01158/default/rmt/blocks/all_6308706741995381342_2495791770474910886 0 0 0 0 0 0 0 0 0 0 9 0

View File

@ -0,0 +1,27 @@
drop table if exists rmt;
create table rmt (n int) engine=ReplicatedMergeTree('/test/01158/{database}/rmt', '1') order by n;
insert into rmt values (1);
insert into rmt values (1);
system flush logs;
select 'log';
select type, has_watch, op_num, path, is_ephemeral, is_sequential, version, requests_size, request_idx, error, watch_type,
watch_state, path_created, stat_version, stat_cversion, stat_dataLength, stat_numChildren
from system.zookeeper_log where path like '/test/01158/' || currentDatabase() || '/rmt/log%' and op_num not in (3, 4, 12)
order by xid, type, request_idx;
select 'parts';
select type, has_watch, op_num, path, is_ephemeral, is_sequential, version, requests_size, request_idx, error, watch_type,
watch_state, path_created, stat_version, stat_cversion, stat_dataLength, stat_numChildren
from system.zookeeper_log
where (session_id, xid) in (select session_id, xid from system.zookeeper_log where path='/test/01158/' || currentDatabase() || '/rmt/replicas/1/parts/all_0_0_0')
order by xid, type, request_idx;
select 'blocks';
select type, has_watch, op_num, path, is_ephemeral, is_sequential, version, requests_size, request_idx, error, watch_type,
watch_state, path_created, stat_version, stat_cversion, stat_dataLength, stat_numChildren
from system.zookeeper_log
where (session_id, xid) in (select session_id, xid from system.zookeeper_log where path like '/test/01158/' || currentDatabase() || '/rmt/blocks%' and op_num not in (1, 12))
order by xid, type, request_idx;
drop table rmt;