This commit is contained in:
Pavel Kruglov 2021-02-01 20:09:55 +03:00
parent d27f5114c5
commit 7d9eb966f0
5 changed files with 13 additions and 8 deletions

View File

@ -343,7 +343,7 @@ GetHedgedConnections::ReplicaStatePtr GetHedgedConnections::processEpollEvents(b
else if (timeout_fd_to_replica.find(event_fd) != timeout_fd_to_replica.end()) else if (timeout_fd_to_replica.find(event_fd) != timeout_fd_to_replica.end())
{ {
replica = timeout_fd_to_replica[event_fd]; replica = timeout_fd_to_replica[event_fd];
finish = processTimeoutEvent(replica, replica->active_timeouts[event_fd].get(), non_blocking); finish = processTimeoutEvent(replica, replica->active_timeouts[event_fd], non_blocking);
} }
else else
throw Exception("Unknown event from epoll", ErrorCodes::LOGICAL_ERROR); throw Exception("Unknown event from epoll", ErrorCodes::LOGICAL_ERROR);
@ -476,7 +476,7 @@ void addTimeoutToReplica(
throw Exception("Unknown timeout type", ErrorCodes::BAD_ARGUMENTS); throw Exception("Unknown timeout type", ErrorCodes::BAD_ARGUMENTS);
} }
std::unique_ptr<TimerDescriptor> timeout_descriptor = std::make_unique<TimerDescriptor>(); TimerDescriptorPtr timeout_descriptor = std::make_shared<TimerDescriptor>();
timeout_descriptor->setType(type); timeout_descriptor->setType(type);
timeout_descriptor->setRelative(timeout); timeout_descriptor->setRelative(timeout);
epoll.add(timeout_descriptor->getDescriptor()); epoll.add(timeout_descriptor->getDescriptor());

View File

@ -34,7 +34,7 @@ public:
int index = -1; int index = -1;
int fd = -1; int fd = -1;
size_t parallel_replica_offset = 0; size_t parallel_replica_offset = 0;
std::unordered_map<int, std::unique_ptr<TimerDescriptor>> active_timeouts; std::unordered_map<int, std::shared_ptr<TimerDescriptor>> active_timeouts;
void reset() void reset()
{ {

View File

@ -294,7 +294,7 @@ Packet HedgedConnections::receivePacketImpl(AsyncCallback async_callback)
{ {
LOG_DEBUG(log, "event is timeout"); LOG_DEBUG(log, "event is timeout");
replica = timeout_fd_to_replica[event_fd]; replica = timeout_fd_to_replica[event_fd];
processTimeoutEvent(replica, replica->active_timeouts[event_fd].get()); processTimeoutEvent(replica, replica->active_timeouts[event_fd]);
} }
else if (event_fd == get_hedged_connections.getFileDescriptor()) else if (event_fd == get_hedged_connections.getFileDescriptor())
tryGetNewReplica(); tryGetNewReplica();
@ -375,12 +375,14 @@ void HedgedConnections::processReceiveData(ReplicaStatePtr & replica)
void HedgedConnections::processTimeoutEvent(ReplicaStatePtr & replica, TimerDescriptorPtr timeout_descriptor) void HedgedConnections::processTimeoutEvent(ReplicaStatePtr & replica, TimerDescriptorPtr timeout_descriptor)
{ {
LOG_DEBUG(log, "processTimeoutEvent");
epoll.remove(timeout_descriptor->getDescriptor()); epoll.remove(timeout_descriptor->getDescriptor());
replica->active_timeouts.erase(timeout_descriptor->getDescriptor()); replica->active_timeouts.erase(timeout_descriptor->getDescriptor());
timeout_fd_to_replica.erase(timeout_descriptor->getDescriptor()); timeout_fd_to_replica.erase(timeout_descriptor->getDescriptor());
if (timeout_descriptor->getType() == TimerTypes::RECEIVE_TIMEOUT) if (timeout_descriptor->getType() == TimerTypes::RECEIVE_TIMEOUT)
{ {
LOG_DEBUG(log, "process RECEIVE_TIMEOUT");
size_t offset = replica->parallel_replica_offset; size_t offset = replica->parallel_replica_offset;
finishProcessReplica(replica, true); finishProcessReplica(replica, true);
@ -390,6 +392,7 @@ void HedgedConnections::processTimeoutEvent(ReplicaStatePtr & replica, TimerDesc
} }
else if (timeout_descriptor->getType() == TimerTypes::RECEIVE_DATA_TIMEOUT) else if (timeout_descriptor->getType() == TimerTypes::RECEIVE_DATA_TIMEOUT)
{ {
LOG_DEBUG(log, "process RECEIVE_DATA_TIMEOUT");
offsets_queue.push(replica->parallel_replica_offset); offsets_queue.push(replica->parallel_replica_offset);
tryGetNewReplica(); tryGetNewReplica();
} }

View File

@ -39,7 +39,7 @@ public:
void setType(int type_) { type = type_; } void setType(int type_) { type = type_; }
}; };
using TimerDescriptorPtr = TimerDescriptor *; using TimerDescriptorPtr = std::shared_ptr<TimerDescriptor>;
} }
#endif #endif

View File

@ -17,7 +17,7 @@ node = cluster.add_instance(
node_1 = cluster.add_instance('node_1', with_zookeeper=True, stay_alive=True, user_configs=['configs/users1.xml']) node_1 = cluster.add_instance('node_1', with_zookeeper=True, stay_alive=True, user_configs=['configs/users1.xml'])
node_2 = cluster.add_instance('node_2', with_zookeeper=True) node_2 = cluster.add_instance('node_2', with_zookeeper=True)
sleep_timeout = 5 sleep_timeout = 30
receive_timeout = 1 receive_timeout = 1
config = '''<yandex> config = '''<yandex>
@ -62,12 +62,14 @@ def process_test(sleep_setting_name, receive_timeout_name):
start = time.time() start = time.time()
node.query("SELECT * FROM distributed"); node.query("SELECT * FROM distributed");
query_time = time.time() - start query_time = time.time() - start
print(query_time)
# Check that query time is not long # Check that query time is not long
assert query_time < sleep_timeout # assert query_time < sleep_timeout
def test_change_replica_on_receive_hello(started_cluster): def test(started_cluster):
node.query("INSERT INTO distributed VALUES (1, '2020-01-01')") node.query("INSERT INTO distributed VALUES (1, '2020-01-01')")
process_test("sleep_before_send_hello", "receive_hello_timeout") process_test("sleep_before_send_hello", "receive_hello_timeout")