Fixing build.

This commit is contained in:
Nikolai Kochetov 2020-12-10 15:20:18 +03:00
parent e295dfe6e3
commit 75ac87c241
8 changed files with 64 additions and 24 deletions

View File

@ -15,7 +15,7 @@ if (NOT USE_INTERNAL_BOOST_LIBRARY)
)
if(Boost_INCLUDE_DIR AND Boost_FILESYSTEM_LIBRARY AND Boost_FILESYSTEM_LIBRARY AND
Boost_PROGRAM_OPTIONS_LIBRARY AND Boost_REGEX_LIBRARY AND Boost_SYSTEM_LIBRARY)
Boost_PROGRAM_OPTIONS_LIBRARY AND Boost_REGEX_LIBRARY AND Boost_SYSTEM_LIBRARY AND Boost_CONTEXT_LIBRARY)
set(EXTERNAL_BOOST_FOUND 1)
@ -151,7 +151,7 @@ if (NOT EXTERNAL_BOOST_FOUND)
enable_language(ASM)
SET(ASM_OPTIONS "-x assembler-with-cpp")
if (SANITIZE)
if (SANITIZE AND SANITIZE STREQUAL "address")
add_compile_definitions(BOOST_USE_UCONTEXT)
add_compile_definitions(BOOST_USE_ASAN)

View File

@ -21,6 +21,7 @@ RUN apt-get update \
libboost-thread-dev \
libboost-iostreams-dev \
libboost-regex-dev \
libboost-context-dev \
zlib1g-dev \
liblz4-dev \
libdouble-conversion-dev \

View File

@ -110,6 +110,8 @@ private:
mutable std::mutex cancel_mutex;
boost::context::fiber * fiber = nullptr;
friend class RemoteQueryExecutorReadContext;
};
}

View File

