mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-30 05:30:51 +00:00
Add async read to RemoteQueryExecutor.
This commit is contained in:
parent
0fae325d76
commit
e3946bc2b5
@ -16,6 +16,7 @@
|
|||||||
#include <DataStreams/BlockStreamProfileInfo.h>
|
#include <DataStreams/BlockStreamProfileInfo.h>
|
||||||
|
|
||||||
#include <IO/ConnectionTimeouts.h>
|
#include <IO/ConnectionTimeouts.h>
|
||||||
|
#include <IO/ReadBufferFromPocoSocket.h>
|
||||||
|
|
||||||
#include <Core/Settings.h>
|
#include <Core/Settings.h>
|
||||||
#include <Interpreters/TablesStatus.h>
|
#include <Interpreters/TablesStatus.h>
|
||||||
@ -188,6 +189,8 @@ public:
|
|||||||
size_t outBytesCount() const { return out ? out->count() : 0; }
|
size_t outBytesCount() const { return out ? out->count() : 0; }
|
||||||
size_t inBytesCount() const { return in ? in->count() : 0; }
|
size_t inBytesCount() const { return in ? in->count() : 0; }
|
||||||
|
|
||||||
|
void setFiber(ReadBufferFromPocoSocket::Fiber * fiber) { in->setFiber(fiber); }
|
||||||
|
|
||||||
private:
|
private:
|
||||||
String host;
|
String host;
|
||||||
UInt16 port;
|
UInt16 port;
|
||||||
@ -224,7 +227,7 @@ private:
|
|||||||
String server_display_name;
|
String server_display_name;
|
||||||
|
|
||||||
std::unique_ptr<Poco::Net::StreamSocket> socket;
|
std::unique_ptr<Poco::Net::StreamSocket> socket;
|
||||||
std::shared_ptr<ReadBuffer> in;
|
std::shared_ptr<ReadBufferFromPocoSocket> in;
|
||||||
std::shared_ptr<WriteBuffer> out;
|
std::shared_ptr<WriteBuffer> out;
|
||||||
std::optional<UInt64> last_input_packet_type;
|
std::optional<UInt64> last_input_packet_type;
|
||||||
|
|
||||||
|
@ -67,6 +67,8 @@ public:
|
|||||||
/// Without locking, because sendCancel() does not change the state of the replicas.
|
/// Without locking, because sendCancel() does not change the state of the replicas.
|
||||||
bool hasActiveConnections() const { return active_connection_count > 0; }
|
bool hasActiveConnections() const { return active_connection_count > 0; }
|
||||||
|
|
||||||
|
void setFiber(ReadBufferFromPocoSocket::Fiber * fiber) { current_connection->setFiber(fiber); }
|
||||||
|
|
||||||
private:
|
private:
|
||||||
/// Internal version of `receivePacket` function without locking.
|
/// Internal version of `receivePacket` function without locking.
|
||||||
Packet receivePacketUnlocked();
|
Packet receivePacketUnlocked();
|
||||||
|
@ -9,6 +9,7 @@
|
|||||||
#include <Interpreters/castColumn.h>
|
#include <Interpreters/castColumn.h>
|
||||||
#include <Interpreters/Cluster.h>
|
#include <Interpreters/Cluster.h>
|
||||||
#include <Interpreters/InternalTextLogsQueue.h>
|
#include <Interpreters/InternalTextLogsQueue.h>
|
||||||
|
#include <Common/FiberStack.h>
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
@ -199,65 +200,136 @@ Block RemoteQueryExecutor::read()
|
|||||||
|
|
||||||
Packet packet = multiplexed_connections->receivePacket();
|
Packet packet = multiplexed_connections->receivePacket();
|
||||||
|
|
||||||
switch (packet.type)
|
if (auto block = processPacket(std::move(packet)))
|
||||||
|
return *block;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void RemoteQueryExecutor::read(ReadContext & read_context)
|
||||||
|
{
|
||||||
|
if (!sent_query)
|
||||||
|
{
|
||||||
|
sendQuery();
|
||||||
|
|
||||||
|
if (context.getSettingsRef().skip_unavailable_shards && (0 == multiplexed_connections->size()))
|
||||||
{
|
{
|
||||||
case Protocol::Server::Data:
|
read_context.is_read_in_progress = false;
|
||||||
/// If the block is not empty and is not a header block
|
read_context.result.clear();
|
||||||
if (packet.block && (packet.block.rows() > 0))
|
return;
|
||||||
return adaptBlockStructure(packet.block, header);
|
|
||||||
break; /// If the block is empty - we will receive other packets before EndOfStream.
|
|
||||||
|
|
||||||
case Protocol::Server::Exception:
|
|
||||||
got_exception_from_replica = true;
|
|
||||||
packet.exception->rethrow();
|
|
||||||
break;
|
|
||||||
|
|
||||||
case Protocol::Server::EndOfStream:
|
|
||||||
if (!multiplexed_connections->hasActiveConnections())
|
|
||||||
{
|
|
||||||
finished = true;
|
|
||||||
return Block();
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
|
|
||||||
case Protocol::Server::Progress:
|
|
||||||
/** We use the progress from a remote server.
|
|
||||||
* We also include in ProcessList,
|
|
||||||
* and we use it to check
|
|
||||||
* constraints (for example, the minimum speed of query execution)
|
|
||||||
* and quotas (for example, the number of lines to read).
|
|
||||||
*/
|
|
||||||
if (progress_callback)
|
|
||||||
progress_callback(packet.progress);
|
|
||||||
break;
|
|
||||||
|
|
||||||
case Protocol::Server::ProfileInfo:
|
|
||||||
/// Use own (client-side) info about read bytes, it is more correct info than server-side one.
|
|
||||||
if (profile_info_callback)
|
|
||||||
profile_info_callback(packet.profile_info);
|
|
||||||
break;
|
|
||||||
|
|
||||||
case Protocol::Server::Totals:
|
|
||||||
totals = packet.block;
|
|
||||||
break;
|
|
||||||
|
|
||||||
case Protocol::Server::Extremes:
|
|
||||||
extremes = packet.block;
|
|
||||||
break;
|
|
||||||
|
|
||||||
case Protocol::Server::Log:
|
|
||||||
/// Pass logs from remote server to client
|
|
||||||
if (auto log_queue = CurrentThread::getInternalTextLogsQueue())
|
|
||||||
log_queue->pushBlock(std::move(packet.block));
|
|
||||||
break;
|
|
||||||
|
|
||||||
default:
|
|
||||||
got_unknown_packet_from_replica = true;
|
|
||||||
throw Exception(ErrorCodes::UNKNOWN_PACKET_FROM_SERVER, "Unknown packet {} from one of the following replicas: {}",
|
|
||||||
toString(packet.type),
|
|
||||||
multiplexed_connections->dumpAddresses());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
do
|
||||||
|
{
|
||||||
|
if (!read_context.is_read_in_progress)
|
||||||
|
{
|
||||||
|
auto routine = [&read_context, this](boost::context::fiber && sink)
|
||||||
|
{
|
||||||
|
read_context.fiber_context.fiber = std::move(sink);
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
multiplexed_connections->setFiber(&read_context.fiber_context);
|
||||||
|
read_context.packet = multiplexed_connections->receivePacket();
|
||||||
|
multiplexed_connections->setFiber(nullptr);
|
||||||
|
}
|
||||||
|
catch (...)
|
||||||
|
{
|
||||||
|
read_context.exception = std::current_exception();
|
||||||
|
}
|
||||||
|
|
||||||
|
return std::move(read_context.fiber_context.fiber);
|
||||||
|
};
|
||||||
|
|
||||||
|
read_context.fiber = boost::context::fiber(std::allocator_arg_t(), read_context.stack, std::move(routine));
|
||||||
|
}
|
||||||
|
|
||||||
|
read_context.fiber = std::move(read_context.fiber).resume();
|
||||||
|
|
||||||
|
if (read_context.exception)
|
||||||
|
std::rethrow_exception(std::move(read_context.exception));
|
||||||
|
|
||||||
|
if (read_context.fiber)
|
||||||
|
{
|
||||||
|
read_context.is_read_in_progress = true;
|
||||||
|
read_context.fd = read_context.fiber_context.fd;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
read_context.is_read_in_progress = false;
|
||||||
|
if (auto data = processPacket(std::move(read_context.packet)))
|
||||||
|
{
|
||||||
|
read_context.result = std::move(*data);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
while (true);
|
||||||
|
}
|
||||||
|
|
||||||
|
std::optional<Block> RemoteQueryExecutor::processPacket(Packet packet)
|
||||||
|
{
|
||||||
|
switch (packet.type)
|
||||||
|
{
|
||||||
|
case Protocol::Server::Data:
|
||||||
|
/// If the block is not empty and is not a header block
|
||||||
|
if (packet.block && (packet.block.rows() > 0))
|
||||||
|
return adaptBlockStructure(packet.block, header);
|
||||||
|
break; /// If the block is empty - we will receive other packets before EndOfStream.
|
||||||
|
|
||||||
|
case Protocol::Server::Exception:
|
||||||
|
got_exception_from_replica = true;
|
||||||
|
packet.exception->rethrow();
|
||||||
|
break;
|
||||||
|
|
||||||
|
case Protocol::Server::EndOfStream:
|
||||||
|
if (!multiplexed_connections->hasActiveConnections())
|
||||||
|
{
|
||||||
|
finished = true;
|
||||||
|
return Block();
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
|
||||||
|
case Protocol::Server::Progress:
|
||||||
|
/** We use the progress from a remote server.
|
||||||
|
* We also include in ProcessList,
|
||||||
|
* and we use it to check
|
||||||
|
* constraints (for example, the minimum speed of query execution)
|
||||||
|
* and quotas (for example, the number of lines to read).
|
||||||
|
*/
|
||||||
|
if (progress_callback)
|
||||||
|
progress_callback(packet.progress);
|
||||||
|
break;
|
||||||
|
|
||||||
|
case Protocol::Server::ProfileInfo:
|
||||||
|
/// Use own (client-side) info about read bytes, it is more correct info than server-side one.
|
||||||
|
if (profile_info_callback)
|
||||||
|
profile_info_callback(packet.profile_info);
|
||||||
|
break;
|
||||||
|
|
||||||
|
case Protocol::Server::Totals:
|
||||||
|
totals = packet.block;
|
||||||
|
break;
|
||||||
|
|
||||||
|
case Protocol::Server::Extremes:
|
||||||
|
extremes = packet.block;
|
||||||
|
break;
|
||||||
|
|
||||||
|
case Protocol::Server::Log:
|
||||||
|
/// Pass logs from remote server to client
|
||||||
|
if (auto log_queue = CurrentThread::getInternalTextLogsQueue())
|
||||||
|
log_queue->pushBlock(std::move(packet.block));
|
||||||
|
break;
|
||||||
|
|
||||||
|
default:
|
||||||
|
got_unknown_packet_from_replica = true;
|
||||||
|
throw Exception(ErrorCodes::UNKNOWN_PACKET_FROM_SERVER, "Unknown packet {} from one of the following replicas: {}",
|
||||||
|
toString(packet.type),
|
||||||
|
multiplexed_connections->dumpAddresses());
|
||||||
|
}
|
||||||
|
|
||||||
|
return {};
|
||||||
}
|
}
|
||||||
|
|
||||||
void RemoteQueryExecutor::finish()
|
void RemoteQueryExecutor::finish()
|
||||||
|
@ -3,6 +3,7 @@
|
|||||||
#include <Interpreters/Context.h>
|
#include <Interpreters/Context.h>
|
||||||
#include <Client/ConnectionPool.h>
|
#include <Client/ConnectionPool.h>
|
||||||
#include <Client/MultiplexedConnections.h>
|
#include <Client/MultiplexedConnections.h>
|
||||||
|
#include <Common/FiberStack.h>
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
@ -46,11 +47,31 @@ public:
|
|||||||
|
|
||||||
~RemoteQueryExecutor();
|
~RemoteQueryExecutor();
|
||||||
|
|
||||||
|
struct ReadContext
|
||||||
|
{
|
||||||
|
bool is_read_in_progress = false;
|
||||||
|
|
||||||
|
/// If is_read_in_progress, use this fd to poll
|
||||||
|
int fd;
|
||||||
|
|
||||||
|
/// If not is_read_in_progress, result block is set.
|
||||||
|
Block result;
|
||||||
|
|
||||||
|
/// Internal data
|
||||||
|
|
||||||
|
boost::context::fiber fiber;
|
||||||
|
Packet packet;
|
||||||
|
std::exception_ptr exception;
|
||||||
|
FiberStack<> stack;
|
||||||
|
ReadBufferFromPocoSocket::Fiber fiber_context;
|
||||||
|
};
|
||||||
|
|
||||||
/// Create connection and send query, external tables and scalars.
|
/// Create connection and send query, external tables and scalars.
|
||||||
void sendQuery();
|
void sendQuery();
|
||||||
|
|
||||||
/// Read next block of data. Returns empty block if query is finished.
|
/// Read next block of data. Returns empty block if query is finished.
|
||||||
Block read();
|
Block read();
|
||||||
|
void read(ReadContext & read_context);
|
||||||
|
|
||||||
/// Receive all remain packets and finish query.
|
/// Receive all remain packets and finish query.
|
||||||
/// It should be cancelled after read returned empty block.
|
/// It should be cancelled after read returned empty block.
|
||||||
@ -159,6 +180,9 @@ private:
|
|||||||
|
|
||||||
/// Returns true if exception was thrown
|
/// Returns true if exception was thrown
|
||||||
bool hasThrownException() const;
|
bool hasThrownException() const;
|
||||||
|
|
||||||
|
/// Process packet for read and return data block if possible.
|
||||||
|
std::optional<Block> processPacket(Packet packet);
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -28,10 +28,24 @@ bool ReadBufferFromPocoSocket::nextImpl()
|
|||||||
ssize_t bytes_read = 0;
|
ssize_t bytes_read = 0;
|
||||||
Stopwatch watch;
|
Stopwatch watch;
|
||||||
|
|
||||||
|
int flags = 0;
|
||||||
|
if (fiber)
|
||||||
|
flags |= MSG_DONTWAIT;
|
||||||
|
|
||||||
/// Add more details to exceptions.
|
/// Add more details to exceptions.
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
bytes_read = socket.impl()->receiveBytes(internal_buffer.begin(), internal_buffer.size());
|
bytes_read = socket.impl()->receiveBytes(internal_buffer.begin(), internal_buffer.size(), flags);
|
||||||
|
|
||||||
|
/// If fiber is specified, and read is blocking, run fiber and try again later.
|
||||||
|
/// It is expected that file descriptor may be polled externally.
|
||||||
|
/// Note that receive timeout is not checked here. External code should check it while polling.
|
||||||
|
while (bytes_read < 0 && fiber && (errno == POCO_EAGAIN || errno == POCO_EWOULDBLOCK))
|
||||||
|
{
|
||||||
|
fiber->fd = socket.impl()->sockfd();
|
||||||
|
fiber->fiber = std::move(fiber->fiber).resume();
|
||||||
|
bytes_read = socket.impl()->receiveBytes(internal_buffer.begin(), internal_buffer.size(), flags);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
catch (const Poco::Net::NetException & e)
|
catch (const Poco::Net::NetException & e)
|
||||||
{
|
{
|
||||||
|
@ -5,6 +5,8 @@
|
|||||||
#include <IO/ReadBuffer.h>
|
#include <IO/ReadBuffer.h>
|
||||||
#include <IO/BufferWithOwnMemory.h>
|
#include <IO/BufferWithOwnMemory.h>
|
||||||
|
|
||||||
|
#include <boost/context/fiber.hpp>
|
||||||
|
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
@ -28,6 +30,17 @@ public:
|
|||||||
ReadBufferFromPocoSocket(Poco::Net::Socket & socket_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE);
|
ReadBufferFromPocoSocket(Poco::Net::Socket & socket_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE);
|
||||||
|
|
||||||
bool poll(size_t timeout_microseconds);
|
bool poll(size_t timeout_microseconds);
|
||||||
|
|
||||||
|
struct Fiber
|
||||||
|
{
|
||||||
|
boost::context::fiber fiber;
|
||||||
|
int fd;
|
||||||
|
};
|
||||||
|
|
||||||
|
void setFiber(Fiber * fiber_) { fiber = fiber_; }
|
||||||
|
|
||||||
|
private:
|
||||||
|
Fiber * fiber;
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user