Merge pull request #41034 from FrankChen021/distributed

Improve the observability of INSERT on distributed table
This commit is contained in:
Robert Schulze 2022-09-13 20:55:11 +02:00 committed by GitHub
commit e8e6dddc76
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 188 additions and 41 deletions

View File

@ -264,6 +264,18 @@ protected:
}
};
/// Schedule jobs/tasks on global thread pool without implicit passing tracing context on current thread to underlying worker as parent tracing context.
///
/// If you implement your own job/task scheduling upon global thread pool or schedules a long time running job in a infinite loop way,
/// you need to use class, or you need to use ThreadFromGlobalPool below.
///
/// See the comments of ThreadPool below to know how it works.
using ThreadFromGlobalPoolNoTracingContextPropagation = ThreadFromGlobalPoolImpl<false>;
/// An alias of thread that execute jobs/tasks on global thread pool by implicit passing tracing context on current thread to underlying worker as parent tracing context.
/// If jobs/tasks are directly scheduled by using APIs of this class, you need to use this class or you need to use class above.
using ThreadFromGlobalPool = ThreadFromGlobalPoolImpl<true>;
/// Recommended thread pool for the case when multiple thread pools are created and destroyed.
///
/// The template parameter of ThreadFromGlobalPool is set to false to disable tracing context propagation to underlying worker.
@ -274,9 +286,6 @@ protected:
/// which means the tracing context initialized at underlying worker level won't be delete for a very long time.
/// This would cause wrong context for further jobs scheduled in ThreadPool.
///
/// To make sure the tracing context are correctly propagated, we explicitly disable context propagation(including initialization and de-initialization) at underlying worker level.
/// To make sure the tracing context is correctly propagated, we explicitly disable context propagation(including initialization and de-initialization) at underlying worker level.
///
using ThreadPool = ThreadPoolImpl<ThreadFromGlobalPoolImpl<false>>;
/// An alias for user code to execute a job in the global thread pool
using ThreadFromGlobalPool = ThreadFromGlobalPoolImpl<true>;
using ThreadPool = ThreadPoolImpl<ThreadFromGlobalPoolNoTracingContextPropagation>;

View File

@ -149,9 +149,9 @@ BackgroundSchedulePool::BackgroundSchedulePool(size_t size_, CurrentMetrics::Met
threads.resize(size_);
for (auto & thread : threads)
thread = ThreadFromGlobalPool([this] { threadFunction(); });
thread = ThreadFromGlobalPoolNoTracingContextPropagation([this] { threadFunction(); });
delayed_thread = ThreadFromGlobalPool([this] { delayExecutionThreadFunction(); });
delayed_thread = ThreadFromGlobalPoolNoTracingContextPropagation([this] { delayExecutionThreadFunction(); });
}
@ -168,7 +168,7 @@ void BackgroundSchedulePool::increaseThreadsCount(size_t new_threads_count)
threads.resize(new_threads_count);
for (size_t i = old_threads_count; i < new_threads_count; ++i)
threads[i] = ThreadFromGlobalPool([this] { threadFunction(); });
threads[i] = ThreadFromGlobalPoolNoTracingContextPropagation([this] { threadFunction(); });
}

View File

@ -57,7 +57,9 @@ public:
~BackgroundSchedulePool();
private:
using Threads = std::vector<ThreadFromGlobalPool>;
/// BackgroundSchedulePool schedules a task on its own task queue, there's no need to construct/restore tracing context on this level.
/// This is also how ThreadPool class treats the tracing context. See ThreadPool for more information.
using Threads = std::vector<ThreadFromGlobalPoolNoTracingContextPropagation>;
void threadFunction();
void delayExecutionThreadFunction();
@ -83,7 +85,7 @@ private:
std::condition_variable delayed_tasks_cond_var;
std::mutex delayed_tasks_mutex;
/// Thread waiting for next delayed task.
ThreadFromGlobalPool delayed_thread;
ThreadFromGlobalPoolNoTracingContextPropagation delayed_thread;
/// Tasks ordered by scheduled time.
DelayedTasks delayed_tasks;

View File

