Fixing tests.

This commit is contained in:
Nikolai Kochetov 2020-12-14 19:16:08 +03:00
parent 8de5cd5bc7
commit db9ad80168
7 changed files with 53 additions and 7 deletions

View File

@ -201,6 +201,7 @@ Block RemoteQueryExecutor::read()
}
}
#if defined(OS_LINUX)
std::variant<Block, int> RemoteQueryExecutor::read(std::unique_ptr<ReadContext> & read_context)
{
if (!sent_query)
@ -232,6 +233,7 @@ std::variant<Block, int> RemoteQueryExecutor::read(std::unique_ptr<ReadContext>
}
while (true);
}
#endif
std::optional<Block> RemoteQueryExecutor::processPacket(Packet packet)
{

View File

@ -59,10 +59,11 @@ public:
/// Read next block of data. Returns empty block if query is finished.
Block read();
#if defined(OS_LINUX)
/// Async variant of read. Returns ready block or file descriptor which may be used for polling.
/// ReadContext is an internal read state. Pass empty ptr first time, reuse created one for every call.
std::variant<Block, int> read(std::unique_ptr<ReadContext> & read_context);
#endif
/// Receive all remain packets and finish query.
/// It should be cancelled after read returned empty block.
void finish(std::unique_ptr<ReadContext> * read_context = nullptr);

View File

@ -1,4 +1,7 @@
#pragma once
#if defined(OS_LINUX)
#include <sys/epoll.h>
#include <Common/Fiber.h>
#include <Common/FiberStack.h>
@ -25,6 +28,8 @@ public:
std::exception_ptr exception;
FiberStack<> stack;
boost::context::fiber fiber;
std::mutex fiber_lock;
std::unique_lock<std::mutex> * connection_lock;
Poco::Timespan receive_timeout;
MultiplexedConnections & connections;
@ -61,7 +66,9 @@ public:
throwFromErrno("Cannot add timer descriptor to epoll", ErrorCodes::CANNOT_OPEN_FILE);
}
fiber = boost::context::fiber(std::allocator_arg_t(), stack, Routine{connections, *this});
auto routine = Routine{connections, *this, std::unique_lock(connections.cancel_mutex, std::defer_lock)};
connection_lock = &routine.connection_lock;
fiber = boost::context::fiber(std::allocator_arg_t(), stack, std::move(routine));
}
void setSocket(Poco::Net::Socket & socket)
@ -147,17 +154,23 @@ public:
timer.setRelative(receive_timeout);
}
bool resumeRoutine(std::mutex & mutex)
bool resumeRoutine()
{
if (is_read_in_progress && !checkTimeout())
return false;
{
std::lock_guard guard(mutex);
std::lock_guard guard(fiber_lock);
if (!fiber)
return false;
if (!connection_lock->owns_lock())
connection_lock->lock();
fiber = std::move(fiber).resume();
if (!is_read_in_progress)
connection_lock->unlock();
}
if (exception)
@ -180,9 +193,9 @@ public:
return true;
}
void cancel(std::mutex & mutex)
void cancel()
{
std::lock_guard guard(mutex);
std::lock_guard guard(fiber_lock);
boost::context::fiber to_destroy = std::move(fiber);
uint64_t buf = 0;
@ -207,6 +220,7 @@ public:
{
MultiplexedConnections & connections;
Self & read_context;
std::unique_lock<std::mutex> connection_lock;
Fiber operator()(Fiber && sink) const
{
@ -236,5 +250,13 @@ public:
}
};
};
}
#else
namespace DB
{
class RemoteQueryExecutorReadContext
{
};
}
#endif

View File

@ -1,4 +1,7 @@
#include <Processors/Executors/AsyncTaskQueue.h>
#if defined(OS_LINUX)
#include <Common/Exception.h>
#include <sys/epoll.h>
#include <unistd.h>
@ -101,3 +104,4 @@ void AsyncTaskQueue::finish()
}
}
#endif

View File

@ -7,6 +7,7 @@
namespace DB
{
#if defined(OS_LINUX)
class AsyncTaskQueue
{
public:
@ -46,5 +47,12 @@ public:
/// Interrupt waiting.
void finish();
};
#else
class AsyncTaskQueue
{
bool empty() { return true; }
void finish() {}
};
#endif
}

View File

@ -531,6 +531,7 @@ void PipelineExecutor::executeStepImpl(size_t thread_num, size_t num_threads, st
break;
}
#if defined(OS_LINUX)
if (num_threads == 1)
{
/// If we execute in single thread, wait for async tasks here.
@ -541,6 +542,7 @@ void PipelineExecutor::executeStepImpl(size_t thread_num, size_t num_threads, st
node = static_cast<ExecutingGraph::Node *>(res.data);
break;
}
#endif
threads_queue.push(thread_num);
}
@ -619,11 +621,13 @@ void PipelineExecutor::executeStepImpl(size_t thread_num, size_t num_threads, st
{
std::unique_lock lock(task_queue_mutex);
#if defined(OS_LINUX)
while (!async_queue.empty() && !finished)
{
async_task_queue.addTask(thread_num, async_queue.front(), async_queue.front()->processor->schedule());
async_queue.pop();
}
#endif
while (!queue.empty() && !finished)
{
@ -772,6 +776,7 @@ void PipelineExecutor::executeImpl(size_t num_threads)
});
}
#if defined(OS_LINUX)
{
/// Wait for async tasks.
std::unique_lock lock(task_queue_mutex);
@ -788,6 +793,7 @@ void PipelineExecutor::executeImpl(size_t num_threads)
}
}
}
#endif
for (auto & thread : threads)
if (thread.joinable())

View File

@ -65,7 +65,7 @@ std::optional<Chunk> RemoteSource::tryGenerate()
was_query_sent = true;
}
#if defined(OS_LINUX)
auto res = query_executor->read(read_context);
if (std::holds_alternative<int>(res))
{
@ -77,6 +77,9 @@ std::optional<Chunk> RemoteSource::tryGenerate()
is_async_state = false;
auto block = std::get<Block>(std::move(res));
#else
auto block = query_executor->read();
#endif
if (!block)
{