diff --git a/base/base/ReplxxLineReader.cpp b/base/base/ReplxxLineReader.cpp index b7c18110503..75c48f690f8 100644 --- a/base/base/ReplxxLineReader.cpp +++ b/base/base/ReplxxLineReader.cpp @@ -220,6 +220,35 @@ ReplxxLineReader::ReplxxLineReader( rx.bind_key(Replxx::KEY::control('W'), [this](char32_t code) { return rx.invoke(Replxx::ACTION::KILL_TO_WHITESPACE_ON_LEFT, code); }); rx.bind_key(Replxx::KEY::meta('E'), [this](char32_t) { openEditor(); return Replxx::ACTION_RESULT::CONTINUE; }); + + /// readline insert-comment + auto insert_comment_action = [this](char32_t code) + { + replxx::Replxx::State state(rx.get_state()); + const char * line = state.text(); + const char * line_end = line + strlen(line); + + std::string commented_line; + if (std::find(line, line_end, '\n') != line_end) + { + /// If query has multiple lines, multiline comment is used over + /// commenting each line separately for easier uncomment (though + /// with invoking editor it is simpler to uncomment multiple lines) + /// + /// Note, that using multiline comment is OK even with nested + /// comments, since nested comments are supported. + commented_line = fmt::format("/* {} */", state.text()); + } + else + { + // In a simplest case use simple comment. + commented_line = fmt::format("-- {}", state.text()); + } + rx.set_state(replxx::Replxx::State(commented_line.c_str(), commented_line.size())); + + return rx.invoke(Replxx::ACTION::COMMIT_LINE, code); + }; + rx.bind_key(Replxx::KEY::meta('#'), insert_comment_action); } ReplxxLineReader::~ReplxxLineReader() diff --git a/src/Common/ThreadPool.h b/src/Common/ThreadPool.h index fc5377b3783..76ada9e0d75 100644 --- a/src/Common/ThreadPool.h +++ b/src/Common/ThreadPool.h @@ -264,6 +264,18 @@ protected: } }; +/// Schedule jobs/tasks on global thread pool without implicit passing tracing context on current thread to underlying worker as parent tracing context. +/// +/// If you implement your own job/task scheduling upon global thread pool or schedules a long time running job in a infinite loop way, +/// you need to use class, or you need to use ThreadFromGlobalPool below. +/// +/// See the comments of ThreadPool below to know how it works. +using ThreadFromGlobalPoolNoTracingContextPropagation = ThreadFromGlobalPoolImpl; + +/// An alias of thread that execute jobs/tasks on global thread pool by implicit passing tracing context on current thread to underlying worker as parent tracing context. +/// If jobs/tasks are directly scheduled by using APIs of this class, you need to use this class or you need to use class above. +using ThreadFromGlobalPool = ThreadFromGlobalPoolImpl; + /// Recommended thread pool for the case when multiple thread pools are created and destroyed. /// /// The template parameter of ThreadFromGlobalPool is set to false to disable tracing context propagation to underlying worker. @@ -274,9 +286,6 @@ protected: /// which means the tracing context initialized at underlying worker level won't be delete for a very long time. /// This would cause wrong context for further jobs scheduled in ThreadPool. /// -/// To make sure the tracing context are correctly propagated, we explicitly disable context propagation(including initialization and de-initialization) at underlying worker level. +/// To make sure the tracing context is correctly propagated, we explicitly disable context propagation(including initialization and de-initialization) at underlying worker level. /// -using ThreadPool = ThreadPoolImpl>; - -/// An alias for user code to execute a job in the global thread pool -using ThreadFromGlobalPool = ThreadFromGlobalPoolImpl; +using ThreadPool = ThreadPoolImpl; diff --git a/src/Coordination/CoordinationSettings.h b/src/Coordination/CoordinationSettings.h index 5247f5d7ec8..c436c1b6635 100644 --- a/src/Coordination/CoordinationSettings.h +++ b/src/Coordination/CoordinationSettings.h @@ -30,6 +30,7 @@ struct Settings; M(UInt64, snapshot_distance, 100000, "How many log items we have to collect to write new snapshot", 0) \ M(Bool, auto_forwarding, true, "Allow to forward write requests from followers to leader", 0) \ M(Milliseconds, shutdown_timeout, 5000, "How much time we will wait until RAFT shutdown", 0) \ + M(Milliseconds, session_shutdown_timeout, 10000, "How much time we will wait until sessions are closed during shutdown", 0) \ M(Milliseconds, startup_timeout, 180000, "How much time we will wait until RAFT to start.", 0) \ M(LogsLevel, raft_logs_level, LogsLevel::information, "Log internal RAFT logs into main server log level. Valid values: 'trace', 'debug', 'information', 'warning', 'error', 'fatal', 'none'", 0) \ M(UInt64, rotate_log_storage_interval, 100000, "How many records will be stored in one log storage file", 0) \ diff --git a/src/Coordination/KeeperDispatcher.cpp b/src/Coordination/KeeperDispatcher.cpp index 5b376a03b02..261e43d80e4 100644 --- a/src/Coordination/KeeperDispatcher.cpp +++ b/src/Coordination/KeeperDispatcher.cpp @@ -354,9 +354,6 @@ void KeeperDispatcher::shutdown() update_configuration_thread.join(); } - if (server) - server->shutdown(); - KeeperStorage::RequestForSession request_for_session; /// Set session expired for all pending requests @@ -368,10 +365,58 @@ void KeeperDispatcher::shutdown() setResponse(request_for_session.session_id, response); } - /// Clear all registered sessions - std::lock_guard lock(session_to_response_callback_mutex); - session_to_response_callback.clear(); + KeeperStorage::RequestsForSessions close_requests; + { + /// Clear all registered sessions + std::lock_guard lock(session_to_response_callback_mutex); + + if (hasLeader()) + { + close_requests.reserve(session_to_response_callback.size()); + // send to leader CLOSE requests for active sessions + for (const auto & [session, response] : session_to_response_callback) + { + auto request = Coordination::ZooKeeperRequestFactory::instance().get(Coordination::OpNum::Close); + request->xid = Coordination::CLOSE_XID; + using namespace std::chrono; + KeeperStorage::RequestForSession request_info + { + .session_id = session, + .time = duration_cast(system_clock::now().time_since_epoch()).count(), + .request = std::move(request), + }; + + close_requests.push_back(std::move(request_info)); + } + } + + session_to_response_callback.clear(); + } + + // if there is no leader, there is no reason to do CLOSE because it's a write request + if (hasLeader() && !close_requests.empty()) + { + LOG_INFO(log, "Trying to close {} session(s)", close_requests.size()); + const auto raft_result = server->putRequestBatch(close_requests); + auto sessions_closing_done_promise = std::make_shared>(); + auto sessions_closing_done = sessions_closing_done_promise->get_future(); + raft_result->when_ready([sessions_closing_done_promise = std::move(sessions_closing_done_promise)]( + nuraft::cmd_result> & /*result*/, + nuraft::ptr & /*exception*/) { sessions_closing_done_promise->set_value(); }); + + auto session_shutdown_timeout = configuration_and_settings->coordination_settings->session_shutdown_timeout.totalMilliseconds(); + if (sessions_closing_done.wait_for(std::chrono::milliseconds(session_shutdown_timeout)) != std::future_status::ready) + LOG_WARNING( + log, + "Failed to close sessions in {}ms. If they are not closed, they will be closed after session timeout.", + session_shutdown_timeout); + } + + if (server) + server->shutdown(); + CurrentMetrics::set(CurrentMetrics::KeeperAliveConnections, 0); + } catch (...) { @@ -418,13 +463,15 @@ void KeeperDispatcher::sessionCleanerTask() LOG_INFO(log, "Found dead session {}, will try to close it", dead_session); /// Close session == send close request to raft server - Coordination::ZooKeeperRequestPtr request = Coordination::ZooKeeperRequestFactory::instance().get(Coordination::OpNum::Close); + auto request = Coordination::ZooKeeperRequestFactory::instance().get(Coordination::OpNum::Close); request->xid = Coordination::CLOSE_XID; - KeeperStorage::RequestForSession request_info; - request_info.request = request; using namespace std::chrono; - request_info.time = duration_cast(system_clock::now().time_since_epoch()).count(); - request_info.session_id = dead_session; + KeeperStorage::RequestForSession request_info + { + .session_id = dead_session, + .time = duration_cast(system_clock::now().time_since_epoch()).count(), + .request = std::move(request), + }; { std::lock_guard lock(push_request_mutex); if (!requests_queue->push(std::move(request_info))) diff --git a/src/Core/BackgroundSchedulePool.cpp b/src/Core/BackgroundSchedulePool.cpp index b7a33c4930d..29cd3c1c540 100644 --- a/src/Core/BackgroundSchedulePool.cpp +++ b/src/Core/BackgroundSchedulePool.cpp @@ -149,9 +149,9 @@ BackgroundSchedulePool::BackgroundSchedulePool(size_t size_, CurrentMetrics::Met threads.resize(size_); for (auto & thread : threads) - thread = ThreadFromGlobalPool([this] { threadFunction(); }); + thread = ThreadFromGlobalPoolNoTracingContextPropagation([this] { threadFunction(); }); - delayed_thread = ThreadFromGlobalPool([this] { delayExecutionThreadFunction(); }); + delayed_thread = ThreadFromGlobalPoolNoTracingContextPropagation([this] { delayExecutionThreadFunction(); }); } @@ -168,7 +168,7 @@ void BackgroundSchedulePool::increaseThreadsCount(size_t new_threads_count) threads.resize(new_threads_count); for (size_t i = old_threads_count; i < new_threads_count; ++i) - threads[i] = ThreadFromGlobalPool([this] { threadFunction(); }); + threads[i] = ThreadFromGlobalPoolNoTracingContextPropagation([this] { threadFunction(); }); } diff --git a/src/Core/BackgroundSchedulePool.h b/src/Core/BackgroundSchedulePool.h index 36cbad145c9..1001d98e643 100644 --- a/src/Core/BackgroundSchedulePool.h +++ b/src/Core/BackgroundSchedulePool.h @@ -57,7 +57,9 @@ public: ~BackgroundSchedulePool(); private: - using Threads = std::vector; + /// BackgroundSchedulePool schedules a task on its own task queue, there's no need to construct/restore tracing context on this level. + /// This is also how ThreadPool class treats the tracing context. See ThreadPool for more information. + using Threads = std::vector; void threadFunction(); void delayExecutionThreadFunction(); @@ -83,7 +85,7 @@ private: std::condition_variable delayed_tasks_cond_var; std::mutex delayed_tasks_mutex; /// Thread waiting for next delayed task. - ThreadFromGlobalPool delayed_thread; + ThreadFromGlobalPoolNoTracingContextPropagation delayed_thread; /// Tasks ordered by scheduled time. DelayedTasks delayed_tasks; diff --git a/src/Interpreters/SortedBlocksWriter.cpp b/src/Interpreters/SortedBlocksWriter.cpp index 82d501451a6..8f598f3dd3f 100644 --- a/src/Interpreters/SortedBlocksWriter.cpp +++ b/src/Interpreters/SortedBlocksWriter.cpp @@ -28,6 +28,11 @@ namespace CurrentMetrics namespace DB { +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} + namespace { @@ -84,10 +89,13 @@ void SortedBlocksWriter::insert(Block && block) size_t bytes = 0; size_t flush_no = 0; + if (!block.rows()) + return; + { std::lock_guard lock{insert_mutex}; - /// insert bock into BlocksList undef lock + /// insert block into BlocksList under lock inserted_blocks.insert(std::move(block)); size_t total_row_count = inserted_blocks.row_count + row_count_in_flush; @@ -145,7 +153,7 @@ SortedBlocksWriter::TmpFilePtr SortedBlocksWriter::flush(const BlocksList & bloc pipes.emplace_back(std::make_shared(block.cloneEmpty(), Chunk(block.getColumns(), num_rows))); if (pipes.empty()) - return {}; + throw Exception(ErrorCodes::LOGICAL_ERROR, "Empty block"); QueryPipelineBuilder pipeline; pipeline.init(Pipe::unitePipes(std::move(pipes))); diff --git a/src/Storages/Distributed/DirectoryMonitor.cpp b/src/Storages/Distributed/DirectoryMonitor.cpp index 16981d26146..e8d48431a9e 100644 --- a/src/Storages/Distributed/DirectoryMonitor.cpp +++ b/src/Storages/Distributed/DirectoryMonitor.cpp @@ -140,6 +140,11 @@ namespace size_t rows = 0; size_t bytes = 0; + UInt32 shard_num = 0; + std::string cluster; + std::string distributed_table; + std::string remote_table; + /// dumpStructure() of the header -- obsolete std::string block_header_string; Block block_header; @@ -195,6 +200,14 @@ namespace in.getFileName(), distributed_header.revision, DBMS_TCP_PROTOCOL_VERSION); } + if (header_buf.hasPendingData()) + { + readVarUInt(distributed_header.shard_num, header_buf); + readStringBinary(distributed_header.cluster, header_buf); + readStringBinary(distributed_header.distributed_table, header_buf); + readStringBinary(distributed_header.remote_table, header_buf); + } + /// Add handling new data here, for example: /// /// if (header_buf.hasPendingData()) @@ -621,18 +634,23 @@ void StorageDistributedDirectoryMonitor::processFile(const std::string & file_pa ReadBufferFromFile in(file_path); const auto & distributed_header = readDistributedHeader(in, log); - auto connection = pool->get(timeouts, &distributed_header.insert_settings); + thread_trace_context = std::make_unique(__PRETTY_FUNCTION__, + distributed_header.client_info.client_trace_context, + this->storage.getContext()->getOpenTelemetrySpanLog()); + thread_trace_context->root_span.addAttribute("clickhouse.shard_num", distributed_header.shard_num); + thread_trace_context->root_span.addAttribute("clickhouse.cluster", distributed_header.cluster); + thread_trace_context->root_span.addAttribute("clickhouse.distributed", distributed_header.distributed_table); + thread_trace_context->root_span.addAttribute("clickhouse.remote", distributed_header.remote_table); + thread_trace_context->root_span.addAttribute("clickhouse.rows", distributed_header.rows); + thread_trace_context->root_span.addAttribute("clickhouse.bytes", distributed_header.bytes); + auto connection = pool->get(timeouts, &distributed_header.insert_settings); LOG_DEBUG(log, "Sending `{}` to {} ({} rows, {} bytes)", file_path, connection->getDescription(), formatReadableQuantity(distributed_header.rows), formatReadableSizeWithBinarySuffix(distributed_header.bytes)); - thread_trace_context = std::make_unique(__PRETTY_FUNCTION__, - distributed_header.client_info.client_trace_context, - this->storage.getContext()->getOpenTelemetrySpanLog()); - RemoteInserter remote{*connection, timeouts, distributed_header.insert_query, distributed_header.insert_settings, diff --git a/src/Storages/Distributed/DistributedSink.cpp b/src/Storages/Distributed/DistributedSink.cpp index ae72fdd84e2..8099a7f2002 100644 --- a/src/Storages/Distributed/DistributedSink.cpp +++ b/src/Storages/Distributed/DistributedSink.cpp @@ -171,7 +171,6 @@ void DistributedSink::writeAsync(const Block & block) } else { - if (storage.getShardingKeyExpr() && (cluster->getShardsInfo().size() > 1)) return writeSplitAsync(block); @@ -291,6 +290,8 @@ DistributedSink::runWritingJob(JobReplica & job, const Block & current_block, si auto thread_group = CurrentThread::getGroup(); return [this, thread_group, &job, ¤t_block, num_shards]() { + OpenTelemetry::SpanHolder span(__PRETTY_FUNCTION__); + if (thread_group) CurrentThread::attachToIfDetached(thread_group); setThreadName("DistrOutStrProc"); @@ -331,15 +332,19 @@ DistributedSink::runWritingJob(JobReplica & job, const Block & current_block, si const Block & shard_block = (num_shards > 1) ? job.current_shard_block : current_block; const Settings & settings = context->getSettingsRef(); - /// Do not initiate INSERT for empty block. size_t rows = shard_block.rows(); + + span.addAttribute("clickhouse.shard_num", shard_info.shard_num); + span.addAttribute("clickhouse.cluster", this->storage.cluster_name); + span.addAttribute("clickhouse.distributed", this->storage.getStorageID().getFullNameNotQuoted()); + span.addAttribute("clickhouse.remote", [this]() { return storage.remote_database + "." + storage.remote_table; }); + span.addAttribute("clickhouse.rows", rows); + span.addAttribute("clickhouse.bytes", [&shard_block]() { return toString(shard_block.bytes()); }); + + /// Do not initiate INSERT for empty block. if (rows == 0) return; - OpenTelemetry::SpanHolder span(__PRETTY_FUNCTION__); - span.addAttribute("clickhouse.shard_num", shard_info.shard_num); - span.addAttribute("clickhouse.written_rows", rows); - if (!job.is_local_job || !settings.prefer_localhost_replica) { if (!job.executor) @@ -610,20 +615,15 @@ void DistributedSink::writeSplitAsync(const Block & block) void DistributedSink::writeAsyncImpl(const Block & block, size_t shard_id) { - OpenTelemetry::SpanHolder span("DistributedSink::writeAsyncImpl()"); - const auto & shard_info = cluster->getShardsInfo()[shard_id]; const auto & settings = context->getSettingsRef(); Block block_to_send = removeSuperfluousColumns(block); - span.addAttribute("clickhouse.shard_num", shard_info.shard_num); - span.addAttribute("clickhouse.written_rows", block.rows()); - if (shard_info.hasInternalReplication()) { if (shard_info.isLocal() && settings.prefer_localhost_replica) /// Prefer insert into current instance directly - writeToLocal(block_to_send, shard_info.getLocalNodeCount()); + writeToLocal(shard_info, block_to_send, shard_info.getLocalNodeCount()); else { const auto & path = shard_info.insertPathForInternalReplication( @@ -631,13 +631,13 @@ void DistributedSink::writeAsyncImpl(const Block & block, size_t shard_id) settings.use_compact_format_in_distributed_parts_names); if (path.empty()) throw Exception("Directory name for async inserts is empty", ErrorCodes::LOGICAL_ERROR); - writeToShard(block_to_send, {path}); + writeToShard(shard_info, block_to_send, {path}); } } else { if (shard_info.isLocal() && settings.prefer_localhost_replica) - writeToLocal(block_to_send, shard_info.getLocalNodeCount()); + writeToLocal(shard_info, block_to_send, shard_info.getLocalNodeCount()); std::vector dir_names; for (const auto & address : cluster->getShardsAddresses()[shard_id]) @@ -645,30 +645,44 @@ void DistributedSink::writeAsyncImpl(const Block & block, size_t shard_id) dir_names.push_back(address.toFullString(settings.use_compact_format_in_distributed_parts_names)); if (!dir_names.empty()) - writeToShard(block_to_send, dir_names); + writeToShard(shard_info, block_to_send, dir_names); } } -void DistributedSink::writeToLocal(const Block & block, size_t repeats) +void DistributedSink::writeToLocal(const Cluster::ShardInfo & shard_info, const Block & block, size_t repeats) { OpenTelemetry::SpanHolder span(__PRETTY_FUNCTION__); - span.addAttribute("db.statement", this->query_string); + span.addAttribute("clickhouse.shard_num", shard_info.shard_num); + span.addAttribute("clickhouse.cluster", this->storage.cluster_name); + span.addAttribute("clickhouse.distributed", this->storage.getStorageID().getFullNameNotQuoted()); + span.addAttribute("clickhouse.remote", [this]() { return storage.remote_database + "." + storage.remote_table; }); + span.addAttribute("clickhouse.rows", [&block]() { return toString(block.rows()); }); + span.addAttribute("clickhouse.bytes", [&block]() { return toString(block.bytes()); }); - InterpreterInsertQuery interp(query_ast, context, allow_materialized); + try + { + InterpreterInsertQuery interp(query_ast, context, allow_materialized); - auto block_io = interp.execute(); - PushingPipelineExecutor executor(block_io.pipeline); + auto block_io = interp.execute(); + PushingPipelineExecutor executor(block_io.pipeline); - executor.start(); - writeBlockConvert(executor, block, repeats, log); - executor.finish(); + executor.start(); + writeBlockConvert(executor, block, repeats, log); + executor.finish(); + } + catch (...) + { + span.addAttribute(std::current_exception()); + throw; + } } -void DistributedSink::writeToShard(const Block & block, const std::vector & dir_names) +void DistributedSink::writeToShard(const Cluster::ShardInfo & shard_info, const Block & block, const std::vector & dir_names) { OpenTelemetry::SpanHolder span(__PRETTY_FUNCTION__); + span.addAttribute("clickhouse.shard_num", shard_info.shard_num); const auto & settings = context->getSettingsRef(); const auto & distributed_settings = storage.getDistributedSettingsRef(); @@ -759,6 +773,11 @@ void DistributedSink::writeToShard(const Block & block, const std::vectorstorage.cluster_name, header_buf); + writeStringBinary(this->storage.getStorageID().getFullNameNotQuoted(), header_buf); + writeStringBinary(this->storage.remote_database + "." + this->storage.remote_table, header_buf); + /// Add new fields here, for example: /// writeVarUInt(my_new_data, header_buf); /// And note that it is safe, because we have checksum and size for header. diff --git a/src/Storages/Distributed/DistributedSink.h b/src/Storages/Distributed/DistributedSink.h index 668cec22e8b..af0c64cbd78 100644 --- a/src/Storages/Distributed/DistributedSink.h +++ b/src/Storages/Distributed/DistributedSink.h @@ -69,9 +69,9 @@ private: Block removeSuperfluousColumns(Block block) const; /// Increments finished_writings_count after each repeat. - void writeToLocal(const Block & block, size_t repeats); + void writeToLocal(const Cluster::ShardInfo & shard_info, const Block & block, size_t repeats); - void writeToShard(const Block & block, const std::vector & dir_names); + void writeToShard(const Cluster::ShardInfo & shard_info, const Block & block, const std::vector & dir_names); /// Performs synchronous insertion to remote nodes. If timeout_exceeded flag was set, throws. diff --git a/tests/integration/test_keeper_session/configs/keeper_config.xml b/tests/integration/test_keeper_session/configs/keeper_config1.xml similarity index 67% rename from tests/integration/test_keeper_session/configs/keeper_config.xml rename to tests/integration/test_keeper_session/configs/keeper_config1.xml index ed0bb52bd51..fd308fe8a2f 100644 --- a/tests/integration/test_keeper_session/configs/keeper_config.xml +++ b/tests/integration/test_keeper_session/configs/keeper_config1.xml @@ -1,4 +1,4 @@ - + 9181 1 @@ -19,9 +19,19 @@ 1 node1 9234 - true - 3 + + + 2 + node2 + 9234 + true + + + 3 + node3 + 9234 + true - + diff --git a/tests/integration/test_keeper_session/configs/keeper_config2.xml b/tests/integration/test_keeper_session/configs/keeper_config2.xml new file mode 100644 index 00000000000..ad558fbccad --- /dev/null +++ b/tests/integration/test_keeper_session/configs/keeper_config2.xml @@ -0,0 +1,37 @@ + + + 9181 + 2 + /var/lib/clickhouse/coordination/log + /var/lib/clickhouse/coordination/snapshots + * + + + 5000 + 10000 + 5000 + 75 + trace + + + + + 1 + node1 + 9234 + + + 2 + node2 + 9234 + true + + + 3 + node3 + 9234 + true + + + + diff --git a/tests/integration/test_keeper_session/configs/keeper_config3.xml b/tests/integration/test_keeper_session/configs/keeper_config3.xml new file mode 100644 index 00000000000..2a21f959816 --- /dev/null +++ b/tests/integration/test_keeper_session/configs/keeper_config3.xml @@ -0,0 +1,37 @@ + + + 9181 + 3 + /var/lib/clickhouse/coordination/log + /var/lib/clickhouse/coordination/snapshots + * + + + 5000 + 10000 + 5000 + 75 + trace + + + + + 1 + node1 + 9234 + + + 2 + node2 + 9234 + true + + + 3 + node3 + 9234 + true + + + + diff --git a/tests/integration/test_keeper_session/test.py b/tests/integration/test_keeper_session/test.py index 30db4d9548c..4b3aa7e3fdf 100644 --- a/tests/integration/test_keeper_session/test.py +++ b/tests/integration/test_keeper_session/test.py @@ -10,7 +10,15 @@ from kazoo.client import KazooClient cluster = ClickHouseCluster(__file__) node1 = cluster.add_instance( - "node1", main_configs=["configs/keeper_config.xml"], stay_alive=True + "node1", main_configs=["configs/keeper_config1.xml"], stay_alive=True +) + +node2 = cluster.add_instance( + "node2", main_configs=["configs/keeper_config2.xml"], stay_alive=True +) + +node3 = cluster.add_instance( + "node3", main_configs=["configs/keeper_config3.xml"], stay_alive=True ) bool_struct = struct.Struct("B") @@ -61,7 +69,7 @@ def wait_node(node): def wait_nodes(): - for n in [node1]: + for n in [node1, node2, node3]: wait_node(n) @@ -165,3 +173,21 @@ def test_session_timeout(started_cluster): negotiated_timeout, _ = handshake(node1.name, session_timeout=20000, session_id=0) assert negotiated_timeout == 10000 + + +def test_session_close_shutdown(started_cluster): + wait_nodes() + + node1_zk = get_fake_zk(node1.name) + node2_zk = get_fake_zk(node2.name) + + eph_node = "/test_node" + node2_zk.create(eph_node, ephemeral=True) + assert node1_zk.exists(eph_node) != None + + # shutdown while session is active + node2.stop_clickhouse() + + assert node1_zk.exists(eph_node) == None + + node2.start_clickhouse() diff --git a/tests/queries/0_stateless/02070_join_on_disk.reference b/tests/queries/0_stateless/02070_join_on_disk.reference new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/queries/0_stateless/02070_join_on_disk.sql b/tests/queries/0_stateless/02070_join_on_disk.sql new file mode 100644 index 00000000000..eabf31df25f --- /dev/null +++ b/tests/queries/0_stateless/02070_join_on_disk.sql @@ -0,0 +1,21 @@ +-- Regression test when Join stores data on disk and receive empty block. +-- Because of this it does not create empty file, while expect it. + +SET max_threads = 1; +SET join_algorithm = 'auto'; +SET max_rows_in_join = 1000; +SET optimize_aggregation_in_order = 1; +SET max_block_size = 1000; + +DROP TABLE IF EXISTS join_on_disk; + +SYSTEM STOP MERGES join_on_disk; + +CREATE TABLE join_on_disk (id Int) Engine=MergeTree() ORDER BY id; + +INSERT INTO join_on_disk SELECT number as id FROM numbers_mt(50000); +INSERT INTO join_on_disk SELECT number as id FROM numbers_mt(1000); + +SELECT id FROM join_on_disk lhs LEFT JOIN (SELECT id FROM join_on_disk GROUP BY id) rhs USING (id) FORMAT Null; + +DROP TABLE join_on_disk; diff --git a/tests/queries/0_stateless/02417_opentelemetry_insert_on_distributed_table.reference b/tests/queries/0_stateless/02417_opentelemetry_insert_on_distributed_table.reference new file mode 100644 index 00000000000..dde07d4540d --- /dev/null +++ b/tests/queries/0_stateless/02417_opentelemetry_insert_on_distributed_table.reference @@ -0,0 +1,8 @@ +{"operation_name":"void DB::DistributedSink::writeToLocal(const Cluster::ShardInfo &, const DB::Block &, size_t)","cluster":"test_cluster_two_shards_localhost","shard":"1","rows":"1","bytes":"8"} +{"operation_name":"void DB::DistributedSink::writeToLocal(const Cluster::ShardInfo &, const DB::Block &, size_t)","cluster":"test_cluster_two_shards_localhost","shard":"2","rows":"1","bytes":"8"} +{"operation_name":"void DB::StorageDistributedDirectoryMonitor::processFile(const std::string &)","cluster":"test_cluster_two_shards_localhost","shard":"1","rows":"1","bytes":"8"} +{"operation_name":"void DB::StorageDistributedDirectoryMonitor::processFile(const std::string &)","cluster":"test_cluster_two_shards_localhost","shard":"2","rows":"1","bytes":"8"} +{"operation_name":"auto DB::DistributedSink::runWritingJob(DB::DistributedSink::JobReplica &, const DB::Block &, size_t)::(anonymous class)::operator()() const","cluster":"test_cluster_two_shards_localhost","shard":"1","rows":"1","bytes":"8"} +{"operation_name":"auto DB::DistributedSink::runWritingJob(DB::DistributedSink::JobReplica &, const DB::Block &, size_t)::(anonymous class)::operator()() const","cluster":"test_cluster_two_shards_localhost","shard":"2","rows":"1","bytes":"8"} +{"operation_name":"auto DB::DistributedSink::runWritingJob(DB::DistributedSink::JobReplica &, const DB::Block &, size_t)::(anonymous class)::operator()() const","cluster":"test_cluster_two_shards_localhost","shard":"1","rows":"1","bytes":"8"} +{"operation_name":"auto DB::DistributedSink::runWritingJob(DB::DistributedSink::JobReplica &, const DB::Block &, size_t)::(anonymous class)::operator()() const","cluster":"test_cluster_two_shards_localhost","shard":"2","rows":"1","bytes":"8"} diff --git a/tests/queries/0_stateless/02417_opentelemetry_insert_on_distributed_table.sh b/tests/queries/0_stateless/02417_opentelemetry_insert_on_distributed_table.sh new file mode 100755 index 00000000000..9ac5f061d4a --- /dev/null +++ b/tests/queries/0_stateless/02417_opentelemetry_insert_on_distributed_table.sh @@ -0,0 +1,91 @@ +#!/usr/bin/env bash +# Tags: no-fasttest, distributed + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh + + +# This function takes 4 arguments: +# $1 - OpenTelemetry Trace Id +# $2 - value of insert_distributed_sync +# $3 - value of prefer_localhost_replica +# $4 - a String that helps to debug +function insert() +{ + echo "INSERT INTO ${CLICKHOUSE_DATABASE}.dist_opentelemetry SETTINGS insert_distributed_sync=$2, prefer_localhost_replica=$3 VALUES(1),(2)" | + ${CLICKHOUSE_CURL} \ + -X POST \ + -H "traceparent: 00-$1-5150000000000515-01" \ + -H "tracestate: $4" \ + "${CLICKHOUSE_URL}" \ + --data @- +} + +function check_span() +{ +${CLICKHOUSE_CLIENT} -nq " + SYSTEM FLUSH LOGS; + + SELECT operation_name, + attribute['clickhouse.cluster'] AS cluster, + attribute['clickhouse.shard_num'] AS shard, + attribute['clickhouse.rows'] AS rows, + attribute['clickhouse.bytes'] AS bytes + FROM system.opentelemetry_span_log + WHERE finish_date >= yesterday() + AND lower(hex(trace_id)) = '${1}' + AND attribute['clickhouse.distributed'] = '${CLICKHOUSE_DATABASE}.dist_opentelemetry' + AND attribute['clickhouse.remote'] = '${CLICKHOUSE_DATABASE}.local_opentelemetry' + ORDER BY attribute['clickhouse.shard_num'] + Format JSONEachRow + ;" +} + + +# +# Prepare tables for tests +# +${CLICKHOUSE_CLIENT} -nq " +DROP TABLE IF EXISTS ${CLICKHOUSE_DATABASE}.dist_opentelemetry; +DROP TABLE IF EXISTS ${CLICKHOUSE_DATABASE}.local_opentelemetry; + +CREATE TABLE ${CLICKHOUSE_DATABASE}.dist_opentelemetry (key UInt64) Engine=Distributed('test_cluster_two_shards_localhost', ${CLICKHOUSE_DATABASE}, local_opentelemetry, key % 2); +CREATE TABLE ${CLICKHOUSE_DATABASE}.local_opentelemetry (key UInt64) Engine=MergeTree ORDER BY key; +" + +# +# test1 +# +trace_id=$(${CLICKHOUSE_CLIENT} -q "select lower(hex(generateUUIDv4()))"); +insert $trace_id 0 1 "async-insert-writeToLocal" +check_span $trace_id + +# +# test2 +# +trace_id=$(${CLICKHOUSE_CLIENT} -q "select lower(hex(generateUUIDv4()))"); +insert $trace_id 0 0 "async-insert-writeToRemote" +check_span $trace_id + +# +# test3 +# +trace_id=$(${CLICKHOUSE_CLIENT} -q "select lower(hex(generateUUIDv4()))"); +insert $trace_id 1 1 "sync-insert-writeToLocal" +check_span $trace_id + +# +# test4 +# +trace_id=$(${CLICKHOUSE_CLIENT} -q "select lower(hex(generateUUIDv4()))"); +insert $trace_id 1 0 "sync-insert-writeToRemote" +check_span $trace_id + +# +# Cleanup +# +${CLICKHOUSE_CLIENT} -nq " +DROP TABLE ${CLICKHOUSE_DATABASE}.dist_opentelemetry; +DROP TABLE ${CLICKHOUSE_DATABASE}.local_opentelemetry; +"