DDLLogEntry.parse() to populate query and initiator

This commit is contained in:
bharatnc 2020-12-29 23:56:22 -08:00
parent 9cddd22a7a
commit f0ea07b493
3 changed files with 104 additions and 101 deletions

View File

@ -21,7 +21,6 @@
#include <Interpreters/Context.h>
#include <Access/AccessRightsElement.h>
#include <Access/ContextAccess.h>
#include <Common/DNSResolver.h>
#include <Common/Macros.h>
#include <Common/setThreadName.h>
#include <Common/Stopwatch.h>
@ -34,7 +33,6 @@
#include <DataTypes/DataTypeString.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Poco/Timestamp.h>
#include <Poco/Net/NetException.h>
#include <common/sleep.h>
#include <common/getFQDNOrHostName.h>
#include <pcg_random.hpp>
@ -62,107 +60,46 @@ namespace ErrorCodes
}
namespace
String DDLLogEntry::toString()
{
WriteBufferFromOwnString wb;
struct HostID
{
String host_name;
UInt16 port;
Strings host_id_strings(hosts.size());
std::transform(hosts.begin(), hosts.end(), host_id_strings.begin(), HostID::applyToString);
HostID() = default;
explicit HostID(const Cluster::Address & address)
: host_name(address.host_name), port(address.port) {}
static HostID fromString(const String & host_port_str)
{
HostID res;
std::tie(res.host_name, res.port) = Cluster::Address::fromString(host_port_str);
return res;
}
String toString() const
{
return Cluster::Address::toString(host_name, port);
}
String readableString() const
{
return host_name + ":" + DB::toString(port);
}
bool isLocalAddress(UInt16 clickhouse_port) const
{
try
{
return DB::isLocalAddress(DNSResolver::instance().resolveAddress(host_name, port), clickhouse_port);
}
catch (const Poco::Net::NetException &)
{
/// Avoid "Host not found" exceptions
return false;
}
}
static String applyToString(const HostID & host_id)
{
return host_id.toString();
}
};
auto version = CURRENT_VERSION;
wb << "version: " << version << "\n";
wb << "query: " << escape << query << "\n";
wb << "hosts: " << host_id_strings << "\n";
wb << "initiator: " << initiator << "\n";
return wb.str();
}
struct DDLLogEntry
void DDLLogEntry::parse(const String & data)
{
String query;
std::vector<HostID> hosts;
String initiator; // optional
ReadBufferFromString rb(data);
static constexpr int CURRENT_VERSION = 1;
int version;
rb >> "version: " >> version >> "\n";
String toString()
{
WriteBufferFromOwnString wb;
if (version != CURRENT_VERSION)
throw Exception(ErrorCodes::UNKNOWN_FORMAT_VERSION, "Unknown DDLLogEntry format version: {}", version);
Strings host_id_strings(hosts.size());
std::transform(hosts.begin(), hosts.end(), host_id_strings.begin(), HostID::applyToString);
Strings host_id_strings;
rb >> "query: " >> escape >> query >> "\n";
rb >> "hosts: " >> host_id_strings >> "\n";
auto version = CURRENT_VERSION;
wb << "version: " << version << "\n";
wb << "query: " << escape << query << "\n";
wb << "hosts: " << host_id_strings << "\n";
wb << "initiator: " << initiator << "\n";
if (!rb.eof())
rb >> "initiator: " >> initiator >> "\n";
else
initiator.clear();
return wb.str();
}
assertEOF(rb);
void parse(const String & data)
{
ReadBufferFromString rb(data);
int version;
rb >> "version: " >> version >> "\n";
if (version != CURRENT_VERSION)
throw Exception(ErrorCodes::UNKNOWN_FORMAT_VERSION, "Unknown DDLLogEntry format version: {}", version);
Strings host_id_strings;
rb >> "query: " >> escape >> query >> "\n";
rb >> "hosts: " >> host_id_strings >> "\n";
if (!rb.eof())
rb >> "initiator: " >> initiator >> "\n";
else
initiator.clear();
assertEOF(rb);
hosts.resize(host_id_strings.size());
std::transform(host_id_strings.begin(), host_id_strings.end(), hosts.begin(), HostID::fromString);
}
};
hosts.resize(host_id_strings.size());
std::transform(host_id_strings.begin(), host_id_strings.end(), hosts.begin(), HostID::fromString);
}
struct DDLTask

View File

