Merge pull request #41484 from FrankChen021/on_cluster_dll

Add OpenTelemetry support to ON CLUSTER DDL
This commit is contained in:
Alexander Tokmakov 2022-09-27 15:15:52 +03:00 committed by GitHub
commit 3d4a5a493b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 265 additions and 8 deletions

View File

@ -5,7 +5,7 @@
#include <Common/Exception.h>
#include <Common/hex.h>
#include <Core/Settings.h>
#include <IO/WriteHelpers.h>
#include <IO/Operators.h>
namespace DB
{
@ -226,6 +226,30 @@ String TracingContext::composeTraceparentHeader() const
static_cast<uint8_t>(trace_flags));
}
void TracingContext::deserialize(ReadBuffer & buf)
{
buf >> this->trace_id
>> "\n"
>> this->span_id
>> "\n"
>> this->tracestate
>> "\n"
>> this->trace_flags
>> "\n";
}
void TracingContext::serialize(WriteBuffer & buf) const
{
buf << this->trace_id
<< "\n"
<< this->span_id
<< "\n"
<< this->tracestate
<< "\n"
<< this->trace_flags
<< "\n";
}
const TracingContextOnThread & CurrentContext()
{
return current_thread_trace_context;

View File

@ -7,6 +7,8 @@ namespace DB
struct Settings;
class OpenTelemetrySpanLog;
class WriteBuffer;
class ReadBuffer;
namespace OpenTelemetry
{
@ -63,6 +65,9 @@ struct TracingContext
{
return trace_id != UUID();
}
void deserialize(ReadBuffer & buf);
void serialize(WriteBuffer & buf) const;
};
/// Tracing context kept on each thread
@ -157,5 +162,16 @@ struct SpanHolder : public Span
}
inline WriteBuffer & operator<<(WriteBuffer & buf, const OpenTelemetry::TracingContext & context)
{
context.serialize(buf);
return buf;
}
inline ReadBuffer & operator>> (ReadBuffer & buf, OpenTelemetry::TracingContext & context)
{
context.deserialize(buf);
return buf;
}
}

View File

