mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-19 16:20:50 +00:00
Add desperate instrumentation for debugging deadlock in MultiplexedConnections (#54940)
This commit is contained in:
parent
23bc286280
commit
1ac2247b3c
@ -289,3 +289,13 @@ inline void writeBinByte(UInt8 byte, void * out)
|
||||
{
|
||||
memcpy(out, &impl::bin_byte_to_char_table[static_cast<size_t>(byte) * 8], 8);
|
||||
}
|
||||
|
||||
/// Converts byte array to a hex string. Useful for debug logging.
|
||||
inline std::string hexString(const void * data, size_t size)
|
||||
{
|
||||
const char * p = reinterpret_cast<const char *>(data);
|
||||
std::string s(size * 2, '\0');
|
||||
for (size_t i = 0; i < size; ++i)
|
||||
writeHexByteLowercase(p[i], s.data() + i * 2);
|
||||
return s;
|
||||
}
|
||||
|
@ -1,10 +1,13 @@
|
||||
#include <Client/MultiplexedConnections.h>
|
||||
|
||||
#include <Common/thread_local_rng.h>
|
||||
#include <Common/logger_useful.h>
|
||||
#include <Core/Protocol.h>
|
||||
#include <IO/ConnectionTimeouts.h>
|
||||
#include <IO/Operators.h>
|
||||
#include <Interpreters/ClientInfo.h>
|
||||
#include <base/getThreadId.h>
|
||||
#include <base/hex.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -19,6 +22,13 @@ namespace ErrorCodes
|
||||
}
|
||||
|
||||
|
||||
#define MUTEX_LOCK_TEMPORARY_DEBUG_INSTRUMENTATION \
|
||||
mutex_last_locked_by.store((getThreadId() << 32) | __LINE__); \
|
||||
memcpy(mutex_memory_dump.data(), &cancel_mutex, mutex_memory_dump.size()); \
|
||||
mutex_locked += 1; \
|
||||
SCOPE_EXIT({ mutex_locked -= 1; });
|
||||
|
||||
|
||||
MultiplexedConnections::MultiplexedConnections(Connection & connection, const Settings & settings_, const ThrottlerPtr & throttler)
|
||||
: settings(settings_)
|
||||
{
|
||||
@ -73,6 +83,7 @@ MultiplexedConnections::MultiplexedConnections(
|
||||
void MultiplexedConnections::sendScalarsData(Scalars & data)
|
||||
{
|
||||
std::lock_guard lock(cancel_mutex);
|
||||
MUTEX_LOCK_TEMPORARY_DEBUG_INSTRUMENTATION
|
||||
|
||||
if (!sent_query)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot send scalars data: query not yet sent.");
|
||||
@ -88,6 +99,7 @@ void MultiplexedConnections::sendScalarsData(Scalars & data)
|
||||
void MultiplexedConnections::sendExternalTablesData(std::vector<ExternalTablesData> & data)
|
||||
{
|
||||
std::lock_guard lock(cancel_mutex);
|
||||
MUTEX_LOCK_TEMPORARY_DEBUG_INSTRUMENTATION
|
||||
|
||||
if (!sent_query)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot send external tables data: query not yet sent.");
|
||||
@ -116,6 +128,7 @@ void MultiplexedConnections::sendQuery(
|
||||
bool with_pending_data)
|
||||
{
|
||||
std::lock_guard lock(cancel_mutex);
|
||||
MUTEX_LOCK_TEMPORARY_DEBUG_INSTRUMENTATION
|
||||
|
||||
if (sent_query)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Query already sent.");
|
||||
@ -173,6 +186,7 @@ void MultiplexedConnections::sendQuery(
|
||||
void MultiplexedConnections::sendIgnoredPartUUIDs(const std::vector<UUID> & uuids)
|
||||
{
|
||||
std::lock_guard lock(cancel_mutex);
|
||||
MUTEX_LOCK_TEMPORARY_DEBUG_INSTRUMENTATION
|
||||
|
||||
if (sent_query)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot send uuids after query is sent.");
|
||||
@ -189,6 +203,7 @@ void MultiplexedConnections::sendIgnoredPartUUIDs(const std::vector<UUID> & uuid
|
||||
void MultiplexedConnections::sendReadTaskResponse(const String & response)
|
||||
{
|
||||
std::lock_guard lock(cancel_mutex);
|
||||
MUTEX_LOCK_TEMPORARY_DEBUG_INSTRUMENTATION
|
||||
if (cancelled)
|
||||
return;
|
||||
current_connection->sendReadTaskResponse(response);
|
||||
@ -198,6 +213,7 @@ void MultiplexedConnections::sendReadTaskResponse(const String & response)
|
||||
void MultiplexedConnections::sendMergeTreeReadTaskResponse(const ParallelReadResponse & response)
|
||||
{
|
||||
std::lock_guard lock(cancel_mutex);
|
||||
MUTEX_LOCK_TEMPORARY_DEBUG_INSTRUMENTATION
|
||||
if (cancelled)
|
||||
return;
|
||||
current_connection->sendMergeTreeReadTaskResponse(response);
|
||||
@ -207,13 +223,29 @@ void MultiplexedConnections::sendMergeTreeReadTaskResponse(const ParallelReadRes
|
||||
Packet MultiplexedConnections::receivePacket()
|
||||
{
|
||||
std::lock_guard lock(cancel_mutex);
|
||||
MUTEX_LOCK_TEMPORARY_DEBUG_INSTRUMENTATION
|
||||
Packet packet = receivePacketUnlocked({});
|
||||
return packet;
|
||||
}
|
||||
|
||||
void MultiplexedConnections::disconnect()
|
||||
{
|
||||
std::lock_guard lock(cancel_mutex);
|
||||
/// We've seen this lock mysteriously get stuck forever, without any other thread seeming to
|
||||
/// hold the mutex. This is temporary code to print some extra information next time it happens.
|
||||
/// std::lock_guard lock(cancel_mutex);
|
||||
if (!cancel_mutex.try_lock_for(std::chrono::hours(1)))
|
||||
{
|
||||
UInt64 last_locked = mutex_last_locked_by.load();
|
||||
std::array<UInt8, sizeof(std::timed_mutex)> new_memory_dump;
|
||||
memcpy(new_memory_dump.data(), &cancel_mutex, new_memory_dump.size());
|
||||
LOG_ERROR(&Poco::Logger::get("MultiplexedConnections"), "Deadlock in MultiplexedConnections::disconnect()! Mutex was last (instrumentedly) locked by thread {} on line {}, lock balance: {}, mutex memory when last locked: {}, mutex memory now: {}", last_locked >> 32, last_locked & 0xffffffff, mutex_locked.load(), hexString(mutex_memory_dump.data(), mutex_memory_dump.size()), hexString(new_memory_dump.data(), new_memory_dump.size()));
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Deadlock in MultiplexedConnections::disconnect()");
|
||||
}
|
||||
#pragma clang diagnostic push
|
||||
#pragma clang diagnostic ignored "-Wthread-safety-analysis"
|
||||
std::lock_guard lock(cancel_mutex, std::adopt_lock);
|
||||
#pragma clang diagnostic pop
|
||||
MUTEX_LOCK_TEMPORARY_DEBUG_INSTRUMENTATION
|
||||
|
||||
for (ReplicaState & state : replica_states)
|
||||
{
|
||||
@ -229,6 +261,7 @@ void MultiplexedConnections::disconnect()
|
||||
void MultiplexedConnections::sendCancel()
|
||||
{
|
||||
std::lock_guard lock(cancel_mutex);
|
||||
MUTEX_LOCK_TEMPORARY_DEBUG_INSTRUMENTATION
|
||||
|
||||
if (!sent_query || cancelled)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot cancel. Either no query sent or already cancelled.");
|
||||
@ -246,6 +279,7 @@ void MultiplexedConnections::sendCancel()
|
||||
Packet MultiplexedConnections::drain()
|
||||
{
|
||||
std::lock_guard lock(cancel_mutex);
|
||||
MUTEX_LOCK_TEMPORARY_DEBUG_INSTRUMENTATION
|
||||
|
||||
if (!cancelled)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot drain connections: cancel first.");
|
||||
@ -286,6 +320,7 @@ Packet MultiplexedConnections::drain()
|
||||
std::string MultiplexedConnections::dumpAddresses() const
|
||||
{
|
||||
std::lock_guard lock(cancel_mutex);
|
||||
MUTEX_LOCK_TEMPORARY_DEBUG_INSTRUMENTATION
|
||||
return dumpAddressesUnlocked();
|
||||
}
|
||||
|
||||
|
@ -105,9 +105,15 @@ private:
|
||||
/// std::nullopt if parallel reading from replicas is not used
|
||||
std::optional<ReplicaInfo> replica_info;
|
||||
|
||||
/// A mutex for the sendCancel function to execute safely
|
||||
/// in separate thread.
|
||||
mutable std::mutex cancel_mutex;
|
||||
/// A mutex for the sendCancel function to execute safely in separate thread.
|
||||
mutable std::timed_mutex cancel_mutex;
|
||||
|
||||
/// Temporary instrumentation to debug a weird deadlock on cancel_mutex.
|
||||
/// TODO: Once the investigation is done, get rid of these, and of INSTRUMENTED_LOCK_MUTEX, and
|
||||
/// change cancel_mutex to std::mutex.
|
||||
mutable std::atomic<UInt64> mutex_last_locked_by{0};
|
||||
mutable std::atomic<Int64> mutex_locked{0};
|
||||
mutable std::array<UInt8, sizeof(std::timed_mutex)> mutex_memory_dump;
|
||||
|
||||
friend struct RemoteQueryExecutorRoutine;
|
||||
};
|
||||
|
Loading…
Reference in New Issue
Block a user