@ -140,6 +140,11 @@ namespace
size_t rows = 0;
size_t bytes = 0;
UInt32 shard_num = 0;
std::string cluster;
std::string distributed_table;
std::string remote_table;
/// dumpStructure() of the header -- obsolete
std::string block_header_string;
Block block_header;
@ -195,6 +200,14 @@ namespace
in.getFileName(), distributed_header.revision, DBMS_TCP_PROTOCOL_VERSION);
}
if (header_buf.hasPendingData())
{
readVarUInt(distributed_header.shard_num, header_buf);
readStringBinary(distributed_header.cluster, header_buf);
readStringBinary(distributed_header.distributed_table, header_buf);
readStringBinary(distributed_header.remote_table, header_buf);
}
/// Add handling new data here, for example:
///
/// if (header_buf.hasPendingData())
@ -621,18 +634,23 @@ void StorageDistributedDirectoryMonitor::processFile(const std::string & file_pa
ReadBufferFromFile in(file_path);
const auto & distributed_header = readDistributedHeader(in, log);
auto connection = pool->get(timeouts, &distributed_header.insert_settings);
thread_trace_context = std::make_unique<OpenTelemetry::TracingContextHolder>(__PRETTY_FUNCTION__,
distributed_header.client_info.client_trace_context,
this->storage.getContext()->getOpenTelemetrySpanLog());
thread_trace_context->root_span.addAttribute("clickhouse.shard_num", distributed_header.shard_num);
thread_trace_context->root_span.addAttribute("clickhouse.cluster", distributed_header.cluster);
thread_trace_context->root_span.addAttribute("clickhouse.distributed", distributed_header.distributed_table);
thread_trace_context->root_span.addAttribute("clickhouse.remote", distributed_header.remote_table);
thread_trace_context->root_span.addAttribute("clickhouse.rows", distributed_header.rows);
thread_trace_context->root_span.addAttribute("clickhouse.bytes", distributed_header.bytes);
auto connection = pool->get(timeouts, &distributed_header.insert_settings);
LOG_DEBUG(log, "Sending `{}` to {} ({} rows, {} bytes)",
file_path,
connection->getDescription(),
formatReadableQuantity(distributed_header.rows),
formatReadableSizeWithBinarySuffix(distributed_header.bytes));
thread_trace_context = std::make_unique<OpenTelemetry::TracingContextHolder>(__PRETTY_FUNCTION__,
distributed_header.client_info.client_trace_context,
this->storage.getContext()->getOpenTelemetrySpanLog());
RemoteInserter remote{*connection, timeouts,
distributed_header.insert_query,
distributed_header.insert_settings,

View File

@ -171,7 +171,6 @@ void DistributedSink::writeAsync(const Block & block)
}
else
{
if (storage.getShardingKeyExpr() && (cluster->getShardsInfo().size() > 1))
return writeSplitAsync(block);
@ -291,6 +290,8 @@ DistributedSink::runWritingJob(JobReplica & job, const Block & current_block, si
auto thread_group = CurrentThread::getGroup();
return [this, thread_group, &job, &current_block, num_shards]()
{
OpenTelemetry::SpanHolder span(__PRETTY_FUNCTION__);
if (thread_group)
CurrentThread::attachToIfDetached(thread_group);
setThreadName("DistrOutStrProc");
@ -331,15 +332,19 @@ DistributedSink::runWritingJob(JobReplica & job, const Block & current_block, si
const Block & shard_block = (num_shards > 1) ? job.current_shard_block : current_block;
const Settings & settings = context->getSettingsRef();
/// Do not initiate INSERT for empty block.
size_t rows = shard_block.rows();
span.addAttribute("clickhouse.shard_num", shard_info.shard_num);
span.addAttribute("clickhouse.cluster", this->storage.cluster_name);
span.addAttribute("clickhouse.distributed", this->storage.getStorageID().getFullNameNotQuoted());
span.addAttribute("clickhouse.remote", [this]() { return storage.remote_database + "." + storage.remote_table; });
span.addAttribute("clickhouse.rows", rows);
span.addAttribute("clickhouse.bytes", [&shard_block]() { return toString(shard_block.bytes()); });
/// Do not initiate INSERT for empty block.
if (rows == 0)
return;
OpenTelemetry::SpanHolder span(__PRETTY_FUNCTION__);
span.addAttribute("clickhouse.shard_num", shard_info.shard_num);
span.addAttribute("clickhouse.written_rows", rows);
if (!job.is_local_job || !settings.prefer_localhost_replica)
{
if (!job.executor)
@ -610,20 +615,15 @@ void DistributedSink::writeSplitAsync(const Block & block)
void DistributedSink::writeAsyncImpl(const Block & block, size_t shard_id)
{
OpenTelemetry::SpanHolder span("DistributedSink::writeAsyncImpl()");
const auto & shard_info = cluster->getShardsInfo()[shard_id];
const auto & settings = context->getSettingsRef();
Block block_to_send = removeSuperfluousColumns(block);
span.addAttribute("clickhouse.shard_num", shard_info.shard_num);
span.addAttribute("clickhouse.written_rows", block.rows());
if (shard_info.hasInternalReplication())
{
if (shard_info.isLocal() && settings.prefer_localhost_replica)
/// Prefer insert into current instance directly
writeToLocal(block_to_send, shard_info.getLocalNodeCount());
writeToLocal(shard_info, block_to_send, shard_info.getLocalNodeCount());
else
{
const auto & path = shard_info.insertPathForInternalReplication(
@ -631,13 +631,13 @@ void DistributedSink::writeAsyncImpl(const Block & block, size_t shard_id)
settings.use_compact_format_in_distributed_parts_names);
if (path.empty())
throw Exception("Directory name for async inserts is empty", ErrorCodes::LOGICAL_ERROR);
writeToShard(block_to_send, {path});
writeToShard(shard_info, block_to_send, {path});
}
}
else
{
if (shard_info.isLocal() && settings.prefer_localhost_replica)
writeToLocal(block_to_send, shard_info.getLocalNodeCount());
writeToLocal(shard_info, block_to_send, shard_info.getLocalNodeCount());
std::vector<std::string> dir_names;
for (const auto & address : cluster->getShardsAddresses()[shard_id])
@ -645,16 +645,23 @@ void DistributedSink::writeAsyncImpl(const Block & block, size_t shard_id)
dir_names.push_back(address.toFullString(settings.use_compact_format_in_distributed_parts_names));
if (!dir_names.empty())
writeToShard(block_to_send, dir_names);
writeToShard(shard_info, block_to_send, dir_names);
}
}
void DistributedSink::writeToLocal(const Block & block, size_t repeats)
void DistributedSink::writeToLocal(const Cluster::ShardInfo & shard_info, const Block & block, size_t repeats)
{
OpenTelemetry::SpanHolder span(__PRETTY_FUNCTION__);
span.addAttribute("db.statement", this->query_string);
span.addAttribute("clickhouse.shard_num", shard_info.shard_num);
span.addAttribute("clickhouse.cluster", this->storage.cluster_name);
span.addAttribute("clickhouse.distributed", this->storage.getStorageID().getFullNameNotQuoted());
span.addAttribute("clickhouse.remote", [this]() { return storage.remote_database + "." + storage.remote_table; });
span.addAttribute("clickhouse.rows", [&block]() { return toString(block.rows()); });
span.addAttribute("clickhouse.bytes", [&block]() { return toString(block.bytes()); });
try
{
InterpreterInsertQuery interp(query_ast, context, allow_materialized);
auto block_io = interp.execute();
@ -664,11 +671,18 @@ void DistributedSink::writeToLocal(const Block & block, size_t repeats)
writeBlockConvert(executor, block, repeats, log);
executor.finish();
}
catch (...)
{
span.addAttribute(std::current_exception());
throw;
}
}
void DistributedSink::writeToShard(const Block & block, const std::vector<std::string> & dir_names)
void DistributedSink::writeToShard(const Cluster::ShardInfo & shard_info, const Block & block, const std::vector<std::string> & dir_names)
{
OpenTelemetry::SpanHolder span(__PRETTY_FUNCTION__);
span.addAttribute("clickhouse.shard_num", shard_info.shard_num);
const auto & settings = context->getSettingsRef();
const auto & distributed_settings = storage.getDistributedSettingsRef();
@ -759,6 +773,11 @@ void DistributedSink::writeToShard(const Block & block, const std::vector<std::s
header_stream.write(block.cloneEmpty());
}
writeVarUInt(shard_info.shard_num, header_buf);
writeStringBinary(this->storage.cluster_name, header_buf);
writeStringBinary(this->storage.getStorageID().getFullNameNotQuoted(), header_buf);
writeStringBinary(this->storage.remote_database + "." + this->storage.remote_table, header_buf);
/// Add new fields here, for example:
/// writeVarUInt(my_new_data, header_buf);
/// And note that it is safe, because we have checksum and size for header.

View File

@ -69,9 +69,9 @@ private:
Block removeSuperfluousColumns(Block block) const;
/// Increments finished_writings_count after each repeat.
void writeToLocal(const Block & block, size_t repeats);
void writeToLocal(const Cluster::ShardInfo & shard_info, const Block & block, size_t repeats);
void writeToShard(const Block & block, const std::vector<std::string> & dir_names);
void writeToShard(const Cluster::ShardInfo & shard_info, const Block & block, const std::vector<std::string> & dir_names);
/// Performs synchronous insertion to remote nodes. If timeout_exceeded flag was set, throws.

View File

@ -0,0 +1,8 @@
{"operation_name":"void DB::DistributedSink::writeToLocal(const Cluster::ShardInfo &, const DB::Block &, size_t)","cluster":"test_cluster_two_shards_localhost","shard":"1","rows":"1","bytes":"8"}
{"operation_name":"void DB::DistributedSink::writeToLocal(const Cluster::ShardInfo &, const DB::Block &, size_t)","cluster":"test_cluster_two_shards_localhost","shard":"2","rows":"1","bytes":"8"}
{"operation_name":"void DB::StorageDistributedDirectoryMonitor::processFile(const std::string &)","cluster":"test_cluster_two_shards_localhost","shard":"1","rows":"1","bytes":"8"}
{"operation_name":"void DB::StorageDistributedDirectoryMonitor::processFile(const std::string &)","cluster":"test_cluster_two_shards_localhost","shard":"2","rows":"1","bytes":"8"}
{"operation_name":"auto DB::DistributedSink::runWritingJob(DB::DistributedSink::JobReplica &, const DB::Block &, size_t)::(anonymous class)::operator()() const","cluster":"test_cluster_two_shards_localhost","shard":"1","rows":"1","bytes":"8"}
{"operation_name":"auto DB::DistributedSink::runWritingJob(DB::DistributedSink::JobReplica &, const DB::Block &, size_t)::(anonymous class)::operator()() const","cluster":"test_cluster_two_shards_localhost","shard":"2","rows":"1","bytes":"8"}
{"operation_name":"auto DB::DistributedSink::runWritingJob(DB::DistributedSink::JobReplica &, const DB::Block &, size_t)::(anonymous class)::operator()() const","cluster":"test_cluster_two_shards_localhost","shard":"1","rows":"1","bytes":"8"}
{"operation_name":"auto DB::DistributedSink::runWritingJob(DB::DistributedSink::JobReplica &, const DB::Block &, size_t)::(anonymous class)::operator()() const","cluster":"test_cluster_two_shards_localhost","shard":"2","rows":"1","bytes":"8"}

View File

@ -0,0 +1,91 @@
#!/usr/bin/env bash
# Tags: no-fasttest, distributed
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
# This function takes 4 arguments:
# $1 - OpenTelemetry Trace Id
# $2 - value of insert_distributed_sync
# $3 - value of prefer_localhost_replica
# $4 - a String that helps to debug
function insert()
{
echo "INSERT INTO ${CLICKHOUSE_DATABASE}.dist_opentelemetry SETTINGS insert_distributed_sync=$2, prefer_localhost_replica=$3 VALUES(1),(2)" |
${CLICKHOUSE_CURL} \
-X POST \
-H "traceparent: 00-$1-5150000000000515-01" \
-H "tracestate: $4" \
"${CLICKHOUSE_URL}" \
--data @-
}
function check_span()
{
${CLICKHOUSE_CLIENT} -nq "
SYSTEM FLUSH LOGS;
SELECT operation_name,
attribute['clickhouse.cluster'] AS cluster,
attribute['clickhouse.shard_num'] AS shard,
attribute['clickhouse.rows'] AS rows,
attribute['clickhouse.bytes'] AS bytes
FROM system.opentelemetry_span_log
WHERE finish_date >= yesterday()
AND lower(hex(trace_id)) = '${1}'
AND attribute['clickhouse.distributed'] = '${CLICKHOUSE_DATABASE}.dist_opentelemetry'
AND attribute['clickhouse.remote'] = '${CLICKHOUSE_DATABASE}.local_opentelemetry'
ORDER BY attribute['clickhouse.shard_num']
Format JSONEachRow
;"
}
#
# Prepare tables for tests
#
${CLICKHOUSE_CLIENT} -nq "
DROP TABLE IF EXISTS ${CLICKHOUSE_DATABASE}.dist_opentelemetry;
DROP TABLE IF EXISTS ${CLICKHOUSE_DATABASE}.local_opentelemetry;
CREATE TABLE ${CLICKHOUSE_DATABASE}.dist_opentelemetry (key UInt64) Engine=Distributed('test_cluster_two_shards_localhost', ${CLICKHOUSE_DATABASE}, local_opentelemetry, key % 2);
CREATE TABLE ${CLICKHOUSE_DATABASE}.local_opentelemetry (key UInt64) Engine=MergeTree ORDER BY key;
"
#
# test1
#
trace_id=$(${CLICKHOUSE_CLIENT} -q "select lower(hex(generateUUIDv4()))");
insert $trace_id 0 1 "async-insert-writeToLocal"
check_span $trace_id
#
# test2
#
trace_id=$(${CLICKHOUSE_CLIENT} -q "select lower(hex(generateUUIDv4()))");
insert $trace_id 0 0 "async-insert-writeToRemote"
check_span $trace_id
#
# test3
#
trace_id=$(${CLICKHOUSE_CLIENT} -q "select lower(hex(generateUUIDv4()))");
insert $trace_id 1 1 "sync-insert-writeToLocal"
check_span $trace_id
#
# test4
#
trace_id=$(${CLICKHOUSE_CLIENT} -q "select lower(hex(generateUUIDv4()))");
insert $trace_id 1 0 "sync-insert-writeToRemote"
check_span $trace_id
#
# Cleanup
#
${CLICKHOUSE_CLIENT} -nq "
DROP TABLE ${CLICKHOUSE_DATABASE}.dist_opentelemetry;
DROP TABLE ${CLICKHOUSE_DATABASE}.local_opentelemetry;
"