diff --git a/base/base/hex.h b/base/base/hex.h index 937218fec5a..931f220aa08 100644 --- a/base/base/hex.h +++ b/base/base/hex.h @@ -289,3 +289,13 @@ inline void writeBinByte(UInt8 byte, void * out) { memcpy(out, &impl::bin_byte_to_char_table[static_cast(byte) * 8], 8); } + +/// Converts byte array to a hex string. Useful for debug logging. +inline std::string hexString(const void * data, size_t size) +{ + const char * p = reinterpret_cast(data); + std::string s(size * 2, '\0'); + for (size_t i = 0; i < size; ++i) + writeHexByteLowercase(p[i], s.data() + i * 2); + return s; +} diff --git a/src/Client/MultiplexedConnections.cpp b/src/Client/MultiplexedConnections.cpp index c0218568d67..80d35b7dc1a 100644 --- a/src/Client/MultiplexedConnections.cpp +++ b/src/Client/MultiplexedConnections.cpp @@ -1,10 +1,13 @@ #include #include +#include #include #include #include #include +#include +#include namespace DB { @@ -19,6 +22,13 @@ namespace ErrorCodes } +#define MUTEX_LOCK_TEMPORARY_DEBUG_INSTRUMENTATION \ + mutex_last_locked_by.store((getThreadId() << 32) | __LINE__); \ + memcpy(mutex_memory_dump.data(), &cancel_mutex, mutex_memory_dump.size()); \ + mutex_locked += 1; \ + SCOPE_EXIT({ mutex_locked -= 1; }); + + MultiplexedConnections::MultiplexedConnections(Connection & connection, const Settings & settings_, const ThrottlerPtr & throttler) : settings(settings_) { @@ -73,6 +83,7 @@ MultiplexedConnections::MultiplexedConnections( void MultiplexedConnections::sendScalarsData(Scalars & data) { std::lock_guard lock(cancel_mutex); + MUTEX_LOCK_TEMPORARY_DEBUG_INSTRUMENTATION if (!sent_query) throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot send scalars data: query not yet sent."); @@ -88,6 +99,7 @@ void MultiplexedConnections::sendScalarsData(Scalars & data) void MultiplexedConnections::sendExternalTablesData(std::vector & data) { std::lock_guard lock(cancel_mutex); + MUTEX_LOCK_TEMPORARY_DEBUG_INSTRUMENTATION if (!sent_query) throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot send external tables data: query not yet sent."); @@ -116,6 +128,7 @@ void MultiplexedConnections::sendQuery( bool with_pending_data) { std::lock_guard lock(cancel_mutex); + MUTEX_LOCK_TEMPORARY_DEBUG_INSTRUMENTATION if (sent_query) throw Exception(ErrorCodes::LOGICAL_ERROR, "Query already sent."); @@ -173,6 +186,7 @@ void MultiplexedConnections::sendQuery( void MultiplexedConnections::sendIgnoredPartUUIDs(const std::vector & uuids) { std::lock_guard lock(cancel_mutex); + MUTEX_LOCK_TEMPORARY_DEBUG_INSTRUMENTATION if (sent_query) throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot send uuids after query is sent."); @@ -189,6 +203,7 @@ void MultiplexedConnections::sendIgnoredPartUUIDs(const std::vector & uuid void MultiplexedConnections::sendReadTaskResponse(const String & response) { std::lock_guard lock(cancel_mutex); + MUTEX_LOCK_TEMPORARY_DEBUG_INSTRUMENTATION if (cancelled) return; current_connection->sendReadTaskResponse(response); @@ -198,6 +213,7 @@ void MultiplexedConnections::sendReadTaskResponse(const String & response) void MultiplexedConnections::sendMergeTreeReadTaskResponse(const ParallelReadResponse & response) { std::lock_guard lock(cancel_mutex); + MUTEX_LOCK_TEMPORARY_DEBUG_INSTRUMENTATION if (cancelled) return; current_connection->sendMergeTreeReadTaskResponse(response); @@ -207,13 +223,29 @@ void MultiplexedConnections::sendMergeTreeReadTaskResponse(const ParallelReadRes Packet MultiplexedConnections::receivePacket() { std::lock_guard lock(cancel_mutex); + MUTEX_LOCK_TEMPORARY_DEBUG_INSTRUMENTATION Packet packet = receivePacketUnlocked({}); return packet; } void MultiplexedConnections::disconnect() { - std::lock_guard lock(cancel_mutex); + /// We've seen this lock mysteriously get stuck forever, without any other thread seeming to + /// hold the mutex. This is temporary code to print some extra information next time it happens. + /// std::lock_guard lock(cancel_mutex); + if (!cancel_mutex.try_lock_for(std::chrono::hours(1))) + { + UInt64 last_locked = mutex_last_locked_by.load(); + std::array new_memory_dump; + memcpy(new_memory_dump.data(), &cancel_mutex, new_memory_dump.size()); + LOG_ERROR(&Poco::Logger::get("MultiplexedConnections"), "Deadlock in MultiplexedConnections::disconnect()! Mutex was last (instrumentedly) locked by thread {} on line {}, lock balance: {}, mutex memory when last locked: {}, mutex memory now: {}", last_locked >> 32, last_locked & 0xffffffff, mutex_locked.load(), hexString(mutex_memory_dump.data(), mutex_memory_dump.size()), hexString(new_memory_dump.data(), new_memory_dump.size())); + throw Exception(ErrorCodes::LOGICAL_ERROR, "Deadlock in MultiplexedConnections::disconnect()"); + } +#pragma clang diagnostic push +#pragma clang diagnostic ignored "-Wthread-safety-analysis" + std::lock_guard lock(cancel_mutex, std::adopt_lock); +#pragma clang diagnostic pop + MUTEX_LOCK_TEMPORARY_DEBUG_INSTRUMENTATION for (ReplicaState & state : replica_states) { @@ -229,6 +261,7 @@ void MultiplexedConnections::disconnect() void MultiplexedConnections::sendCancel() { std::lock_guard lock(cancel_mutex); + MUTEX_LOCK_TEMPORARY_DEBUG_INSTRUMENTATION if (!sent_query || cancelled) throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot cancel. Either no query sent or already cancelled."); @@ -246,6 +279,7 @@ void MultiplexedConnections::sendCancel() Packet MultiplexedConnections::drain() { std::lock_guard lock(cancel_mutex); + MUTEX_LOCK_TEMPORARY_DEBUG_INSTRUMENTATION if (!cancelled) throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot drain connections: cancel first."); @@ -286,6 +320,7 @@ Packet MultiplexedConnections::drain() std::string MultiplexedConnections::dumpAddresses() const { std::lock_guard lock(cancel_mutex); + MUTEX_LOCK_TEMPORARY_DEBUG_INSTRUMENTATION return dumpAddressesUnlocked(); } diff --git a/src/Client/MultiplexedConnections.h b/src/Client/MultiplexedConnections.h index 371639d14c4..4c776e6e168 100644 --- a/src/Client/MultiplexedConnections.h +++ b/src/Client/MultiplexedConnections.h @@ -105,9 +105,15 @@ private: /// std::nullopt if parallel reading from replicas is not used std::optional replica_info; - /// A mutex for the sendCancel function to execute safely - /// in separate thread. - mutable std::mutex cancel_mutex; + /// A mutex for the sendCancel function to execute safely in separate thread. + mutable std::timed_mutex cancel_mutex; + + /// Temporary instrumentation to debug a weird deadlock on cancel_mutex. + /// TODO: Once the investigation is done, get rid of these, and of INSTRUMENTED_LOCK_MUTEX, and + /// change cancel_mutex to std::mutex. + mutable std::atomic mutex_last_locked_by{0}; + mutable std::atomic mutex_locked{0}; + mutable std::array mutex_memory_dump; friend struct RemoteQueryExecutorRoutine; };