Fix, make better

This commit is contained in:
avogar 2023-03-17 13:02:20 +00:00
parent 2f2d7133c5
commit 4b46b5b748
7 changed files with 84 additions and 15 deletions

View File

@ -25,7 +25,7 @@ class PacketReceiver : public AsyncTaskExecutor
public:
explicit PacketReceiver(Connection * connection_);
bool isPacketReady() const { return !is_read_in_process && !is_timeout_expired; }
bool isPacketReady() const { return !is_read_in_process && !is_timeout_expired && !exception; }
Packet getPacket() { return std::move(packet); }
bool hasException() const { return exception.operator bool(); }
@ -49,6 +49,8 @@ private:
void processAsyncEvent(int fd, Poco::Timespan socket_timeout, AsyncEventTimeoutType, const std::string &, uint32_t) override;
void clearAsyncEvent() override;
void processException(std::exception_ptr e) override { exception = e; }
struct Task : public AsyncTask
{
Task(PacketReceiver & receiver_) : receiver(receiver_) {}

View File

@ -0,0 +1,3 @@
//
// Created by Павел Круглов on 16/03/2023.
//

View File

@ -110,7 +110,22 @@ std::optional<Chunk> RemoteSource::tryGenerate()
rows_before_limit->set(info.getRowsBeforeLimit());
});
query_executor->sendQuery();
if (async_read)
{
int fd_ = query_executor->sendQueryAsync();
if (fd_ >= 0)
{
fd = fd_;
is_async_state = true;
return Chunk();
}
is_async_state = false;
}
else
{
query_executor->sendQuery();
}
was_query_sent = true;
}
@ -119,7 +134,7 @@ std::optional<Chunk> RemoteSource::tryGenerate()
if (async_read)
{
auto res = query_executor->asyncRead();
auto res = query_executor->readAsync();
if (res.getType() == RemoteQueryExecutor::ReadResult::Type::Nothing)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Got an empty packet from the RemoteQueryExecutor. This is a bug");

View File

@ -153,7 +153,7 @@ RemoteQueryExecutor::~RemoteQueryExecutor()
* all connections, then read and skip the remaining packets to make sure
* these connections did not remain hanging in the out-of-sync state.
*/
if (established || isQueryPending())
if (established || (isQueryPending() && connections))
connections->disconnect();
}
@ -218,6 +218,13 @@ void RemoteQueryExecutor::sendQuery(ClientInfo::QueryKind query_kind, AsyncCallb
if (needToSkipUnavailableShard())
return;
/// Query could be cancelled during creating connections and this code can be executed
/// inside read_context->cancel() under was_cancelled_mutex, it can happen only when
/// was_cancelled = true (because it's set to true before calling read_context->cancel())
/// To avoid deadlock, we should check was_cancelled before locking was_cancelled_mutex.
if (was_cancelled)
return;
/// Query cannot be canceled in the middle of the send query,
/// since there are multiple packets:
/// - Query
@ -252,6 +259,26 @@ void RemoteQueryExecutor::sendQuery(ClientInfo::QueryKind query_kind, AsyncCallb
sendExternalTables();
}
int RemoteQueryExecutor::sendQueryAsync()
{
if (!read_context)
{
std::lock_guard lock(was_cancelled_mutex);
read_context = std::make_unique<ReadContext>(*this, /*suspend_when_query_sent*/ true);
}
/// If query already sent, do nothing. Note that we cannot use sent_query flag here,
/// because we can still be in process of sending scalars or external tables.
if (read_context->isQuerySent())
return -1;
read_context->resume();
if (!read_context->isQuerySent())
return read_context->getFileDescriptor();
return -1;
}
Block RemoteQueryExecutor::readBlock()
{
@ -292,7 +319,7 @@ RemoteQueryExecutor::ReadResult RemoteQueryExecutor::read()
}
}
RemoteQueryExecutor::ReadResult RemoteQueryExecutor::asyncRead()
RemoteQueryExecutor::ReadResult RemoteQueryExecutor::readAsync()
{
#if defined(OS_LINUX)
if (!read_context || (resent_query && recreate_read_context))
@ -356,7 +383,7 @@ RemoteQueryExecutor::ReadResult RemoteQueryExecutor::restartQueryWithoutDuplicat
if (!read_context)
return read();
else
return asyncRead();
return readAsync();
}
throw Exception(ErrorCodes::DUPLICATED_PART_UUIDS, "Found duplicate uuids while processing query");
}
@ -517,6 +544,10 @@ void RemoteQueryExecutor::finish()
/// Send the request to abort the execution of the request, if not already sent.
tryCancel("Cancelling query because enough data has been read");
/// If connections weren't created yet, nothing to do.
if (!connections)
return;
/// Get the remaining packets so that there is no out of sync in the connections to the replicas.
Packet packet = connections->drain();
switch (packet.type)
@ -647,15 +678,19 @@ void RemoteQueryExecutor::tryCancel(const char * reason)
if (read_context)
read_context->cancel();
connections->sendCancel();
if (log)
LOG_TRACE(log, "({}) {}", connections->dumpAddresses(), reason);
/// Query could be cancelled during connection creation, we should check
/// if connections were already created.
if (connections && sent_query)
{
connections->sendCancel();
if (log)
LOG_TRACE(log, "({}) {}", connections->dumpAddresses(), reason);
}
}
bool RemoteQueryExecutor::isQueryPending() const
{
return sent_query && !finished;
return read_context && !finished;
}
bool RemoteQueryExecutor::hasThrownException() const