@ -8,6 +8,7 @@
#include <Interpreters/executeQuery.h>
#include <Parsers/queryToString.h>
#include <Common/Exception.h>
#include <Common/OpenTelemetryTraceContext.h>
#include <Common/ZooKeeper/KeeperException.h>
#include <Common/ZooKeeper/Types.h>
#include <Common/ZooKeeper/ZooKeeper.h>
@ -642,6 +643,7 @@ BlockIO DatabaseReplicated::tryEnqueueReplicatedDDL(const ASTPtr & query, Contex
entry.query = queryToString(query);
entry.initiator = ddl_worker->getCommonHostID();
entry.setSettingsIfRequired(query_context);
entry.tracing_context = OpenTelemetry::CurrentContext();
String node_path = ddl_worker->tryEnqueueAndExecuteEntry(entry, query_context);
Strings hosts_to_wait = getZooKeeper()->getChildren(zookeeper_path + "/replicas");

View File

@ -221,6 +221,10 @@ String DatabaseReplicatedDDLWorker::tryEnqueueAndExecuteEntry(DDLLogEntry & entr
/// NOTE Possibly it would be better to execute initial query on the most up-to-date node,
/// but it requires more complex logic around /try node.
OpenTelemetry::SpanHolder span(__FUNCTION__);
span.addAttribute("clickhouse.cluster", database->getDatabaseName());
entry.tracing_context = OpenTelemetry::CurrentContext();
auto zookeeper = getAndSetZooKeeper();
UInt32 our_log_ptr = getLogPointer();
UInt32 max_log_ptr = parse<UInt32>(zookeeper->get(database->zookeeper_path + "/max_log_ptr"));

View File

@ -50,21 +50,26 @@ bool HostID::isLocalAddress(UInt16 clickhouse_port) const
void DDLLogEntry::assertVersion() const
{
constexpr UInt64 max_version = 2;
if (version == 0 || max_version < version)
if (version == 0
/// NORMALIZE_CREATE_ON_INITIATOR_VERSION does not change the entry format, it uses versioin 2, so there shouldn't be such version
|| version == NORMALIZE_CREATE_ON_INITIATOR_VERSION
|| version > DDL_ENTRY_FORMAT_MAX_VERSION)
throw Exception(ErrorCodes::UNKNOWN_FORMAT_VERSION, "Unknown DDLLogEntry format version: {}."
"Maximum supported version is {}", version, max_version);
"Maximum supported version is {}", version, DDL_ENTRY_FORMAT_MAX_VERSION);
}
void DDLLogEntry::setSettingsIfRequired(ContextPtr context)
{
version = context->getSettingsRef().distributed_ddl_entry_format_version;
if (version <= 0 || version > DDL_ENTRY_FORMAT_MAX_VERSION)
throw Exception(ErrorCodes::UNKNOWN_FORMAT_VERSION, "Unknown distributed_ddl_entry_format_version: {}."
"Maximum supported version is {}.", version, DDL_ENTRY_FORMAT_MAX_VERSION);
/// NORMALIZE_CREATE_ON_INITIATOR_VERSION does not affect entry format in ZooKeeper
if (version == NORMALIZE_CREATE_ON_INITIATOR_VERSION)
version = SETTINGS_IN_ZK_VERSION;
if (version == SETTINGS_IN_ZK_VERSION)
if (version >= SETTINGS_IN_ZK_VERSION)
settings.emplace(context->getSettingsRef().changes());
}
@ -94,6 +99,9 @@ String DDLLogEntry::toString() const
wb << "settings: " << serializeAST(ast) << "\n";
}
if (version >= OPENTELEMETRY_ENABLED_VERSION)
wb << "tracing: " << this->tracing_context;
return wb.str();
}
@ -106,7 +114,7 @@ void DDLLogEntry::parse(const String & data)
Strings host_id_strings;
rb >> "query: " >> escape >> query >> "\n";
if (version == 1)
if (version == OLDEST_VERSION)
{
rb >> "hosts: " >> host_id_strings >> "\n";
@ -115,9 +123,8 @@ void DDLLogEntry::parse(const String & data)
else
initiator.clear();
}
else if (version == 2)
else if (version >= SETTINGS_IN_ZK_VERSION)
{
if (!rb.eof() && *rb.position() == 'h')
rb >> "hosts: " >> host_id_strings >> "\n";
if (!rb.eof() && *rb.position() == 'i')
@ -134,6 +141,12 @@ void DDLLogEntry::parse(const String & data)
}
}
if (version >= OPENTELEMETRY_ENABLED_VERSION)
{
if (!rb.eof() && *rb.position() == 't')
rb >> "tracing: " >> this->tracing_context;
}
assertEOF(rb);
if (!host_id_strings.empty())

View File

@ -2,6 +2,7 @@
#include <Core/Types.h>
#include <Interpreters/Cluster.h>
#include <Common/OpenTelemetryTraceContext.h>
#include <Common/ZooKeeper/Types.h>
#include <filesystem>
@ -69,12 +70,18 @@ struct DDLLogEntry
static constexpr const UInt64 OLDEST_VERSION = 1;
static constexpr const UInt64 SETTINGS_IN_ZK_VERSION = 2;
static constexpr const UInt64 NORMALIZE_CREATE_ON_INITIATOR_VERSION = 3;
static constexpr const UInt64 OPENTELEMETRY_ENABLED_VERSION = 4;
/// Add new version here
/// Remember to update the value below once new version is added
static constexpr const UInt64 DDL_ENTRY_FORMAT_MAX_VERSION = 4;
UInt64 version = 1;
String query;
std::vector<HostID> hosts;
String initiator; // optional
std::optional<SettingsChanges> settings;
OpenTelemetry::TracingContext tracing_context;
void setSettingsIfRequired(ContextPtr context);
String toString() const;

View File

