From 3d65e3f2eed31c9891d989e1a3cb437dcd5a431d Mon Sep 17 00:00:00 2001 From: Frank Chen Date: Mon, 5 Sep 2022 16:37:55 +0800 Subject: [PATCH 01/27] Add cluster/distributed/remote to file --- src/Storages/Distributed/DirectoryMonitor.cpp | 28 +++++++++++++++---- 1 file changed, 23 insertions(+), 5 deletions(-) diff --git a/src/Storages/Distributed/DirectoryMonitor.cpp b/src/Storages/Distributed/DirectoryMonitor.cpp index 16981d26146..f84ddeb4f5e 100644 --- a/src/Storages/Distributed/DirectoryMonitor.cpp +++ b/src/Storages/Distributed/DirectoryMonitor.cpp @@ -140,6 +140,11 @@ namespace size_t rows = 0; size_t bytes = 0; + UInt32 shard_num = 0; + std::string cluster_name; + std::string distributed_table; + std::string remote_table; + /// dumpStructure() of the header -- obsolete std::string block_header_string; Block block_header; @@ -195,6 +200,14 @@ namespace in.getFileName(), distributed_header.revision, DBMS_TCP_PROTOCOL_VERSION); } + if (header_buf.hasPendingData()) + { + readVarUInt(distributed_header.shard_num, header_buf); + readStringBinary(distributed_header.cluster_name, header_buf); + readStringBinary(distributed_header.distributed_table, header_buf); + readStringBinary(distributed_header.remote_table, header_buf); + } + /// Add handling new data here, for example: /// /// if (header_buf.hasPendingData()) @@ -621,18 +634,23 @@ void StorageDistributedDirectoryMonitor::processFile(const std::string & file_pa ReadBufferFromFile in(file_path); const auto & distributed_header = readDistributedHeader(in, log); - auto connection = pool->get(timeouts, &distributed_header.insert_settings); + thread_trace_context = std::make_unique(__PRETTY_FUNCTION__, + distributed_header.client_info.client_trace_context, + this->storage.getContext()->getOpenTelemetrySpanLog()); + thread_trace_context->root_span.addAttribute("clickhouse.shard_num", distributed_header.shard_num); + thread_trace_context->root_span.addAttribute("clickhouse.cluster", distributed_header.cluster_name); + thread_trace_context->root_span.addAttribute("clickhouse.distributed", distributed_header.distributed_table); + thread_trace_context->root_span.addAttribute("clickhouse.remote", distributed_header.remote_table); + thread_trace_context->root_span.addAttribute("clickhouse.rows", distributed_header.rows); + thread_trace_context->root_span.addAttribute("clickhouse.bytes", distributed_header.bytes); + auto connection = pool->get(timeouts, &distributed_header.insert_settings); LOG_DEBUG(log, "Sending `{}` to {} ({} rows, {} bytes)", file_path, connection->getDescription(), formatReadableQuantity(distributed_header.rows), formatReadableSizeWithBinarySuffix(distributed_header.bytes)); - thread_trace_context = std::make_unique(__PRETTY_FUNCTION__, - distributed_header.client_info.client_trace_context, - this->storage.getContext()->getOpenTelemetrySpanLog()); - RemoteInserter remote{*connection, timeouts, distributed_header.insert_query, distributed_header.insert_settings, From a17bc51d5b245b40870abb6caaecd16924eeac32 Mon Sep 17 00:00:00 2001 From: Frank Chen Date: Mon, 5 Sep 2022 16:39:47 +0800 Subject: [PATCH 02/27] Save cluster/distributed/table to log --- src/Storages/Distributed/DistributedSink.cpp | 27 ++++++++++++-------- src/Storages/Distributed/DistributedSink.h | 4 +-- 2 files changed, 18 insertions(+), 13 deletions(-) diff --git a/src/Storages/Distributed/DistributedSink.cpp b/src/Storages/Distributed/DistributedSink.cpp index ae72fdd84e2..0e379a7bd89 100644 --- a/src/Storages/Distributed/DistributedSink.cpp +++ b/src/Storages/Distributed/DistributedSink.cpp @@ -338,7 +338,11 @@ DistributedSink::runWritingJob(JobReplica & job, const Block & current_block, si OpenTelemetry::SpanHolder span(__PRETTY_FUNCTION__); span.addAttribute("clickhouse.shard_num", shard_info.shard_num); - span.addAttribute("clickhouse.written_rows", rows); + span.addAttribute("clickhouse.cluster", this->storage.cluster_name); + span.addAttribute("clickhouse.distributed", this->storage.getStorageID().getFullNameNotQuoted()); + span.addAttribute("clickhouse.remote", [this]() { return storage.remote_database + "." + storage.remote_table; }); + span.addAttribute("clickhouse.rows", [rows]() { return std::to_string(rows); }); + span.addAttribute("clickhouse.bytes", [&shard_block]() { return std::to_string(shard_block.bytes()); }); if (!job.is_local_job || !settings.prefer_localhost_replica) { @@ -610,20 +614,15 @@ void DistributedSink::writeSplitAsync(const Block & block) void DistributedSink::writeAsyncImpl(const Block & block, size_t shard_id) { - OpenTelemetry::SpanHolder span("DistributedSink::writeAsyncImpl()"); - const auto & shard_info = cluster->getShardsInfo()[shard_id]; const auto & settings = context->getSettingsRef(); Block block_to_send = removeSuperfluousColumns(block); - span.addAttribute("clickhouse.shard_num", shard_info.shard_num); - span.addAttribute("clickhouse.written_rows", block.rows()); - if (shard_info.hasInternalReplication()) { if (shard_info.isLocal() && settings.prefer_localhost_replica) /// Prefer insert into current instance directly - writeToLocal(block_to_send, shard_info.getLocalNodeCount()); + writeToLocal(shard_info, block_to_send, shard_info.getLocalNodeCount()); else { const auto & path = shard_info.insertPathForInternalReplication( @@ -631,13 +630,13 @@ void DistributedSink::writeAsyncImpl(const Block & block, size_t shard_id) settings.use_compact_format_in_distributed_parts_names); if (path.empty()) throw Exception("Directory name for async inserts is empty", ErrorCodes::LOGICAL_ERROR); - writeToShard(block_to_send, {path}); + writeToShard(shard_info, block_to_send, {path}); } } else { if (shard_info.isLocal() && settings.prefer_localhost_replica) - writeToLocal(block_to_send, shard_info.getLocalNodeCount()); + writeToLocal(shard_info, block_to_send, shard_info.getLocalNodeCount()); std::vector dir_names; for (const auto & address : cluster->getShardsAddresses()[shard_id]) @@ -645,7 +644,7 @@ void DistributedSink::writeAsyncImpl(const Block & block, size_t shard_id) dir_names.push_back(address.toFullString(settings.use_compact_format_in_distributed_parts_names)); if (!dir_names.empty()) - writeToShard(block_to_send, dir_names); + writeToShard(shard_info, block_to_send, dir_names); } } @@ -666,9 +665,10 @@ void DistributedSink::writeToLocal(const Block & block, size_t repeats) } -void DistributedSink::writeToShard(const Block & block, const std::vector & dir_names) +void DistributedSink::writeToShard(const Cluster::ShardInfo& shard_info, const Block & block, const std::vector & dir_names) { OpenTelemetry::SpanHolder span(__PRETTY_FUNCTION__); + span.addAttribute("clickhouse.shard_num", shard_info.shard_num); const auto & settings = context->getSettingsRef(); const auto & distributed_settings = storage.getDistributedSettingsRef(); @@ -759,6 +759,11 @@ void DistributedSink::writeToShard(const Block & block, const std::vectorstorage.cluster_name, header_buf); + writeStringBinary(this->storage.getStorageID().getFullNameNotQuoted(), header_buf); + writeStringBinary(this->storage.remote_database + "." + this->storage.remote_table, header_buf); + /// Add new fields here, for example: /// writeVarUInt(my_new_data, header_buf); /// And note that it is safe, because we have checksum and size for header. diff --git a/src/Storages/Distributed/DistributedSink.h b/src/Storages/Distributed/DistributedSink.h index 668cec22e8b..5d7a5268865 100644 --- a/src/Storages/Distributed/DistributedSink.h +++ b/src/Storages/Distributed/DistributedSink.h @@ -69,9 +69,9 @@ private: Block removeSuperfluousColumns(Block block) const; /// Increments finished_writings_count after each repeat. - void writeToLocal(const Block & block, size_t repeats); + void writeToLocal(const Cluster::ShardInfo& shard_info, const Block & block, size_t repeats); - void writeToShard(const Block & block, const std::vector & dir_names); + void writeToShard(const Cluster::ShardInfo& shard_info, const Block & block, const std::vector & dir_names); /// Performs synchronous insertion to remote nodes. If timeout_exceeded flag was set, throws. From 6ab1549d6c32182253586c0c7714a2ecce7a8fd1 Mon Sep 17 00:00:00 2001 From: Frank Chen Date: Mon, 5 Sep 2022 16:40:48 +0800 Subject: [PATCH 03/27] Update writeToLocal to record related info --- src/Storages/Distributed/DistributedSink.cpp | 28 +++++++++++++++----- 1 file changed, 21 insertions(+), 7 deletions(-) diff --git a/src/Storages/Distributed/DistributedSink.cpp b/src/Storages/Distributed/DistributedSink.cpp index 0e379a7bd89..dc33cfa4b60 100644 --- a/src/Storages/Distributed/DistributedSink.cpp +++ b/src/Storages/Distributed/DistributedSink.cpp @@ -649,19 +649,33 @@ void DistributedSink::writeAsyncImpl(const Block & block, size_t shard_id) } -void DistributedSink::writeToLocal(const Block & block, size_t repeats) +void DistributedSink::writeToLocal(const Cluster::ShardInfo& shard_info, const Block & block, size_t repeats) { OpenTelemetry::SpanHolder span(__PRETTY_FUNCTION__); span.addAttribute("db.statement", this->query_string); + span.addAttribute("clickhouse.shard_num", shard_info.shard_num); + span.addAttribute("clickhouse.cluster", this->storage.cluster_name); + span.addAttribute("clickhouse.distributed", this->storage.getStorageID().getFullNameNotQuoted()); + span.addAttribute("clickhouse.remote", [this]() { return storage.remote_database + "." + storage.remote_table; }); + span.addAttribute("clickhouse.rows", [&block]() { return std::to_string(block.rows()); }); + span.addAttribute("clickhouse.bytes", [&block]() { return std::to_string(block.bytes()); }); - InterpreterInsertQuery interp(query_ast, context, allow_materialized); + try + { + InterpreterInsertQuery interp(query_ast, context, allow_materialized); - auto block_io = interp.execute(); - PushingPipelineExecutor executor(block_io.pipeline); + auto block_io = interp.execute(); + PushingPipelineExecutor executor(block_io.pipeline); - executor.start(); - writeBlockConvert(executor, block, repeats, log); - executor.finish(); + executor.start(); + writeBlockConvert(executor, block, repeats, log); + executor.finish(); + } + catch (...) + { + span.addAttribute(std::current_exception()); + throw; + } } From 8365e7bfac1f210cf664b833507d3771f1981640 Mon Sep 17 00:00:00 2001 From: Frank Chen Date: Tue, 6 Sep 2022 15:41:21 +0800 Subject: [PATCH 04/27] Remove extra attribute --- src/Storages/Distributed/DistributedSink.cpp | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/Storages/Distributed/DistributedSink.cpp b/src/Storages/Distributed/DistributedSink.cpp index dc33cfa4b60..57397c6908e 100644 --- a/src/Storages/Distributed/DistributedSink.cpp +++ b/src/Storages/Distributed/DistributedSink.cpp @@ -171,7 +171,6 @@ void DistributedSink::writeAsync(const Block & block) } else { - if (storage.getShardingKeyExpr() && (cluster->getShardsInfo().size() > 1)) return writeSplitAsync(block); @@ -652,7 +651,6 @@ void DistributedSink::writeAsyncImpl(const Block & block, size_t shard_id) void DistributedSink::writeToLocal(const Cluster::ShardInfo& shard_info, const Block & block, size_t repeats) { OpenTelemetry::SpanHolder span(__PRETTY_FUNCTION__); - span.addAttribute("db.statement", this->query_string); span.addAttribute("clickhouse.shard_num", shard_info.shard_num); span.addAttribute("clickhouse.cluster", this->storage.cluster_name); span.addAttribute("clickhouse.distributed", this->storage.getStorageID().getFullNameNotQuoted()); From 49556dad975052c748bb8db6ae414da58bf2094d Mon Sep 17 00:00:00 2001 From: Frank Chen Date: Tue, 6 Sep 2022 15:42:45 +0800 Subject: [PATCH 05/27] Add test cases --- ...etry_insert_on_distributed_table.reference | 4 + ...entelemetry_insert_on_distributed_table.sh | 84 +++++++++++++++++++ 2 files changed, 88 insertions(+) create mode 100644 tests/queries/0_stateless/02417_opentelemetry_insert_on_distributed_table.reference create mode 100755 tests/queries/0_stateless/02417_opentelemetry_insert_on_distributed_table.sh diff --git a/tests/queries/0_stateless/02417_opentelemetry_insert_on_distributed_table.reference b/tests/queries/0_stateless/02417_opentelemetry_insert_on_distributed_table.reference new file mode 100644 index 00000000000..fac9fabce8a --- /dev/null +++ b/tests/queries/0_stateless/02417_opentelemetry_insert_on_distributed_table.reference @@ -0,0 +1,4 @@ +{'clickhouse.shard_num':'1','clickhouse.cluster':'test_cluster_two_shards','clickhouse.distributed':'default.dist_opentelemetry','clickhouse.remote':'default.local_opentelemetry','clickhouse.rows':'1','clickhouse.bytes':'8'} +{'clickhouse.shard_num':'2','clickhouse.cluster':'test_cluster_two_shards','clickhouse.distributed':'default.dist_opentelemetry','clickhouse.remote':'default.local_opentelemetry','clickhouse.rows':'1','clickhouse.bytes':'8'} +{'clickhouse.shard_num':'1','clickhouse.cluster':'test_cluster_two_shards','clickhouse.distributed':'default.dist_opentelemetry','clickhouse.remote':'default.local_opentelemetry','clickhouse.rows':'1','clickhouse.bytes':'8'} +{'clickhouse.shard_num':'2','clickhouse.cluster':'test_cluster_two_shards','clickhouse.distributed':'default.dist_opentelemetry','clickhouse.remote':'default.local_opentelemetry','clickhouse.rows':'1','clickhouse.bytes':'8'} diff --git a/tests/queries/0_stateless/02417_opentelemetry_insert_on_distributed_table.sh b/tests/queries/0_stateless/02417_opentelemetry_insert_on_distributed_table.sh new file mode 100755 index 00000000000..6f766e9f3bb --- /dev/null +++ b/tests/queries/0_stateless/02417_opentelemetry_insert_on_distributed_table.sh @@ -0,0 +1,84 @@ +#!/usr/bin/env bash +# Tags: distributed + +set -ue + +unset CLICKHOUSE_LOG_COMMENT + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh + + +${CLICKHOUSE_CLIENT} -nq " +SET distributed_ddl_output_mode = 'none'; + +SYSTEM FLUSH LOGS ON CLUSTER test_cluster_two_shards; +TRUNCATE TABLE IF EXISTS system.opentelemetry_span_log ON CLUSTER test_cluster_two_shards; + +DROP TABLE IF EXISTS default.dist_opentelemetry ON CLUSTER test_cluster_two_shards; +DROP TABLE IF EXISTS default.local_opentelemetry ON CLUSTER test_cluster_two_shards; + +CREATE TABLE default.dist_opentelemetry ON CLUSTER test_cluster_two_shards (key UInt64) Engine=Distributed('test_cluster_two_shards', default, local_opentelemetry, key % 2); +CREATE TABLE default.local_opentelemetry ON CLUSTER test_cluster_two_shards (key UInt64) Engine=MergeTree ORDER BY key; +" + +# +# INSERT ASYNC test +# Do test with opentelemetry enabled +# +${CLICKHOUSE_CLIENT} -nq " +-- Make sure it's async +SET insert_distributed_sync=0; +INSERT INTO default.dist_opentelemetry SETTINGS opentelemetry_start_trace_probability=1 VALUES(1),(2); +" + +# Wait complete of ASYNC INSERT on distributed table +wait + +# Check log +${CLICKHOUSE_CLIENT} -nq " +-- Flush opentelemetry span log on all nodes +SET distributed_ddl_output_mode = 'none'; +SYSTEM FLUSH LOGS ON CLUSTER test_cluster_two_shards; + +-- Above INSERT will insert data to two shards respectively, so there will be two spans generated +SELECT attribute FROM cluster('test_cluster_two_shards', system, opentelemetry_span_log) WHERE operation_name like '%writeToLocal%'; +SELECT attribute FROM cluster('test_cluster_two_shards', system, opentelemetry_span_log) WHERE operation_name like '%processFile%'; +" + +# +# INSERT SYNC test +# Do test with opentelemetry enabled and in SYNC mode +# +${CLICKHOUSE_CLIENT} -nq " + +-- Clear log +SET distributed_ddl_output_mode = 'none'; +TRUNCATE TABLE IF EXISTS system.opentelemetry_span_log ON CLUSTER test_cluster_two_shards; + +-- Make sure it's SYNC +SET insert_distributed_sync=1; + +-- INSERT test +INSERT INTO default.dist_opentelemetry SETTINGS opentelemetry_start_trace_probability=1 VALUES(1),(2); +" + +# Check log +${CLICKHOUSE_CLIENT} -nq " +-- Flush opentelemetry span log on all nodes +SET distributed_ddl_output_mode = 'none'; +SYSTEM FLUSH LOGS ON CLUSTER test_cluster_two_shards; + +-- Above INSERT will insert data to two shards in the same flow, so there should be two spans generated with the same operation name +SELECT attribute FROM cluster('test_cluster_two_shards', system, opentelemetry_span_log) WHERE operation_name like '%runWritingJob%'; +" + +# +# Cleanup +# +${CLICKHOUSE_CLIENT} -nq " +SET distributed_ddl_output_mode = 'none'; +DROP TABLE default.dist_opentelemetry ON CLUSTER test_cluster_two_shards; +DROP TABLE default.local_opentelemetry ON CLUSTER test_cluster_two_shards; +" From 206709603502b8f0f8f99996d332458976f15c7f Mon Sep 17 00:00:00 2001 From: Frank Chen Date: Tue, 6 Sep 2022 16:01:31 +0800 Subject: [PATCH 06/27] Optimize span log for SYNC insert --- src/Storages/Distributed/DistributedSink.cpp | 22 +++++++++++--------- src/Storages/Distributed/DistributedSink.h | 4 ++-- 2 files changed, 14 insertions(+), 12 deletions(-) diff --git a/src/Storages/Distributed/DistributedSink.cpp b/src/Storages/Distributed/DistributedSink.cpp index 57397c6908e..8099a7f2002 100644 --- a/src/Storages/Distributed/DistributedSink.cpp +++ b/src/Storages/Distributed/DistributedSink.cpp @@ -290,6 +290,8 @@ DistributedSink::runWritingJob(JobReplica & job, const Block & current_block, si auto thread_group = CurrentThread::getGroup(); return [this, thread_group, &job, ¤t_block, num_shards]() { + OpenTelemetry::SpanHolder span(__PRETTY_FUNCTION__); + if (thread_group) CurrentThread::attachToIfDetached(thread_group); setThreadName("DistrOutStrProc"); @@ -330,18 +332,18 @@ DistributedSink::runWritingJob(JobReplica & job, const Block & current_block, si const Block & shard_block = (num_shards > 1) ? job.current_shard_block : current_block; const Settings & settings = context->getSettingsRef(); - /// Do not initiate INSERT for empty block. size_t rows = shard_block.rows(); - if (rows == 0) - return; - OpenTelemetry::SpanHolder span(__PRETTY_FUNCTION__); span.addAttribute("clickhouse.shard_num", shard_info.shard_num); span.addAttribute("clickhouse.cluster", this->storage.cluster_name); span.addAttribute("clickhouse.distributed", this->storage.getStorageID().getFullNameNotQuoted()); span.addAttribute("clickhouse.remote", [this]() { return storage.remote_database + "." + storage.remote_table; }); - span.addAttribute("clickhouse.rows", [rows]() { return std::to_string(rows); }); - span.addAttribute("clickhouse.bytes", [&shard_block]() { return std::to_string(shard_block.bytes()); }); + span.addAttribute("clickhouse.rows", rows); + span.addAttribute("clickhouse.bytes", [&shard_block]() { return toString(shard_block.bytes()); }); + + /// Do not initiate INSERT for empty block. + if (rows == 0) + return; if (!job.is_local_job || !settings.prefer_localhost_replica) { @@ -648,15 +650,15 @@ void DistributedSink::writeAsyncImpl(const Block & block, size_t shard_id) } -void DistributedSink::writeToLocal(const Cluster::ShardInfo& shard_info, const Block & block, size_t repeats) +void DistributedSink::writeToLocal(const Cluster::ShardInfo & shard_info, const Block & block, size_t repeats) { OpenTelemetry::SpanHolder span(__PRETTY_FUNCTION__); span.addAttribute("clickhouse.shard_num", shard_info.shard_num); span.addAttribute("clickhouse.cluster", this->storage.cluster_name); span.addAttribute("clickhouse.distributed", this->storage.getStorageID().getFullNameNotQuoted()); span.addAttribute("clickhouse.remote", [this]() { return storage.remote_database + "." + storage.remote_table; }); - span.addAttribute("clickhouse.rows", [&block]() { return std::to_string(block.rows()); }); - span.addAttribute("clickhouse.bytes", [&block]() { return std::to_string(block.bytes()); }); + span.addAttribute("clickhouse.rows", [&block]() { return toString(block.rows()); }); + span.addAttribute("clickhouse.bytes", [&block]() { return toString(block.bytes()); }); try { @@ -677,7 +679,7 @@ void DistributedSink::writeToLocal(const Cluster::ShardInfo& shard_info, const B } -void DistributedSink::writeToShard(const Cluster::ShardInfo& shard_info, const Block & block, const std::vector & dir_names) +void DistributedSink::writeToShard(const Cluster::ShardInfo & shard_info, const Block & block, const std::vector & dir_names) { OpenTelemetry::SpanHolder span(__PRETTY_FUNCTION__); span.addAttribute("clickhouse.shard_num", shard_info.shard_num); diff --git a/src/Storages/Distributed/DistributedSink.h b/src/Storages/Distributed/DistributedSink.h index 5d7a5268865..af0c64cbd78 100644 --- a/src/Storages/Distributed/DistributedSink.h +++ b/src/Storages/Distributed/DistributedSink.h @@ -69,9 +69,9 @@ private: Block removeSuperfluousColumns(Block block) const; /// Increments finished_writings_count after each repeat. - void writeToLocal(const Cluster::ShardInfo& shard_info, const Block & block, size_t repeats); + void writeToLocal(const Cluster::ShardInfo & shard_info, const Block & block, size_t repeats); - void writeToShard(const Cluster::ShardInfo& shard_info, const Block & block, const std::vector & dir_names); + void writeToShard(const Cluster::ShardInfo & shard_info, const Block & block, const std::vector & dir_names); /// Performs synchronous insertion to remote nodes. If timeout_exceeded flag was set, throws. From f21ab12d8e364e0fce760bb5228a8ef00c8c0c66 Mon Sep 17 00:00:00 2001 From: Frank Chen Date: Tue, 6 Sep 2022 22:21:31 +0800 Subject: [PATCH 07/27] Use sleep to wait for flush --- ...7_opentelemetry_insert_on_distributed_table.sh | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/tests/queries/0_stateless/02417_opentelemetry_insert_on_distributed_table.sh b/tests/queries/0_stateless/02417_opentelemetry_insert_on_distributed_table.sh index 6f766e9f3bb..b9b5dd2d424 100755 --- a/tests/queries/0_stateless/02417_opentelemetry_insert_on_distributed_table.sh +++ b/tests/queries/0_stateless/02417_opentelemetry_insert_on_distributed_table.sh @@ -33,15 +33,11 @@ SET insert_distributed_sync=0; INSERT INTO default.dist_opentelemetry SETTINGS opentelemetry_start_trace_probability=1 VALUES(1),(2); " -# Wait complete of ASYNC INSERT on distributed table -wait +# Wait 10s to complete of ASYNC INSERT on distributed table and flush of system.opentelemetry_span_log +sleep 10 # Check log ${CLICKHOUSE_CLIENT} -nq " --- Flush opentelemetry span log on all nodes -SET distributed_ddl_output_mode = 'none'; -SYSTEM FLUSH LOGS ON CLUSTER test_cluster_two_shards; - -- Above INSERT will insert data to two shards respectively, so there will be two spans generated SELECT attribute FROM cluster('test_cluster_two_shards', system, opentelemetry_span_log) WHERE operation_name like '%writeToLocal%'; SELECT attribute FROM cluster('test_cluster_two_shards', system, opentelemetry_span_log) WHERE operation_name like '%processFile%'; @@ -64,12 +60,11 @@ SET insert_distributed_sync=1; INSERT INTO default.dist_opentelemetry SETTINGS opentelemetry_start_trace_probability=1 VALUES(1),(2); " +# Wait 10s to flush system.opentelemetry_span_log +sleep 10 + # Check log ${CLICKHOUSE_CLIENT} -nq " --- Flush opentelemetry span log on all nodes -SET distributed_ddl_output_mode = 'none'; -SYSTEM FLUSH LOGS ON CLUSTER test_cluster_two_shards; - -- Above INSERT will insert data to two shards in the same flow, so there should be two spans generated with the same operation name SELECT attribute FROM cluster('test_cluster_two_shards', system, opentelemetry_span_log) WHERE operation_name like '%runWritingJob%'; " From 329f31e7ab2f2436a9dc45ec001a06563893852d Mon Sep 17 00:00:00 2001 From: Frank Chen Date: Thu, 8 Sep 2022 11:38:10 +0800 Subject: [PATCH 08/27] Address review comments Signed-off-by: Frank Chen --- src/Storages/Distributed/DirectoryMonitor.cpp | 6 +-- ...entelemetry_insert_on_distributed_table.sh | 41 ++++++++----------- 2 files changed, 19 insertions(+), 28 deletions(-) diff --git a/src/Storages/Distributed/DirectoryMonitor.cpp b/src/Storages/Distributed/DirectoryMonitor.cpp index f84ddeb4f5e..b3d2494c7e1 100644 --- a/src/Storages/Distributed/DirectoryMonitor.cpp +++ b/src/Storages/Distributed/DirectoryMonitor.cpp @@ -141,7 +141,7 @@ namespace size_t bytes = 0; UInt32 shard_num = 0; - std::string cluster_name; + std::string cluster; std::string distributed_table; std::string remote_table; @@ -203,7 +203,7 @@ namespace if (header_buf.hasPendingData()) { readVarUInt(distributed_header.shard_num, header_buf); - readStringBinary(distributed_header.cluster_name, header_buf); + readStringBinary(distributed_header.cluster, header_buf); readStringBinary(distributed_header.distributed_table, header_buf); readStringBinary(distributed_header.remote_table, header_buf); } @@ -638,7 +638,7 @@ void StorageDistributedDirectoryMonitor::processFile(const std::string & file_pa distributed_header.client_info.client_trace_context, this->storage.getContext()->getOpenTelemetrySpanLog()); thread_trace_context->root_span.addAttribute("clickhouse.shard_num", distributed_header.shard_num); - thread_trace_context->root_span.addAttribute("clickhouse.cluster", distributed_header.cluster_name); + thread_trace_context->root_span.addAttribute("clickhouse.cluster", distributed_header.cluster); thread_trace_context->root_span.addAttribute("clickhouse.distributed", distributed_header.distributed_table); thread_trace_context->root_span.addAttribute("clickhouse.remote", distributed_header.remote_table); thread_trace_context->root_span.addAttribute("clickhouse.rows", distributed_header.rows); diff --git a/tests/queries/0_stateless/02417_opentelemetry_insert_on_distributed_table.sh b/tests/queries/0_stateless/02417_opentelemetry_insert_on_distributed_table.sh index b9b5dd2d424..319f0151b1d 100755 --- a/tests/queries/0_stateless/02417_opentelemetry_insert_on_distributed_table.sh +++ b/tests/queries/0_stateless/02417_opentelemetry_insert_on_distributed_table.sh @@ -1,5 +1,5 @@ #!/usr/bin/env bash -# Tags: distributed +# Tags: no-fasttest, distributed set -ue @@ -10,9 +10,7 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) . "$CURDIR"/../shell_config.sh -${CLICKHOUSE_CLIENT} -nq " -SET distributed_ddl_output_mode = 'none'; - +${CLICKHOUSE_CLIENT} --distributed_ddl_output_mode=none -nq " SYSTEM FLUSH LOGS ON CLUSTER test_cluster_two_shards; TRUNCATE TABLE IF EXISTS system.opentelemetry_span_log ON CLUSTER test_cluster_two_shards; @@ -28,16 +26,17 @@ CREATE TABLE default.local_opentelemetry ON CLUSTER test_cluster_two_shards (key # Do test with opentelemetry enabled # ${CLICKHOUSE_CLIENT} -nq " --- Make sure it's async -SET insert_distributed_sync=0; -INSERT INTO default.dist_opentelemetry SETTINGS opentelemetry_start_trace_probability=1 VALUES(1),(2); +INSERT INTO default.dist_opentelemetry SETTINGS opentelemetry_start_trace_probability=1, insert_distributed_sync=0 VALUES(1),(2); " -# Wait 10s to complete of ASYNC INSERT on distributed table and flush of system.opentelemetry_span_log -sleep 10 - # Check log -${CLICKHOUSE_CLIENT} -nq " +${CLICKHOUSE_CLIENT} --distributed_ddl_output_mode=none -nq " +-- Make sure INSERT on distributed finishes +SYSTEM FLUSH DISTRIBUTED default.dist_opentelemetry ON CLUSTER test_cluster_two_shards; + +-- Make sure opentelemetry span log flushed +SYSTEM FLUSH LOGS ON CLUSTER test_cluster_two_shards; + -- Above INSERT will insert data to two shards respectively, so there will be two spans generated SELECT attribute FROM cluster('test_cluster_two_shards', system, opentelemetry_span_log) WHERE operation_name like '%writeToLocal%'; SELECT attribute FROM cluster('test_cluster_two_shards', system, opentelemetry_span_log) WHERE operation_name like '%processFile%'; @@ -47,24 +46,17 @@ SELECT attribute FROM cluster('test_cluster_two_shards', system, opentelemetry_s # INSERT SYNC test # Do test with opentelemetry enabled and in SYNC mode # -${CLICKHOUSE_CLIENT} -nq " - +${CLICKHOUSE_CLIENT} --distributed_ddl_output_mode=none -nq " -- Clear log -SET distributed_ddl_output_mode = 'none'; TRUNCATE TABLE IF EXISTS system.opentelemetry_span_log ON CLUSTER test_cluster_two_shards; --- Make sure it's SYNC -SET insert_distributed_sync=1; - --- INSERT test -INSERT INTO default.dist_opentelemetry SETTINGS opentelemetry_start_trace_probability=1 VALUES(1),(2); +INSERT INTO default.dist_opentelemetry SETTINGS opentelemetry_start_trace_probability=1, insert_distributed_sync=1 VALUES(1),(2); " -# Wait 10s to flush system.opentelemetry_span_log -sleep 10 - # Check log -${CLICKHOUSE_CLIENT} -nq " +${CLICKHOUSE_CLIENT} --distributed_ddl_output_mode=none -nq " +SYSTEM FLUSH LOGS ON CLUSTER test_cluster_two_shards; + -- Above INSERT will insert data to two shards in the same flow, so there should be two spans generated with the same operation name SELECT attribute FROM cluster('test_cluster_two_shards', system, opentelemetry_span_log) WHERE operation_name like '%runWritingJob%'; " @@ -72,8 +64,7 @@ SELECT attribute FROM cluster('test_cluster_two_shards', system, opentelemetry_s # # Cleanup # -${CLICKHOUSE_CLIENT} -nq " -SET distributed_ddl_output_mode = 'none'; +${CLICKHOUSE_CLIENT} --distributed_ddl_output_mode=none -nq " DROP TABLE default.dist_opentelemetry ON CLUSTER test_cluster_two_shards; DROP TABLE default.local_opentelemetry ON CLUSTER test_cluster_two_shards; " From a9863805222bbb90152b57540a3577caa104096a Mon Sep 17 00:00:00 2001 From: Frank Chen Date: Thu, 8 Sep 2022 17:25:29 +0800 Subject: [PATCH 09/27] Update src/Storages/Distributed/DirectoryMonitor.cpp Co-authored-by: Azat Khuzhin --- src/Storages/Distributed/DirectoryMonitor.cpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/Storages/Distributed/DirectoryMonitor.cpp b/src/Storages/Distributed/DirectoryMonitor.cpp index b3d2494c7e1..e8d48431a9e 100644 --- a/src/Storages/Distributed/DirectoryMonitor.cpp +++ b/src/Storages/Distributed/DirectoryMonitor.cpp @@ -202,10 +202,10 @@ namespace if (header_buf.hasPendingData()) { - readVarUInt(distributed_header.shard_num, header_buf); - readStringBinary(distributed_header.cluster, header_buf); - readStringBinary(distributed_header.distributed_table, header_buf); - readStringBinary(distributed_header.remote_table, header_buf); + readVarUInt(distributed_header.shard_num, header_buf); + readStringBinary(distributed_header.cluster, header_buf); + readStringBinary(distributed_header.distributed_table, header_buf); + readStringBinary(distributed_header.remote_table, header_buf); } /// Add handling new data here, for example: From 237abffdba538c9a4acc85db4b15dcdba4e735ac Mon Sep 17 00:00:00 2001 From: Frank Chen Date: Fri, 9 Sep 2022 11:59:53 +0800 Subject: [PATCH 10/27] Improve test Signed-off-by: Frank Chen --- ...etry_insert_on_distributed_table.reference | 8 +- ...entelemetry_insert_on_distributed_table.sh | 92 ++++++++++++++----- 2 files changed, 72 insertions(+), 28 deletions(-) diff --git a/tests/queries/0_stateless/02417_opentelemetry_insert_on_distributed_table.reference b/tests/queries/0_stateless/02417_opentelemetry_insert_on_distributed_table.reference index fac9fabce8a..98fb6a68656 100644 --- a/tests/queries/0_stateless/02417_opentelemetry_insert_on_distributed_table.reference +++ b/tests/queries/0_stateless/02417_opentelemetry_insert_on_distributed_table.reference @@ -1,4 +1,4 @@ -{'clickhouse.shard_num':'1','clickhouse.cluster':'test_cluster_two_shards','clickhouse.distributed':'default.dist_opentelemetry','clickhouse.remote':'default.local_opentelemetry','clickhouse.rows':'1','clickhouse.bytes':'8'} -{'clickhouse.shard_num':'2','clickhouse.cluster':'test_cluster_two_shards','clickhouse.distributed':'default.dist_opentelemetry','clickhouse.remote':'default.local_opentelemetry','clickhouse.rows':'1','clickhouse.bytes':'8'} -{'clickhouse.shard_num':'1','clickhouse.cluster':'test_cluster_two_shards','clickhouse.distributed':'default.dist_opentelemetry','clickhouse.remote':'default.local_opentelemetry','clickhouse.rows':'1','clickhouse.bytes':'8'} -{'clickhouse.shard_num':'2','clickhouse.cluster':'test_cluster_two_shards','clickhouse.distributed':'default.dist_opentelemetry','clickhouse.remote':'default.local_opentelemetry','clickhouse.rows':'1','clickhouse.bytes':'8'} +1 +1 +1 +1 diff --git a/tests/queries/0_stateless/02417_opentelemetry_insert_on_distributed_table.sh b/tests/queries/0_stateless/02417_opentelemetry_insert_on_distributed_table.sh index 319f0151b1d..7ec0650aaac 100755 --- a/tests/queries/0_stateless/02417_opentelemetry_insert_on_distributed_table.sh +++ b/tests/queries/0_stateless/02417_opentelemetry_insert_on_distributed_table.sh @@ -11,60 +11,104 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) ${CLICKHOUSE_CLIENT} --distributed_ddl_output_mode=none -nq " -SYSTEM FLUSH LOGS ON CLUSTER test_cluster_two_shards; -TRUNCATE TABLE IF EXISTS system.opentelemetry_span_log ON CLUSTER test_cluster_two_shards; +DROP TABLE IF EXISTS ${CLICKHOUSE_DATABASE}.dist_opentelemetry; +DROP TABLE IF EXISTS ${CLICKHOUSE_DATABASE}.local_opentelemetry; -DROP TABLE IF EXISTS default.dist_opentelemetry ON CLUSTER test_cluster_two_shards; -DROP TABLE IF EXISTS default.local_opentelemetry ON CLUSTER test_cluster_two_shards; - -CREATE TABLE default.dist_opentelemetry ON CLUSTER test_cluster_two_shards (key UInt64) Engine=Distributed('test_cluster_two_shards', default, local_opentelemetry, key % 2); -CREATE TABLE default.local_opentelemetry ON CLUSTER test_cluster_two_shards (key UInt64) Engine=MergeTree ORDER BY key; +CREATE TABLE ${CLICKHOUSE_DATABASE}.dist_opentelemetry (key UInt64) Engine=Distributed('test_cluster_two_shards', ${CLICKHOUSE_DATABASE}, local_opentelemetry, key % 2); +CREATE TABLE ${CLICKHOUSE_DATABASE}.local_opentelemetry (key UInt64) Engine=MergeTree ORDER BY key; " # # INSERT ASYNC test # Do test with opentelemetry enabled # -${CLICKHOUSE_CLIENT} -nq " -INSERT INTO default.dist_opentelemetry SETTINGS opentelemetry_start_trace_probability=1, insert_distributed_sync=0 VALUES(1),(2); -" +trace_id=$(${CLICKHOUSE_CLIENT} -q "select lower(hex(generateUUIDv4()))"); +echo "INSERT INTO ${CLICKHOUSE_DATABASE}.dist_opentelemetry SETTINGS insert_distributed_sync=0 VALUES(1),(2)" | +${CLICKHOUSE_CURL} \ + -X POST \ + -H "traceparent: 00-$trace_id-5250000000000525-01" \ + -H "tracestate: some custom state" \ + "${CLICKHOUSE_URL}" \ + --data @- # Check log ${CLICKHOUSE_CLIENT} --distributed_ddl_output_mode=none -nq " -- Make sure INSERT on distributed finishes -SYSTEM FLUSH DISTRIBUTED default.dist_opentelemetry ON CLUSTER test_cluster_two_shards; +SYSTEM FLUSH DISTRIBUTED ${CLICKHOUSE_DATABASE}.dist_opentelemetry; -- Make sure opentelemetry span log flushed -SYSTEM FLUSH LOGS ON CLUSTER test_cluster_two_shards; +SYSTEM FLUSH LOGS; -- Above INSERT will insert data to two shards respectively, so there will be two spans generated -SELECT attribute FROM cluster('test_cluster_two_shards', system, opentelemetry_span_log) WHERE operation_name like '%writeToLocal%'; -SELECT attribute FROM cluster('test_cluster_two_shards', system, opentelemetry_span_log) WHERE operation_name like '%processFile%'; +SELECT count() FROM system.opentelemetry_span_log +WHERE lower(hex(trace_id)) = '${trace_id}' +AND operation_name like '%writeToLocal%' +AND attribute['clickhouse.shard_num'] = '1' +AND attribute['clickhouse.cluster'] = 'test_cluster_two_shards' +AND attribute['clickhouse.distributed'] = '${CLICKHOUSE_DATABASE}.dist_opentelemetry' +AND attribute['clickhouse.remote'] = '${CLICKHOUSE_DATABASE}.local_opentelemetry' +AND attribute['clickhouse.rows'] = '1' +AND attribute['clickhouse.bytes'] = '8' +; + +SELECT count() FROM system.opentelemetry_span_log +WHERE lower(hex(trace_id)) = '${trace_id}' +AND operation_name like '%writeToLocal%' +AND attribute['clickhouse.shard_num'] = '2' +AND attribute['clickhouse.cluster'] = 'test_cluster_two_shards' +AND attribute['clickhouse.distributed'] = '${CLICKHOUSE_DATABASE}.dist_opentelemetry' +AND attribute['clickhouse.remote'] = '${CLICKHOUSE_DATABASE}.local_opentelemetry' +AND attribute['clickhouse.rows'] = '1' +AND attribute['clickhouse.bytes'] = '8' +; + " # # INSERT SYNC test # Do test with opentelemetry enabled and in SYNC mode # -${CLICKHOUSE_CLIENT} --distributed_ddl_output_mode=none -nq " --- Clear log -TRUNCATE TABLE IF EXISTS system.opentelemetry_span_log ON CLUSTER test_cluster_two_shards; - -INSERT INTO default.dist_opentelemetry SETTINGS opentelemetry_start_trace_probability=1, insert_distributed_sync=1 VALUES(1),(2); -" +trace_id=$(${CLICKHOUSE_CLIENT} -q "select lower(hex(generateUUIDv4()))"); +echo "INSERT INTO ${CLICKHOUSE_DATABASE}.dist_opentelemetry SETTINGS insert_distributed_sync=1 VALUES(1),(2)" | +${CLICKHOUSE_CURL} \ + -X POST \ + -H "traceparent: 00-$trace_id-5250000000000525-01" \ + -H "tracestate: some custom state" \ + "${CLICKHOUSE_URL}" \ + --data @- # Check log ${CLICKHOUSE_CLIENT} --distributed_ddl_output_mode=none -nq " -SYSTEM FLUSH LOGS ON CLUSTER test_cluster_two_shards; +SYSTEM FLUSH LOGS; -- Above INSERT will insert data to two shards in the same flow, so there should be two spans generated with the same operation name -SELECT attribute FROM cluster('test_cluster_two_shards', system, opentelemetry_span_log) WHERE operation_name like '%runWritingJob%'; +SELECT count() FROM system.opentelemetry_span_log +WHERE lower(hex(trace_id)) = '${trace_id}' +AND operation_name like '%runWritingJob%' +AND attribute['clickhouse.shard_num'] = '1' +AND attribute['clickhouse.cluster'] = 'test_cluster_two_shards' +AND attribute['clickhouse.distributed'] = '${CLICKHOUSE_DATABASE}.dist_opentelemetry' +AND attribute['clickhouse.remote'] = '${CLICKHOUSE_DATABASE}.local_opentelemetry' +AND attribute['clickhouse.rows'] = '1' +AND attribute['clickhouse.bytes'] = '8' +; + +SELECT count() FROM system.opentelemetry_span_log +WHERE lower(hex(trace_id)) = '${trace_id}' +AND operation_name like '%runWritingJob%' +AND attribute['clickhouse.shard_num'] = '2' +AND attribute['clickhouse.cluster'] = 'test_cluster_two_shards' +AND attribute['clickhouse.distributed'] = '${CLICKHOUSE_DATABASE}.dist_opentelemetry' +AND attribute['clickhouse.remote'] = '${CLICKHOUSE_DATABASE}.local_opentelemetry' +AND attribute['clickhouse.rows'] = '1' +AND attribute['clickhouse.bytes'] = '8' +; " # # Cleanup # ${CLICKHOUSE_CLIENT} --distributed_ddl_output_mode=none -nq " -DROP TABLE default.dist_opentelemetry ON CLUSTER test_cluster_two_shards; -DROP TABLE default.local_opentelemetry ON CLUSTER test_cluster_two_shards; +DROP TABLE ${CLICKHOUSE_DATABASE}.dist_opentelemetry; +DROP TABLE ${CLICKHOUSE_DATABASE}.local_opentelemetry; " From 92a92baa33842f6a1aeee232f22a17b4fd71e923 Mon Sep 17 00:00:00 2001 From: Frank Chen Date: Fri, 9 Sep 2022 12:18:27 +0800 Subject: [PATCH 11/27] Simplify test Signed-off-by: Frank Chen --- ...entelemetry_insert_on_distributed_table.sh | 126 ++++++------------ 1 file changed, 42 insertions(+), 84 deletions(-) diff --git a/tests/queries/0_stateless/02417_opentelemetry_insert_on_distributed_table.sh b/tests/queries/0_stateless/02417_opentelemetry_insert_on_distributed_table.sh index 7ec0650aaac..1b4e1da97f9 100755 --- a/tests/queries/0_stateless/02417_opentelemetry_insert_on_distributed_table.sh +++ b/tests/queries/0_stateless/02417_opentelemetry_insert_on_distributed_table.sh @@ -1,16 +1,44 @@ #!/usr/bin/env bash # Tags: no-fasttest, distributed -set -ue - -unset CLICKHOUSE_LOG_COMMENT - CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) # shellcheck source=../shell_config.sh . "$CURDIR"/../shell_config.sh -${CLICKHOUSE_CLIENT} --distributed_ddl_output_mode=none -nq " +function insert() +{ + echo "INSERT INTO ${CLICKHOUSE_DATABASE}.dist_opentelemetry SETTINGS insert_distributed_sync=$2 VALUES(1),(2)" | + ${CLICKHOUSE_CURL} \ + -X POST \ + -H "traceparent: 00-$1-5150000000000515-01" \ + -H "tracestate: some custom state" \ + "${CLICKHOUSE_URL}" \ + --data @- +} + +function check_span() +{ +${CLICKHOUSE_CLIENT} -nq " + SYSTEM FLUSH LOGS; + + SELECT count() FROM system.opentelemetry_span_log + WHERE lower(hex(trace_id)) = '${1}' + AND operation_name like '${2}' + AND attribute['clickhouse.shard_num'] = '${3}' + AND attribute['clickhouse.cluster'] = 'test_cluster_two_shards' + AND attribute['clickhouse.distributed'] = '${CLICKHOUSE_DATABASE}.dist_opentelemetry' + AND attribute['clickhouse.remote'] = '${CLICKHOUSE_DATABASE}.local_opentelemetry' + AND attribute['clickhouse.rows'] = '1' + AND attribute['clickhouse.bytes'] = '8' + ;" +} + + +# +# Prepare tables for tests +# +${CLICKHOUSE_CLIENT} -nq " DROP TABLE IF EXISTS ${CLICKHOUSE_DATABASE}.dist_opentelemetry; DROP TABLE IF EXISTS ${CLICKHOUSE_DATABASE}.local_opentelemetry; @@ -19,96 +47,26 @@ CREATE TABLE ${CLICKHOUSE_DATABASE}.local_opentelemetry (key UInt64) Engine=Merg " # -# INSERT ASYNC test -# Do test with opentelemetry enabled +# ASYNC INSERT test with opentelemetry enabled # trace_id=$(${CLICKHOUSE_CLIENT} -q "select lower(hex(generateUUIDv4()))"); -echo "INSERT INTO ${CLICKHOUSE_DATABASE}.dist_opentelemetry SETTINGS insert_distributed_sync=0 VALUES(1),(2)" | -${CLICKHOUSE_CURL} \ - -X POST \ - -H "traceparent: 00-$trace_id-5250000000000525-01" \ - -H "tracestate: some custom state" \ - "${CLICKHOUSE_URL}" \ - --data @- +insert $trace_id 0 +check_span $trace_id '%writeToLocal%' '1' +check_span $trace_id '%writeToLocal%' '2' -# Check log -${CLICKHOUSE_CLIENT} --distributed_ddl_output_mode=none -nq " --- Make sure INSERT on distributed finishes -SYSTEM FLUSH DISTRIBUTED ${CLICKHOUSE_DATABASE}.dist_opentelemetry; - --- Make sure opentelemetry span log flushed -SYSTEM FLUSH LOGS; - --- Above INSERT will insert data to two shards respectively, so there will be two spans generated -SELECT count() FROM system.opentelemetry_span_log -WHERE lower(hex(trace_id)) = '${trace_id}' -AND operation_name like '%writeToLocal%' -AND attribute['clickhouse.shard_num'] = '1' -AND attribute['clickhouse.cluster'] = 'test_cluster_two_shards' -AND attribute['clickhouse.distributed'] = '${CLICKHOUSE_DATABASE}.dist_opentelemetry' -AND attribute['clickhouse.remote'] = '${CLICKHOUSE_DATABASE}.local_opentelemetry' -AND attribute['clickhouse.rows'] = '1' -AND attribute['clickhouse.bytes'] = '8' -; - -SELECT count() FROM system.opentelemetry_span_log -WHERE lower(hex(trace_id)) = '${trace_id}' -AND operation_name like '%writeToLocal%' -AND attribute['clickhouse.shard_num'] = '2' -AND attribute['clickhouse.cluster'] = 'test_cluster_two_shards' -AND attribute['clickhouse.distributed'] = '${CLICKHOUSE_DATABASE}.dist_opentelemetry' -AND attribute['clickhouse.remote'] = '${CLICKHOUSE_DATABASE}.local_opentelemetry' -AND attribute['clickhouse.rows'] = '1' -AND attribute['clickhouse.bytes'] = '8' -; - -" # -# INSERT SYNC test -# Do test with opentelemetry enabled and in SYNC mode +# SYNC INSERT SYNC test with opentelemetry enabled # trace_id=$(${CLICKHOUSE_CLIENT} -q "select lower(hex(generateUUIDv4()))"); -echo "INSERT INTO ${CLICKHOUSE_DATABASE}.dist_opentelemetry SETTINGS insert_distributed_sync=1 VALUES(1),(2)" | -${CLICKHOUSE_CURL} \ - -X POST \ - -H "traceparent: 00-$trace_id-5250000000000525-01" \ - -H "tracestate: some custom state" \ - "${CLICKHOUSE_URL}" \ - --data @- - -# Check log -${CLICKHOUSE_CLIENT} --distributed_ddl_output_mode=none -nq " -SYSTEM FLUSH LOGS; - --- Above INSERT will insert data to two shards in the same flow, so there should be two spans generated with the same operation name -SELECT count() FROM system.opentelemetry_span_log -WHERE lower(hex(trace_id)) = '${trace_id}' -AND operation_name like '%runWritingJob%' -AND attribute['clickhouse.shard_num'] = '1' -AND attribute['clickhouse.cluster'] = 'test_cluster_two_shards' -AND attribute['clickhouse.distributed'] = '${CLICKHOUSE_DATABASE}.dist_opentelemetry' -AND attribute['clickhouse.remote'] = '${CLICKHOUSE_DATABASE}.local_opentelemetry' -AND attribute['clickhouse.rows'] = '1' -AND attribute['clickhouse.bytes'] = '8' -; - -SELECT count() FROM system.opentelemetry_span_log -WHERE lower(hex(trace_id)) = '${trace_id}' -AND operation_name like '%runWritingJob%' -AND attribute['clickhouse.shard_num'] = '2' -AND attribute['clickhouse.cluster'] = 'test_cluster_two_shards' -AND attribute['clickhouse.distributed'] = '${CLICKHOUSE_DATABASE}.dist_opentelemetry' -AND attribute['clickhouse.remote'] = '${CLICKHOUSE_DATABASE}.local_opentelemetry' -AND attribute['clickhouse.rows'] = '1' -AND attribute['clickhouse.bytes'] = '8' -; -" +insert $trace_id 1 +check_span $trace_id '%runWritingJob%' '1' +check_span $trace_id '%runWritingJob%' '2' # # Cleanup # -${CLICKHOUSE_CLIENT} --distributed_ddl_output_mode=none -nq " +${CLICKHOUSE_CLIENT} -nq " DROP TABLE ${CLICKHOUSE_DATABASE}.dist_opentelemetry; DROP TABLE ${CLICKHOUSE_DATABASE}.local_opentelemetry; " From 2fb0ae7002938720751fe606c3725a84dfbcad88 Mon Sep 17 00:00:00 2001 From: Frank Chen Date: Fri, 9 Sep 2022 19:02:42 +0800 Subject: [PATCH 12/27] Update test case --- .../02417_opentelemetry_insert_on_distributed_table.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/queries/0_stateless/02417_opentelemetry_insert_on_distributed_table.sh b/tests/queries/0_stateless/02417_opentelemetry_insert_on_distributed_table.sh index 1b4e1da97f9..55457d26249 100755 --- a/tests/queries/0_stateless/02417_opentelemetry_insert_on_distributed_table.sh +++ b/tests/queries/0_stateless/02417_opentelemetry_insert_on_distributed_table.sh @@ -52,7 +52,7 @@ CREATE TABLE ${CLICKHOUSE_DATABASE}.local_opentelemetry (key UInt64) Engine=Merg trace_id=$(${CLICKHOUSE_CLIENT} -q "select lower(hex(generateUUIDv4()))"); insert $trace_id 0 check_span $trace_id '%writeToLocal%' '1' -check_span $trace_id '%writeToLocal%' '2' +check_span $trace_id '%processFile%' '2' # From 57146c9361fd7ba87f7798f9a001db2a1b1d523d Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Fri, 9 Sep 2022 19:43:14 +0200 Subject: [PATCH 13/27] Fix typos in SortedBlocksWriter Signed-off-by: Azat Khuzhin --- src/Interpreters/SortedBlocksWriter.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Interpreters/SortedBlocksWriter.cpp b/src/Interpreters/SortedBlocksWriter.cpp index 0acb056690f..20859e23ea7 100644 --- a/src/Interpreters/SortedBlocksWriter.cpp +++ b/src/Interpreters/SortedBlocksWriter.cpp @@ -87,7 +87,7 @@ void SortedBlocksWriter::insert(Block && block) { std::lock_guard lock{insert_mutex}; - /// insert bock into BlocksList undef lock + /// insert block into BlocksList under lock inserted_blocks.insert(std::move(block)); size_t total_row_count = inserted_blocks.row_count + row_count_in_flush; From 763bb18f98ac34521fe342158a44527e3318d58a Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Fri, 9 Sep 2022 19:23:45 +0200 Subject: [PATCH 14/27] Fix SIGSEGV in SortedBlocksWriter in case of empty block CI found one issue [1]. Here is the stack trace for invalid read:
stack trace ``` 0: DB::TemporaryFileLazySource::TemporaryFileLazySource(std::__1::basic_string, std::__1::allocator > const&, DB::Block const&) [inlined] std::__1::basic_string, std::__1::allocator >::__is_long(this="") const at string:1445:22 1: DB::TemporaryFileLazySource::TemporaryFileLazySource(std::__1::basic_string, std::__1::allocator > const&, DB::Block const&) [inlined] std::__1::basic_string, std::__1::allocator >::basic_string(this="", __str="") at string:1927 2: DB::TemporaryFileLazySource::TemporaryFileLazySource(this=0x00007f3aec105f58, path_="", header_=0x00007f38ffd93b40) at TemporaryFileLazySource.cpp:11 3: DB::SortedBlocksWriter::streamFromFile(std::__1::unique_ptr > const&) const [inlined] DB::TemporaryFileLazySource* std::__1::construct_at, std::__1::allocator > const&, DB::Block, DB::TemporaryFileLazySource*>(__args=0x00007f38ffd91560) at construct_at.h:38:50 4: DB::SortedBlocksWriter::streamFromFile(std::__1::unique_ptr > const&) const [inlined] void std::__1::allocator_traits >::construct, std::__1::allocator > const&, DB::Block, void, void>(__args=0x00007f38ffd91560) at allocator_traits.h:298 5: DB::SortedBlocksWriter::streamFromFile(std::__1::unique_ptr > const&) const [inlined] std::__1::__shared_ptr_emplace >::__shared_ptr_emplace, std::__1::allocator > const&, DB::Block>(this=0x00007f3aec105f40, __args=0x00007f38ffd91560) at shared_ptr.h:293 6: DB::SortedBlocksWriter::streamFromFile(std::__1::unique_ptr > const&) const [inlined] std::__1::shared_ptr std::__1::allocate_shared, std::__1::basic_string, std::__1::allocator > const&, DB::Block, void>(__args=, __args=) at shared_ptr.h:954 7: DB::SortedBlocksWriter::streamFromFile(std::__1::unique_ptr > const&) const [inlined] std::__1::shared_ptr std::__1::make_shared, std::__1::allocator > const&, DB::Block, void>(__args=, __args=) at shared_ptr.h:963 8: DB::SortedBlocksWriter::streamFromFile(this=, file=) const at SortedBlocksWriter.cpp:238 9: DB::SortedBlocksWriter::premerge(this=) at SortedBlocksWriter.cpp:209:32 ```
[1]: https://s3.amazonaws.com/clickhouse-test-reports/41046/adea92f847373d1fcfd733d8979c63024f9b80bf/stress_test__asan_.html So the problem here is that there was empty unique_ptr<> reference to temporary file, because of empty block that accepted by SortedBlocksWriter::insert(), but insert() is not a problem the problem is premerge() that steals blocks from insert() and do not have check that there are some rows. However this check exists in SortedBlocksWriter::flush(), and in that case temporary file is not created. Signed-off-by: Azat Khuzhin --- src/Interpreters/SortedBlocksWriter.cpp | 10 +++++++++- .../0_stateless/02070_join_on_disk.reference | 0 .../0_stateless/02070_join_on_disk.sql | 19 +++++++++++++++++++ 3 files changed, 28 insertions(+), 1 deletion(-) create mode 100644 tests/queries/0_stateless/02070_join_on_disk.reference create mode 100644 tests/queries/0_stateless/02070_join_on_disk.sql diff --git a/src/Interpreters/SortedBlocksWriter.cpp b/src/Interpreters/SortedBlocksWriter.cpp index 20859e23ea7..755c43df635 100644 --- a/src/Interpreters/SortedBlocksWriter.cpp +++ b/src/Interpreters/SortedBlocksWriter.cpp @@ -28,6 +28,11 @@ namespace CurrentMetrics namespace DB { +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} + namespace { @@ -84,6 +89,9 @@ void SortedBlocksWriter::insert(Block && block) size_t bytes = 0; size_t flush_no = 0; + if (!block.rows()) + return; + { std::lock_guard lock{insert_mutex}; @@ -145,7 +153,7 @@ SortedBlocksWriter::TmpFilePtr SortedBlocksWriter::flush(const BlocksList & bloc pipes.emplace_back(std::make_shared(block.cloneEmpty(), Chunk(block.getColumns(), num_rows))); if (pipes.empty()) - return {}; + throw Exception(ErrorCodes::LOGICAL_ERROR, "Empty block"); QueryPipelineBuilder pipeline; pipeline.init(Pipe::unitePipes(std::move(pipes))); diff --git a/tests/queries/0_stateless/02070_join_on_disk.reference b/tests/queries/0_stateless/02070_join_on_disk.reference new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/queries/0_stateless/02070_join_on_disk.sql b/tests/queries/0_stateless/02070_join_on_disk.sql new file mode 100644 index 00000000000..c25a7a1ffac --- /dev/null +++ b/tests/queries/0_stateless/02070_join_on_disk.sql @@ -0,0 +1,19 @@ +-- Regression test when Join stores data on disk and receive empty block. +-- Because of this it does not create empty file, while expect it. + +SET max_threads = 1; +SET join_algorithm = 'auto'; +SET max_rows_in_join = 1000; +SET optimize_aggregation_in_order = 1; +SET max_block_size = 1000; + +DROP TABLE IF EXISTS join_on_disk; + +SYSTEM STOP MERGES join_on_disk; + +CREATE TABLE join_on_disk (id Int) Engine=MergeTree() ORDER BY id; + +INSERT INTO join_on_disk SELECT number as id FROM numbers_mt(50000); +INSERT INTO join_on_disk SELECT number as id FROM numbers_mt(1000); + +SELECT id FROM join_on_disk lhs LEFT JOIN (SELECT id FROM join_on_disk GROUP BY id) rhs USING (id) FORMAT Null; From b2b5091ffd04164ffcfee7986b0973c31e2fbc3c Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Fri, 9 Sep 2022 22:01:52 +0300 Subject: [PATCH 15/27] Update 02070_join_on_disk.sql --- tests/queries/0_stateless/02070_join_on_disk.sql | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/queries/0_stateless/02070_join_on_disk.sql b/tests/queries/0_stateless/02070_join_on_disk.sql index c25a7a1ffac..eabf31df25f 100644 --- a/tests/queries/0_stateless/02070_join_on_disk.sql +++ b/tests/queries/0_stateless/02070_join_on_disk.sql @@ -17,3 +17,5 @@ INSERT INTO join_on_disk SELECT number as id FROM numbers_mt(50000); INSERT INTO join_on_disk SELECT number as id FROM numbers_mt(1000); SELECT id FROM join_on_disk lhs LEFT JOIN (SELECT id FROM join_on_disk GROUP BY id) rhs USING (id) FORMAT Null; + +DROP TABLE join_on_disk; From 27b6a25473ef898b39baed2d6432778070a1d619 Mon Sep 17 00:00:00 2001 From: Frank Chen Date: Mon, 12 Sep 2022 11:04:30 +0800 Subject: [PATCH 16/27] Compare content instead of count for easier problem solving Signed-off-by: Frank Chen --- ...etry_insert_on_distributed_table.reference | 8 +++---- ...entelemetry_insert_on_distributed_table.sh | 23 +++++++++---------- 2 files changed, 15 insertions(+), 16 deletions(-) diff --git a/tests/queries/0_stateless/02417_opentelemetry_insert_on_distributed_table.reference b/tests/queries/0_stateless/02417_opentelemetry_insert_on_distributed_table.reference index 98fb6a68656..71bbd6f22ae 100644 --- a/tests/queries/0_stateless/02417_opentelemetry_insert_on_distributed_table.reference +++ b/tests/queries/0_stateless/02417_opentelemetry_insert_on_distributed_table.reference @@ -1,4 +1,4 @@ -1 -1 -1 -1 +{"operation_name":"void DB::DistributedSink::writeToLocal(const Cluster::ShardInfo &, const DB::Block &, size_t)","cluster":"test_cluster_two_shards","shard":"1","rows":"1","bytes":"8"} +{"operation_name":"void DB::DistributedSink::writeToLocal(const Cluster::ShardInfo &, const DB::Block &, size_t)","cluster":"test_cluster_two_shards","shard":"2","rows":"1","bytes":"8"} +{"operation_name":"auto DB::DistributedSink::runWritingJob(DB::DistributedSink::JobReplica &, const DB::Block &, size_t)::(anonymous class)::operator()() const","cluster":"test_cluster_two_shards","shard":"1","rows":"1","bytes":"8"} +{"operation_name":"auto DB::DistributedSink::runWritingJob(DB::DistributedSink::JobReplica &, const DB::Block &, size_t)::(anonymous class)::operator()() const","cluster":"test_cluster_two_shards","shard":"2","rows":"1","bytes":"8"} diff --git a/tests/queries/0_stateless/02417_opentelemetry_insert_on_distributed_table.sh b/tests/queries/0_stateless/02417_opentelemetry_insert_on_distributed_table.sh index 55457d26249..06b35298b89 100755 --- a/tests/queries/0_stateless/02417_opentelemetry_insert_on_distributed_table.sh +++ b/tests/queries/0_stateless/02417_opentelemetry_insert_on_distributed_table.sh @@ -22,15 +22,17 @@ function check_span() ${CLICKHOUSE_CLIENT} -nq " SYSTEM FLUSH LOGS; - SELECT count() FROM system.opentelemetry_span_log - WHERE lower(hex(trace_id)) = '${1}' - AND operation_name like '${2}' - AND attribute['clickhouse.shard_num'] = '${3}' - AND attribute['clickhouse.cluster'] = 'test_cluster_two_shards' + SELECT operation_name, + attribute['clickhouse.cluster'] AS cluster, + attribute['clickhouse.shard_num'] AS shard, + attribute['clickhouse.rows'] AS rows, + attribute['clickhouse.bytes'] AS bytes + FROM system.opentelemetry_span_log + WHERE finish_date >= yesterday() + AND lower(hex(trace_id)) = '${1}' AND attribute['clickhouse.distributed'] = '${CLICKHOUSE_DATABASE}.dist_opentelemetry' AND attribute['clickhouse.remote'] = '${CLICKHOUSE_DATABASE}.local_opentelemetry' - AND attribute['clickhouse.rows'] = '1' - AND attribute['clickhouse.bytes'] = '8' + Format JSONEachRow ;" } @@ -51,17 +53,14 @@ CREATE TABLE ${CLICKHOUSE_DATABASE}.local_opentelemetry (key UInt64) Engine=Merg # trace_id=$(${CLICKHOUSE_CLIENT} -q "select lower(hex(generateUUIDv4()))"); insert $trace_id 0 -check_span $trace_id '%writeToLocal%' '1' -check_span $trace_id '%processFile%' '2' - +check_span $trace_id # # SYNC INSERT SYNC test with opentelemetry enabled # trace_id=$(${CLICKHOUSE_CLIENT} -q "select lower(hex(generateUUIDv4()))"); insert $trace_id 1 -check_span $trace_id '%runWritingJob%' '1' -check_span $trace_id '%runWritingJob%' '2' +check_span $trace_id # # Cleanup From 16975ff4a834d50499db9bf03977bfbb6115a188 Mon Sep 17 00:00:00 2001 From: Frank Chen Date: Mon, 12 Sep 2022 14:18:21 +0800 Subject: [PATCH 17/27] Fix testcase Signed-off-by: Frank Chen --- .../02417_opentelemetry_insert_on_distributed_table.reference | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/queries/0_stateless/02417_opentelemetry_insert_on_distributed_table.reference b/tests/queries/0_stateless/02417_opentelemetry_insert_on_distributed_table.reference index 71bbd6f22ae..ee5d97c601b 100644 --- a/tests/queries/0_stateless/02417_opentelemetry_insert_on_distributed_table.reference +++ b/tests/queries/0_stateless/02417_opentelemetry_insert_on_distributed_table.reference @@ -1,4 +1,4 @@ {"operation_name":"void DB::DistributedSink::writeToLocal(const Cluster::ShardInfo &, const DB::Block &, size_t)","cluster":"test_cluster_two_shards","shard":"1","rows":"1","bytes":"8"} -{"operation_name":"void DB::DistributedSink::writeToLocal(const Cluster::ShardInfo &, const DB::Block &, size_t)","cluster":"test_cluster_two_shards","shard":"2","rows":"1","bytes":"8"} +{"operation_name":"void DB::StorageDistributedDirectoryMonitor::processFile(const std::string &)","cluster":"test_cluster_two_shards","shard":"2","rows":"1","bytes":"8"} {"operation_name":"auto DB::DistributedSink::runWritingJob(DB::DistributedSink::JobReplica &, const DB::Block &, size_t)::(anonymous class)::operator()() const","cluster":"test_cluster_two_shards","shard":"1","rows":"1","bytes":"8"} {"operation_name":"auto DB::DistributedSink::runWritingJob(DB::DistributedSink::JobReplica &, const DB::Block &, size_t)::(anonymous class)::operator()() const","cluster":"test_cluster_two_shards","shard":"2","rows":"1","bytes":"8"} From e985ee335463aa5ca8f2c93c4d996ac6a70132db Mon Sep 17 00:00:00 2001 From: Antonio Andelic Date: Mon, 12 Sep 2022 12:22:48 +0000 Subject: [PATCH 18/27] Close sessions on Keeper shutdown --- src/Coordination/CoordinationSettings.h | 1 + src/Coordination/KeeperDispatcher.cpp | 51 ++++++++++++++++--- .../{keeper_config.xml => keeper_config1.xml} | 18 +++++-- .../configs/keeper_config2.xml | 37 ++++++++++++++ .../configs/keeper_config3.xml | 37 ++++++++++++++ tests/integration/test_keeper_session/test.py | 28 +++++++++- 6 files changed, 160 insertions(+), 12 deletions(-) rename tests/integration/test_keeper_session/configs/{keeper_config.xml => keeper_config1.xml} (67%) create mode 100644 tests/integration/test_keeper_session/configs/keeper_config2.xml create mode 100644 tests/integration/test_keeper_session/configs/keeper_config3.xml diff --git a/src/Coordination/CoordinationSettings.h b/src/Coordination/CoordinationSettings.h index 5247f5d7ec8..c436c1b6635 100644 --- a/src/Coordination/CoordinationSettings.h +++ b/src/Coordination/CoordinationSettings.h @@ -30,6 +30,7 @@ struct Settings; M(UInt64, snapshot_distance, 100000, "How many log items we have to collect to write new snapshot", 0) \ M(Bool, auto_forwarding, true, "Allow to forward write requests from followers to leader", 0) \ M(Milliseconds, shutdown_timeout, 5000, "How much time we will wait until RAFT shutdown", 0) \ + M(Milliseconds, session_shutdown_timeout, 10000, "How much time we will wait until sessions are closed during shutdown", 0) \ M(Milliseconds, startup_timeout, 180000, "How much time we will wait until RAFT to start.", 0) \ M(LogsLevel, raft_logs_level, LogsLevel::information, "Log internal RAFT logs into main server log level. Valid values: 'trace', 'debug', 'information', 'warning', 'error', 'fatal', 'none'", 0) \ M(UInt64, rotate_log_storage_interval, 100000, "How many records will be stored in one log storage file", 0) \ diff --git a/src/Coordination/KeeperDispatcher.cpp b/src/Coordination/KeeperDispatcher.cpp index 5b376a03b02..48030ef86d2 100644 --- a/src/Coordination/KeeperDispatcher.cpp +++ b/src/Coordination/KeeperDispatcher.cpp @@ -354,9 +354,6 @@ void KeeperDispatcher::shutdown() update_configuration_thread.join(); } - if (server) - server->shutdown(); - KeeperStorage::RequestForSession request_for_session; /// Set session expired for all pending requests @@ -368,10 +365,52 @@ void KeeperDispatcher::shutdown() setResponse(request_for_session.session_id, response); } - /// Clear all registered sessions - std::lock_guard lock(session_to_response_callback_mutex); - session_to_response_callback.clear(); + KeeperStorage::RequestsForSessions close_requests; + { + /// Clear all registered sessions + std::lock_guard lock(session_to_response_callback_mutex); + + if (hasLeader()) + { + // send to leader CLOSE requests for active sessions + for (const auto & [session, response] : session_to_response_callback) + { + Coordination::ZooKeeperRequestPtr request = Coordination::ZooKeeperRequestFactory::instance().get(Coordination::OpNum::Close); + request->xid = Coordination::CLOSE_XID; + KeeperStorage::RequestForSession request_info; + request_info.request = request; + using namespace std::chrono; + request_info.time = duration_cast(system_clock::now().time_since_epoch()).count(); + request_info.session_id = session; + + close_requests.push_back(std::move(request_info)); + } + } + + session_to_response_callback.clear(); + } + + // if there is no leader, there is no reason to do CLOSE because it's a write request + if (hasLeader() && !close_requests.empty()) + { + LOG_INFO(log, "Trying to close {} session(s)", close_requests.size()); + const auto raft_result = server->putRequestBatch(close_requests); + Poco::Event sessions_closing_done; + raft_result->when_ready([&](nuraft::cmd_result> & /*result*/, nuraft::ptr & /*exception*/) + { + sessions_closing_done.set(); + }); + + auto session_shutdown_timeout = configuration_and_settings->coordination_settings->session_shutdown_timeout.totalMilliseconds(); + if (!sessions_closing_done.tryWait(session_shutdown_timeout)) + LOG_WARNING(log, "Failed to close sessions in {}ms. If they are not closed, they will be closed after session timeout.", session_shutdown_timeout); + } + + if (server) + server->shutdown(); + CurrentMetrics::set(CurrentMetrics::KeeperAliveConnections, 0); + } catch (...) { diff --git a/tests/integration/test_keeper_session/configs/keeper_config.xml b/tests/integration/test_keeper_session/configs/keeper_config1.xml similarity index 67% rename from tests/integration/test_keeper_session/configs/keeper_config.xml rename to tests/integration/test_keeper_session/configs/keeper_config1.xml index ed0bb52bd51..fd308fe8a2f 100644 --- a/tests/integration/test_keeper_session/configs/keeper_config.xml +++ b/tests/integration/test_keeper_session/configs/keeper_config1.xml @@ -1,4 +1,4 @@ - + 9181 1 @@ -19,9 +19,19 @@ 1 node1 9234 - true - 3 + + + 2 + node2 + 9234 + true + + + 3 + node3 + 9234 + true - + diff --git a/tests/integration/test_keeper_session/configs/keeper_config2.xml b/tests/integration/test_keeper_session/configs/keeper_config2.xml new file mode 100644 index 00000000000..ad558fbccad --- /dev/null +++ b/tests/integration/test_keeper_session/configs/keeper_config2.xml @@ -0,0 +1,37 @@ + + + 9181 + 2 + /var/lib/clickhouse/coordination/log + /var/lib/clickhouse/coordination/snapshots + * + + + 5000 + 10000 + 5000 + 75 + trace + + + + + 1 + node1 + 9234 + + + 2 + node2 + 9234 + true + + + 3 + node3 + 9234 + true + + + + diff --git a/tests/integration/test_keeper_session/configs/keeper_config3.xml b/tests/integration/test_keeper_session/configs/keeper_config3.xml new file mode 100644 index 00000000000..2a21f959816 --- /dev/null +++ b/tests/integration/test_keeper_session/configs/keeper_config3.xml @@ -0,0 +1,37 @@ + + + 9181 + 3 + /var/lib/clickhouse/coordination/log + /var/lib/clickhouse/coordination/snapshots + * + + + 5000 + 10000 + 5000 + 75 + trace + + + + + 1 + node1 + 9234 + + + 2 + node2 + 9234 + true + + + 3 + node3 + 9234 + true + + + + diff --git a/tests/integration/test_keeper_session/test.py b/tests/integration/test_keeper_session/test.py index 30db4d9548c..bb72a30359d 100644 --- a/tests/integration/test_keeper_session/test.py +++ b/tests/integration/test_keeper_session/test.py @@ -10,7 +10,15 @@ from kazoo.client import KazooClient cluster = ClickHouseCluster(__file__) node1 = cluster.add_instance( - "node1", main_configs=["configs/keeper_config.xml"], stay_alive=True + "node1", main_configs=["configs/keeper_config1.xml"], stay_alive=True +) + +node2 = cluster.add_instance( + "node2", main_configs=["configs/keeper_config2.xml"], stay_alive=True +) + +node3 = cluster.add_instance( + "node3", main_configs=["configs/keeper_config3.xml"], stay_alive=True ) bool_struct = struct.Struct("B") @@ -61,7 +69,7 @@ def wait_node(node): def wait_nodes(): - for n in [node1]: + for n in [node1, node2, node3]: wait_node(n) @@ -165,3 +173,19 @@ def test_session_timeout(started_cluster): negotiated_timeout, _ = handshake(node1.name, session_timeout=20000, session_id=0) assert negotiated_timeout == 10000 + + +def test_session_close_shutdown(started_cluster): + wait_nodes() + + node1_zk = get_fake_zk(node1.name) + node2_zk = get_fake_zk(node2.name) + + eph_node = "/test_node" + node2_zk.create(eph_node, ephemeral=True) + assert node1_zk.exists(eph_node) != None + + # shutdown while session is active + node2.stop_clickhouse() + + assert node1_zk.exists(eph_node) == None From c2dfabe51c7ecbfb32aac7cfe07e577acf04c3dd Mon Sep 17 00:00:00 2001 From: Antonio Andelic Date: Mon, 12 Sep 2022 12:25:39 +0000 Subject: [PATCH 19/27] Use shared_ptr for Event --- src/Coordination/KeeperDispatcher.cpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/Coordination/KeeperDispatcher.cpp b/src/Coordination/KeeperDispatcher.cpp index 48030ef86d2..9684f085f4a 100644 --- a/src/Coordination/KeeperDispatcher.cpp +++ b/src/Coordination/KeeperDispatcher.cpp @@ -395,14 +395,14 @@ void KeeperDispatcher::shutdown() { LOG_INFO(log, "Trying to close {} session(s)", close_requests.size()); const auto raft_result = server->putRequestBatch(close_requests); - Poco::Event sessions_closing_done; - raft_result->when_ready([&](nuraft::cmd_result> & /*result*/, nuraft::ptr & /*exception*/) + auto sessions_closing_done = std::make_shared(); + raft_result->when_ready([sessions_closing_done](nuraft::cmd_result> & /*result*/, nuraft::ptr & /*exception*/) { - sessions_closing_done.set(); + sessions_closing_done->set(); }); auto session_shutdown_timeout = configuration_and_settings->coordination_settings->session_shutdown_timeout.totalMilliseconds(); - if (!sessions_closing_done.tryWait(session_shutdown_timeout)) + if (!sessions_closing_done->tryWait(session_shutdown_timeout)) LOG_WARNING(log, "Failed to close sessions in {}ms. If they are not closed, they will be closed after session timeout.", session_shutdown_timeout); } From ebaa24ecaeedc6ba64fe4334f5a9e10583ecbd8c Mon Sep 17 00:00:00 2001 From: Frank Chen Date: Mon, 12 Sep 2022 22:15:30 +0800 Subject: [PATCH 20/27] Fix flaky tests Signed-off-by: Frank Chen --- src/Common/ThreadPool.h | 18 +++++++--- src/Core/BackgroundSchedulePool.cpp | 6 ++-- src/Core/BackgroundSchedulePool.h | 6 ++-- ...etry_insert_on_distributed_table.reference | 12 ++++--- ...entelemetry_insert_on_distributed_table.sh | 34 +++++++++++++++---- 5 files changed, 56 insertions(+), 20 deletions(-) diff --git a/src/Common/ThreadPool.h b/src/Common/ThreadPool.h index fc5377b3783..45f3455bf8f 100644 --- a/src/Common/ThreadPool.h +++ b/src/Common/ThreadPool.h @@ -264,6 +264,18 @@ protected: } }; +/// Schedule jobs/tasks on global thread pool without implicit passing tracing context on current thread to underlying worker as parent tracing context. +/// +/// If you implement your own job/task scheduling upon global thread pool or schedules a long time running job in a infinite loop way, +/// you need to use class, or you need to use ThreadFromGlobalPool below. +/// +/// See the comments of ThreadPool below to know how it works. +using ThreadFromGlobalPoolWithoutTracingContext = ThreadFromGlobalPoolImpl; + +/// An alias of thread that execute jobs/tasks on global thread pool by implicit passing tracing context on current thread to underlying worker as parent tracing context. +/// If jobs/tasks are directly scheduled by using APIs of this class, you need to use this class or you need to use class above. +using ThreadFromGlobalPool = ThreadFromGlobalPoolImpl; + /// Recommended thread pool for the case when multiple thread pools are created and destroyed. /// /// The template parameter of ThreadFromGlobalPool is set to false to disable tracing context propagation to underlying worker. @@ -274,9 +286,7 @@ protected: /// which means the tracing context initialized at underlying worker level won't be delete for a very long time. /// This would cause wrong context for further jobs scheduled in ThreadPool. /// -/// To make sure the tracing context are correctly propagated, we explicitly disable context propagation(including initialization and de-initialization) at underlying worker level. +/// To make sure the tracing context is correctly propagated, we explicitly disable context propagation(including initialization and de-initialization) at underlying worker level. /// -using ThreadPool = ThreadPoolImpl>; +using ThreadPool = ThreadPoolImpl; -/// An alias for user code to execute a job in the global thread pool -using ThreadFromGlobalPool = ThreadFromGlobalPoolImpl; diff --git a/src/Core/BackgroundSchedulePool.cpp b/src/Core/BackgroundSchedulePool.cpp index b7a33c4930d..b2adc07d92f 100644 --- a/src/Core/BackgroundSchedulePool.cpp +++ b/src/Core/BackgroundSchedulePool.cpp @@ -149,9 +149,9 @@ BackgroundSchedulePool::BackgroundSchedulePool(size_t size_, CurrentMetrics::Met threads.resize(size_); for (auto & thread : threads) - thread = ThreadFromGlobalPool([this] { threadFunction(); }); + thread = ThreadFromGlobalPoolWithoutTracingContext([this] { threadFunction(); }); - delayed_thread = ThreadFromGlobalPool([this] { delayExecutionThreadFunction(); }); + delayed_thread = ThreadFromGlobalPoolWithoutTracingContext([this] { delayExecutionThreadFunction(); }); } @@ -168,7 +168,7 @@ void BackgroundSchedulePool::increaseThreadsCount(size_t new_threads_count) threads.resize(new_threads_count); for (size_t i = old_threads_count; i < new_threads_count; ++i) - threads[i] = ThreadFromGlobalPool([this] { threadFunction(); }); + threads[i] = ThreadFromGlobalPoolWithoutTracingContext([this] { threadFunction(); }); } diff --git a/src/Core/BackgroundSchedulePool.h b/src/Core/BackgroundSchedulePool.h index 36cbad145c9..e7abc99a4a8 100644 --- a/src/Core/BackgroundSchedulePool.h +++ b/src/Core/BackgroundSchedulePool.h @@ -57,7 +57,9 @@ public: ~BackgroundSchedulePool(); private: - using Threads = std::vector; + /// BackgroundSchedulePool schedules a task on its own task queue, there's no need to construct/restore tracing context on this level. + /// This is also how ThreadPool class treats the tracing context. See ThreadPool for more information. + using Threads = std::vector; void threadFunction(); void delayExecutionThreadFunction(); @@ -83,7 +85,7 @@ private: std::condition_variable delayed_tasks_cond_var; std::mutex delayed_tasks_mutex; /// Thread waiting for next delayed task. - ThreadFromGlobalPool delayed_thread; + ThreadFromGlobalPoolImpl delayed_thread; /// Tasks ordered by scheduled time. DelayedTasks delayed_tasks; diff --git a/tests/queries/0_stateless/02417_opentelemetry_insert_on_distributed_table.reference b/tests/queries/0_stateless/02417_opentelemetry_insert_on_distributed_table.reference index ee5d97c601b..dde07d4540d 100644 --- a/tests/queries/0_stateless/02417_opentelemetry_insert_on_distributed_table.reference +++ b/tests/queries/0_stateless/02417_opentelemetry_insert_on_distributed_table.reference @@ -1,4 +1,8 @@ -{"operation_name":"void DB::DistributedSink::writeToLocal(const Cluster::ShardInfo &, const DB::Block &, size_t)","cluster":"test_cluster_two_shards","shard":"1","rows":"1","bytes":"8"} -{"operation_name":"void DB::StorageDistributedDirectoryMonitor::processFile(const std::string &)","cluster":"test_cluster_two_shards","shard":"2","rows":"1","bytes":"8"} -{"operation_name":"auto DB::DistributedSink::runWritingJob(DB::DistributedSink::JobReplica &, const DB::Block &, size_t)::(anonymous class)::operator()() const","cluster":"test_cluster_two_shards","shard":"1","rows":"1","bytes":"8"} -{"operation_name":"auto DB::DistributedSink::runWritingJob(DB::DistributedSink::JobReplica &, const DB::Block &, size_t)::(anonymous class)::operator()() const","cluster":"test_cluster_two_shards","shard":"2","rows":"1","bytes":"8"} +{"operation_name":"void DB::DistributedSink::writeToLocal(const Cluster::ShardInfo &, const DB::Block &, size_t)","cluster":"test_cluster_two_shards_localhost","shard":"1","rows":"1","bytes":"8"} +{"operation_name":"void DB::DistributedSink::writeToLocal(const Cluster::ShardInfo &, const DB::Block &, size_t)","cluster":"test_cluster_two_shards_localhost","shard":"2","rows":"1","bytes":"8"} +{"operation_name":"void DB::StorageDistributedDirectoryMonitor::processFile(const std::string &)","cluster":"test_cluster_two_shards_localhost","shard":"1","rows":"1","bytes":"8"} +{"operation_name":"void DB::StorageDistributedDirectoryMonitor::processFile(const std::string &)","cluster":"test_cluster_two_shards_localhost","shard":"2","rows":"1","bytes":"8"} +{"operation_name":"auto DB::DistributedSink::runWritingJob(DB::DistributedSink::JobReplica &, const DB::Block &, size_t)::(anonymous class)::operator()() const","cluster":"test_cluster_two_shards_localhost","shard":"1","rows":"1","bytes":"8"} +{"operation_name":"auto DB::DistributedSink::runWritingJob(DB::DistributedSink::JobReplica &, const DB::Block &, size_t)::(anonymous class)::operator()() const","cluster":"test_cluster_two_shards_localhost","shard":"2","rows":"1","bytes":"8"} +{"operation_name":"auto DB::DistributedSink::runWritingJob(DB::DistributedSink::JobReplica &, const DB::Block &, size_t)::(anonymous class)::operator()() const","cluster":"test_cluster_two_shards_localhost","shard":"1","rows":"1","bytes":"8"} +{"operation_name":"auto DB::DistributedSink::runWritingJob(DB::DistributedSink::JobReplica &, const DB::Block &, size_t)::(anonymous class)::operator()() const","cluster":"test_cluster_two_shards_localhost","shard":"2","rows":"1","bytes":"8"} diff --git a/tests/queries/0_stateless/02417_opentelemetry_insert_on_distributed_table.sh b/tests/queries/0_stateless/02417_opentelemetry_insert_on_distributed_table.sh index 06b35298b89..9ac5f061d4a 100755 --- a/tests/queries/0_stateless/02417_opentelemetry_insert_on_distributed_table.sh +++ b/tests/queries/0_stateless/02417_opentelemetry_insert_on_distributed_table.sh @@ -6,13 +6,18 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) . "$CURDIR"/../shell_config.sh +# This function takes 4 arguments: +# $1 - OpenTelemetry Trace Id +# $2 - value of insert_distributed_sync +# $3 - value of prefer_localhost_replica +# $4 - a String that helps to debug function insert() { - echo "INSERT INTO ${CLICKHOUSE_DATABASE}.dist_opentelemetry SETTINGS insert_distributed_sync=$2 VALUES(1),(2)" | + echo "INSERT INTO ${CLICKHOUSE_DATABASE}.dist_opentelemetry SETTINGS insert_distributed_sync=$2, prefer_localhost_replica=$3 VALUES(1),(2)" | ${CLICKHOUSE_CURL} \ -X POST \ -H "traceparent: 00-$1-5150000000000515-01" \ - -H "tracestate: some custom state" \ + -H "tracestate: $4" \ "${CLICKHOUSE_URL}" \ --data @- } @@ -32,6 +37,7 @@ ${CLICKHOUSE_CLIENT} -nq " AND lower(hex(trace_id)) = '${1}' AND attribute['clickhouse.distributed'] = '${CLICKHOUSE_DATABASE}.dist_opentelemetry' AND attribute['clickhouse.remote'] = '${CLICKHOUSE_DATABASE}.local_opentelemetry' + ORDER BY attribute['clickhouse.shard_num'] Format JSONEachRow ;" } @@ -44,22 +50,36 @@ ${CLICKHOUSE_CLIENT} -nq " DROP TABLE IF EXISTS ${CLICKHOUSE_DATABASE}.dist_opentelemetry; DROP TABLE IF EXISTS ${CLICKHOUSE_DATABASE}.local_opentelemetry; -CREATE TABLE ${CLICKHOUSE_DATABASE}.dist_opentelemetry (key UInt64) Engine=Distributed('test_cluster_two_shards', ${CLICKHOUSE_DATABASE}, local_opentelemetry, key % 2); +CREATE TABLE ${CLICKHOUSE_DATABASE}.dist_opentelemetry (key UInt64) Engine=Distributed('test_cluster_two_shards_localhost', ${CLICKHOUSE_DATABASE}, local_opentelemetry, key % 2); CREATE TABLE ${CLICKHOUSE_DATABASE}.local_opentelemetry (key UInt64) Engine=MergeTree ORDER BY key; " # -# ASYNC INSERT test with opentelemetry enabled +# test1 # trace_id=$(${CLICKHOUSE_CLIENT} -q "select lower(hex(generateUUIDv4()))"); -insert $trace_id 0 +insert $trace_id 0 1 "async-insert-writeToLocal" check_span $trace_id # -# SYNC INSERT SYNC test with opentelemetry enabled +# test2 # trace_id=$(${CLICKHOUSE_CLIENT} -q "select lower(hex(generateUUIDv4()))"); -insert $trace_id 1 +insert $trace_id 0 0 "async-insert-writeToRemote" +check_span $trace_id + +# +# test3 +# +trace_id=$(${CLICKHOUSE_CLIENT} -q "select lower(hex(generateUUIDv4()))"); +insert $trace_id 1 1 "sync-insert-writeToLocal" +check_span $trace_id + +# +# test4 +# +trace_id=$(${CLICKHOUSE_CLIENT} -q "select lower(hex(generateUUIDv4()))"); +insert $trace_id 1 0 "sync-insert-writeToRemote" check_span $trace_id # From 7d6903bdc0d2b3d43b13bfadb1e048ba403668ad Mon Sep 17 00:00:00 2001 From: Frank Chen Date: Tue, 13 Sep 2022 00:06:03 +0800 Subject: [PATCH 21/27] Fix Signed-off-by: Frank Chen --- src/Core/BackgroundSchedulePool.cpp | 6 +++--- src/Core/BackgroundSchedulePool.h | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/Core/BackgroundSchedulePool.cpp b/src/Core/BackgroundSchedulePool.cpp index b2adc07d92f..29cd3c1c540 100644 --- a/src/Core/BackgroundSchedulePool.cpp +++ b/src/Core/BackgroundSchedulePool.cpp @@ -149,9 +149,9 @@ BackgroundSchedulePool::BackgroundSchedulePool(size_t size_, CurrentMetrics::Met threads.resize(size_); for (auto & thread : threads) - thread = ThreadFromGlobalPoolWithoutTracingContext([this] { threadFunction(); }); + thread = ThreadFromGlobalPoolNoTracingContextPropagation([this] { threadFunction(); }); - delayed_thread = ThreadFromGlobalPoolWithoutTracingContext([this] { delayExecutionThreadFunction(); }); + delayed_thread = ThreadFromGlobalPoolNoTracingContextPropagation([this] { delayExecutionThreadFunction(); }); } @@ -168,7 +168,7 @@ void BackgroundSchedulePool::increaseThreadsCount(size_t new_threads_count) threads.resize(new_threads_count); for (size_t i = old_threads_count; i < new_threads_count; ++i) - threads[i] = ThreadFromGlobalPoolWithoutTracingContext([this] { threadFunction(); }); + threads[i] = ThreadFromGlobalPoolNoTracingContextPropagation([this] { threadFunction(); }); } diff --git a/src/Core/BackgroundSchedulePool.h b/src/Core/BackgroundSchedulePool.h index e7abc99a4a8..1001d98e643 100644 --- a/src/Core/BackgroundSchedulePool.h +++ b/src/Core/BackgroundSchedulePool.h @@ -59,7 +59,7 @@ public: private: /// BackgroundSchedulePool schedules a task on its own task queue, there's no need to construct/restore tracing context on this level. /// This is also how ThreadPool class treats the tracing context. See ThreadPool for more information. - using Threads = std::vector; + using Threads = std::vector; void threadFunction(); void delayExecutionThreadFunction(); @@ -85,7 +85,7 @@ private: std::condition_variable delayed_tasks_cond_var; std::mutex delayed_tasks_mutex; /// Thread waiting for next delayed task. - ThreadFromGlobalPoolImpl delayed_thread; + ThreadFromGlobalPoolNoTracingContextPropagation delayed_thread; /// Tasks ordered by scheduled time. DelayedTasks delayed_tasks; From 7e1f2901daa68da3765aebfc6efa937f4ad9d03c Mon Sep 17 00:00:00 2001 From: Frank Chen Date: Tue, 13 Sep 2022 00:06:17 +0800 Subject: [PATCH 22/27] Fix Signed-off-by: Frank Chen --- src/Common/ThreadPool.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Common/ThreadPool.h b/src/Common/ThreadPool.h index 45f3455bf8f..af221449be9 100644 --- a/src/Common/ThreadPool.h +++ b/src/Common/ThreadPool.h @@ -270,7 +270,7 @@ protected: /// you need to use class, or you need to use ThreadFromGlobalPool below. /// /// See the comments of ThreadPool below to know how it works. -using ThreadFromGlobalPoolWithoutTracingContext = ThreadFromGlobalPoolImpl; +using ThreadFromGlobalPoolNoTracingContextPropagation = ThreadFromGlobalPoolImpl; /// An alias of thread that execute jobs/tasks on global thread pool by implicit passing tracing context on current thread to underlying worker as parent tracing context. /// If jobs/tasks are directly scheduled by using APIs of this class, you need to use this class or you need to use class above. @@ -288,5 +288,5 @@ using ThreadFromGlobalPool = ThreadFromGlobalPoolImpl; /// /// To make sure the tracing context is correctly propagated, we explicitly disable context propagation(including initialization and de-initialization) at underlying worker level. /// -using ThreadPool = ThreadPoolImpl; +using ThreadPool = ThreadPoolImpl; From 20191932dfe6a4d1dcb3def80b24f9fe19a99167 Mon Sep 17 00:00:00 2001 From: Frank Chen Date: Tue, 13 Sep 2022 00:41:05 +0800 Subject: [PATCH 23/27] Fix style Signed-off-by: Frank Chen --- src/Common/ThreadPool.h | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/Common/ThreadPool.h b/src/Common/ThreadPool.h index af221449be9..76ada9e0d75 100644 --- a/src/Common/ThreadPool.h +++ b/src/Common/ThreadPool.h @@ -266,7 +266,7 @@ protected: /// Schedule jobs/tasks on global thread pool without implicit passing tracing context on current thread to underlying worker as parent tracing context. /// -/// If you implement your own job/task scheduling upon global thread pool or schedules a long time running job in a infinite loop way, +/// If you implement your own job/task scheduling upon global thread pool or schedules a long time running job in a infinite loop way, /// you need to use class, or you need to use ThreadFromGlobalPool below. /// /// See the comments of ThreadPool below to know how it works. @@ -289,4 +289,3 @@ using ThreadFromGlobalPool = ThreadFromGlobalPoolImpl; /// To make sure the tracing context is correctly propagated, we explicitly disable context propagation(including initialization and de-initialization) at underlying worker level. /// using ThreadPool = ThreadPoolImpl; - From b2cc6a8cc6d14cf743c53c29485ea220055529cd Mon Sep 17 00:00:00 2001 From: Antonio Andelic Date: Mon, 12 Sep 2022 18:19:41 +0000 Subject: [PATCH 24/27] Use promise/future --- src/Coordination/KeeperDispatcher.cpp | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/src/Coordination/KeeperDispatcher.cpp b/src/Coordination/KeeperDispatcher.cpp index 9684f085f4a..8084bf1d513 100644 --- a/src/Coordination/KeeperDispatcher.cpp +++ b/src/Coordination/KeeperDispatcher.cpp @@ -372,6 +372,7 @@ void KeeperDispatcher::shutdown() if (hasLeader()) { + close_requests.reserve(session_to_response_callback.size()); // send to leader CLOSE requests for active sessions for (const auto & [session, response] : session_to_response_callback) { @@ -395,15 +396,18 @@ void KeeperDispatcher::shutdown() { LOG_INFO(log, "Trying to close {} session(s)", close_requests.size()); const auto raft_result = server->putRequestBatch(close_requests); - auto sessions_closing_done = std::make_shared(); - raft_result->when_ready([sessions_closing_done](nuraft::cmd_result> & /*result*/, nuraft::ptr & /*exception*/) - { - sessions_closing_done->set(); - }); + auto sessions_closing_done_promise = std::make_shared>(); + auto sessions_closing_done = sessions_closing_done_promise->get_future(); + raft_result->when_ready([sessions_closing_done_promise = std::move(sessions_closing_done_promise)]( + nuraft::cmd_result> & /*result*/, + nuraft::ptr & /*exception*/) { sessions_closing_done_promise->set_value(); }); auto session_shutdown_timeout = configuration_and_settings->coordination_settings->session_shutdown_timeout.totalMilliseconds(); - if (!sessions_closing_done->tryWait(session_shutdown_timeout)) - LOG_WARNING(log, "Failed to close sessions in {}ms. If they are not closed, they will be closed after session timeout.", session_shutdown_timeout); + if (sessions_closing_done.wait_for(std::chrono::milliseconds(session_shutdown_timeout)) != std::future_status::ready) + LOG_WARNING( + log, + "Failed to close sessions in {}ms. If they are not closed, they will be closed after session timeout.", + session_shutdown_timeout); } if (server) From 569b4bb63102472328f0ea02bb16cd0532046465 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Mon, 12 Sep 2022 16:16:40 +0200 Subject: [PATCH 25/27] Add ability to automatically comment SQL queries in clickhouse-client/local This is like Alt-# in readline, it is useful when you need to look something else, and need to save current query/command somewhere, and commented lin the history is a good option. Signed-off-by: Azat Khuzhin --- base/base/ReplxxLineReader.cpp | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/base/base/ReplxxLineReader.cpp b/base/base/ReplxxLineReader.cpp index b7c18110503..75c48f690f8 100644 --- a/base/base/ReplxxLineReader.cpp +++ b/base/base/ReplxxLineReader.cpp @@ -220,6 +220,35 @@ ReplxxLineReader::ReplxxLineReader( rx.bind_key(Replxx::KEY::control('W'), [this](char32_t code) { return rx.invoke(Replxx::ACTION::KILL_TO_WHITESPACE_ON_LEFT, code); }); rx.bind_key(Replxx::KEY::meta('E'), [this](char32_t) { openEditor(); return Replxx::ACTION_RESULT::CONTINUE; }); + + /// readline insert-comment + auto insert_comment_action = [this](char32_t code) + { + replxx::Replxx::State state(rx.get_state()); + const char * line = state.text(); + const char * line_end = line + strlen(line); + + std::string commented_line; + if (std::find(line, line_end, '\n') != line_end) + { + /// If query has multiple lines, multiline comment is used over + /// commenting each line separately for easier uncomment (though + /// with invoking editor it is simpler to uncomment multiple lines) + /// + /// Note, that using multiline comment is OK even with nested + /// comments, since nested comments are supported. + commented_line = fmt::format("/* {} */", state.text()); + } + else + { + // In a simplest case use simple comment. + commented_line = fmt::format("-- {}", state.text()); + } + rx.set_state(replxx::Replxx::State(commented_line.c_str(), commented_line.size())); + + return rx.invoke(Replxx::ACTION::COMMIT_LINE, code); + }; + rx.bind_key(Replxx::KEY::meta('#'), insert_comment_action); } ReplxxLineReader::~ReplxxLineReader() From 45cde902190310ad4f189850aa265aabfea48acd Mon Sep 17 00:00:00 2001 From: Antonio Andelic Date: Tue, 13 Sep 2022 09:51:31 +0000 Subject: [PATCH 26/27] Fix test --- tests/integration/test_keeper_session/test.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/integration/test_keeper_session/test.py b/tests/integration/test_keeper_session/test.py index bb72a30359d..4b3aa7e3fdf 100644 --- a/tests/integration/test_keeper_session/test.py +++ b/tests/integration/test_keeper_session/test.py @@ -189,3 +189,5 @@ def test_session_close_shutdown(started_cluster): node2.stop_clickhouse() assert node1_zk.exists(eph_node) == None + + node2.start_clickhouse() From a6b5ffec5d9f9af25f339f9f73d5e382e1bde62d Mon Sep 17 00:00:00 2001 From: Antonio Andelic Date: Tue, 13 Sep 2022 09:51:46 +0000 Subject: [PATCH 27/27] Polishing --- src/Coordination/KeeperDispatcher.cpp | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/src/Coordination/KeeperDispatcher.cpp b/src/Coordination/KeeperDispatcher.cpp index 8084bf1d513..261e43d80e4 100644 --- a/src/Coordination/KeeperDispatcher.cpp +++ b/src/Coordination/KeeperDispatcher.cpp @@ -376,13 +376,15 @@ void KeeperDispatcher::shutdown() // send to leader CLOSE requests for active sessions for (const auto & [session, response] : session_to_response_callback) { - Coordination::ZooKeeperRequestPtr request = Coordination::ZooKeeperRequestFactory::instance().get(Coordination::OpNum::Close); + auto request = Coordination::ZooKeeperRequestFactory::instance().get(Coordination::OpNum::Close); request->xid = Coordination::CLOSE_XID; - KeeperStorage::RequestForSession request_info; - request_info.request = request; using namespace std::chrono; - request_info.time = duration_cast(system_clock::now().time_since_epoch()).count(); - request_info.session_id = session; + KeeperStorage::RequestForSession request_info + { + .session_id = session, + .time = duration_cast(system_clock::now().time_since_epoch()).count(), + .request = std::move(request), + }; close_requests.push_back(std::move(request_info)); } @@ -461,13 +463,15 @@ void KeeperDispatcher::sessionCleanerTask() LOG_INFO(log, "Found dead session {}, will try to close it", dead_session); /// Close session == send close request to raft server - Coordination::ZooKeeperRequestPtr request = Coordination::ZooKeeperRequestFactory::instance().get(Coordination::OpNum::Close); + auto request = Coordination::ZooKeeperRequestFactory::instance().get(Coordination::OpNum::Close); request->xid = Coordination::CLOSE_XID; - KeeperStorage::RequestForSession request_info; - request_info.request = request; using namespace std::chrono; - request_info.time = duration_cast(system_clock::now().time_since_epoch()).count(); - request_info.session_id = dead_session; + KeeperStorage::RequestForSession request_info + { + .session_id = dead_session, + .time = duration_cast(system_clock::now().time_since_epoch()).count(), + .request = std::move(request), + }; { std::lock_guard lock(push_request_mutex); if (!requests_queue->push(std::move(request_info)))