@ -1,12 +1,15 @@
#pragma once
#include <DataStreams/BlockIO.h>
#include <Interpreters/Cluster.h>
#include <Interpreters/Context.h>
#include <DataStreams/BlockIO.h>
#include <Common/CurrentThread.h>
#include <Common/ThreadPool.h>
#include <common/logger_useful.h>
#include <Storages/IStorage.h>
#include <Poco/Net/NetException.h>
#include <Common/CurrentThread.h>
#include <Common/DNSResolver.h>
#include <Common/ThreadPool.h>
#include <Common/isLocalAddress.h>
#include <common/logger_useful.h>
#include <atomic>
#include <chrono>
@ -16,24 +19,80 @@
namespace zkutil
{
class ZooKeeper;
class ZooKeeper;
}
namespace DB
{
class Context;
class ASTAlterQuery;
class AccessRightsElements;
struct DDLLogEntry;
struct HostID
{
String host_name;
UInt16 port;
HostID() = default;
explicit HostID(const Cluster::Address & address) : host_name(address.host_name), port(address.port) { }
static HostID fromString(const String & host_port_str)
{
HostID res;
std::tie(res.host_name, res.port) = Cluster::Address::fromString(host_port_str);
return res;
}
String toString() const { return Cluster::Address::toString(host_name, port); }
String readableString() const { return host_name + ":" + DB::toString(port); }
bool isLocalAddress(UInt16 clickhouse_port) const
{
try
{
return DB::isLocalAddress(DNSResolver::instance().resolveAddress(host_name, port), clickhouse_port);
}
catch (const Poco::Net::NetException &)
{
/// Avoid "Host not found" exceptions
return false;
}
}
static String applyToString(const HostID & host_id) { return host_id.toString(); }
};
struct DDLLogEntry
{
String query;
std::vector<HostID> hosts;
String initiator; // optional
static constexpr int CURRENT_VERSION = 1;
public:
String toString();
void parse(const String & data);
};
struct DDLTask;
using DDLTaskPtr = std::unique_ptr<DDLTask>;
/// Pushes distributed DDL query to the queue
BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr, const Context & context);
BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr, const Context & context, const AccessRightsElements & query_requires_access, bool query_requires_grant_option = false);
BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr, const Context & context, AccessRightsElements && query_requires_access, bool query_requires_grant_option = false);
BlockIO executeDDLQueryOnCluster(
const ASTPtr & query_ptr,
const Context & context,
const AccessRightsElements & query_requires_access,
bool query_requires_grant_option = false);
BlockIO executeDDLQueryOnCluster(
const ASTPtr & query_ptr,
const Context & context,
AccessRightsElements && query_requires_access,
bool query_requires_grant_option = false);
class DDLWorker

View File

@ -6,6 +6,7 @@
#include <Columns/ColumnArray.h>
#include <Interpreters/Cluster.h>
#include <Interpreters/DDLWorker.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeDateTime.h>
@ -92,7 +93,8 @@ NamesAndTypesList StorageSystemDDLWorkerQueue::getNamesAndTypes()
{"port", std::make_shared<DataTypeUInt16>()},
{"status", std::make_shared<DataTypeEnum8>(getStatusEnumsAndValues())},
{"cluster", std::make_shared<DataTypeString>()},
{"value", std::make_shared<DataTypeString>()},
{"query", std::make_shared<DataTypeString>()},
{"initiator", std::make_shared<DataTypeString>()},
{"query_start_time", std::make_shared<DataTypeDateTime>()},
{"query_finish_time", std::make_shared<DataTypeDateTime>()},
{"query_duration_ms", std::make_shared<DataTypeUInt64>()},
@ -160,6 +162,7 @@ static String extractPath(const ASTPtr & query, const Context & context)
return extractPathImpl(*select.where(), res, context) ? res : "";
}
void StorageSystemDDLWorkerQueue::fillData(MutableColumns & res_columns, const Context & context, const SelectQueryInfo & query_info) const
{
String cluster_name = extractPath(query_info.query, context);
@ -276,7 +279,11 @@ void StorageSystemDDLWorkerQueue::fillData(MutableColumns & res_columns, const C
auto query_start_time = res.stat.mtime;
res_columns[i++]->insert(res.data); // value
DDLLogEntry entry;
entry.parse(res.data);
res_columns[i++]->insert(entry.query); // query
res_columns[i++]->insert(entry.initiator); // initiator
res_columns[i++]->insert(UInt64(query_start_time / 1000)); // query_start_time
res_columns[i++]->insert(UInt64(query_finish_time / 1000)); // query_finish_time
res_columns[i++]->insert(UInt64(query_finish_time - query_start_time)); // query_duration_ms