@ -19,6 +19,7 @@
#include <Interpreters/executeQuery.h>
#include <Interpreters/Cluster.h>
#include <Interpreters/Context.h>
#include <Common/OpenTelemetryTraceContext.h>
#include <Common/setThreadName.h>
#include <Common/randomSeed.h>
#include <Common/ZooKeeper/ZooKeeper.h>
@ -515,6 +516,11 @@ void DDLWorker::processTask(DDLTaskBase & task, const ZooKeeperPtr & zookeeper)
LOG_DEBUG(log, "Processing task {} ({})", task.entry_name, task.entry.query);
chassert(!task.completely_processed);
/// Setup tracing context on current thread for current DDL
OpenTelemetry::TracingContextHolder tracing_ctx_holder(__PRETTY_FUNCTION__ ,
task.entry.tracing_context,
this->context->getOpenTelemetrySpanLog());
String active_node_path = task.getActiveNodePath();
String finished_node_path = task.getFinishedNodePath();

View File

@ -55,6 +55,8 @@ bool isSupportedAlterType(int type)
BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr_, ContextPtr context, const DDLQueryOnClusterParams & params)
{
OpenTelemetry::SpanHolder span(__FUNCTION__);
if (context->getCurrentTransaction() && context->getSettingsRef().throw_on_unsupported_query_inside_transaction)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "ON CLUSTER queries inside transactions are not supported");
@ -88,6 +90,8 @@ BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr_, ContextPtr context,
cluster = context->getCluster(query->cluster);
}
span.addAttribute("clickhouse.cluster", query->cluster);
/// TODO: support per-cluster grant
context->checkAccess(AccessType::CLUSTER);
@ -164,6 +168,7 @@ BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr_, ContextPtr context,
entry.query = queryToString(query_ptr);
entry.initiator = ddl_worker.getCommonHostID();
entry.setSettingsIfRequired(context);
entry.tracing_context = OpenTelemetry::CurrentContext();
String node_path = ddl_worker.enqueueQuery(entry);
return getDistributedDDLStatus(node_path, entry, context);

View File

@ -865,6 +865,12 @@ class TestCase:
stdout=PIPE,
universal_newlines=True,
).communicate()[0]
if diff.startswith("Binary files "):
diff += "Content of stdout:\n===================\n"
file = open(self.stdout_file, "r")
diff += str(file.read())
file.close()
diff += "==================="
description += f"\n{diff}\n"
if debug_log:
description += "\n"

View File

@ -0,0 +1,15 @@
===ddl_format_version 3====
1
1
1
1
===ddl_format_version 4====
1
1
1
1
===exception====
1
1
1
1

View File

