mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 15:12:02 +00:00
Merge pull request #21886 from Avogar/hedged
Add profile event in hedged requests and change timeout from sec to ms.
This commit is contained in:
commit
78c56b8913
@ -13,12 +13,6 @@
|
||||
|
||||
#include <IO/ConnectionTimeouts.h>
|
||||
|
||||
namespace ProfileEvents
|
||||
{
|
||||
extern const Event DistributedConnectionMissingTable;
|
||||
extern const Event DistributedConnectionStaleReplica;
|
||||
}
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
|
@ -1,8 +1,14 @@
|
||||
#if defined(OS_LINUX)
|
||||
|
||||
#include <Client/HedgedConnections.h>
|
||||
#include <Common/ProfileEvents.h>
|
||||
#include <Interpreters/ClientInfo.h>
|
||||
|
||||
namespace ProfileEvents
|
||||
{
|
||||
extern const Event HedgedRequestsChangeReplica;
|
||||
}
|
||||
|
||||
namespace DB
|
||||
{
|
||||
namespace ErrorCodes
|
||||
@ -321,6 +327,7 @@ HedgedConnections::ReplicaLocation HedgedConnections::getReadyReplicaLocation(As
|
||||
offset_states[location.offset].replicas[location.index].is_change_replica_timeout_expired = true;
|
||||
offset_states[location.offset].next_replica_in_process = true;
|
||||
offsets_queue.push(location.offset);
|
||||
ProfileEvents::increment(ProfileEvents::HedgedRequestsChangeReplica);
|
||||
startNewReplica();
|
||||
}
|
||||
else
|
||||
@ -399,11 +406,21 @@ Packet HedgedConnections::receivePacketFromReplica(const ReplicaLocation & repli
|
||||
break;
|
||||
|
||||
case Protocol::Server::EndOfStream:
|
||||
/// Check case when we receive EndOfStream before first not empty data packet
|
||||
/// or positive progress. It may happen if max_parallel_replicas > 1 and
|
||||
/// there is no way to sample data in this query.
|
||||
if (offset_states[replica_location.offset].can_change_replica)
|
||||
disableChangingReplica(replica_location);
|
||||
finishProcessReplica(replica, false);
|
||||
break;
|
||||
|
||||
case Protocol::Server::Exception:
|
||||
default:
|
||||
/// Check case when we receive Exception before first not empty data packet
|
||||
/// or positive progress. It may happen if max_parallel_replicas > 1 and
|
||||
/// there is no way to sample data in this query.
|
||||
if (offset_states[replica_location.offset].can_change_replica)
|
||||
disableChangingReplica(replica_location);
|
||||
finishProcessReplica(replica, true);
|
||||
break;
|
||||
}
|
||||
|
@ -2,10 +2,16 @@
|
||||
|
||||
#include <Client/HedgedConnectionsFactory.h>
|
||||
#include <Common/typeid_cast.h>
|
||||
#include <Common/ProfileEvents.h>
|
||||
|
||||
namespace ProfileEvents
|
||||
{
|
||||
extern const Event HedgedRequestsChangeReplica;
|
||||
}
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int ALL_CONNECTION_TRIES_FAILED;
|
||||
@ -219,6 +225,7 @@ HedgedConnectionsFactory::State HedgedConnectionsFactory::processEpollEvents(boo
|
||||
int index = timeout_fd_to_replica_index[event_fd];
|
||||
replicas[index].change_replica_timeout.reset();
|
||||
++shuffled_pools[index].slowdown_count;
|
||||
ProfileEvents::increment(ProfileEvents::HedgedRequestsChangeReplica);
|
||||
}
|
||||
else
|
||||
throw Exception("Unknown event from epoll", ErrorCodes::LOGICAL_ERROR);
|
||||
|
@ -97,6 +97,8 @@
|
||||
M(DistributedConnectionStaleReplica, "") \
|
||||
M(DistributedConnectionFailAtAll, "Total count when distributed connection fails after all retries finished") \
|
||||
\
|
||||
M(HedgedRequestsChangeReplica, "Total count when timeout for changing replica expired in hedged requests.") \
|
||||
\
|
||||
M(CompileFunction, "Number of times a compilation of generated LLVM code (to create fused function for complex expressions) was initiated.") \
|
||||
M(CompiledFunctionExecute, "Number of times a compiled function was executed.") \
|
||||
M(CompileExpressionsMicroseconds, "Total time spent for compilation of expressions to LLVM code.") \
|
||||
|
@ -13,7 +13,7 @@
|
||||
#define DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC 300
|
||||
/// Timeouts for hedged requests.
|
||||
#define DBMS_DEFAULT_HEDGED_CONNECTION_TIMEOUT_MS 100
|
||||
#define DBMS_DEFAULT_RECEIVE_DATA_TIMEOUT_SEC 2
|
||||
#define DBMS_DEFAULT_RECEIVE_DATA_TIMEOUT_MS 2000
|
||||
/// Timeout for synchronous request-result protocol call (like Ping or TablesStatus).
|
||||
#define DBMS_DEFAULT_SYNC_REQUEST_TIMEOUT_SEC 5
|
||||
#define DBMS_DEFAULT_POLL_INTERVAL 10
|
||||
|
@ -55,8 +55,8 @@ class IColumn;
|
||||
M(Seconds, receive_timeout, DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC, "", 0) \
|
||||
M(Seconds, send_timeout, DBMS_DEFAULT_SEND_TIMEOUT_SEC, "", 0) \
|
||||
M(Seconds, tcp_keep_alive_timeout, 0, "The time in seconds the connection needs to remain idle before TCP starts sending keepalive probes", 0) \
|
||||
M(Milliseconds, hedged_connection_timeout, DBMS_DEFAULT_HEDGED_CONNECTION_TIMEOUT_MS, "Connection timeout for establishing connection with replica for Hedged requests", 0) \
|
||||
M(Seconds, receive_data_timeout, DBMS_DEFAULT_RECEIVE_DATA_TIMEOUT_SEC, "Connection timeout for receiving first packet of data or packet with positive progress from replica", 0) \
|
||||
M(Milliseconds, hedged_connection_timeout_ms, DBMS_DEFAULT_HEDGED_CONNECTION_TIMEOUT_MS, "Connection timeout for establishing connection with replica for Hedged requests", 0) \
|
||||
M(Milliseconds, receive_data_timeout_ms, DBMS_DEFAULT_RECEIVE_DATA_TIMEOUT_MS, "Connection timeout for receiving first packet of data or packet with positive progress from replica", 0) \
|
||||
M(Bool, use_hedged_requests, true, "Use hedged requests for distributed queries", 0) \
|
||||
M(Bool, allow_changing_replica_until_first_data_packet, false, "Allow HedgedConnections to change replica until receiving first data packet", 0) \
|
||||
M(Milliseconds, queue_max_wait_ms, 0, "The wait time in the request queue, if the number of concurrent requests exceeds the maximum.", 0) \
|
||||
|
@ -23,8 +23,8 @@ inline ConnectionTimeouts ConnectionTimeouts::getTCPTimeoutsWithFailover(const S
|
||||
settings.tcp_keep_alive_timeout,
|
||||
0,
|
||||
settings.connect_timeout_with_failover_secure_ms,
|
||||
settings.hedged_connection_timeout,
|
||||
settings.receive_data_timeout);
|
||||
settings.hedged_connection_timeout_ms,
|
||||
settings.receive_data_timeout_ms);
|
||||
}
|
||||
|
||||
inline ConnectionTimeouts ConnectionTimeouts::getHTTPTimeouts(const Context & context)
|
||||
|
@ -3,8 +3,8 @@
|
||||
<profiles>
|
||||
<default>
|
||||
<load_balancing>in_order</load_balancing>
|
||||
<hedged_connection_timeout>100</hedged_connection_timeout>
|
||||
<receive_data_timeout>2</receive_data_timeout>
|
||||
<hedged_connection_timeout_ms>100</hedged_connection_timeout_ms>
|
||||
<receive_data_timeout_ms>2000</receive_data_timeout_ms>
|
||||
</default>
|
||||
</profiles>
|
||||
</yandex>
|
||||
|
@ -83,10 +83,16 @@ def check_settings(node_name, sleep_in_send_tables_status, sleep_in_send_data):
|
||||
assert attempts < 1000
|
||||
|
||||
|
||||
def check_changing_replica_events(expected_count):
|
||||
result = NODES['node'].query("SELECT value FROM system.events WHERE event='HedgedRequestsChangeReplica'")
|
||||
assert int(result) == expected_count
|
||||
|
||||
|
||||
def test_stuck_replica(started_cluster):
|
||||
cluster.pause_container("node_1")
|
||||
|
||||
check_query(expected_replica="node_2")
|
||||
check_changing_replica_events(1)
|
||||
|
||||
result = NODES['node'].query("SELECT slowdowns_count FROM system.clusters WHERE cluster='test_cluster' and host_name='node_1'")
|
||||
|
||||
@ -132,6 +138,7 @@ def test_send_table_status_sleep(started_cluster):
|
||||
check_settings('node_3', 0, 0)
|
||||
|
||||
check_query(expected_replica="node_2")
|
||||
check_changing_replica_events(1)
|
||||
|
||||
|
||||
def test_send_table_status_sleep2(started_cluster):
|
||||
@ -152,6 +159,7 @@ def test_send_table_status_sleep2(started_cluster):
|
||||
check_settings('node_3', 0, 0)
|
||||
|
||||
check_query(expected_replica="node_3")
|
||||
check_changing_replica_events(2)
|
||||
|
||||
|
||||
def test_send_data(started_cluster):
|
||||
@ -172,6 +180,7 @@ def test_send_data(started_cluster):
|
||||
check_settings('node_3', 0, 0)
|
||||
|
||||
check_query(expected_replica="node_2")
|
||||
check_changing_replica_events(1)
|
||||
|
||||
|
||||
def test_send_data2(started_cluster):
|
||||
@ -192,6 +201,7 @@ def test_send_data2(started_cluster):
|
||||
check_settings('node_3', 0, 0)
|
||||
|
||||
check_query(expected_replica="node_3")
|
||||
check_changing_replica_events(2)
|
||||
|
||||
|
||||
def test_combination1(started_cluster):
|
||||
@ -212,6 +222,7 @@ def test_combination1(started_cluster):
|
||||
check_settings('node_3', 0, 0)
|
||||
|
||||
check_query(expected_replica="node_3")
|
||||
check_changing_replica_events(2)
|
||||
|
||||
|
||||
def test_combination2(started_cluster):
|
||||
@ -232,6 +243,7 @@ def test_combination2(started_cluster):
|
||||
check_settings('node_3', 0, 0)
|
||||
|
||||
check_query(expected_replica="node_3")
|
||||
check_changing_replica_events(2)
|
||||
|
||||
|
||||
def test_combination3(started_cluster):
|
||||
@ -252,6 +264,7 @@ def test_combination3(started_cluster):
|
||||
check_settings('node_3', 0, sleep_time)
|
||||
|
||||
check_query(expected_replica="node_2")
|
||||
check_changing_replica_events(3)
|
||||
|
||||
|
||||
def test_combination4(started_cluster):
|
||||
@ -272,6 +285,7 @@ def test_combination4(started_cluster):
|
||||
check_settings('node_3', 2, 0)
|
||||
|
||||
check_query(expected_replica="node_2")
|
||||
check_changing_replica_events(4)
|
||||
|
||||
|
||||
def test_receive_timeout1(started_cluster):
|
||||
@ -294,6 +308,7 @@ def test_receive_timeout1(started_cluster):
|
||||
check_settings('node_3', 0, 1)
|
||||
|
||||
check_query(expected_replica="node_3", receive_timeout=2)
|
||||
check_changing_replica_events(2)
|
||||
|
||||
|
||||
def test_receive_timeout2(started_cluster):
|
||||
@ -317,4 +332,5 @@ def test_receive_timeout2(started_cluster):
|
||||
check_settings('node_3', 2, 0)
|
||||
|
||||
check_query(expected_replica="node_2", receive_timeout=3)
|
||||
check_changing_replica_events(3)
|
||||
|
||||
|
@ -4,8 +4,8 @@
|
||||
<default>
|
||||
<load_balancing>in_order</load_balancing>
|
||||
<max_parallel_replicas>2</max_parallel_replicas>
|
||||
<hedged_connection_timeout>100</hedged_connection_timeout>
|
||||
<receive_data_timeout>2</receive_data_timeout>
|
||||
<hedged_connection_timeout_ms>100</hedged_connection_timeout_ms>
|
||||
<receive_data_timeout_ms>2000</receive_data_timeout_ms>
|
||||
</default>
|
||||
</profiles>
|
||||
</yandex>
|
||||
|
@ -36,7 +36,7 @@ def started_cluster():
|
||||
NODES['node'].query('''CREATE TABLE distributed (id UInt32, date Date) ENGINE =
|
||||
Distributed('test_cluster', 'default', 'replicated')''')
|
||||
|
||||
NODES['node'].query("INSERT INTO distributed VALUES (1, '2020-01-01'), (2, '2020-01-02')")
|
||||
NODES['node'].query("INSERT INTO distributed SELECT number, toDateTime(number) FROM numbers(100)")
|
||||
|
||||
yield cluster
|
||||
|
||||
@ -54,17 +54,21 @@ config = '''<yandex>
|
||||
</yandex>'''
|
||||
|
||||
|
||||
def check_query():
|
||||
QUERY_1 = "SELECT count() FROM distributed"
|
||||
QUERY_2 = "SELECT * FROM distributed"
|
||||
|
||||
|
||||
def check_query(query=QUERY_1):
|
||||
NODES['node'].restart_clickhouse()
|
||||
|
||||
# Without hedged requests select query will last more than 30 seconds,
|
||||
# with hedged requests it will last just around 1-2 second
|
||||
|
||||
start = time.time()
|
||||
NODES['node'].query("SELECT * FROM distributed");
|
||||
NODES['node'].query(query);
|
||||
query_time = time.time() - start
|
||||
print("Query time:", query_time)
|
||||
|
||||
|
||||
assert query_time < 5
|
||||
|
||||
|
||||
@ -81,6 +85,11 @@ def check_settings(node_name, sleep_in_send_tables_status, sleep_in_send_data):
|
||||
assert attempts < 1000
|
||||
|
||||
|
||||
def check_changing_replica_events(expected_count):
|
||||
result = NODES['node'].query("SELECT value FROM system.events WHERE event='HedgedRequestsChangeReplica'")
|
||||
assert int(result) == expected_count
|
||||
|
||||
|
||||
def test_send_table_status_sleep(started_cluster):
|
||||
NODES['node_1'].replace_config(
|
||||
'/etc/clickhouse-server/users.d/users1.xml',
|
||||
@ -94,9 +103,11 @@ def test_send_table_status_sleep(started_cluster):
|
||||
check_settings('node_2', sleep_time, 0)
|
||||
|
||||
check_query()
|
||||
check_changing_replica_events(2)
|
||||
|
||||
|
||||
def test_send_data(started_cluster):
|
||||
|
||||
NODES['node_1'].replace_config(
|
||||
'/etc/clickhouse-server/users.d/users1.xml',
|
||||
config.format(sleep_in_send_tables_status=0, sleep_in_send_data=sleep_time))
|
||||
@ -109,6 +120,7 @@ def test_send_data(started_cluster):
|
||||
check_settings('node_2', 0, sleep_time)
|
||||
|
||||
check_query()
|
||||
check_changing_replica_events(2)
|
||||
|
||||
|
||||
def test_combination1(started_cluster):
|
||||
@ -129,6 +141,7 @@ def test_combination1(started_cluster):
|
||||
check_settings('node_3', 0, sleep_time)
|
||||
|
||||
check_query()
|
||||
check_changing_replica_events(3)
|
||||
|
||||
|
||||
def test_combination2(started_cluster):
|
||||
@ -155,4 +168,33 @@ def test_combination2(started_cluster):
|
||||
check_settings('node_4', 1, 0)
|
||||
|
||||
check_query()
|
||||
check_changing_replica_events(4)
|
||||
|
||||
|
||||
def test_query_with_no_data_to_sample(started_cluster):
|
||||
NODES['node_1'].replace_config(
|
||||
'/etc/clickhouse-server/users.d/users1.xml',
|
||||
config.format(sleep_in_send_tables_status=0, sleep_in_send_data=sleep_time))
|
||||
|
||||
NODES['node_2'].replace_config(
|
||||
'/etc/clickhouse-server/users.d/users1.xml',
|
||||
config.format(sleep_in_send_tables_status=0, sleep_in_send_data=sleep_time))
|
||||
|
||||
NODES['node_3'].replace_config(
|
||||
'/etc/clickhouse-server/users.d/users1.xml',
|
||||
config.format(sleep_in_send_tables_status=0, sleep_in_send_data=0))
|
||||
|
||||
NODES['node_4'].replace_config(
|
||||
'/etc/clickhouse-server/users.d/users1.xml',
|
||||
config.format(sleep_in_send_tables_status=0, sleep_in_send_data=0))
|
||||
check_settings('node_1', 0, sleep_time)
|
||||
check_settings('node_2', 0, sleep_time)
|
||||
check_settings('node_3', 0, 0)
|
||||
check_settings('node_4', 0, 0)
|
||||
|
||||
# When there is no way to sample data, the whole query will be performed by
|
||||
# the first replica and the second replica will just send EndOfStream,
|
||||
# so we will change only the first replica here.
|
||||
check_query(query=QUERY_2)
|
||||
check_changing_replica_events(1)
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user