@ -223,7 +223,7 @@ std::variant<Block, int> RemoteQueryExecutor::read(std::unique_ptr<ReadContext>
do
{
if (!read_context->resumeRoutine())
if (!read_context->resumeRoutine(was_cancelled_mutex))
return Block();
if (read_context->is_read_in_progress)
@ -317,7 +317,7 @@ void RemoteQueryExecutor::finish(std::unique_ptr<ReadContext> * read_context)
return;
if (read_context && *read_context)
(*read_context)->cancel();
(*read_context)->cancel(was_cancelled_mutex);
/** If you have not read all the data yet, but they are no longer needed.
* This may be due to the fact that the data is sufficient (for example, when using LIMIT).
@ -368,7 +368,7 @@ void RemoteQueryExecutor::cancel(std::unique_ptr<ReadContext> * read_context)
return;
if (read_context && *read_context)
(*read_context)->cancel();
(*read_context)->cancel(was_cancelled_mutex);
tryCancel("Cancelling query");
}

View File

@ -24,7 +24,6 @@ public:
std::exception_ptr exception;
FiberStack<> stack;
std::mutex fiber_mutex;
boost::context::fiber fiber;
Poco::Timespan receive_timeout;
@ -33,6 +32,7 @@ public:
TimerDescriptor timer{CLOCK_MONOTONIC, 0};
int socket_fd = -1;
int epoll_fd;
int pipe_fd[2];
explicit RemoteQueryExecutorReadContext(MultiplexedConnections & connections_) : connections(connections_)
{
@ -40,6 +40,18 @@ public:
if (-1 == epoll_fd)
throwFromErrno("Cannot create epoll descriptor", ErrorCodes::CANNOT_OPEN_FILE);
if (-1 == pipe2(pipe_fd, O_NONBLOCK))
throwFromErrno("Cannot create pipe", ErrorCodes::CANNOT_OPEN_FILE);
{
epoll_event socket_event;
socket_event.events = EPOLLIN | EPOLLPRI;
socket_event.data.fd = pipe_fd[0];
if (-1 == epoll_ctl(epoll_fd, EPOLL_CTL_ADD, pipe_fd[0], &socket_event))
throwFromErrno("Cannot add pipe descriptor to epoll", ErrorCodes::CANNOT_OPEN_FILE);
}
{
epoll_event timer_event;
timer_event.events = EPOLLIN | EPOLLPRI;
@ -76,11 +88,11 @@ public:
receive_timeout = socket.impl()->getReceiveTimeout();
}
void checkTimeout() const
bool checkTimeout() const
{
try
{
checkTimeoutImpl();
return checkTimeoutImpl();
}
catch (DB::Exception & e)
{
@ -89,16 +101,18 @@ public:
}
}
void checkTimeoutImpl() const
bool checkTimeoutImpl() const
{
epoll_event events[2];
epoll_event events[3];
events[0].data.fd = events[1].data.fd = events[2].data.fd = -1;
/// Wait for epoll_fd will not block if it was polled externally.
int num_events = epoll_wait(epoll_fd, events, 2, 0);
int num_events = epoll_wait(epoll_fd, events, 3, 0);
if (num_events == -1)
throwFromErrno("Failed to epoll_wait", ErrorCodes::CANNOT_READ_FROM_SOCKET);
bool is_socket_ready = false;
bool is_pipe_alarmed = false;
bool has_timer_alarm = false;
for (int i = 0; i < num_events; ++i)
@ -107,14 +121,21 @@ public:
is_socket_ready = true;
if (events[i].data.fd == timer.getDescriptor())
has_timer_alarm = true;
if (events[i].data.fd == pipe_fd[0])
is_pipe_alarmed = true;
}
if (is_pipe_alarmed)
return false;
if (has_timer_alarm && !is_socket_ready)
{
/// Socket receive timeout. Drain it in case or error, or it may be hide by timeout exception.
timer.drain();
throw NetException("Timeout exceeded", ErrorCodes::SOCKET_TIMEOUT);
}
return true;
}
void setTimer() const
@ -126,13 +147,13 @@ public:
timer.setRelative(receive_timeout);
}
bool resumeRoutine()
bool resumeRoutine(std::mutex & mutex)
{
if (is_read_in_progress)
checkTimeout();
if (is_read_in_progress && !checkTimeout())
return false;
{
std::lock_guard guard(fiber_mutex);
std::lock_guard guard(mutex);
if (!fiber)
return false;
@ -159,10 +180,20 @@ public:
return true;
}
void cancel()
void cancel(std::mutex & mutex)
{
std::lock_guard guard(fiber_mutex);
std::lock_guard guard(mutex);
boost::context::fiber to_destroy = std::move(fiber);
uint64_t buf = 0;
while (-1 == write(pipe_fd[1], &buf, sizeof(buf)))
{
if (errno == EAGAIN)
break;
if (errno != EINTR)
throwFromErrno("Cannot write to pipe", ErrorCodes::CANNOT_READ_FROM_SOCKET);
}
}
~RemoteQueryExecutorReadContext()
@ -186,7 +217,7 @@ public:
connections.setFiber(&sink);
read_context.is_read_in_progress = true;
read_context.packet = connections.receivePacket();
read_context.packet = connections.receivePacketUnlocked();
read_context.is_read_in_progress = false;
sink = std::move(sink).resume();

View File

@ -92,7 +92,7 @@ void AsyncTaskQueue::finish()
uint64_t buf = 0;
while (-1 == write(pipe_fd[1], &buf, sizeof(buf)))
{
if (errno == EAGAIN || errno == EWOULDBLOCK)
if (errno == EAGAIN)
break;
if (errno != EINTR)

View File

@ -105,6 +105,16 @@ void RemoteSource::onCancel()
is_async_state = false;
}
void RemoteSource::onUpdatePorts()
{
if (getPort().isFinished())
{
was_query_canceled = true;
query_executor->finish(&read_context);
is_async_state = false;
}
}
RemoteTotalsSource::RemoteTotalsSource(RemoteQueryExecutorPtr executor)
: ISource(executor->getHeader())

View File

@ -29,11 +29,7 @@ public:
void setRowsBeforeLimitCounter(RowsBeforeLimitCounterPtr counter) { rows_before_limit.swap(counter); }
/// Stop reading from stream if output port is finished.
void onUpdatePorts() override
{
if (getPort().isFinished())
cancel();
}
void onUpdatePorts() override;
int schedule() override { return fd; }