@ -0,0 +1,159 @@
#!/usr/bin/env bash
# Tags: zookeeper
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
# The test cases in this file cover DDLs running on both Replicated database engine and non-Replicated database engine.
# Since the processing flow is a little bit different from each other, in order to share same reference file,
# we compare the expected result and actual result by ourselves. See check_span method below for more detail.
# This function takes following arguments:
# $1 - OpenTelemetry Trace Id
# $2 - Query
# $3 - Query Settings
function execute_query()
{
# Some queries are supposed to fail, use -f to suppress error messages
echo $2 | ${CLICKHOUSE_CURL_COMMAND} -q -s --max-time 180 \
-X POST \
-H "traceparent: 00-$1-5150000000000515-01" \
-H "tracestate: a\nb cd" \
"${CLICKHOUSE_URL}&${3}" \
--data @-
}
# This function takes following argument:
# $1 - expected
# $2 - OpenTelemetry Trace Id
# $3 - operation_name pattern
# $4 - extra condition
function check_span()
{
if [ -n "$4" ]; then
extra_condition=" AND ${4}"
else
extra_condition=""
fi
ret=$(${CLICKHOUSE_CLIENT} -nq "
SYSTEM FLUSH LOGS;
SELECT count()
FROM system.opentelemetry_span_log
WHERE finish_date >= yesterday()
AND lower(hex(trace_id)) = '${2}'
AND operation_name like '${3}'
${extra_condition};")
if [ $ret = $1 ]; then
echo 1
else
echo "[operation_name like '${3}' ${extra_condition}]=$ret, expected: ${1}"
# echo the span logs to help analyze
${CLICKHOUSE_CLIENT} -q "
SELECT operation_name, attribute
FROM system.opentelemetry_span_log
WHERE finish_date >= yesterday()
AND lower(hex(trace_id)) ='${2}'
ORDER BY start_time_us
Format PrettyCompact
"
fi
}
#
# Set up
#
${CLICKHOUSE_CLIENT} -q "
DROP TABLE IF EXISTS ${CLICKHOUSE_DATABASE}.ddl_test_for_opentelemetry;
"
# Support Replicated database engine
cluster_name=$($CLICKHOUSE_CLIENT -q "select if(engine = 'Replicated', name, 'test_shard_localhost') from system.databases where name='$CLICKHOUSE_DATABASE'")
#
# Only format_version 4 enables the tracing
#
for ddl_version in 3 4; do
# Echo a separator so that the reference file is more clear for reading
echo "===ddl_format_version ${ddl_version}===="
trace_id=$(${CLICKHOUSE_CLIENT} -q "select lower(hex(generateUUIDv4()))");
execute_query $trace_id "CREATE TABLE ${CLICKHOUSE_DATABASE}.ddl_test_for_opentelemetry ON CLUSTER ${cluster_name} (id UInt64) Engine=MergeTree ORDER BY id" "distributed_ddl_output_mode=none&distributed_ddl_entry_format_version=${ddl_version}"
check_span 1 $trace_id "HTTPHandler"
if [ $cluster_name = "test_shard_localhost" ]; then
check_span 1 $trace_id "%executeDDLQueryOnCluster%" "attribute['clickhouse.cluster']='${cluster_name}'"
else
check_span 1 $trace_id "%tryEnqueueAndExecuteEntry%" "attribute['clickhouse.cluster']='${cluster_name}'"
fi
if [ $cluster_name = "test_shard_localhost" ]; then
# The tracing is only enabled when entry format version is 4
if [ $ddl_version = "4" ]; then
expected=1
else
expected=0
fi
else
# For Replicated database engine, the tracing is always enabled because it calls DDLWorker::processTask directly
expected=1
fi
check_span $expected $trace_id "%DDLWorker::processTask%"
# For queries that tracing are enabled(format version is 4 or Replicated database engine), there should be two 'query' spans,
# one is for the HTTPHandler, the other is for the DDL executing in DDLWorker.
#
# For other format, there should be only one 'query' span
if [ $cluster_name = "test_shard_localhost" ]; then
if [ $ddl_version = "4" ]; then
expected=2
else
expected=1
fi
else
expected=2
fi
check_span $expected $trace_id "query"
# Remove table
# Under Replicated database engine, the DDL is executed as ON CLUSTER DDL, so distributed_ddl_output_mode is needed to supress output
${CLICKHOUSE_CLIENT} --distributed_ddl_output_mode none -q "
DROP TABLE IF EXISTS ${CLICKHOUSE_DATABASE}.ddl_test_for_opentelemetry;
"
done
#
# an exceptional case, DROP a non-exist table
#
# Echo a separator so that the reference file is more clear for reading
echo "===exception===="
trace_id=$(${CLICKHOUSE_CLIENT} -q "select lower(hex(generateUUIDv4()))");
execute_query $trace_id "DROP TABLE ${CLICKHOUSE_DATABASE}.ddl_test_for_opentelemetry_non_exist ON CLUSTER ${cluster_name}" "distributed_ddl_output_mode=none&distributed_ddl_entry_format_version=4" 2>&1| grep -Fv "UNKNOWN_TABLE"
check_span 1 $trace_id "HTTPHandler"
if [ $cluster_name = "test_shard_localhost" ]; then
expected=1
else
# For Replicated database it will fail on initiator before enqueueing distributed DDL
expected=0
fi
check_span $expected $trace_id "%executeDDLQueryOnCluster%" "attribute['clickhouse.cluster']='${cluster_name}'"
check_span $expected $trace_id "%DDLWorker::processTask%"
if [ $cluster_name = "test_shard_localhost" ]; then
# There should be two 'query' spans, one is for the HTTPHandler, the other is for the DDL executing in DDLWorker.
# Both of these two spans contain exception
expected=2
else
# For Replicated database, there should only one query span
expected=1
fi
# We don't case about the exact value of exception_code, just check it's there.
check_span $expected $trace_id "query" "attribute['clickhouse.exception_code']<>''"