mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 15:12:02 +00:00
Merge pull request #23309 from ClickHouse/try-fix-crash-with-unknown-packet
Fix crash in case of unknown packet
This commit is contained in:
commit
44112587d4
@ -13,6 +13,7 @@ namespace ErrorCodes
|
||||
extern const int MISMATCH_REPLICAS_DATA_SOURCES;
|
||||
extern const int NO_AVAILABLE_REPLICA;
|
||||
extern const int TIMEOUT_EXCEEDED;
|
||||
extern const int UNKNOWN_PACKET_FROM_SERVER;
|
||||
}
|
||||
|
||||
|
||||
@ -278,7 +279,22 @@ Packet MultiplexedConnections::receivePacketUnlocked(AsyncCallback async_callbac
|
||||
Packet packet;
|
||||
{
|
||||
AsyncCallbackSetter async_setter(current_connection, std::move(async_callback));
|
||||
packet = current_connection->receivePacket();
|
||||
|
||||
try
|
||||
{
|
||||
packet = current_connection->receivePacket();
|
||||
}
|
||||
catch (Exception & e)
|
||||
{
|
||||
if (e.code() == ErrorCodes::UNKNOWN_PACKET_FROM_SERVER)
|
||||
{
|
||||
/// Exception may happen when packet is received, e.g. when got unknown packet.
|
||||
/// In this case, invalidate replica, so that we would not read from it anymore.
|
||||
current_connection->disconnect();
|
||||
invalidateReplica(state);
|
||||
}
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
switch (packet.type)
|
||||
|
@ -225,6 +225,7 @@ class IColumn;
|
||||
/** Settings for testing hedged requests */ \
|
||||
M(Milliseconds, sleep_in_send_tables_status_ms, 0, "Time to sleep in sending tables status response in TCPHandler", 0) \
|
||||
M(Milliseconds, sleep_in_send_data_ms, 0, "Time to sleep in sending data in TCPHandler", 0) \
|
||||
M(UInt64, unknown_packet_in_send_data, 0, "Send unknown packet instead of data Nth data packet", 0) \
|
||||
\
|
||||
M(Bool, insert_allow_materialized_columns, 0, "If setting is enabled, Allow materialized columns in INSERT.", 0) \
|
||||
M(Seconds, http_connection_timeout, DEFAULT_HTTP_READ_BUFFER_CONNECTION_TIMEOUT, "HTTP connection timeout.", 0) \
|
||||
|
@ -311,6 +311,8 @@ void TCPHandler::runImpl()
|
||||
/// Processing Query
|
||||
state.io = executeQuery(state.query, query_context, false, state.stage, may_have_embedded_data);
|
||||
|
||||
unknown_packet_in_send_data = query_context->getSettingsRef().unknown_packet_in_send_data;
|
||||
|
||||
after_check_cancelled.restart();
|
||||
after_send_progress.restart();
|
||||
|
||||
@ -1472,6 +1474,14 @@ void TCPHandler::sendData(const Block & block)
|
||||
|
||||
try
|
||||
{
|
||||
/// For testing hedged requests
|
||||
if (unknown_packet_in_send_data)
|
||||
{
|
||||
--unknown_packet_in_send_data;
|
||||
if (unknown_packet_in_send_data == 0)
|
||||
writeVarUInt(UInt64(-1), *out);
|
||||
}
|
||||
|
||||
writeVarUInt(Protocol::Server::Data, *out);
|
||||
/// Send external table name (empty name is the main table)
|
||||
writeStringBinary("", *out);
|
||||
|
@ -135,6 +135,8 @@ private:
|
||||
ContextPtr connection_context;
|
||||
ContextPtr query_context;
|
||||
|
||||
size_t unknown_packet_in_send_data = 0;
|
||||
|
||||
/// Streams for reading/writing from/to client connection socket.
|
||||
std::shared_ptr<ReadBuffer> in;
|
||||
std::shared_ptr<WriteBuffer> out;
|
||||
|
9
tests/queries/0_stateless/01822_async_read_from_socket_crash.sh
Executable file
9
tests/queries/0_stateless/01822_async_read_from_socket_crash.sh
Executable file
@ -0,0 +1,9 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
# shellcheck source=../shell_config.sh
|
||||
. "$CURDIR"/../shell_config.sh
|
||||
|
||||
|
||||
|
||||
for _ in {1..10}; do $CLICKHOUSE_CLIENT -q "select number from remote('127.0.0.{2,3}', numbers(20)) limit 8 settings max_block_size = 2, unknown_packet_in_send_data=4, sleep_in_send_data_ms=100, async_socket_for_remote=1 format Null" > /dev/null 2>&1 || true; done
|
Loading…
Reference in New Issue
Block a user