Fixing tests.

This commit is contained in:
Nikolai Kochetov 2020-12-15 21:20:14 +03:00
parent 4905201985
commit 3d6ace5890
4 changed files with 8 additions and 21 deletions

View File

@ -742,8 +742,11 @@ std::optional<UInt64> Connection::checkPacket(size_t timeout_microseconds)
} }
Packet Connection::receivePacket() Packet Connection::receivePacket(Fiber * fiber)
{ {
in->setFiber(fiber);
SCOPE_EXIT(in->setFiber(nullptr));
try try
{ {
Packet res; Packet res;

View File

@ -171,7 +171,7 @@ public:
std::optional<UInt64> checkPacket(size_t timeout_microseconds = 0); std::optional<UInt64> checkPacket(size_t timeout_microseconds = 0);
/// Receive packet from server. /// Receive packet from server.
Packet receivePacket(); Packet receivePacket(Fiber * fiber);
/// If not connected yet, or if connection is broken - then connect. If cannot connect - throw an exception. /// If not connected yet, or if connection is broken - then connect. If cannot connect - throw an exception.
void forceConnected(const ConnectionTimeouts & timeouts); void forceConnected(const ConnectionTimeouts & timeouts);
@ -190,7 +190,6 @@ public:
size_t outBytesCount() const { return out ? out->count() : 0; } size_t outBytesCount() const { return out ? out->count() : 0; }
size_t inBytesCount() const { return in ? in->count() : 0; } size_t inBytesCount() const { return in ? in->count() : 0; }
void setFiber(Fiber * fiber) { in->setFiber(fiber); }
Poco::Net::Socket & getSocket() { return in->getSocket(); } Poco::Net::Socket & getSocket() { return in->getSocket(); }
private: private:

View File

@ -237,7 +237,7 @@ std::string MultiplexedConnections::dumpAddressesUnlocked() const
return buf.str(); return buf.str();
} }
Packet MultiplexedConnections::receivePacketUnlocked() Packet MultiplexedConnections::receivePacketUnlocked(Fiber * fiber)
{ {
if (!sent_query) if (!sent_query)
throw Exception("Cannot receive packets: no query sent.", ErrorCodes::LOGICAL_ERROR); throw Exception("Cannot receive packets: no query sent.", ErrorCodes::LOGICAL_ERROR);
@ -249,19 +249,7 @@ Packet MultiplexedConnections::receivePacketUnlocked()
if (current_connection == nullptr) if (current_connection == nullptr)
throw Exception("Logical error: no available replica", ErrorCodes::NO_AVAILABLE_REPLICA); throw Exception("Logical error: no available replica", ErrorCodes::NO_AVAILABLE_REPLICA);
if (fiber) Packet packet = current_connection->receivePacket(fiber);
current_connection->setFiber(fiber);
Packet packet;
{
SCOPE_EXIT(
{
fiber = nullptr;
current_connection->setFiber(fiber);
});
packet = current_connection->receivePacket();
}
switch (packet.type) switch (packet.type)
{ {

View File

@ -67,12 +67,11 @@ public:
/// Without locking, because sendCancel() does not change the state of the replicas. /// Without locking, because sendCancel() does not change the state of the replicas.
bool hasActiveConnections() const { return active_connection_count > 0; } bool hasActiveConnections() const { return active_connection_count > 0; }
void setFiber(Fiber * fiber_) { fiber = fiber_; }
Poco::Net::Socket & getSocket() { return current_connection->getSocket(); } Poco::Net::Socket & getSocket() { return current_connection->getSocket(); }
private: private:
/// Internal version of `receivePacket` function without locking. /// Internal version of `receivePacket` function without locking.
Packet receivePacketUnlocked(); Packet receivePacketUnlocked(Fiber * fiber = nullptr);
/// Internal version of `dumpAddresses` function without locking. /// Internal version of `dumpAddresses` function without locking.
std::string dumpAddressesUnlocked() const; std::string dumpAddressesUnlocked() const;
@ -109,8 +108,6 @@ private:
/// in separate thread. /// in separate thread.
mutable std::mutex cancel_mutex; mutable std::mutex cancel_mutex;
boost::context::fiber * fiber = nullptr;
friend class RemoteQueryExecutorReadContext; friend class RemoteQueryExecutorReadContext;
}; };