mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-19 22:22:00 +00:00
Remove using namespace from header
Signed-off-by: Frank Chen <frank.chen021@outlook.com>
This commit is contained in:
parent
221a71f801
commit
bb00dcc19b
@ -484,13 +484,13 @@ void Connection::sendQuery(
|
|||||||
bool with_pending_data,
|
bool with_pending_data,
|
||||||
std::function<void(const Progress &)>)
|
std::function<void(const Progress &)>)
|
||||||
{
|
{
|
||||||
SpanHolder span("Connection::sendQuery()");
|
OpenTelemetry::SpanHolder span("Connection::sendQuery()");
|
||||||
span.addAttribute("clickhouse.query_id", query_id_);
|
span.addAttribute("clickhouse.query_id", query_id_);
|
||||||
span.addAttribute("clickhouse.query", query);
|
span.addAttribute("clickhouse.query", query);
|
||||||
span.addAttribute("target", [this] () { return this->getHost() + ":" + std::to_string(this->getPort()); });
|
span.addAttribute("target", [this] () { return this->getHost() + ":" + std::to_string(this->getPort()); });
|
||||||
|
|
||||||
ClientInfo new_client_info;
|
ClientInfo new_client_info;
|
||||||
const auto ¤t_trace_context = TracingContextOnThread::current();
|
const auto ¤t_trace_context = OpenTelemetry::TracingContextOnThread::current();
|
||||||
if (client_info && current_trace_context.isTraceEnabled())
|
if (client_info && current_trace_context.isTraceEnabled())
|
||||||
{
|
{
|
||||||
// use current span as the parent of remote span
|
// use current span as the parent of remote span
|
||||||
|
@ -36,6 +36,14 @@ void Span::addAttribute(std::string_view name, std::string_view value)
|
|||||||
this->attributes.push_back(Tuple{name, value});
|
this->attributes.push_back(Tuple{name, value});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void Span::addAttributeIfNotEmpty(std::string_view name, std::string_view value)
|
||||||
|
{
|
||||||
|
if (!this->isTraceEnabled() || name.empty() || value.empty())
|
||||||
|
return;
|
||||||
|
|
||||||
|
this->attributes.push_back(Tuple{name, value});
|
||||||
|
}
|
||||||
|
|
||||||
void Span::addAttribute(std::string_view name, std::function<String()> value_supplier)
|
void Span::addAttribute(std::string_view name, std::function<String()> value_supplier)
|
||||||
{
|
{
|
||||||
if (!this->isTraceEnabled() || !value_supplier)
|
if (!this->isTraceEnabled() || !value_supplier)
|
||||||
|
@ -24,6 +24,7 @@ struct Span
|
|||||||
void addAttribute(std::string_view name, UInt64 value);
|
void addAttribute(std::string_view name, UInt64 value);
|
||||||
void addAttributeIfNotZero(std::string_view name, UInt64 value);
|
void addAttributeIfNotZero(std::string_view name, UInt64 value);
|
||||||
void addAttribute(std::string_view name, std::string_view value);
|
void addAttribute(std::string_view name, std::string_view value);
|
||||||
|
void addAttributeIfNotEmpty(std::string_view name, std::string_view value);
|
||||||
void addAttribute(std::string_view name, std::function<String()> value_supplier);
|
void addAttribute(std::string_view name, std::function<String()> value_supplier);
|
||||||
|
|
||||||
/// Following two methods are declared as noexcept to make sure they're exception safe
|
/// Following two methods are declared as noexcept to make sure they're exception safe
|
||||||
@ -155,7 +156,5 @@ struct SpanHolder : public Span
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
using namespace OpenTelemetry;
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -152,7 +152,7 @@ ReturnType ThreadPoolImpl<Thread>::scheduleImpl(Job job, int priority, std::opti
|
|||||||
|
|
||||||
// this scheduleImpl is called in the parent thread,
|
// this scheduleImpl is called in the parent thread,
|
||||||
// the tracing context on this thread is used as parent context for the sub-thread that runs the job
|
// the tracing context on this thread is used as parent context for the sub-thread that runs the job
|
||||||
const auto ¤t_thread_context = DB::TracingContextOnThread::current();
|
const auto ¤t_thread_context = DB::OpenTelemetry::TracingContextOnThread::current();
|
||||||
jobs.emplace(std::move(job), priority, current_thread_context);
|
jobs.emplace(std::move(job), priority, current_thread_context);
|
||||||
++scheduled_jobs;
|
++scheduled_jobs;
|
||||||
new_job_or_shutdown.notify_one();
|
new_job_or_shutdown.notify_one();
|
||||||
@ -255,7 +255,7 @@ void ThreadPoolImpl<Thread>::worker(typename std::list<Thread>::iterator thread_
|
|||||||
bool need_shutdown = false;
|
bool need_shutdown = false;
|
||||||
|
|
||||||
/// A copy of parent trace context
|
/// A copy of parent trace context
|
||||||
DB::TracingContextOnThread parent_thead_trace_context;
|
DB::OpenTelemetry::TracingContextOnThread parent_thead_trace_context;
|
||||||
|
|
||||||
{
|
{
|
||||||
std::unique_lock lock(mutex);
|
std::unique_lock lock(mutex);
|
||||||
@ -267,7 +267,7 @@ void ThreadPoolImpl<Thread>::worker(typename std::list<Thread>::iterator thread_
|
|||||||
/// boost::priority_queue does not provide interface for getting non-const reference to an element
|
/// boost::priority_queue does not provide interface for getting non-const reference to an element
|
||||||
/// to prevent us from modifying its priority. We have to use const_cast to force move semantics on JobWithPriority::job.
|
/// to prevent us from modifying its priority. We have to use const_cast to force move semantics on JobWithPriority::job.
|
||||||
job = std::move(const_cast<Job &>(jobs.top().job));
|
job = std::move(const_cast<Job &>(jobs.top().job));
|
||||||
parent_thead_trace_context = std::move(const_cast<DB::TracingContextOnThread &>(jobs.top().thread_trace_context));
|
parent_thead_trace_context = std::move(const_cast<DB::OpenTelemetry::TracingContextOnThread &>(jobs.top().thread_trace_context));
|
||||||
jobs.pop();
|
jobs.pop();
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
@ -281,7 +281,7 @@ void ThreadPoolImpl<Thread>::worker(typename std::list<Thread>::iterator thread_
|
|||||||
if (!need_shutdown)
|
if (!need_shutdown)
|
||||||
{
|
{
|
||||||
/// Set up tracing context for this thread by its parent context
|
/// Set up tracing context for this thread by its parent context
|
||||||
DB::TracingContextHolder thread_trace_context("ThreadPool::worker()", parent_thead_trace_context);
|
DB::OpenTelemetry::TracingContextHolder thread_trace_context("ThreadPool::worker()", parent_thead_trace_context);
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
|
@ -63,7 +63,7 @@ public:
|
|||||||
Decimal64 initial_query_start_time_microseconds{};
|
Decimal64 initial_query_start_time_microseconds{};
|
||||||
|
|
||||||
/// OpenTelemetry trace context we received from client, or which we are going to send to server.
|
/// OpenTelemetry trace context we received from client, or which we are going to send to server.
|
||||||
TracingContext client_trace_context;
|
OpenTelemetry::TracingContext client_trace_context;
|
||||||
|
|
||||||
/// All below are parameters related to initial query.
|
/// All below are parameters related to initial query.
|
||||||
|
|
||||||
|
@ -7,11 +7,11 @@
|
|||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
|
|
||||||
struct OpenTelemetrySpanLogElement : public Span
|
struct OpenTelemetrySpanLogElement : public OpenTelemetry::Span
|
||||||
{
|
{
|
||||||
OpenTelemetrySpanLogElement() = default;
|
OpenTelemetrySpanLogElement() = default;
|
||||||
OpenTelemetrySpanLogElement(const Span & span)
|
OpenTelemetrySpanLogElement(const OpenTelemetry::Span & span)
|
||||||
: Span(span) {}
|
: OpenTelemetry::Span(span) {}
|
||||||
|
|
||||||
static std::string name() { return "OpenTelemetrySpanLog"; }
|
static std::string name() { return "OpenTelemetrySpanLog"; }
|
||||||
static NamesAndTypesList getNamesAndTypes();
|
static NamesAndTypesList getNamesAndTypes();
|
||||||
|
@ -233,7 +233,7 @@ inline UInt64 time_in_seconds(std::chrono::time_point<std::chrono::system_clock>
|
|||||||
return std::chrono::duration_cast<std::chrono::seconds>(timepoint.time_since_epoch()).count();
|
return std::chrono::duration_cast<std::chrono::seconds>(timepoint.time_since_epoch()).count();
|
||||||
}
|
}
|
||||||
|
|
||||||
static void onExceptionBeforeStart(const String & query_for_logging, ContextPtr context, UInt64 current_time_us, ASTPtr ast, const std::shared_ptr<SpanHolder> & query_span)
|
static void onExceptionBeforeStart(const String & query_for_logging, ContextPtr context, UInt64 current_time_us, ASTPtr ast, const std::shared_ptr<OpenTelemetry::SpanHolder> & query_span)
|
||||||
{
|
{
|
||||||
/// Exception before the query execution.
|
/// Exception before the query execution.
|
||||||
if (auto quota = context->getQuota())
|
if (auto quota = context->getQuota())
|
||||||
@ -345,7 +345,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
|
|||||||
QueryProcessingStage::Enum stage,
|
QueryProcessingStage::Enum stage,
|
||||||
ReadBuffer * istr)
|
ReadBuffer * istr)
|
||||||
{
|
{
|
||||||
std::shared_ptr<SpanHolder> query_span = std::make_shared<SpanHolder>("query");
|
std::shared_ptr<OpenTelemetry::SpanHolder> query_span = std::make_shared<OpenTelemetry::SpanHolder>("query");
|
||||||
|
|
||||||
const auto current_time = std::chrono::system_clock::now();
|
const auto current_time = std::chrono::system_clock::now();
|
||||||
|
|
||||||
@ -667,12 +667,12 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
|
|||||||
}
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
std::unique_ptr<SpanHolder> span;
|
std::unique_ptr<OpenTelemetry::SpanHolder> span;
|
||||||
if (query_span->isTraceEnabled())
|
if (query_span->isTraceEnabled())
|
||||||
{
|
{
|
||||||
auto * raw_interpreter_ptr = interpreter.get();
|
auto * raw_interpreter_ptr = interpreter.get();
|
||||||
std::string class_name(demangle(typeid(*raw_interpreter_ptr).name()));
|
std::string class_name(demangle(typeid(*raw_interpreter_ptr).name()));
|
||||||
span = std::make_unique<SpanHolder>(class_name + "::execute()");
|
span = std::make_unique<OpenTelemetry::SpanHolder>(class_name + "::execute()");
|
||||||
}
|
}
|
||||||
res = interpreter->execute();
|
res = interpreter->execute();
|
||||||
}
|
}
|
||||||
@ -930,8 +930,8 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
|
|||||||
|
|
||||||
query_span->addAttribute("db.statement", elem.query);
|
query_span->addAttribute("db.statement", elem.query);
|
||||||
query_span->addAttribute("clickhouse.query_id", elem.client_info.current_query_id);
|
query_span->addAttribute("clickhouse.query_id", elem.client_info.current_query_id);
|
||||||
query_span->addAttribute("clickhouse.tracestate", TracingContextOnThread::current().tracestate);
|
|
||||||
query_span->addAttribute("clickhouse.query_status", "QueryFinish");
|
query_span->addAttribute("clickhouse.query_status", "QueryFinish");
|
||||||
|
query_span->addAttributeIfNotEmpty("clickhouse.tracestate", OpenTelemetry::TracingContextOnThread::current().tracestate);
|
||||||
query_span->addAttributeIfNotZero("clickhouse.read_rows", elem.read_rows);
|
query_span->addAttributeIfNotZero("clickhouse.read_rows", elem.read_rows);
|
||||||
query_span->addAttributeIfNotZero("clickhouse.read_bytes", elem.read_bytes);
|
query_span->addAttributeIfNotZero("clickhouse.read_bytes", elem.read_bytes);
|
||||||
query_span->addAttributeIfNotZero("clickhouse.written_rows", info.written_rows);
|
query_span->addAttributeIfNotZero("clickhouse.written_rows", info.written_rows);
|
||||||
|
@ -71,11 +71,11 @@ static void executeJob(ExecutingGraph::Node * node, ReadProgressCallback * read_
|
|||||||
|
|
||||||
bool ExecutionThreadContext::executeTask()
|
bool ExecutionThreadContext::executeTask()
|
||||||
{
|
{
|
||||||
std::unique_ptr<SpanHolder> span;
|
std::unique_ptr<OpenTelemetry::SpanHolder> span;
|
||||||
|
|
||||||
if (trace_processors)
|
if (trace_processors)
|
||||||
{
|
{
|
||||||
span = std::make_unique<SpanHolder>("ExecutionThreadContext::executeTask() " + node->processor->getName());
|
span = std::make_unique<OpenTelemetry::SpanHolder>("ExecutionThreadContext::executeTask() " + node->processor->getName());
|
||||||
span->addAttribute("thread_number", thread_number);
|
span->addAttribute("thread_number", thread_number);
|
||||||
}
|
}
|
||||||
std::optional<Stopwatch> execution_time_watch;
|
std::optional<Stopwatch> execution_time_watch;
|
||||||
|
@ -662,7 +662,7 @@ namespace
|
|||||||
std::optional<Session> session;
|
std::optional<Session> session;
|
||||||
ContextMutablePtr query_context;
|
ContextMutablePtr query_context;
|
||||||
std::optional<CurrentThread::QueryScope> query_scope;
|
std::optional<CurrentThread::QueryScope> query_scope;
|
||||||
TracingContextHolderPtr thread_trace_context;
|
OpenTelemetry::TracingContextHolderPtr thread_trace_context;
|
||||||
String query_text;
|
String query_text;
|
||||||
ASTPtr ast;
|
ASTPtr ast;
|
||||||
ASTInsertQuery * insert_query = nullptr;
|
ASTInsertQuery * insert_query = nullptr;
|
||||||
@ -842,10 +842,10 @@ namespace
|
|||||||
query_scope.emplace(query_context);
|
query_scope.emplace(query_context);
|
||||||
|
|
||||||
/// Set up tracing context for this query on current thread
|
/// Set up tracing context for this query on current thread
|
||||||
thread_trace_context = std::make_unique<TracingContextHolder>("GRPCServer",
|
thread_trace_context = std::make_unique<OpenTelemetry::TracingContextHolder>("GRPCServer",
|
||||||
query_context->getClientInfo().client_trace_context,
|
query_context->getClientInfo().client_trace_context,
|
||||||
query_context->getSettingsRef(),
|
query_context->getSettingsRef(),
|
||||||
query_context->getOpenTelemetrySpanLog());
|
query_context->getOpenTelemetrySpanLog());
|
||||||
|
|
||||||
/// Prepare for sending exceptions and logs.
|
/// Prepare for sending exceptions and logs.
|
||||||
const Settings & settings = query_context->getSettingsRef();
|
const Settings & settings = query_context->getSettingsRef();
|
||||||
|
@ -933,7 +933,7 @@ void HTTPHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse
|
|||||||
/// In case of exception, send stack trace to client.
|
/// In case of exception, send stack trace to client.
|
||||||
bool with_stacktrace = false;
|
bool with_stacktrace = false;
|
||||||
|
|
||||||
TracingContextHolderPtr thread_trace_context;
|
OpenTelemetry::TracingContextHolderPtr thread_trace_context;
|
||||||
SCOPE_EXIT({
|
SCOPE_EXIT({
|
||||||
// make sure the response status is recorded
|
// make sure the response status is recorded
|
||||||
if (thread_trace_context)
|
if (thread_trace_context)
|
||||||
@ -963,10 +963,10 @@ void HTTPHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse
|
|||||||
|
|
||||||
// Setup tracing context for this thread
|
// Setup tracing context for this thread
|
||||||
auto context = session->sessionOrGlobalContext();
|
auto context = session->sessionOrGlobalContext();
|
||||||
thread_trace_context = std::make_unique<TracingContextHolder>("HTTPHandler",
|
thread_trace_context = std::make_unique<OpenTelemetry::TracingContextHolder>("HTTPHandler",
|
||||||
client_info.client_trace_context,
|
client_info.client_trace_context,
|
||||||
context->getSettingsRef(),
|
context->getSettingsRef(),
|
||||||
context->getOpenTelemetrySpanLog());
|
context->getOpenTelemetrySpanLog());
|
||||||
thread_trace_context->root_span.addAttribute("clickhouse.uri", request.getURI());
|
thread_trace_context->root_span.addAttribute("clickhouse.uri", request.getURI());
|
||||||
|
|
||||||
response.setContentType("text/plain; charset=UTF-8");
|
response.setContentType("text/plain; charset=UTF-8");
|
||||||
|
@ -219,7 +219,7 @@ void TCPHandler::runImpl()
|
|||||||
|
|
||||||
/// Initialized later.
|
/// Initialized later.
|
||||||
std::optional<CurrentThread::QueryScope> query_scope;
|
std::optional<CurrentThread::QueryScope> query_scope;
|
||||||
TracingContextHolderPtr thread_trace_context;
|
OpenTelemetry::TracingContextHolderPtr thread_trace_context;
|
||||||
|
|
||||||
/** An exception during the execution of request (it must be sent over the network to the client).
|
/** An exception during the execution of request (it must be sent over the network to the client).
|
||||||
* The client will be able to accept it, if it did not happen while sending another packet and the client has not disconnected yet.
|
* The client will be able to accept it, if it did not happen while sending another packet and the client has not disconnected yet.
|
||||||
@ -246,10 +246,10 @@ void TCPHandler::runImpl()
|
|||||||
continue;
|
continue;
|
||||||
|
|
||||||
/// Set up tracing context for this query on current thread
|
/// Set up tracing context for this query on current thread
|
||||||
thread_trace_context = std::make_unique<TracingContextHolder>("TCPHandler",
|
thread_trace_context = std::make_unique<OpenTelemetry::TracingContextHolder>("TCPHandler",
|
||||||
query_context->getClientInfo().client_trace_context,
|
query_context->getClientInfo().client_trace_context,
|
||||||
query_context->getSettingsRef(),
|
query_context->getSettingsRef(),
|
||||||
query_context->getOpenTelemetrySpanLog());
|
query_context->getOpenTelemetrySpanLog());
|
||||||
|
|
||||||
query_scope.emplace(query_context);
|
query_scope.emplace(query_context);
|
||||||
|
|
||||||
|
@ -609,7 +609,7 @@ bool StorageDistributedDirectoryMonitor::processFiles(const std::map<UInt64, std
|
|||||||
|
|
||||||
void StorageDistributedDirectoryMonitor::processFile(const std::string & file_path)
|
void StorageDistributedDirectoryMonitor::processFile(const std::string & file_path)
|
||||||
{
|
{
|
||||||
TracingContextHolderPtr thread_trace_context;
|
OpenTelemetry::TracingContextHolderPtr thread_trace_context;
|
||||||
|
|
||||||
Stopwatch watch;
|
Stopwatch watch;
|
||||||
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(storage.getContext()->getSettingsRef());
|
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(storage.getContext()->getSettingsRef());
|
||||||
@ -629,7 +629,7 @@ void StorageDistributedDirectoryMonitor::processFile(const std::string & file_pa
|
|||||||
formatReadableQuantity(distributed_header.rows),
|
formatReadableQuantity(distributed_header.rows),
|
||||||
formatReadableSizeWithBinarySuffix(distributed_header.bytes));
|
formatReadableSizeWithBinarySuffix(distributed_header.bytes));
|
||||||
|
|
||||||
thread_trace_context = std::make_unique<TracingContextHolder>(__PRETTY_FUNCTION__,
|
thread_trace_context = std::make_unique<OpenTelemetry::TracingContextHolder>(__PRETTY_FUNCTION__,
|
||||||
distributed_header.client_info.client_trace_context,
|
distributed_header.client_info.client_trace_context,
|
||||||
this->storage.getContext()->getOpenTelemetrySpanLog());
|
this->storage.getContext()->getOpenTelemetrySpanLog());
|
||||||
|
|
||||||
@ -870,7 +870,7 @@ private:
|
|||||||
ReadBufferFromFile in(file_path->second);
|
ReadBufferFromFile in(file_path->second);
|
||||||
const auto & distributed_header = readDistributedHeader(in, parent.log);
|
const auto & distributed_header = readDistributedHeader(in, parent.log);
|
||||||
|
|
||||||
TracingContextHolder thread_trace_context(__PRETTY_FUNCTION__,
|
OpenTelemetry::TracingContextHolder thread_trace_context(__PRETTY_FUNCTION__,
|
||||||
distributed_header.client_info.client_trace_context,
|
distributed_header.client_info.client_trace_context,
|
||||||
parent.storage.getContext()->getOpenTelemetrySpanLog());
|
parent.storage.getContext()->getOpenTelemetrySpanLog());
|
||||||
|
|
||||||
@ -909,7 +909,7 @@ private:
|
|||||||
const auto & distributed_header = readDistributedHeader(in, parent.log);
|
const auto & distributed_header = readDistributedHeader(in, parent.log);
|
||||||
|
|
||||||
// this function is called in a separated thread, so we set up the trace context from the file
|
// this function is called in a separated thread, so we set up the trace context from the file
|
||||||
TracingContextHolder thread_trace_context(__PRETTY_FUNCTION__,
|
OpenTelemetry::TracingContextHolder thread_trace_context(__PRETTY_FUNCTION__,
|
||||||
distributed_header.client_info.client_trace_context,
|
distributed_header.client_info.client_trace_context,
|
||||||
parent.storage.getContext()->getOpenTelemetrySpanLog());
|
parent.storage.getContext()->getOpenTelemetrySpanLog());
|
||||||
|
|
||||||
|
@ -336,7 +336,7 @@ DistributedSink::runWritingJob(JobReplica & job, const Block & current_block, si
|
|||||||
if (rows == 0)
|
if (rows == 0)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
SpanHolder span(__PRETTY_FUNCTION__);
|
OpenTelemetry::SpanHolder span(__PRETTY_FUNCTION__);
|
||||||
span.addAttribute("clickhouse.shard_num", shard_info.shard_num);
|
span.addAttribute("clickhouse.shard_num", shard_info.shard_num);
|
||||||
span.addAttribute("clickhouse.written_rows", rows);
|
span.addAttribute("clickhouse.written_rows", rows);
|
||||||
|
|
||||||
@ -419,7 +419,7 @@ DistributedSink::runWritingJob(JobReplica & job, const Block & current_block, si
|
|||||||
|
|
||||||
void DistributedSink::writeSync(const Block & block)
|
void DistributedSink::writeSync(const Block & block)
|
||||||
{
|
{
|
||||||
SpanHolder span(__PRETTY_FUNCTION__);
|
OpenTelemetry::SpanHolder span(__PRETTY_FUNCTION__);
|
||||||
|
|
||||||
const Settings & settings = context->getSettingsRef();
|
const Settings & settings = context->getSettingsRef();
|
||||||
const auto & shards_info = cluster->getShardsInfo();
|
const auto & shards_info = cluster->getShardsInfo();
|
||||||
@ -610,7 +610,7 @@ void DistributedSink::writeSplitAsync(const Block & block)
|
|||||||
|
|
||||||
void DistributedSink::writeAsyncImpl(const Block & block, size_t shard_id)
|
void DistributedSink::writeAsyncImpl(const Block & block, size_t shard_id)
|
||||||
{
|
{
|
||||||
SpanHolder span("DistributedSink::writeAsyncImpl()");
|
OpenTelemetry::SpanHolder span("DistributedSink::writeAsyncImpl()");
|
||||||
|
|
||||||
const auto & shard_info = cluster->getShardsInfo()[shard_id];
|
const auto & shard_info = cluster->getShardsInfo()[shard_id];
|
||||||
const auto & settings = context->getSettingsRef();
|
const auto & settings = context->getSettingsRef();
|
||||||
@ -652,7 +652,7 @@ void DistributedSink::writeAsyncImpl(const Block & block, size_t shard_id)
|
|||||||
|
|
||||||
void DistributedSink::writeToLocal(const Block & block, size_t repeats)
|
void DistributedSink::writeToLocal(const Block & block, size_t repeats)
|
||||||
{
|
{
|
||||||
SpanHolder span(__PRETTY_FUNCTION__);
|
OpenTelemetry::SpanHolder span(__PRETTY_FUNCTION__);
|
||||||
span.addAttribute("db.statement", this->query_string);
|
span.addAttribute("db.statement", this->query_string);
|
||||||
|
|
||||||
InterpreterInsertQuery interp(query_ast, context, allow_materialized);
|
InterpreterInsertQuery interp(query_ast, context, allow_materialized);
|
||||||
@ -668,7 +668,7 @@ void DistributedSink::writeToLocal(const Block & block, size_t repeats)
|
|||||||
|
|
||||||
void DistributedSink::writeToShard(const Block & block, const std::vector<std::string> & dir_names)
|
void DistributedSink::writeToShard(const Block & block, const std::vector<std::string> & dir_names)
|
||||||
{
|
{
|
||||||
SpanHolder span(__PRETTY_FUNCTION__);
|
OpenTelemetry::SpanHolder span(__PRETTY_FUNCTION__);
|
||||||
|
|
||||||
const auto & settings = context->getSettingsRef();
|
const auto & settings = context->getSettingsRef();
|
||||||
const auto & distributed_settings = storage.getDistributedSettingsRef();
|
const auto & distributed_settings = storage.getDistributedSettingsRef();
|
||||||
@ -737,11 +737,11 @@ void DistributedSink::writeToShard(const Block & block, const std::vector<std::s
|
|||||||
writeStringBinary(query_string, header_buf);
|
writeStringBinary(query_string, header_buf);
|
||||||
context->getSettingsRef().write(header_buf);
|
context->getSettingsRef().write(header_buf);
|
||||||
|
|
||||||
if (TracingContextOnThread::current().isTraceEnabled())
|
if (OpenTelemetry::TracingContextOnThread::current().isTraceEnabled())
|
||||||
{
|
{
|
||||||
// if the distributed tracing is enabled, use the trace context in current thread as parent of next span
|
// if the distributed tracing is enabled, use the trace context in current thread as parent of next span
|
||||||
auto client_info = context->getClientInfo();
|
auto client_info = context->getClientInfo();
|
||||||
client_info.client_trace_context = TracingContextOnThread::current();
|
client_info.client_trace_context = OpenTelemetry::TracingContextOnThread::current();
|
||||||
client_info.write(header_buf, DBMS_TCP_PROTOCOL_VERSION);
|
client_info.write(header_buf, DBMS_TCP_PROTOCOL_VERSION);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
|
@ -102,7 +102,7 @@ namespace
|
|||||||
ReadWriteBufferFromHTTP::HTTPHeaderEntries headers(headers_.begin(), headers_.end());
|
ReadWriteBufferFromHTTP::HTTPHeaderEntries headers(headers_.begin(), headers_.end());
|
||||||
|
|
||||||
// Propagate OpenTelemetry trace context, if any, downstream.
|
// Propagate OpenTelemetry trace context, if any, downstream.
|
||||||
const auto ¤t_trace_context = TracingContextOnThread::current();
|
const auto ¤t_trace_context = OpenTelemetry::TracingContextOnThread::current();
|
||||||
if (current_trace_context.isTraceEnabled())
|
if (current_trace_context.isTraceEnabled())
|
||||||
{
|
{
|
||||||
headers.emplace_back("traceparent", current_trace_context.composeTraceparentHeader());
|
headers.emplace_back("traceparent", current_trace_context.composeTraceparentHeader());
|
||||||
|
Loading…
Reference in New Issue
Block a user