mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-16 19:32:07 +00:00
71 lines
1.8 KiB
C++
71 lines
1.8 KiB
C++
|
#include <Client/PacketReceiver.h>
|
||
|
|
||
|
namespace DB
|
||
|
{
|
||
|
|
||
|
PacketReceiver::PacketReceiver(Connection * connection_) : AsyncTaskExecutor(std::make_unique<Task>(*this)), connection(connection_)
|
||
|
{
|
||
|
epoll.add(timeout_descriptor.getDescriptor());
|
||
|
socket_fd = connection->getSocket()->impl()->sockfd();
|
||
|
epoll.add(socket_fd);
|
||
|
}
|
||
|
|
||
|
bool PacketReceiver::checkBeforeTaskResume()
|
||
|
{
|
||
|
/// If there is no pending data, check timeout.
|
||
|
return connection->hasReadPendingData() || checkTimeout();
|
||
|
}
|
||
|
|
||
|
void PacketReceiver::processAsyncEvent(int fd [[maybe_unused]], Poco::Timespan socket_timeout, AsyncEventTimeoutType, const std::string &, uint32_t)
|
||
|
{
|
||
|
assert(fd == socket_fd);
|
||
|
timeout_descriptor.setRelative(socket_timeout);
|
||
|
timeout = socket_timeout;
|
||
|
is_read_in_process = true;
|
||
|
}
|
||
|
|
||
|
void PacketReceiver::clearAsyncEvent()
|
||
|
{
|
||
|
is_read_in_process = false;
|
||
|
timeout_descriptor.reset();
|
||
|
}
|
||
|
|
||
|
bool PacketReceiver::checkTimeout()
|
||
|
{
|
||
|
bool is_socket_ready = false;
|
||
|
|
||
|
epoll_event events[2];
|
||
|
events[0].data.fd = events[1].data.fd = -1;
|
||
|
size_t ready_count = epoll.getManyReady(2, events, true);
|
||
|
|
||
|
for (size_t i = 0; i != ready_count; ++i)
|
||
|
{
|
||
|
if (events[i].data.fd == socket_fd)
|
||
|
is_socket_ready = true;
|
||
|
if (events[i].data.fd == timeout_descriptor.getDescriptor())
|
||
|
is_timeout_expired = true;
|
||
|
}
|
||
|
|
||
|
if (is_timeout_expired && !is_socket_ready)
|
||
|
{
|
||
|
timeout_descriptor.reset();
|
||
|
return false;
|
||
|
}
|
||
|
|
||
|
return true;
|
||
|
}
|
||
|
|
||
|
void PacketReceiver::Task::run(AsyncCallback async_callback, ResumeCallback suspend_callback)
|
||
|
{
|
||
|
while (true)
|
||
|
{
|
||
|
{
|
||
|
AsyncCallbackSetter async_setter(receiver.connection, async_callback);
|
||
|
receiver.packet = receiver.connection->receivePacket();
|
||
|
}
|
||
|
suspend_callback();
|
||
|
}
|
||
|
}
|
||
|
|
||
|
}
|