View File

@ -90,6 +90,8 @@ public:
/// and it should pass INITIAL_QUERY.
void sendQuery(ClientInfo::QueryKind query_kind = ClientInfo::QueryKind::SECONDARY_QUERY, AsyncCallback async_callback = {});
int sendQueryAsync();
/// Query is resent to a replica, the query itself can be modified.
std::atomic<bool> resent_query { false };
std::atomic<bool> recreate_read_context { false };
@ -146,7 +148,7 @@ public:
ReadResult read();
/// Async variant of read. Returns ready block or file descriptor which may be used for polling.
ReadResult asyncRead();
ReadResult readAsync();
/// Receive all remain packets and finish query.
/// It should be cancelled after read returned empty block.

View File

@ -18,8 +18,8 @@ namespace ErrorCodes
extern const int SOCKET_TIMEOUT;
}
RemoteQueryExecutorReadContext::RemoteQueryExecutorReadContext(RemoteQueryExecutor & executor_)
: AsyncTaskExecutor(std::make_unique<Task>(*this)), executor(executor_)
RemoteQueryExecutorReadContext::RemoteQueryExecutorReadContext(RemoteQueryExecutor & executor_, bool suspend_when_query_sent_)
: AsyncTaskExecutor(std::make_unique<Task>(*this)), executor(executor_), suspend_when_query_sent(suspend_when_query_sent_)
{
if (-1 == pipe2(pipe_fd, O_NONBLOCK))
throwFromErrno("Cannot create pipe", ErrorCodes::CANNOT_OPEN_FILE);
@ -37,6 +37,10 @@ bool RemoteQueryExecutorReadContext::checkBeforeTaskResume()
void RemoteQueryExecutorReadContext::Task::run(AsyncCallback async_callback, ResumeCallback suspend_callback)
{
read_context.executor.sendQuery(ClientInfo::QueryKind::SECONDARY_QUERY, async_callback);
read_context.is_query_sent = true;
if (read_context.suspend_when_query_sent)
suspend_callback();
if (read_context.executor.needToSkipUnavailableShard())
return;
@ -108,6 +112,10 @@ void RemoteQueryExecutorReadContext::cancelImpl()
/// (disconnected), so it will not left in an unsynchronised state.
if (!is_timer_alarmed)
{
/// If query wasn't sent, just complete sending it.
if (!is_query_sent)
suspend_when_query_sent = true;
/// Wait for current pending packet, to avoid leaving connection in unsynchronised state.
while (is_in_progress.load(std::memory_order_relaxed))
{

View File

@ -27,7 +27,7 @@ class RemoteQueryExecutor;
class RemoteQueryExecutorReadContext : public AsyncTaskExecutor
{
public:
explicit RemoteQueryExecutorReadContext(RemoteQueryExecutor & executor_);
explicit RemoteQueryExecutorReadContext(RemoteQueryExecutor & executor_, bool suspend_when_query_sent_ = false);
~RemoteQueryExecutorReadContext() override;
@ -35,6 +35,8 @@ public:
bool isCancelled() const { return AsyncTaskExecutor::isCancelled() || is_pipe_alarmed; }
bool isQuerySent() const { return is_query_sent; }
int getFileDescriptor() const { return epoll.getFileDescriptor(); }
Packet getPacket() { return std::move(packet); }
@ -80,6 +82,8 @@ private:
Epoll epoll;
std::string connection_fd_description;
bool suspend_when_query_sent = false;
bool is_query_sent = false;
};
}