mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-10 09:32:06 +00:00
Refactor 4.0
This commit is contained in:
parent
9bd9ea9fbc
commit
dd5185c779
@ -16,153 +16,24 @@ namespace ErrorCodes
|
|||||||
extern const int ATTEMPT_TO_READ_AFTER_EOF;
|
extern const int ATTEMPT_TO_READ_AFTER_EOF;
|
||||||
extern const int NETWORK_ERROR;
|
extern const int NETWORK_ERROR;
|
||||||
extern const int SOCKET_TIMEOUT;
|
extern const int SOCKET_TIMEOUT;
|
||||||
|
extern const int LOGICAL_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
ConnectionEstablisher::ConnectionEstablisher(
|
ConnectionEstablisher::ConnectionEstablisher(
|
||||||
IConnectionPool * pool_,
|
IConnectionPool * pool_,
|
||||||
const ConnectionTimeouts * timeouts_,
|
const ConnectionTimeouts * timeouts_,
|
||||||
const Settings * settings_,
|
const Settings * settings_,
|
||||||
const QualifiedTableName * table_to_check_)
|
Poco::Logger * log_,
|
||||||
: pool(pool_), timeouts(timeouts_), settings(settings_), table_to_check(table_to_check_),
|
const QualifiedTableName * table_to_check_) : pool(pool_), timeouts(timeouts_), settings(settings_), log(log_), table_to_check(table_to_check_)
|
||||||
stage(Stage::INITIAL), log(&Poco::Logger::get("ConnectionEstablisher"))
|
|
||||||
{
|
{
|
||||||
#if defined(OS_LINUX)
|
|
||||||
epoll.add(receive_timeout.getDescriptor());
|
|
||||||
#endif
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void ConnectionEstablisher::Routine::ReadCallback::operator()(int fd, const Poco::Timespan & timeout, const std::string &)
|
void ConnectionEstablisher::run(ConnectionEstablisher::TryResult & result, std::string & fail_message)
|
||||||
{
|
|
||||||
#if defined(OS_LINUX)
|
|
||||||
if (connection_establisher.socket_fd != fd)
|
|
||||||
{
|
|
||||||
if (connection_establisher.socket_fd != -1)
|
|
||||||
connection_establisher.epoll.remove(connection_establisher.socket_fd);
|
|
||||||
|
|
||||||
connection_establisher.epoll.add(fd);
|
|
||||||
connection_establisher.socket_fd = fd;
|
|
||||||
}
|
|
||||||
|
|
||||||
connection_establisher.receive_timeout.setRelative(timeout);
|
|
||||||
fiber = std::move(fiber).resume();
|
|
||||||
connection_establisher.receive_timeout.reset();
|
|
||||||
#else
|
|
||||||
(void) fd;
|
|
||||||
(void) timeout;
|
|
||||||
#endif
|
|
||||||
}
|
|
||||||
|
|
||||||
Fiber ConnectionEstablisher::Routine::operator()(Fiber && sink)
|
|
||||||
{
|
{
|
||||||
|
is_finished = false;
|
||||||
|
SCOPE_EXIT(is_finished = true);
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
connection_establisher.establishConnection(ReadCallback{connection_establisher, sink});
|
|
||||||
}
|
|
||||||
catch (const boost::context::detail::forced_unwind &)
|
|
||||||
{
|
|
||||||
/// This exception is thrown by fiber implementation in case if fiber is being deleted but hasn't exited
|
|
||||||
/// It should not be caught or it will segfault.
|
|
||||||
/// Other exceptions must be caught
|
|
||||||
throw;
|
|
||||||
}
|
|
||||||
catch (...)
|
|
||||||
{
|
|
||||||
connection_establisher.exception = std::current_exception();
|
|
||||||
}
|
|
||||||
|
|
||||||
return std::move(sink);
|
|
||||||
}
|
|
||||||
|
|
||||||
void ConnectionEstablisher::resume()
|
|
||||||
{
|
|
||||||
if (!fiber_created)
|
|
||||||
{
|
|
||||||
reset();
|
|
||||||
fiber = boost::context::fiber(std::allocator_arg_t(), fiber_stack, Routine{*this});
|
|
||||||
fiber_created = true;
|
|
||||||
resumeFiber();
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
#if defined(OS_LINUX)
|
|
||||||
bool is_socket_ready = false;
|
|
||||||
bool is_receive_timeout_alarmed = false;
|
|
||||||
|
|
||||||
epoll_event events[2];
|
|
||||||
events[0].data.fd = events[1].data.fd = -1;
|
|
||||||
size_t ready_count = epoll.getManyReady(2, events, true);
|
|
||||||
for (size_t i = 0; i != ready_count; ++i)
|
|
||||||
{
|
|
||||||
if (events[i].data.fd == socket_fd)
|
|
||||||
is_socket_ready = true;
|
|
||||||
if (events[i].data.fd == receive_timeout.getDescriptor())
|
|
||||||
is_receive_timeout_alarmed = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (is_receive_timeout_alarmed && !is_socket_ready)
|
|
||||||
processReceiveTimeout();
|
|
||||||
#endif
|
|
||||||
|
|
||||||
resumeFiber();
|
|
||||||
}
|
|
||||||
|
|
||||||
void ConnectionEstablisher::cancel()
|
|
||||||
{
|
|
||||||
destroyFiber();
|
|
||||||
reset();
|
|
||||||
}
|
|
||||||
|
|
||||||
void ConnectionEstablisher::processReceiveTimeout()
|
|
||||||
{
|
|
||||||
#if defined(OS_LINUX)
|
|
||||||
destroyFiber();
|
|
||||||
stage = Stage::FAILED;
|
|
||||||
fail_message = "Code: 209, e.displayText() = DB::NetException: Timeout exceeded while reading from socket (" + result.entry->getDescription() + ")";
|
|
||||||
epoll.remove(socket_fd);
|
|
||||||
resetResult();
|
|
||||||
#endif
|
|
||||||
}
|
|
||||||
|
|
||||||
void ConnectionEstablisher::resetResult()
|
|
||||||
{
|
|
||||||
if (!result.entry.isNull())
|
|
||||||
{
|
|
||||||
result.entry->disconnect();
|
|
||||||
result.reset();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void ConnectionEstablisher::reset()
|
|
||||||
{
|
|
||||||
stage = Stage::INITIAL;
|
|
||||||
resetResult();
|
|
||||||
fail_message.clear();
|
|
||||||
socket_fd = -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
void ConnectionEstablisher::resumeFiber()
|
|
||||||
{
|
|
||||||
fiber = std::move(fiber).resume();
|
|
||||||
|
|
||||||
if (exception)
|
|
||||||
std::rethrow_exception(std::move(exception));
|
|
||||||
|
|
||||||
if (stage == Stage::FAILED)
|
|
||||||
destroyFiber();
|
|
||||||
}
|
|
||||||
|
|
||||||
void ConnectionEstablisher::destroyFiber()
|
|
||||||
{
|
|
||||||
Fiber to_destroy = std::move(fiber);
|
|
||||||
fiber_created = false;
|
|
||||||
}
|
|
||||||
|
|
||||||
void ConnectionEstablisher::establishConnection(AsyncCallback async_callback)
|
|
||||||
{
|
|
||||||
try
|
|
||||||
{
|
|
||||||
stage = Stage::IN_PROCESS;
|
|
||||||
result.entry = pool->get(*timeouts, settings, /* force_connected = */ false);
|
result.entry = pool->get(*timeouts, settings, /* force_connected = */ false);
|
||||||
AsyncCallbackSetter async_setter(&*result.entry, std::move(async_callback));
|
AsyncCallbackSetter async_setter(&*result.entry, std::move(async_callback));
|
||||||
|
|
||||||
@ -175,7 +46,6 @@ void ConnectionEstablisher::establishConnection(AsyncCallback async_callback)
|
|||||||
result.entry->forceConnected(*timeouts);
|
result.entry->forceConnected(*timeouts);
|
||||||
result.is_usable = true;
|
result.is_usable = true;
|
||||||
result.is_up_to_date = true;
|
result.is_up_to_date = true;
|
||||||
stage = Stage::FINISHED;
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -192,8 +62,6 @@ void ConnectionEstablisher::establishConnection(AsyncCallback async_callback)
|
|||||||
fail_message = fmt::format(message_pattern, backQuote(table_to_check->database), backQuote(table_to_check->table), result.entry->getDescription());
|
fail_message = fmt::format(message_pattern, backQuote(table_to_check->database), backQuote(table_to_check->table), result.entry->getDescription());
|
||||||
LOG_WARNING(log, fail_message);
|
LOG_WARNING(log, fail_message);
|
||||||
ProfileEvents::increment(ProfileEvents::DistributedConnectionMissingTable);
|
ProfileEvents::increment(ProfileEvents::DistributedConnectionMissingTable);
|
||||||
|
|
||||||
stage = Stage::FINISHED;
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -203,7 +71,6 @@ void ConnectionEstablisher::establishConnection(AsyncCallback async_callback)
|
|||||||
if (!max_allowed_delay)
|
if (!max_allowed_delay)
|
||||||
{
|
{
|
||||||
result.is_up_to_date = true;
|
result.is_up_to_date = true;
|
||||||
stage = Stage::FINISHED;
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -219,7 +86,6 @@ void ConnectionEstablisher::establishConnection(AsyncCallback async_callback)
|
|||||||
LOG_TRACE(log, "Server {} has unacceptable replica delay for table {}.{}: {}", result.entry->getDescription(), table_to_check->database, table_to_check->table, delay);
|
LOG_TRACE(log, "Server {} has unacceptable replica delay for table {}.{}: {}", result.entry->getDescription(), table_to_check->database, table_to_check->table, delay);
|
||||||
ProfileEvents::increment(ProfileEvents::DistributedConnectionStaleReplica);
|
ProfileEvents::increment(ProfileEvents::DistributedConnectionStaleReplica);
|
||||||
}
|
}
|
||||||
stage = Stage::FINISHED;
|
|
||||||
}
|
}
|
||||||
catch (const Exception & e)
|
catch (const Exception & e)
|
||||||
{
|
{
|
||||||
@ -228,9 +94,144 @@ void ConnectionEstablisher::establishConnection(AsyncCallback async_callback)
|
|||||||
throw;
|
throw;
|
||||||
|
|
||||||
fail_message = getCurrentExceptionMessage(/* with_stacktrace = */ false);
|
fail_message = getCurrentExceptionMessage(/* with_stacktrace = */ false);
|
||||||
resetResult();
|
|
||||||
stage = Stage::FAILED;
|
if (!result.entry.isNull())
|
||||||
|
{
|
||||||
|
result.entry->disconnect();
|
||||||
|
result.reset();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ConnectionEstablisherAsync::ConnectionEstablisherAsync(
|
||||||
|
IConnectionPool * pool_,
|
||||||
|
const ConnectionTimeouts * timeouts_,
|
||||||
|
const Settings * settings_,
|
||||||
|
Poco::Logger * log_,
|
||||||
|
const QualifiedTableName * table_to_check_)
|
||||||
|
: connection_establisher(pool_, timeouts_, settings_, log_, table_to_check_)
|
||||||
|
{
|
||||||
|
epoll.add(receive_timeout.getDescriptor());
|
||||||
|
}
|
||||||
|
|
||||||
|
void ConnectionEstablisherAsync::Routine::ReadCallback::operator()(int fd, const Poco::Timespan & timeout, const std::string &)
|
||||||
|
{
|
||||||
|
if (connection_establisher_async.socket_fd != fd)
|
||||||
|
{
|
||||||
|
if (connection_establisher_async.socket_fd != -1)
|
||||||
|
connection_establisher_async.epoll.remove(connection_establisher_async.socket_fd);
|
||||||
|
|
||||||
|
connection_establisher_async.epoll.add(fd);
|
||||||
|
connection_establisher_async.socket_fd = fd;
|
||||||
|
}
|
||||||
|
|
||||||
|
connection_establisher_async.receive_timeout.setRelative(timeout);
|
||||||
|
fiber = std::move(fiber).resume();
|
||||||
|
connection_establisher_async.receive_timeout.reset();
|
||||||
|
}
|
||||||
|
|
||||||
|
Fiber ConnectionEstablisherAsync::Routine::operator()(Fiber && sink)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
connection_establisher_async.connection_establisher.setAsyncCallback(ReadCallback{connection_establisher_async, sink});
|
||||||
|
connection_establisher_async.connection_establisher.run(connection_establisher_async.result, connection_establisher_async.fail_message);
|
||||||
|
}
|
||||||
|
catch (const boost::context::detail::forced_unwind &)
|
||||||
|
{
|
||||||
|
/// This exception is thrown by fiber implementation in case if fiber is being deleted but hasn't exited
|
||||||
|
/// It should not be caught or it will segfault.
|
||||||
|
/// Other exceptions must be caught
|
||||||
|
throw;
|
||||||
|
}
|
||||||
|
catch (...)
|
||||||
|
{
|
||||||
|
connection_establisher_async.exception = std::current_exception();
|
||||||
|
}
|
||||||
|
|
||||||
|
return std::move(sink);
|
||||||
|
}
|
||||||
|
|
||||||
|
std::variant<int, ConnectionEstablisher::TryResult> ConnectionEstablisherAsync::resume()
|
||||||
|
{
|
||||||
|
if (!fiber_created)
|
||||||
|
{
|
||||||
|
reset();
|
||||||
|
fiber = boost::context::fiber(std::allocator_arg_t(), fiber_stack, Routine{*this});
|
||||||
|
fiber_created = true;
|
||||||
|
} else if (!checkReceiveTimeout())
|
||||||
|
return result;
|
||||||
|
|
||||||
|
fiber = std::move(fiber).resume();
|
||||||
|
|
||||||
|
if (exception)
|
||||||
|
std::rethrow_exception(std::move(exception));
|
||||||
|
|
||||||
|
if (connection_establisher.isFinished())
|
||||||
|
{
|
||||||
|
destroyFiber();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
return epoll.getFileDescriptor();
|
||||||
|
}
|
||||||
|
|
||||||
|
bool ConnectionEstablisherAsync::checkReceiveTimeout()
|
||||||
|
{
|
||||||
|
bool is_socket_ready = false;
|
||||||
|
bool is_receive_timeout_alarmed = false;
|
||||||
|
|
||||||
|
epoll_event events[2];
|
||||||
|
events[0].data.fd = events[1].data.fd = -1;
|
||||||
|
size_t ready_count = epoll.getManyReady(2, events, false);
|
||||||
|
for (size_t i = 0; i != ready_count; ++i)
|
||||||
|
{
|
||||||
|
if (events[i].data.fd == socket_fd)
|
||||||
|
is_socket_ready = true;
|
||||||
|
if (events[i].data.fd == receive_timeout.getDescriptor())
|
||||||
|
is_receive_timeout_alarmed = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (is_receive_timeout_alarmed && !is_socket_ready)
|
||||||
|
{
|
||||||
|
destroyFiber();
|
||||||
|
/// In not async case this exception would be thrown and caught in ConnectionEstablisher::run,
|
||||||
|
/// but in async case we process timeout outside and cannot throw exception. So, we just save fail message.
|
||||||
|
fail_message = "Timeout exceeded while reading from socket (" + result.entry->getDescription() + ")";
|
||||||
|
epoll.remove(socket_fd);
|
||||||
|
resetResult();
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
void ConnectionEstablisherAsync::cancel()
|
||||||
|
{
|
||||||
|
destroyFiber();
|
||||||
|
reset();
|
||||||
|
}
|
||||||
|
|
||||||
|
void ConnectionEstablisherAsync::reset()
|
||||||
|
{
|
||||||
|
resetResult();
|
||||||
|
fail_message.clear();
|
||||||
|
socket_fd = -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
void ConnectionEstablisherAsync::resetResult()
|
||||||
|
{
|
||||||
|
if (!result.entry.isNull())
|
||||||
|
{
|
||||||
|
result.entry->disconnect();
|
||||||
|
result.reset();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void ConnectionEstablisherAsync::destroyFiber()
|
||||||
|
{
|
||||||
|
Fiber to_destroy = std::move(fiber);
|
||||||
|
fiber_created = false;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -1,5 +1,7 @@
|
|||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
|
#include <variant>
|
||||||
|
|
||||||
#include <Common/Epoll.h>
|
#include <Common/Epoll.h>
|
||||||
#include <Common/Fiber.h>
|
#include <Common/Fiber.h>
|
||||||
#include <Common/FiberStack.h>
|
#include <Common/FiberStack.h>
|
||||||
@ -10,12 +12,8 @@
|
|||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
|
|
||||||
/// Class for nonblocking establishing connection to the replica.
|
/// Class for establishing connection to the replica. It supports setting up
|
||||||
/// It runs establishing connection process in fiber and sets special
|
/// an async callback that will be called when reading from socket blocks.
|
||||||
/// read callback which is called when reading from socket blocks.
|
|
||||||
/// When read callback is called, socket and receive timeout are added in epoll
|
|
||||||
/// and execution returns to the main program.
|
|
||||||
/// So, you can poll this epoll file descriptor to determine when to resume.
|
|
||||||
class ConnectionEstablisher
|
class ConnectionEstablisher
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
@ -24,60 +22,74 @@ public:
|
|||||||
ConnectionEstablisher(IConnectionPool * pool_,
|
ConnectionEstablisher(IConnectionPool * pool_,
|
||||||
const ConnectionTimeouts * timeouts_,
|
const ConnectionTimeouts * timeouts_,
|
||||||
const Settings * settings_,
|
const Settings * settings_,
|
||||||
|
Poco::Logger * log,
|
||||||
const QualifiedTableName * table_to_check = nullptr);
|
const QualifiedTableName * table_to_check = nullptr);
|
||||||
|
|
||||||
/// Establish connection with replica, call async_callbeck when
|
/// Establish connection and save it in result, write possible exception message in fail_message.
|
||||||
/// reading from socket blocks.
|
void run(TryResult & result, std::string & fail_message);
|
||||||
void establishConnection(AsyncCallback async_callback = {});
|
|
||||||
|
|
||||||
/// In the first call create fiber with establishConnection function,
|
/// Set async callback that will be called when reading from socket blocks.
|
||||||
/// in the next - check timeout and resume fiber.
|
void setAsyncCallback(AsyncCallback async_callback_) { async_callback = std::move(async_callback_); }
|
||||||
void resume();
|
|
||||||
|
bool isFinished() const { return is_finished; }
|
||||||
|
|
||||||
|
private:
|
||||||
|
IConnectionPool * pool;
|
||||||
|
const ConnectionTimeouts * timeouts;
|
||||||
|
const Settings * settings;
|
||||||
|
Poco::Logger * log;
|
||||||
|
const QualifiedTableName * table_to_check;
|
||||||
|
|
||||||
|
bool is_finished;
|
||||||
|
AsyncCallback async_callback = {};
|
||||||
|
|
||||||
|
};
|
||||||
|
|
||||||
|
#if defined(OS_LINUX)
|
||||||
|
|
||||||
|
/// Class for nonblocking establishing connection to the replica.
|
||||||
|
/// It runs establishing connection process in fiber and sets special
|
||||||
|
/// read callback which is called when reading from socket blocks.
|
||||||
|
/// When read callback is called, socket and receive timeout are added in epoll
|
||||||
|
/// and execution returns to the main program.
|
||||||
|
/// So, you can poll this epoll file descriptor to determine when to resume.
|
||||||
|
class ConnectionEstablisherAsync
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
using TryResult = PoolWithFailoverBase<IConnectionPool>::TryResult;
|
||||||
|
|
||||||
|
ConnectionEstablisherAsync(IConnectionPool * pool_,
|
||||||
|
const ConnectionTimeouts * timeouts_,
|
||||||
|
const Settings * settings_,
|
||||||
|
Poco::Logger * log_,
|
||||||
|
const QualifiedTableName * table_to_check = nullptr);
|
||||||
|
|
||||||
|
/// Resume establishing connection. If the process was not finished,
|
||||||
|
/// return file descriptor (you can add it in epoll and poll it,
|
||||||
|
/// when this fd become ready, call resume again),
|
||||||
|
/// if the process was failed or finished, return it's result,
|
||||||
|
std::variant<int, TryResult> resume();
|
||||||
|
|
||||||
/// Cancel establishing connections. Fiber will be destroyed,
|
/// Cancel establishing connections. Fiber will be destroyed,
|
||||||
/// class will be set in initial stage.
|
/// class will be set in initial stage.
|
||||||
void cancel();
|
void cancel();
|
||||||
|
|
||||||
bool isInProcess() const { return stage == Stage::IN_PROCESS; }
|
TryResult getResult() const { return result; }
|
||||||
|
|
||||||
bool isFinished() const { return stage == Stage::FINISHED; }
|
|
||||||
|
|
||||||
bool isFailed() const { return stage == Stage::FAILED; }
|
|
||||||
|
|
||||||
int getFileDescriptor() const
|
|
||||||
{
|
|
||||||
int fd = -1;
|
|
||||||
#if defined(OS_LINUX)
|
|
||||||
fd = epoll.getFileDescriptor();
|
|
||||||
#endif
|
|
||||||
return fd;
|
|
||||||
}
|
|
||||||
|
|
||||||
const std::string & getFailMessage() const { return fail_message; }
|
const std::string & getFailMessage() const { return fail_message; }
|
||||||
|
|
||||||
TryResult getResult() { return result; }
|
|
||||||
|
|
||||||
Connection * getConnection() { return &*result.entry; }
|
|
||||||
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
void processReceiveTimeout();
|
/// When epoll file descriptor is ready, check if it's an expired timeout.
|
||||||
|
/// Return false if receive timeout expired and socket is not ready, return true otherwise.
|
||||||
enum class Stage
|
bool checkReceiveTimeout();
|
||||||
{
|
|
||||||
INITIAL,
|
|
||||||
IN_PROCESS,
|
|
||||||
FINISHED,
|
|
||||||
FAILED,
|
|
||||||
};
|
|
||||||
|
|
||||||
struct Routine
|
struct Routine
|
||||||
{
|
{
|
||||||
ConnectionEstablisher & connection_establisher;
|
ConnectionEstablisherAsync & connection_establisher_async;
|
||||||
|
|
||||||
struct ReadCallback
|
struct ReadCallback
|
||||||
{
|
{
|
||||||
ConnectionEstablisher & connection_establisher;
|
ConnectionEstablisherAsync & connection_establisher_async;
|
||||||
Fiber & fiber;
|
Fiber & fiber;
|
||||||
|
|
||||||
void operator()(int fd, const Poco::Timespan & timeout, const std::string &);
|
void operator()(int fd, const Poco::Timespan & timeout, const std::string &);
|
||||||
@ -86,31 +98,34 @@ private:
|
|||||||
Fiber operator()(Fiber && sink);
|
Fiber operator()(Fiber && sink);
|
||||||
};
|
};
|
||||||
|
|
||||||
void resetResult();
|
|
||||||
|
|
||||||
void reset();
|
void reset();
|
||||||
|
|
||||||
|
void resetResult();
|
||||||
|
|
||||||
void destroyFiber();
|
void destroyFiber();
|
||||||
|
|
||||||
void resumeFiber();
|
ConnectionEstablisher connection_establisher;
|
||||||
|
|
||||||
IConnectionPool * pool;
|
|
||||||
const ConnectionTimeouts * timeouts;
|
|
||||||
std::string fail_message;
|
|
||||||
const Settings * settings;
|
|
||||||
const QualifiedTableName * table_to_check;
|
|
||||||
TryResult result;
|
TryResult result;
|
||||||
Stage stage;
|
std::string fail_message;
|
||||||
Poco::Logger * log;
|
|
||||||
Fiber fiber;
|
Fiber fiber;
|
||||||
FiberStack fiber_stack;
|
FiberStack fiber_stack;
|
||||||
std::exception_ptr exception;
|
|
||||||
int socket_fd = -1;
|
/// We use timer descriptor for checking socket receive timeout.
|
||||||
bool fiber_created = false;
|
|
||||||
#if defined(OS_LINUX)
|
|
||||||
TimerDescriptor receive_timeout;
|
TimerDescriptor receive_timeout;
|
||||||
|
|
||||||
|
/// In read callback we add socket file descriptor and timer descriptor with receive timeout
|
||||||
|
/// in epoll, so we can return epoll file descriptor outside for polling.
|
||||||
Epoll epoll;
|
Epoll epoll;
|
||||||
#endif
|
int socket_fd = -1;
|
||||||
|
std::string socket_description;
|
||||||
|
|
||||||
|
/// If and exception occurred in fiber resume, we save it and rethrow.
|
||||||
|
std::exception_ptr exception;
|
||||||
|
|
||||||
|
bool fiber_created = false;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
#endif
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -247,10 +247,10 @@ ConnectionPoolWithFailover::tryGetEntry(
|
|||||||
const Settings * settings,
|
const Settings * settings,
|
||||||
const QualifiedTableName * table_to_check)
|
const QualifiedTableName * table_to_check)
|
||||||
{
|
{
|
||||||
ConnectionEstablisher connection_establisher(&pool, &timeouts, settings, table_to_check);
|
ConnectionEstablisher connection_establisher(&pool, &timeouts, settings, log, table_to_check);
|
||||||
connection_establisher.establishConnection();
|
TryResult result;
|
||||||
fail_message = connection_establisher.getFailMessage();
|
connection_establisher.run(result, fail_message);
|
||||||
return connection_establisher.getResult();
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
std::vector<ConnectionPoolWithFailover::Base::ShuffledPool> ConnectionPoolWithFailover::getShuffledPools(const Settings * settings)
|
std::vector<ConnectionPoolWithFailover::Base::ShuffledPool> ConnectionPoolWithFailover::getShuffledPools(const Settings * settings)
|
||||||
|
@ -99,7 +99,7 @@ private:
|
|||||||
/// Try to get a connection from the pool and check that it is good.
|
/// Try to get a connection from the pool and check that it is good.
|
||||||
/// If table_to_check is not null and the check is enabled in settings, check that replication delay
|
/// If table_to_check is not null and the check is enabled in settings, check that replication delay
|
||||||
/// for this table is not too large.
|
/// for this table is not too large.
|
||||||
static TryResult tryGetEntry(
|
TryResult tryGetEntry(
|
||||||
IConnectionPool & pool,
|
IConnectionPool & pool,
|
||||||
const ConnectionTimeouts & timeouts,
|
const ConnectionTimeouts & timeouts,
|
||||||
std::string & fail_message,
|
std::string & fail_message,
|
||||||
|
@ -39,8 +39,8 @@ HedgedConnections::HedgedConnections(
|
|||||||
ReplicaState & replica = offset_states[i].replicas.back();
|
ReplicaState & replica = offset_states[i].replicas.back();
|
||||||
replica.connection->setThrottler(throttler_);
|
replica.connection->setThrottler(throttler_);
|
||||||
|
|
||||||
epoll.add(replica.packet_receiver.getFileDescriptor());
|
epoll.add(replica.packet_receiver->getFileDescriptor());
|
||||||
fd_to_replica_location[replica.packet_receiver.getFileDescriptor()] = ReplicaLocation{i, 0};
|
fd_to_replica_location[replica.packet_receiver->getFileDescriptor()] = ReplicaLocation{i, 0};
|
||||||
|
|
||||||
epoll.add(replica.change_replica_timeout.getDescriptor());
|
epoll.add(replica.change_replica_timeout.getDescriptor());
|
||||||
timeout_fd_to_replica_location[replica.change_replica_timeout.getDescriptor()] = ReplicaLocation{i, 0};
|
timeout_fd_to_replica_location[replica.change_replica_timeout.getDescriptor()] = ReplicaLocation{i, 0};
|
||||||
@ -143,6 +143,16 @@ void HedgedConnections::sendQuery(
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!disable_two_level_aggregation)
|
||||||
|
{
|
||||||
|
/// Tell hedged_connections_factory to skip replicas that doesn't support two-level aggregation.
|
||||||
|
hedged_connections_factory.setSkipPredicate(
|
||||||
|
[timeouts](Connection * connection)
|
||||||
|
{
|
||||||
|
return connection->getServerRevision(timeouts) < DBMS_MIN_REVISION_WITH_CURRENT_AGGREGATION_VARIANT_SELECTION_METHOD;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
auto send_query = [this, timeouts, query, query_id, stage, client_info, with_pending_data](ReplicaState & replica)
|
auto send_query = [this, timeouts, query, query_id, stage, client_info, with_pending_data](ReplicaState & replica)
|
||||||
{
|
{
|
||||||
Settings modified_settings = settings;
|
Settings modified_settings = settings;
|
||||||
@ -157,7 +167,7 @@ void HedgedConnections::sendQuery(
|
|||||||
if (offset_states.size() > 1)
|
if (offset_states.size() > 1)
|
||||||
{
|
{
|
||||||
modified_settings.parallel_replicas_count = offset_states.size();
|
modified_settings.parallel_replicas_count = offset_states.size();
|
||||||
modified_settings.parallel_replica_offset = fd_to_replica_location[replica.packet_receiver.getFileDescriptor()].offset;
|
modified_settings.parallel_replica_offset = fd_to_replica_location[replica.packet_receiver->getFileDescriptor()].offset;
|
||||||
}
|
}
|
||||||
|
|
||||||
replica.connection->sendQuery(timeouts, query, query_id, stage, &modified_settings, &client_info, with_pending_data);
|
replica.connection->sendQuery(timeouts, query, query_id, stage, &modified_settings, &client_info, with_pending_data);
|
||||||
@ -183,11 +193,8 @@ void HedgedConnections::disconnect()
|
|||||||
|
|
||||||
if (hedged_connections_factory.hasEventsInProcess())
|
if (hedged_connections_factory.hasEventsInProcess())
|
||||||
{
|
{
|
||||||
if (next_replica_in_process)
|
if (hedged_connections_factory.numberOfProcessingReplicas() > 0)
|
||||||
{
|
|
||||||
epoll.remove(hedged_connections_factory.getFileDescriptor());
|
epoll.remove(hedged_connections_factory.getFileDescriptor());
|
||||||
next_replica_in_process = false;
|
|
||||||
}
|
|
||||||
|
|
||||||
hedged_connections_factory.stopChoosingReplicas();
|
hedged_connections_factory.stopChoosingReplicas();
|
||||||
}
|
}
|
||||||
@ -291,33 +298,23 @@ HedgedConnections::ReplicaLocation HedgedConnections::getReadyReplicaLocation(As
|
|||||||
int event_fd;
|
int event_fd;
|
||||||
while (true)
|
while (true)
|
||||||
{
|
{
|
||||||
/// Check connections for pending data in buffer.
|
|
||||||
ReplicaLocation location;
|
|
||||||
if (checkPendingData(location))
|
|
||||||
{
|
|
||||||
ReplicaState & replica_state = offset_states[location.offset].replicas[location.index];
|
|
||||||
|
|
||||||
replica_state.packet_receiver.resume();
|
|
||||||
if (replica_state.packet_receiver.isPacketReady())
|
|
||||||
return location;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Get ready file descriptor from epoll and process it.
|
/// Get ready file descriptor from epoll and process it.
|
||||||
event_fd = getReadyFileDescriptor(async_callback);
|
event_fd = getReadyFileDescriptor(async_callback);
|
||||||
|
|
||||||
if (event_fd == hedged_connections_factory.getFileDescriptor())
|
if (event_fd == hedged_connections_factory.getFileDescriptor())
|
||||||
tryGetNewReplica(false);
|
checkNewReplica();
|
||||||
else if (fd_to_replica_location.contains(event_fd))
|
else if (fd_to_replica_location.contains(event_fd))
|
||||||
{
|
{
|
||||||
location = fd_to_replica_location[event_fd];
|
ReplicaLocation location = fd_to_replica_location[event_fd];
|
||||||
ReplicaState & replica_state = offset_states[location.offset].replicas[location.index];
|
ReplicaState & replica_state = offset_states[location.offset].replicas[location.index];
|
||||||
replica_state.packet_receiver.resume();
|
auto res = replica_state.packet_receiver->resume();
|
||||||
|
|
||||||
if (replica_state.packet_receiver.isPacketReady())
|
if (std::holds_alternative<Packet>(res))
|
||||||
|
{
|
||||||
|
last_received_packet = std::move(std::get<Packet>(res));
|
||||||
return location;
|
return location;
|
||||||
|
}
|
||||||
if (replica_state.packet_receiver.isReceiveTimeoutExpired())
|
else if (std::holds_alternative<Poco::Timespan>(res))
|
||||||
{
|
{
|
||||||
finishProcessReplica(replica_state, true);
|
finishProcessReplica(replica_state, true);
|
||||||
|
|
||||||
@ -328,11 +325,11 @@ HedgedConnections::ReplicaLocation HedgedConnections::getReadyReplicaLocation(As
|
|||||||
}
|
}
|
||||||
else if (timeout_fd_to_replica_location.contains(event_fd))
|
else if (timeout_fd_to_replica_location.contains(event_fd))
|
||||||
{
|
{
|
||||||
location = timeout_fd_to_replica_location[event_fd];
|
ReplicaLocation location = timeout_fd_to_replica_location[event_fd];
|
||||||
offset_states[location.offset].replicas[location.index].change_replica_timeout.reset();
|
offset_states[location.offset].replicas[location.index].change_replica_timeout.reset();
|
||||||
offset_states[location.offset].next_replica_in_process = true;
|
offset_states[location.offset].next_replica_in_process = true;
|
||||||
offsets_queue.push(location.offset);
|
offsets_queue.push(location.offset);
|
||||||
tryGetNewReplica(true);
|
startNewReplica();
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
throw Exception("Unknown event from epoll", ErrorCodes::LOGICAL_ERROR);
|
throw Exception("Unknown event from epoll", ErrorCodes::LOGICAL_ERROR);
|
||||||
@ -343,28 +340,20 @@ int HedgedConnections::getReadyFileDescriptor(AsyncCallback async_callback)
|
|||||||
{
|
{
|
||||||
epoll_event event;
|
epoll_event event;
|
||||||
event.data.fd = -1;
|
event.data.fd = -1;
|
||||||
epoll.getManyReady(1, &event, true, std::move(async_callback));
|
size_t events_count = 0;
|
||||||
|
while (events_count == 0)
|
||||||
|
{
|
||||||
|
events_count = epoll.getManyReady(1, &event, false);
|
||||||
|
if (!events_count && async_callback)
|
||||||
|
async_callback(epoll.getFileDescriptor(), 0, epoll.getDescription());
|
||||||
|
}
|
||||||
return event.data.fd;
|
return event.data.fd;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool HedgedConnections::checkPendingData(ReplicaLocation & location_out)
|
|
||||||
{
|
|
||||||
for (auto & [fd, location] : fd_to_replica_location)
|
|
||||||
{
|
|
||||||
if (offset_states[location.offset].replicas[location.index].connection->hasReadPendingData())
|
|
||||||
{
|
|
||||||
location_out = location;
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
Packet HedgedConnections::receivePacketFromReplica(const ReplicaLocation & replica_location)
|
Packet HedgedConnections::receivePacketFromReplica(const ReplicaLocation & replica_location)
|
||||||
{
|
{
|
||||||
ReplicaState & replica = offset_states[replica_location.offset].replicas[replica_location.index];
|
ReplicaState & replica = offset_states[replica_location.offset].replicas[replica_location.index];
|
||||||
Packet packet = replica.packet_receiver.getPacket();
|
Packet packet = std::move(last_received_packet);
|
||||||
switch (packet.type)
|
switch (packet.type)
|
||||||
{
|
{
|
||||||
case Protocol::Server::Data:
|
case Protocol::Server::Data:
|
||||||
@ -413,27 +402,41 @@ void HedgedConnections::processReceivedFirstDataPacket(const ReplicaLocation & r
|
|||||||
/// If we received data from replicas with all offsets, we need to stop choosing new replicas.
|
/// If we received data from replicas with all offsets, we need to stop choosing new replicas.
|
||||||
if (hedged_connections_factory.hasEventsInProcess() && offsets_with_received_first_data_packet == offset_states.size())
|
if (hedged_connections_factory.hasEventsInProcess() && offsets_with_received_first_data_packet == offset_states.size())
|
||||||
{
|
{
|
||||||
if (next_replica_in_process)
|
if (hedged_connections_factory.numberOfProcessingReplicas() > 0)
|
||||||
{
|
|
||||||
epoll.remove(hedged_connections_factory.getFileDescriptor());
|
epoll.remove(hedged_connections_factory.getFileDescriptor());
|
||||||
next_replica_in_process = false;
|
|
||||||
}
|
|
||||||
hedged_connections_factory.stopChoosingReplicas();
|
hedged_connections_factory.stopChoosingReplicas();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void HedgedConnections::tryGetNewReplica(bool start_new_connection)
|
void HedgedConnections::startNewReplica()
|
||||||
{
|
{
|
||||||
Connection * connection = nullptr;
|
Connection * connection = nullptr;
|
||||||
HedgedConnectionsFactory::State state = hedged_connections_factory.getNextConnection(start_new_connection, false, connection);
|
HedgedConnectionsFactory::State state = hedged_connections_factory.startNewConnection(connection);
|
||||||
|
|
||||||
/// Skip replicas that doesn't support two-level aggregation if we didn't disable it in sendQuery.
|
/// Check if we need to add hedged_connections_factory file descriptor to epoll.
|
||||||
while (state == HedgedConnectionsFactory::State::READY && !disable_two_level_aggregation
|
if (state == HedgedConnectionsFactory::State::NOT_READY && hedged_connections_factory.numberOfProcessingReplicas() == 1)
|
||||||
&& connection->getServerRevision(hedged_connections_factory.getConnectionTimeouts())
|
epoll.add(hedged_connections_factory.getFileDescriptor());
|
||||||
< DBMS_MIN_REVISION_WITH_CURRENT_AGGREGATION_VARIANT_SELECTION_METHOD)
|
|
||||||
state = hedged_connections_factory.getNextConnection(true, false, connection);
|
|
||||||
|
|
||||||
if (state == HedgedConnectionsFactory::State::READY)
|
processNewReplicaState(state, connection);
|
||||||
|
}
|
||||||
|
|
||||||
|
void HedgedConnections::checkNewReplica()
|
||||||
|
{
|
||||||
|
Connection * connection = nullptr;
|
||||||
|
HedgedConnectionsFactory::State state = hedged_connections_factory.waitForReadyConnections(/*blocking = */false, connection);
|
||||||
|
|
||||||
|
processNewReplicaState(state, connection);
|
||||||
|
|
||||||
|
/// Check if we don't need to listen hedged_connections_factory file descriptor in epoll anymore.
|
||||||
|
if (hedged_connections_factory.numberOfProcessingReplicas() == 0)
|
||||||
|
epoll.remove(hedged_connections_factory.getFileDescriptor());
|
||||||
|
}
|
||||||
|
|
||||||
|
void HedgedConnections::processNewReplicaState(HedgedConnectionsFactory::State state, Connection * connection)
|
||||||
|
{
|
||||||
|
switch (state)
|
||||||
|
{
|
||||||
|
case HedgedConnectionsFactory::State::READY:
|
||||||
{
|
{
|
||||||
size_t offset = offsets_queue.front();
|
size_t offset = offsets_queue.front();
|
||||||
offsets_queue.pop();
|
offsets_queue.pop();
|
||||||
@ -444,47 +447,39 @@ void HedgedConnections::tryGetNewReplica(bool start_new_connection)
|
|||||||
++active_connection_count;
|
++active_connection_count;
|
||||||
|
|
||||||
ReplicaState & replica = offset_states[offset].replicas.back();
|
ReplicaState & replica = offset_states[offset].replicas.back();
|
||||||
epoll.add(replica.packet_receiver.getFileDescriptor());
|
epoll.add(replica.packet_receiver->getFileDescriptor());
|
||||||
fd_to_replica_location[replica.packet_receiver.getFileDescriptor()] = ReplicaLocation{offset, offset_states[offset].replicas.size() - 1};
|
fd_to_replica_location[replica.packet_receiver->getFileDescriptor()] = ReplicaLocation{offset, offset_states[offset].replicas.size() - 1};
|
||||||
epoll.add(replica.change_replica_timeout.getDescriptor());
|
epoll.add(replica.change_replica_timeout.getDescriptor());
|
||||||
timeout_fd_to_replica_location[replica.change_replica_timeout.getDescriptor()] = ReplicaLocation{offset, offset_states[offset].replicas.size() - 1};
|
timeout_fd_to_replica_location[replica.change_replica_timeout.getDescriptor()] = ReplicaLocation{offset, offset_states[offset].replicas.size() - 1};
|
||||||
|
|
||||||
pipeline_for_new_replicas.run(replica);
|
pipeline_for_new_replicas.run(replica);
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
else if (state == HedgedConnectionsFactory::State::NOT_READY && !next_replica_in_process)
|
case HedgedConnectionsFactory::State::CANNOT_CHOOSE:
|
||||||
{
|
|
||||||
epoll.add(hedged_connections_factory.getFileDescriptor());
|
|
||||||
next_replica_in_process = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Check if we cannot get new replica and there is no active replica with needed offsets.
|
|
||||||
else if (state == HedgedConnectionsFactory::State::CANNOT_CHOOSE)
|
|
||||||
{
|
{
|
||||||
while (!offsets_queue.empty())
|
while (!offsets_queue.empty())
|
||||||
{
|
{
|
||||||
|
/// Check if there is no active replica with needed offsets.
|
||||||
if (offset_states[offsets_queue.front()].active_connection_count == 0)
|
if (offset_states[offsets_queue.front()].active_connection_count == 0)
|
||||||
throw Exception("Cannot find enough connections to replicas", ErrorCodes::ALL_CONNECTION_TRIES_FAILED);
|
throw Exception("Cannot find enough connections to replicas", ErrorCodes::ALL_CONNECTION_TRIES_FAILED);
|
||||||
offset_states[offsets_queue.front()].next_replica_in_process = false;
|
offset_states[offsets_queue.front()].next_replica_in_process = false;
|
||||||
offsets_queue.pop();
|
offsets_queue.pop();
|
||||||
}
|
}
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
|
case HedgedConnectionsFactory::State::NOT_READY:
|
||||||
/// Check if we don't need to listen hedged_connections_factory file descriptor in epoll anymore.
|
break;
|
||||||
if (next_replica_in_process && (state == HedgedConnectionsFactory::State::CANNOT_CHOOSE || offsets_queue.empty()))
|
|
||||||
{
|
|
||||||
epoll.remove(hedged_connections_factory.getFileDescriptor());
|
|
||||||
next_replica_in_process = false;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void HedgedConnections::finishProcessReplica(ReplicaState & replica, bool disconnect)
|
void HedgedConnections::finishProcessReplica(ReplicaState & replica, bool disconnect)
|
||||||
{
|
{
|
||||||
replica.packet_receiver.cancel();
|
replica.packet_receiver->cancel();
|
||||||
replica.change_replica_timeout.reset();
|
replica.change_replica_timeout.reset();
|
||||||
|
|
||||||
epoll.remove(replica.packet_receiver.getFileDescriptor());
|
epoll.remove(replica.packet_receiver->getFileDescriptor());
|
||||||
--offset_states[fd_to_replica_location[replica.packet_receiver.getFileDescriptor()].offset].active_connection_count;
|
--offset_states[fd_to_replica_location[replica.packet_receiver->getFileDescriptor()].offset].active_connection_count;
|
||||||
fd_to_replica_location.erase(replica.packet_receiver.getFileDescriptor());
|
fd_to_replica_location.erase(replica.packet_receiver->getFileDescriptor());
|
||||||
|
|
||||||
epoll.remove(replica.change_replica_timeout.getDescriptor());
|
epoll.remove(replica.change_replica_timeout.getDescriptor());
|
||||||
timeout_fd_to_replica_location.erase(replica.change_replica_timeout.getDescriptor());
|
timeout_fd_to_replica_location.erase(replica.change_replica_timeout.getDescriptor());
|
||||||
|
@ -21,14 +21,15 @@ namespace DB
|
|||||||
class HedgedConnections : public IConnections
|
class HedgedConnections : public IConnections
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
|
using PacketReceiverPtr = std::unique_ptr<PacketReceiver>;
|
||||||
struct ReplicaState
|
struct ReplicaState
|
||||||
{
|
{
|
||||||
ReplicaState(Connection * connection_) : connection(connection_), packet_receiver(connection_)
|
explicit ReplicaState(Connection * connection_) : connection(connection_), packet_receiver(std::make_unique<PacketReceiver>(connection_))
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
Connection * connection = nullptr;
|
Connection * connection = nullptr;
|
||||||
PacketReceiver packet_receiver;
|
PacketReceiverPtr packet_receiver;
|
||||||
TimerDescriptor change_replica_timeout;
|
TimerDescriptor change_replica_timeout;
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -119,14 +120,16 @@ private:
|
|||||||
|
|
||||||
void processReceivedFirstDataPacket(const ReplicaLocation & replica_location);
|
void processReceivedFirstDataPacket(const ReplicaLocation & replica_location);
|
||||||
|
|
||||||
void tryGetNewReplica(bool start_new_connection);
|
void startNewReplica();
|
||||||
|
|
||||||
|
void checkNewReplica();
|
||||||
|
|
||||||
|
void processNewReplicaState(HedgedConnectionsFactory::State state, Connection * connection);
|
||||||
|
|
||||||
void finishProcessReplica(ReplicaState & replica, bool disconnect);
|
void finishProcessReplica(ReplicaState & replica, bool disconnect);
|
||||||
|
|
||||||
int getReadyFileDescriptor(AsyncCallback async_callback = {});
|
int getReadyFileDescriptor(AsyncCallback async_callback = {});
|
||||||
|
|
||||||
bool checkPendingData(ReplicaLocation & location_out);
|
|
||||||
|
|
||||||
HedgedConnectionsFactory hedged_connections_factory;
|
HedgedConnectionsFactory hedged_connections_factory;
|
||||||
|
|
||||||
/// All replicas in offset_states[offset] is responsible for process query
|
/// All replicas in offset_states[offset] is responsible for process query
|
||||||
@ -159,9 +162,7 @@ private:
|
|||||||
/// If we didn't disabled it, we need to skip this replica.
|
/// If we didn't disabled it, we need to skip this replica.
|
||||||
bool disable_two_level_aggregation = false;
|
bool disable_two_level_aggregation = false;
|
||||||
|
|
||||||
/// This flag means we need to get connection with new replica, but no replica is ready.
|
Packet last_received_packet;
|
||||||
/// When it's true, hedged_connections_factory.getFileDescriptor() is in epoll.
|
|
||||||
bool next_replica_in_process = false;
|
|
||||||
|
|
||||||
Epoll epoll;
|
Epoll epoll;
|
||||||
const Settings & settings;
|
const Settings & settings;
|
||||||
|
@ -22,15 +22,12 @@ HedgedConnectionsFactory::HedgedConnectionsFactory(
|
|||||||
{
|
{
|
||||||
shuffled_pools = pool->getShuffledPools(settings);
|
shuffled_pools = pool->getShuffledPools(settings);
|
||||||
for (size_t i = 0; i != shuffled_pools.size(); ++i)
|
for (size_t i = 0; i != shuffled_pools.size(); ++i)
|
||||||
replicas.emplace_back(ConnectionEstablisher(shuffled_pools[i].pool, &timeouts, settings, table_to_check.get()));
|
replicas.emplace_back(ConnectionEstablisherAsync(shuffled_pools[i].pool, &timeouts, settings, log, table_to_check.get()));
|
||||||
|
|
||||||
max_tries
|
max_tries
|
||||||
= (settings ? size_t{settings->connections_with_failover_max_tries} : size_t{DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES});
|
= (settings ? size_t{settings->connections_with_failover_max_tries} : size_t{DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES});
|
||||||
|
|
||||||
fallback_to_stale_replicas = settings && settings->fallback_to_stale_replicas_for_distributed_queries;
|
fallback_to_stale_replicas = settings && settings->fallback_to_stale_replicas_for_distributed_queries;
|
||||||
entries_count = 0;
|
|
||||||
usable_count = 0;
|
|
||||||
failed_pools_count = 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
HedgedConnectionsFactory::~HedgedConnectionsFactory()
|
HedgedConnectionsFactory::~HedgedConnectionsFactory()
|
||||||
@ -62,18 +59,22 @@ std::vector<Connection *> HedgedConnectionsFactory::getManyConnections(PoolMode
|
|||||||
/// Try to start establishing connections with max_entries replicas.
|
/// Try to start establishing connections with max_entries replicas.
|
||||||
for (size_t i = 0; i != max_entries; ++i)
|
for (size_t i = 0; i != max_entries; ++i)
|
||||||
{
|
{
|
||||||
int index = startEstablishingNewConnection(connection);
|
++requested_connections_count;
|
||||||
if (index == -1)
|
State state = startNewConnectionImpl(connection);
|
||||||
break;
|
if (state == State::READY)
|
||||||
if (replicas[index].is_ready)
|
|
||||||
connections.push_back(connection);
|
connections.push_back(connection);
|
||||||
|
if (state == State::CANNOT_CHOOSE)
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Process connections until we get enough READY connections
|
/// Process connections until we get enough READY connections
|
||||||
/// (work asynchronously with all connections we started).
|
/// (work asynchronously with all connections we started).
|
||||||
|
/// TODO: when we get GET_ALL mode we can start reading packets from ready
|
||||||
|
/// TODO: connection as soon as we got it, not even waiting for the others.
|
||||||
while (connections.size() < max_entries)
|
while (connections.size() < max_entries)
|
||||||
{
|
{
|
||||||
auto state = getNextConnection(false, true, connection);
|
/// Set blocking = true to avoid busy-waiting here.
|
||||||
|
auto state = waitForReadyConnections(/*blocking = */true, connection);
|
||||||
if (state == State::READY)
|
if (state == State::READY)
|
||||||
connections.push_back(connection);
|
connections.push_back(connection);
|
||||||
else if (state == State::CANNOT_CHOOSE)
|
else if (state == State::CANNOT_CHOOSE)
|
||||||
@ -82,29 +83,38 @@ std::vector<Connection *> HedgedConnectionsFactory::getManyConnections(PoolMode
|
|||||||
break;
|
break;
|
||||||
|
|
||||||
/// Determine the reason of not enough replicas.
|
/// Determine the reason of not enough replicas.
|
||||||
if (!fallback_to_stale_replicas && usable_count >= min_entries)
|
if (!fallback_to_stale_replicas && up_to_date_count < min_entries)
|
||||||
throw DB::Exception(
|
throw Exception(
|
||||||
"Could not find enough connections to up-to-date replicas. Got: " + std::to_string(connections.size())
|
"Could not find enough connections to up-to-date replicas. Got: " + std::to_string(connections.size())
|
||||||
+ ", needed: " + std::to_string(min_entries),
|
+ ", needed: " + std::to_string(min_entries),
|
||||||
DB::ErrorCodes::ALL_REPLICAS_ARE_STALE);
|
DB::ErrorCodes::ALL_REPLICAS_ARE_STALE);
|
||||||
|
if (usable_count < min_entries)
|
||||||
throw DB::NetException(
|
throw NetException(
|
||||||
"All connection tries failed. Log: \n\n" + fail_messages + "\n",
|
"All connection tries failed. Log: \n\n" + fail_messages + "\n",
|
||||||
DB::ErrorCodes::ALL_CONNECTION_TRIES_FAILED);
|
DB::ErrorCodes::ALL_CONNECTION_TRIES_FAILED);
|
||||||
|
|
||||||
|
throw Exception("Unknown reason of not enough replicas.", ErrorCodes::LOGICAL_ERROR);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return connections;
|
return connections;
|
||||||
}
|
}
|
||||||
|
|
||||||
HedgedConnectionsFactory::State HedgedConnectionsFactory::getNextConnection(bool start_new_connection, bool blocking, Connection *& connection_out)
|
HedgedConnectionsFactory::State HedgedConnectionsFactory::startNewConnection(Connection *& connection_out)
|
||||||
{
|
{
|
||||||
if (start_new_connection)
|
LOG_DEBUG(log, "startNewConnection");
|
||||||
{
|
++requested_connections_count;
|
||||||
int index = startEstablishingNewConnection(connection_out);
|
State state = startNewConnectionImpl(connection_out);
|
||||||
if (index != -1 && replicas[index].is_ready)
|
/// If we cannot start new connection but there are connections in epoll, return NOT_READY.
|
||||||
return State::READY;
|
if (state == State::CANNOT_CHOOSE && !epoll.empty())
|
||||||
}
|
state = State::NOT_READY;
|
||||||
|
|
||||||
|
return state;
|
||||||
|
}
|
||||||
|
|
||||||
|
HedgedConnectionsFactory::State HedgedConnectionsFactory::waitForReadyConnections(bool blocking, Connection *& connection_out)
|
||||||
|
{
|
||||||
|
LOG_DEBUG(log, "waitForReadyConnections");
|
||||||
|
|
||||||
State state = processEpollEvents(blocking, connection_out);
|
State state = processEpollEvents(blocking, connection_out);
|
||||||
if (state != State::CANNOT_CHOOSE)
|
if (state != State::CANNOT_CHOOSE)
|
||||||
@ -120,24 +130,6 @@ HedgedConnectionsFactory::State HedgedConnectionsFactory::getNextConnection(bool
|
|||||||
return setBestUsableReplica(connection_out);
|
return setBestUsableReplica(connection_out);
|
||||||
}
|
}
|
||||||
|
|
||||||
void HedgedConnectionsFactory::stopChoosingReplicas()
|
|
||||||
{
|
|
||||||
for (auto & [fd, index] : fd_to_replica_index)
|
|
||||||
{
|
|
||||||
epoll.remove(fd);
|
|
||||||
replicas[index].connection_establisher.cancel();
|
|
||||||
}
|
|
||||||
|
|
||||||
for (auto & [fd, index] : timeout_fd_to_replica_index)
|
|
||||||
{
|
|
||||||
replicas[index].change_replica_timeout.reset();
|
|
||||||
epoll.remove(fd);
|
|
||||||
}
|
|
||||||
|
|
||||||
fd_to_replica_index.clear();
|
|
||||||
timeout_fd_to_replica_index.clear();
|
|
||||||
}
|
|
||||||
|
|
||||||
int HedgedConnectionsFactory::getNextIndex()
|
int HedgedConnectionsFactory::getNextIndex()
|
||||||
{
|
{
|
||||||
/// Check if there is no free replica.
|
/// Check if there is no free replica.
|
||||||
@ -158,8 +150,7 @@ int HedgedConnectionsFactory::getNextIndex()
|
|||||||
next_index = (next_index + 1) % shuffled_pools.size();
|
next_index = (next_index + 1) % shuffled_pools.size();
|
||||||
|
|
||||||
/// Check if we can try this replica.
|
/// Check if we can try this replica.
|
||||||
if (!replicas[next_index].connection_establisher.isInProcess()
|
if (replicas[next_index].connection_establisher.getResult().entry.isNull()
|
||||||
&& !replicas[next_index].connection_establisher.isFinished()
|
|
||||||
&& (max_tries == 0 || shuffled_pools[next_index].error_count < max_tries))
|
&& (max_tries == 0 || shuffled_pools[next_index].error_count < max_tries))
|
||||||
finish = true;
|
finish = true;
|
||||||
|
|
||||||
@ -172,100 +163,34 @@ int HedgedConnectionsFactory::getNextIndex()
|
|||||||
return next_index;
|
return next_index;
|
||||||
}
|
}
|
||||||
|
|
||||||
int HedgedConnectionsFactory::startEstablishingNewConnection(Connection *& connection_out)
|
HedgedConnectionsFactory::State HedgedConnectionsFactory::startNewConnectionImpl(Connection *& connection_out)
|
||||||
{
|
{
|
||||||
|
LOG_DEBUG(log, "startNewConnectionImpl");
|
||||||
|
|
||||||
int index;
|
int index;
|
||||||
|
State state;
|
||||||
do
|
do
|
||||||
{
|
{
|
||||||
index = getNextIndex();
|
index = getNextIndex();
|
||||||
if (index == -1)
|
if (index == -1)
|
||||||
return -1;
|
return State::CANNOT_CHOOSE;
|
||||||
|
|
||||||
ReplicaStatus & replica = replicas[index];
|
state = resumeConnectionEstablisher(index, connection_out);
|
||||||
++replicas_in_process_count;
|
|
||||||
replica.connection_establisher.resume();
|
|
||||||
|
|
||||||
processConnectionEstablisherStage(index);
|
|
||||||
|
|
||||||
if (replica.connection_establisher.isInProcess())
|
|
||||||
{
|
|
||||||
epoll.add(replica.connection_establisher.getFileDescriptor());
|
|
||||||
fd_to_replica_index[replica.connection_establisher.getFileDescriptor()] = index;
|
|
||||||
|
|
||||||
/// Add timeout for changing replica.
|
|
||||||
replica.change_replica_timeout.setRelative(timeouts.hedged_connection_timeout);
|
|
||||||
epoll.add(replica.change_replica_timeout.getDescriptor());
|
|
||||||
timeout_fd_to_replica_index[replica.change_replica_timeout.getDescriptor()] = index;
|
|
||||||
}
|
}
|
||||||
}
|
while (state == State::CANNOT_CHOOSE);
|
||||||
while (!replicas[index].connection_establisher.isInProcess() && !replicas[index].is_ready);
|
|
||||||
|
|
||||||
if (replicas[index].is_ready)
|
return state;
|
||||||
connection_out = replicas[index].connection_establisher.getConnection();
|
|
||||||
|
|
||||||
return index;
|
|
||||||
}
|
|
||||||
|
|
||||||
void HedgedConnectionsFactory::processConnectionEstablisherStage(int index, bool remove_from_epoll)
|
|
||||||
{
|
|
||||||
ReplicaStatus & replica = replicas[index];
|
|
||||||
|
|
||||||
if (replica.connection_establisher.isFinished())
|
|
||||||
{
|
|
||||||
--replicas_in_process_count;
|
|
||||||
++entries_count;
|
|
||||||
|
|
||||||
if (remove_from_epoll)
|
|
||||||
removeReplicaFromEpoll(index);
|
|
||||||
|
|
||||||
if (replica.connection_establisher.getResult().is_usable)
|
|
||||||
{
|
|
||||||
++usable_count;
|
|
||||||
if (replica.connection_establisher.getResult().is_up_to_date)
|
|
||||||
replica.is_ready = true;
|
|
||||||
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
/// If replica is not usable, we need to save fail message.
|
|
||||||
if (!replica.connection_establisher.getFailMessage().empty())
|
|
||||||
fail_messages += replica.connection_establisher.getFailMessage() + "\n";
|
|
||||||
}
|
|
||||||
else if (replica.connection_establisher.isFailed())
|
|
||||||
processFailedConnection(index, remove_from_epoll);
|
|
||||||
}
|
|
||||||
|
|
||||||
void HedgedConnectionsFactory::processFailedConnection(int index, bool remove_from_epoll)
|
|
||||||
{
|
|
||||||
ConnectionEstablisher & connection_establisher = replicas[index].connection_establisher;
|
|
||||||
|
|
||||||
if (remove_from_epoll)
|
|
||||||
removeReplicaFromEpoll(index);
|
|
||||||
|
|
||||||
if (!connection_establisher.getFailMessage().empty())
|
|
||||||
fail_messages += connection_establisher.getFailMessage() + "\n";
|
|
||||||
|
|
||||||
ShuffledPool & shuffled_pool = shuffled_pools[index];
|
|
||||||
LOG_WARNING(
|
|
||||||
log, "Connection failed at try №{}, reason: {}", (shuffled_pool.error_count + 1), connection_establisher.getFailMessage());
|
|
||||||
ProfileEvents::increment(ProfileEvents::DistributedConnectionFailTry);
|
|
||||||
|
|
||||||
shuffled_pool.error_count = std::min(pool->getMaxErrorCup(), shuffled_pool.error_count + 1);
|
|
||||||
|
|
||||||
if (shuffled_pool.error_count >= max_tries)
|
|
||||||
{
|
|
||||||
++failed_pools_count;
|
|
||||||
ProfileEvents::increment(ProfileEvents::DistributedConnectionFailAtAll);
|
|
||||||
}
|
|
||||||
|
|
||||||
--replicas_in_process_count;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
HedgedConnectionsFactory::State HedgedConnectionsFactory::processEpollEvents(bool blocking, Connection *& connection_out)
|
HedgedConnectionsFactory::State HedgedConnectionsFactory::processEpollEvents(bool blocking, Connection *& connection_out)
|
||||||
{
|
{
|
||||||
|
LOG_DEBUG(log, "processEpollEvents");
|
||||||
|
|
||||||
int event_fd;
|
int event_fd;
|
||||||
while (!epoll.empty())
|
while (!epoll.empty())
|
||||||
{
|
{
|
||||||
|
LOG_DEBUG(log, "loop");
|
||||||
|
|
||||||
event_fd = getReadyFileDescriptor(blocking);
|
event_fd = getReadyFileDescriptor(blocking);
|
||||||
|
|
||||||
if (event_fd == -1)
|
if (event_fd == -1)
|
||||||
@ -274,24 +199,29 @@ HedgedConnectionsFactory::State HedgedConnectionsFactory::processEpollEvents(boo
|
|||||||
if (fd_to_replica_index.contains(event_fd))
|
if (fd_to_replica_index.contains(event_fd))
|
||||||
{
|
{
|
||||||
int index = fd_to_replica_index[event_fd];
|
int index = fd_to_replica_index[event_fd];
|
||||||
processConnectionEstablisherEvent(index, connection_out);
|
State state = resumeConnectionEstablisher(index, connection_out);
|
||||||
|
if (state == State::NOT_READY)
|
||||||
if (replicas[index].is_ready)
|
|
||||||
return State::READY;
|
|
||||||
if (replicas[index].connection_establisher.isInProcess())
|
|
||||||
continue;
|
continue;
|
||||||
|
|
||||||
|
/// Connection establishing not in process now, remove all
|
||||||
|
/// information about it from epoll.
|
||||||
|
removeReplicaFromEpoll(index, event_fd);
|
||||||
|
|
||||||
|
if (state == State::READY)
|
||||||
|
return state;
|
||||||
}
|
}
|
||||||
else if (timeout_fd_to_replica_index.contains(event_fd))
|
else if (timeout_fd_to_replica_index.contains(event_fd))
|
||||||
|
{
|
||||||
|
LOG_DEBUG(log, "change_replica_timeout");
|
||||||
replicas[timeout_fd_to_replica_index[event_fd]].change_replica_timeout.reset();
|
replicas[timeout_fd_to_replica_index[event_fd]].change_replica_timeout.reset();
|
||||||
|
}
|
||||||
else
|
else
|
||||||
throw Exception("Unknown event from epoll", ErrorCodes::LOGICAL_ERROR);
|
throw Exception("Unknown event from epoll", ErrorCodes::LOGICAL_ERROR);
|
||||||
|
|
||||||
/// We reach this point only if we need to start new connection
|
/// We reach this point only if we need to start new connection
|
||||||
/// (Special timeout expired or one of the previous connections failed).
|
/// (Special timeout expired or one of the previous connections failed).
|
||||||
int index = startEstablishingNewConnection(connection_out);
|
|
||||||
|
|
||||||
/// Return only if replica is ready.
|
/// Return only if replica is ready.
|
||||||
if (index != -1 && replicas[index].is_ready)
|
if (startNewConnectionImpl(connection_out) == State::READY)
|
||||||
return State::READY;
|
return State::READY;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -306,35 +236,138 @@ int HedgedConnectionsFactory::getReadyFileDescriptor(bool blocking)
|
|||||||
return event.data.fd;
|
return event.data.fd;
|
||||||
}
|
}
|
||||||
|
|
||||||
void HedgedConnectionsFactory::removeReplicaFromEpoll(int index)
|
HedgedConnectionsFactory::State HedgedConnectionsFactory::resumeConnectionEstablisher(int index, Connection *& connection_out)
|
||||||
{
|
{
|
||||||
ReplicaStatus & replica = replicas[index];
|
LOG_DEBUG(log, "resumeConnectionEstablisher");
|
||||||
epoll.remove(replica.connection_establisher.getFileDescriptor());
|
|
||||||
fd_to_replica_index.erase(replica.connection_establisher.getFileDescriptor());
|
|
||||||
|
|
||||||
replica.change_replica_timeout.reset();
|
auto res = replicas[index].connection_establisher.resume();
|
||||||
epoll.remove(replica.change_replica_timeout.getDescriptor());
|
|
||||||
timeout_fd_to_replica_index.erase(replica.change_replica_timeout.getDescriptor());
|
if (std::holds_alternative<TryResult>(res))
|
||||||
|
return processFinishedConnection(index, std::get<TryResult>(res), connection_out);
|
||||||
|
|
||||||
|
int fd = std::get<int>(res);
|
||||||
|
if (!fd_to_replica_index.contains(fd))
|
||||||
|
addNewReplicaToEpoll(index, fd);
|
||||||
|
|
||||||
|
return State::NOT_READY;
|
||||||
}
|
}
|
||||||
|
|
||||||
void HedgedConnectionsFactory::processConnectionEstablisherEvent(int index, Connection *& connection_out)
|
HedgedConnectionsFactory::State HedgedConnectionsFactory::processFinishedConnection(int index, TryResult result, Connection *& connection_out)
|
||||||
{
|
{
|
||||||
replicas[index].connection_establisher.resume();
|
LOG_DEBUG(log, "processFinishedConnection");
|
||||||
processConnectionEstablisherStage(index, true);
|
|
||||||
if (replicas[index].is_ready)
|
const std::string & fail_message = replicas[index].connection_establisher.getFailMessage();
|
||||||
connection_out = replicas[index].connection_establisher.getConnection();
|
if (!fail_message.empty())
|
||||||
|
fail_messages += fail_message + "\n";
|
||||||
|
|
||||||
|
if (!result.entry.isNull())
|
||||||
|
{
|
||||||
|
++entries_count;
|
||||||
|
|
||||||
|
if (result.is_usable)
|
||||||
|
{
|
||||||
|
++usable_count;
|
||||||
|
if (result.is_up_to_date)
|
||||||
|
{
|
||||||
|
++up_to_date_count;
|
||||||
|
if (!skip_predicate || !skip_predicate(&*result.entry))
|
||||||
|
{
|
||||||
|
replicas[index].is_ready = true;
|
||||||
|
++ready_replicas_count;
|
||||||
|
connection_out = &*result.entry;
|
||||||
|
return State::READY;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
ShuffledPool & shuffled_pool = shuffled_pools[index];
|
||||||
|
LOG_WARNING(
|
||||||
|
log, "Connection failed at try №{}, reason: {}", (shuffled_pool.error_count + 1), fail_message);
|
||||||
|
ProfileEvents::increment(ProfileEvents::DistributedConnectionFailTry);
|
||||||
|
|
||||||
|
shuffled_pool.error_count = std::min(pool->getMaxErrorCup(), shuffled_pool.error_count + 1);
|
||||||
|
|
||||||
|
if (shuffled_pool.error_count >= max_tries)
|
||||||
|
{
|
||||||
|
++failed_pools_count;
|
||||||
|
ProfileEvents::increment(ProfileEvents::DistributedConnectionFailAtAll);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return State::CANNOT_CHOOSE;
|
||||||
|
}
|
||||||
|
|
||||||
|
void HedgedConnectionsFactory::stopChoosingReplicas()
|
||||||
|
{
|
||||||
|
LOG_DEBUG(log, "stopChoosingReplicas");
|
||||||
|
|
||||||
|
for (auto & [fd, index] : fd_to_replica_index)
|
||||||
|
{
|
||||||
|
--replicas_in_process_count;
|
||||||
|
epoll.remove(fd);
|
||||||
|
replicas[index].connection_establisher.cancel();
|
||||||
|
}
|
||||||
|
|
||||||
|
for (auto & [timeout_fd, index] : timeout_fd_to_replica_index)
|
||||||
|
{
|
||||||
|
replicas[index].change_replica_timeout.reset();
|
||||||
|
epoll.remove(timeout_fd);
|
||||||
|
}
|
||||||
|
|
||||||
|
fd_to_replica_index.clear();
|
||||||
|
timeout_fd_to_replica_index.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
void HedgedConnectionsFactory::addNewReplicaToEpoll(int index, int fd)
|
||||||
|
{
|
||||||
|
LOG_DEBUG(log, "addNewReplicaToEpoll");
|
||||||
|
|
||||||
|
++replicas_in_process_count;
|
||||||
|
epoll.add(fd);
|
||||||
|
fd_to_replica_index[fd] = index;
|
||||||
|
|
||||||
|
/// Add timeout for changing replica.
|
||||||
|
replicas[index].change_replica_timeout.setRelative(timeouts.hedged_connection_timeout);
|
||||||
|
epoll.add(replicas[index].change_replica_timeout.getDescriptor());
|
||||||
|
timeout_fd_to_replica_index[replicas[index].change_replica_timeout.getDescriptor()] = index;
|
||||||
|
}
|
||||||
|
|
||||||
|
void HedgedConnectionsFactory::removeReplicaFromEpoll(int index, int fd)
|
||||||
|
{
|
||||||
|
LOG_DEBUG(log, "removeReplicaFromEpoll");
|
||||||
|
|
||||||
|
--replicas_in_process_count;
|
||||||
|
epoll.remove(fd);
|
||||||
|
fd_to_replica_index.erase(fd);
|
||||||
|
|
||||||
|
replicas[index].change_replica_timeout.reset();
|
||||||
|
epoll.remove(replicas[index].change_replica_timeout.getDescriptor());
|
||||||
|
timeout_fd_to_replica_index.erase(replicas[index].change_replica_timeout.getDescriptor());
|
||||||
|
}
|
||||||
|
|
||||||
|
int HedgedConnectionsFactory::numberOfProcessingReplicas() const
|
||||||
|
{
|
||||||
|
if (epoll.empty())
|
||||||
|
return 0;
|
||||||
|
|
||||||
|
return requested_connections_count - ready_replicas_count;
|
||||||
}
|
}
|
||||||
|
|
||||||
HedgedConnectionsFactory::State HedgedConnectionsFactory::setBestUsableReplica(Connection *& connection_out)
|
HedgedConnectionsFactory::State HedgedConnectionsFactory::setBestUsableReplica(Connection *& connection_out)
|
||||||
{
|
{
|
||||||
|
LOG_DEBUG(log, "setBestUsableReplica");
|
||||||
|
|
||||||
std::vector<int> indexes;
|
std::vector<int> indexes;
|
||||||
for (size_t i = 0; i != replicas.size(); ++i)
|
for (size_t i = 0; i != replicas.size(); ++i)
|
||||||
{
|
{
|
||||||
/// Don't add unusable, failed replicas and replicas that are ready or in process.
|
/// Don't add unusable, failed replicas and replicas that are ready or in process.
|
||||||
if (!replicas[i].connection_establisher.getResult().entry.isNull()
|
TryResult result = replicas[i].connection_establisher.getResult();
|
||||||
&& replicas[i].connection_establisher.getResult().is_usable
|
if (!result.entry.isNull()
|
||||||
&& !replicas[i].connection_establisher.isInProcess()
|
&& result.is_usable
|
||||||
&& !replicas[i].is_ready)
|
&& !replicas[i].is_ready
|
||||||
|
&& (!skip_predicate || !skip_predicate(&*result.entry)))
|
||||||
indexes.push_back(i);
|
indexes.push_back(i);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -350,9 +383,9 @@ HedgedConnectionsFactory::State HedgedConnectionsFactory::setBestUsableReplica(C
|
|||||||
return replicas[lhs].connection_establisher.getResult().staleness < replicas[rhs].connection_establisher.getResult().staleness;
|
return replicas[lhs].connection_establisher.getResult().staleness < replicas[rhs].connection_establisher.getResult().staleness;
|
||||||
});
|
});
|
||||||
|
|
||||||
++ready_replicas_count;
|
|
||||||
replicas[indexes[0]].is_ready = true;
|
replicas[indexes[0]].is_ready = true;
|
||||||
connection_out = replicas[indexes[0]].connection_establisher.getConnection();
|
TryResult result = replicas[indexes[0]].connection_establisher.getResult();
|
||||||
|
connection_out = &*result.entry;
|
||||||
return State::READY;
|
return State::READY;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -25,6 +25,7 @@ class HedgedConnectionsFactory
|
|||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
using ShuffledPool = ConnectionPoolWithFailover::Base::ShuffledPool;
|
using ShuffledPool = ConnectionPoolWithFailover::Base::ShuffledPool;
|
||||||
|
using TryResult = PoolWithFailoverBase<IConnectionPool>::TryResult;
|
||||||
|
|
||||||
enum class State
|
enum class State
|
||||||
{
|
{
|
||||||
@ -35,11 +36,11 @@ public:
|
|||||||
|
|
||||||
struct ReplicaStatus
|
struct ReplicaStatus
|
||||||
{
|
{
|
||||||
ReplicaStatus(ConnectionEstablisher connection_stablisher_) : connection_establisher(std::move(connection_stablisher_))
|
explicit ReplicaStatus(ConnectionEstablisherAsync connection_stablisher_) : connection_establisher(std::move(connection_stablisher_))
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
ConnectionEstablisher connection_establisher;
|
ConnectionEstablisherAsync connection_establisher;
|
||||||
TimerDescriptor change_replica_timeout;
|
TimerDescriptor change_replica_timeout;
|
||||||
bool is_ready = false;
|
bool is_ready = false;
|
||||||
};
|
};
|
||||||
@ -57,10 +58,9 @@ public:
|
|||||||
/// if there is no events in epoll and blocking is false, return NOT_READY.
|
/// if there is no events in epoll and blocking is false, return NOT_READY.
|
||||||
/// Returned state might be READY, NOT_READY and CANNOT_CHOOSE.
|
/// Returned state might be READY, NOT_READY and CANNOT_CHOOSE.
|
||||||
/// If state is READY, replica connection will be written in connection_out.
|
/// If state is READY, replica connection will be written in connection_out.
|
||||||
State getNextConnection(bool start_new_connection, bool blocking, Connection *& connection_out);
|
State waitForReadyConnections(bool blocking, Connection *& connection_out);
|
||||||
|
|
||||||
/// Check if we can try to produce new READY replica.
|
State startNewConnection(Connection *& connection_out);
|
||||||
// bool canGetNewConnection() const { return ready_replicas_count + failed_pools_count < shuffled_pools.size(); }
|
|
||||||
|
|
||||||
/// Stop working with all replicas that are not READY.
|
/// Stop working with all replicas that are not READY.
|
||||||
void stopChoosingReplicas();
|
void stopChoosingReplicas();
|
||||||
@ -71,14 +71,16 @@ public:
|
|||||||
|
|
||||||
const ConnectionTimeouts & getConnectionTimeouts() const { return timeouts; }
|
const ConnectionTimeouts & getConnectionTimeouts() const { return timeouts; }
|
||||||
|
|
||||||
|
int numberOfProcessingReplicas() const;
|
||||||
|
|
||||||
|
void setSkipPredicate(std::function<bool(Connection *)> pred) { skip_predicate = std::move(pred); }
|
||||||
|
|
||||||
~HedgedConnectionsFactory();
|
~HedgedConnectionsFactory();
|
||||||
|
|
||||||
private:
|
private:
|
||||||
/// Try to start establishing connection to the new replica. Return
|
/// Try to start establishing connection to the new replica. Return
|
||||||
/// the index of the new replica or -1 if cannot start new connection.
|
/// the index of the new replica or -1 if cannot start new connection.
|
||||||
int startEstablishingNewConnection(Connection *& connection_out);
|
State startNewConnectionImpl(Connection *& connection_out);
|
||||||
|
|
||||||
void processConnectionEstablisherStage(int replica_index, bool remove_from_epoll = false);
|
|
||||||
|
|
||||||
/// Find an index of the next free replica to start connection.
|
/// Find an index of the next free replica to start connection.
|
||||||
/// Return -1 if there is no free replica.
|
/// Return -1 if there is no free replica.
|
||||||
@ -86,11 +88,15 @@ private:
|
|||||||
|
|
||||||
int getReadyFileDescriptor(bool blocking);
|
int getReadyFileDescriptor(bool blocking);
|
||||||
|
|
||||||
void processFailedConnection(int replica_index, bool remove_from_epoll);
|
void processFailedConnection(int index, const std::string & fail_message);
|
||||||
|
|
||||||
void processConnectionEstablisherEvent(int replica_index, Connection *& connection_out);
|
State resumeConnectionEstablisher(int index, Connection *& connection_out);
|
||||||
|
|
||||||
void removeReplicaFromEpoll(int index);
|
State processFinishedConnection(int index, TryResult result, Connection *& connection_out);
|
||||||
|
|
||||||
|
void removeReplicaFromEpoll(int index, int fd);
|
||||||
|
|
||||||
|
void addNewReplicaToEpoll(int index, int fd);
|
||||||
|
|
||||||
/// Return NOT_READY state if there is no ready events, READY if replica is ready
|
/// Return NOT_READY state if there is no ready events, READY if replica is ready
|
||||||
/// and CANNOT_CHOOSE if there is no more events in epoll.
|
/// and CANNOT_CHOOSE if there is no more events in epoll.
|
||||||
@ -111,10 +117,7 @@ private:
|
|||||||
/// Map timeout for changing replica to replica index.
|
/// Map timeout for changing replica to replica index.
|
||||||
std::unordered_map<int, int> timeout_fd_to_replica_index;
|
std::unordered_map<int, int> timeout_fd_to_replica_index;
|
||||||
|
|
||||||
/// Indexes of replicas, that are in process of connection.
|
std::function<bool(Connection *)> skip_predicate;
|
||||||
size_t replicas_in_process_count = 0;
|
|
||||||
/// Indexes of ready replicas.
|
|
||||||
size_t ready_replicas_count = 0;
|
|
||||||
|
|
||||||
std::shared_ptr<QualifiedTableName> table_to_check;
|
std::shared_ptr<QualifiedTableName> table_to_check;
|
||||||
int last_used_index = -1;
|
int last_used_index = -1;
|
||||||
@ -122,10 +125,14 @@ private:
|
|||||||
Epoll epoll;
|
Epoll epoll;
|
||||||
Poco::Logger * log;
|
Poco::Logger * log;
|
||||||
std::string fail_messages;
|
std::string fail_messages;
|
||||||
size_t entries_count;
|
|
||||||
size_t usable_count;
|
|
||||||
size_t failed_pools_count;
|
|
||||||
size_t max_tries;
|
size_t max_tries;
|
||||||
|
size_t entries_count = 0;
|
||||||
|
size_t usable_count = 0;
|
||||||
|
size_t up_to_date_count = 0;
|
||||||
|
size_t failed_pools_count= 0;
|
||||||
|
size_t replicas_in_process_count = 0;
|
||||||
|
size_t requested_connections_count = 0;
|
||||||
|
size_t ready_replicas_count = 0;
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -2,41 +2,86 @@
|
|||||||
|
|
||||||
#if defined(OS_LINUX)
|
#if defined(OS_LINUX)
|
||||||
|
|
||||||
|
#include <variant>
|
||||||
|
|
||||||
#include <Client/IConnections.h>
|
#include <Client/IConnections.h>
|
||||||
#include <Common/FiberStack.h>
|
#include <Common/FiberStack.h>
|
||||||
#include <Common/Fiber.h>
|
#include <Common/Fiber.h>
|
||||||
|
#include <Common/Epoll.h>
|
||||||
|
#include <Common/TimerDescriptor.h>
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
|
|
||||||
|
namespace ErrorCodes
|
||||||
|
{
|
||||||
|
extern const int CANNOT_OPEN_FILE;
|
||||||
|
extern const int CANNOT_READ_FROM_SOCKET;
|
||||||
|
}
|
||||||
|
|
||||||
/// Class for nonblocking packet receiving. It runs connection->receivePacket
|
/// Class for nonblocking packet receiving. It runs connection->receivePacket
|
||||||
/// in fiber and sets special read callback which is called when
|
/// in fiber and sets special read callback which is called when
|
||||||
/// reading from socket blocks. When read callback is called,
|
/// reading from socket blocks. When read callback is called,
|
||||||
/// socket and receive timeout are added in epoll and execution returns to the main program.
|
/// socket and receive timeout are added in epoll and execution returns to the main program.
|
||||||
/// So, you can poll this epoll file descriptor to determine when to resume
|
/// So, you can poll this epoll file descriptor to determine when to resume
|
||||||
/// packet receiving (beside polling epoll descriptor, you also need to check connection->hasPendingData(),
|
/// packet receiving.
|
||||||
/// because small packet can be read in buffer with the previous one, so new packet will be ready in buffer,
|
|
||||||
/// but there is no data socket to poll).
|
|
||||||
class PacketReceiver
|
class PacketReceiver
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
PacketReceiver(Connection * connection_) : connection(connection_)
|
explicit PacketReceiver(Connection * connection_) : connection(connection_)
|
||||||
{
|
{
|
||||||
epoll.add(receive_timeout.getDescriptor());
|
epoll.add(receive_timeout.getDescriptor());
|
||||||
epoll.add(connection->getSocket()->impl()->sockfd());
|
epoll.add(connection->getSocket()->impl()->sockfd());
|
||||||
|
|
||||||
|
if (-1 == pipe2(pipe_fd, O_NONBLOCK))
|
||||||
|
throwFromErrno("Cannot create pipe", ErrorCodes::CANNOT_OPEN_FILE);
|
||||||
|
epoll.add(pipe_fd[0]);
|
||||||
|
|
||||||
fiber = boost::context::fiber(std::allocator_arg_t(), fiber_stack, Routine{*this});
|
fiber = boost::context::fiber(std::allocator_arg_t(), fiber_stack, Routine{*this});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
~PacketReceiver()
|
||||||
|
{
|
||||||
|
close(pipe_fd[0]);
|
||||||
|
close(pipe_fd[1]);
|
||||||
|
}
|
||||||
|
|
||||||
/// Resume packet receiving.
|
/// Resume packet receiving.
|
||||||
void resume()
|
std::variant<int, Packet, Poco::Timespan> resume()
|
||||||
{
|
{
|
||||||
/// If there is no pending data, check receive timeout.
|
/// If there is no pending data, check receive timeout.
|
||||||
if (!connection->hasReadPendingData() && !checkReceiveTimeout())
|
if (!connection->hasReadPendingData() && !checkReceiveTimeout())
|
||||||
return;
|
{
|
||||||
|
/// Receive timeout expired.
|
||||||
|
return Poco::Timespan();
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Resume fiber.
|
||||||
fiber = std::move(fiber).resume();
|
fiber = std::move(fiber).resume();
|
||||||
if (exception)
|
if (exception)
|
||||||
std::rethrow_exception(std::move(exception));
|
std::rethrow_exception(std::move(exception));
|
||||||
|
|
||||||
|
if (is_read_in_process)
|
||||||
|
return epoll.getFileDescriptor();
|
||||||
|
|
||||||
|
/// Write something in pipe when buffer has pending data, because
|
||||||
|
/// in this case socket won't be ready in epoll but we need to tell
|
||||||
|
/// outside that there is more data in buffer.
|
||||||
|
if (connection->hasReadPendingData())
|
||||||
|
{
|
||||||
|
uint64_t buf = 0;
|
||||||
|
while (-1 == write(pipe_fd[1], &buf, sizeof(buf)))
|
||||||
|
{
|
||||||
|
if (errno == EAGAIN)
|
||||||
|
break;
|
||||||
|
|
||||||
|
if (errno != EINTR)
|
||||||
|
throwFromErrno("Cannot write to pipe", ErrorCodes::CANNOT_READ_FROM_SOCKET);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Receiving packet was finished.
|
||||||
|
return std::move(packet);
|
||||||
}
|
}
|
||||||
|
|
||||||
void cancel()
|
void cancel()
|
||||||
@ -45,20 +90,16 @@ public:
|
|||||||
connection = nullptr;
|
connection = nullptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
Packet getPacket() { return std::move(packet); }
|
|
||||||
|
|
||||||
int getFileDescriptor() const { return epoll.getFileDescriptor(); }
|
int getFileDescriptor() const { return epoll.getFileDescriptor(); }
|
||||||
|
|
||||||
bool isPacketReady() const { return !is_read_in_process; }
|
|
||||||
|
|
||||||
bool isReceiveTimeoutExpired() const { return is_receive_timeout_expired; }
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
/// When epoll file descriptor is ready, check if it's an expired timeout
|
/// When epoll file descriptor is ready, check if it's an expired timeout.
|
||||||
|
/// Return false if receive timeout expired and socket is not ready, return true otherwise.
|
||||||
bool checkReceiveTimeout()
|
bool checkReceiveTimeout()
|
||||||
{
|
{
|
||||||
bool is_socket_ready = false;
|
bool is_socket_ready = false;
|
||||||
is_receive_timeout_expired = false;
|
bool is_pipe_ready = false;
|
||||||
|
bool is_receive_timeout_expired = false;
|
||||||
|
|
||||||
epoll_event events[2];
|
epoll_event events[2];
|
||||||
events[0].data.fd = events[1].data.fd = -1;
|
events[0].data.fd = events[1].data.fd = -1;
|
||||||
@ -68,10 +109,19 @@ private:
|
|||||||
{
|
{
|
||||||
if (events[i].data.fd == connection->getSocket()->impl()->sockfd())
|
if (events[i].data.fd == connection->getSocket()->impl()->sockfd())
|
||||||
is_socket_ready = true;
|
is_socket_ready = true;
|
||||||
|
if (events[i].data.fd == pipe_fd[0])
|
||||||
|
is_pipe_ready = true;
|
||||||
if (events[i].data.fd == receive_timeout.getDescriptor())
|
if (events[i].data.fd == receive_timeout.getDescriptor())
|
||||||
is_receive_timeout_expired = true;
|
is_receive_timeout_expired = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (is_pipe_ready)
|
||||||
|
{
|
||||||
|
LOG_DEBUG(&Poco::Logger::get("PacketReceiver"), "pipe");
|
||||||
|
drainPipe();
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
if (is_receive_timeout_expired && !is_socket_ready)
|
if (is_receive_timeout_expired && !is_socket_ready)
|
||||||
{
|
{
|
||||||
receive_timeout.reset();
|
receive_timeout.reset();
|
||||||
@ -81,6 +131,23 @@ private:
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void drainPipe()
|
||||||
|
{
|
||||||
|
uint64_t buf;
|
||||||
|
while (true)
|
||||||
|
{
|
||||||
|
ssize_t res = read(pipe_fd[0], &buf, sizeof(buf));
|
||||||
|
if (res < 0)
|
||||||
|
{
|
||||||
|
if (errno == EAGAIN)
|
||||||
|
break;
|
||||||
|
|
||||||
|
if (errno != EINTR)
|
||||||
|
throwFromErrno("Cannot drain pipe_fd", ErrorCodes::CANNOT_READ_FROM_SOCKET);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
struct Routine
|
struct Routine
|
||||||
{
|
{
|
||||||
PacketReceiver & receiver;
|
PacketReceiver & receiver;
|
||||||
@ -131,14 +198,28 @@ private:
|
|||||||
};
|
};
|
||||||
|
|
||||||
Connection * connection;
|
Connection * connection;
|
||||||
TimerDescriptor receive_timeout;
|
Packet packet;
|
||||||
Epoll epoll;
|
|
||||||
Fiber fiber;
|
Fiber fiber;
|
||||||
FiberStack fiber_stack;
|
FiberStack fiber_stack;
|
||||||
Packet packet;
|
|
||||||
bool is_read_in_process = false;
|
/// We use timer descriptor for checking socket receive timeout.
|
||||||
bool is_receive_timeout_expired = false;
|
TimerDescriptor receive_timeout;
|
||||||
|
|
||||||
|
/// In read callback we add socket file descriptor and timer descriptor with receive timeout
|
||||||
|
/// in epoll, so we can return epoll file descriptor outside for polling.
|
||||||
|
Epoll epoll;
|
||||||
|
|
||||||
|
/// Pipe is used when there is pending data in buffer
|
||||||
|
/// after receiving packet socket won't be ready in epoll in this case),
|
||||||
|
/// so we add pipe_fd in epoll and write something in it to tell
|
||||||
|
/// outside that we are ready to receive new packet.
|
||||||
|
int pipe_fd[2];
|
||||||
|
|
||||||
|
/// If and exception occurred in fiber resume, we save it and rethrow.
|
||||||
std::exception_ptr exception;
|
std::exception_ptr exception;
|
||||||
|
|
||||||
|
bool is_read_in_process = false;
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -57,22 +57,19 @@ void Epoll::remove(int fd)
|
|||||||
throwFromErrno("Cannot remove descriptor from epoll", DB::ErrorCodes::EPOLL_ERROR);
|
throwFromErrno("Cannot remove descriptor from epoll", DB::ErrorCodes::EPOLL_ERROR);
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t Epoll::getManyReady(int max_events, epoll_event * events_out, bool blocking, AsyncCallback async_callback) const
|
size_t Epoll::getManyReady(int max_events, epoll_event * events_out, bool blocking) const
|
||||||
{
|
{
|
||||||
if (events_count == 0)
|
if (events_count == 0)
|
||||||
throw Exception("There is no events in epoll", ErrorCodes::LOGICAL_ERROR);
|
throw Exception("There is no events in epoll", ErrorCodes::LOGICAL_ERROR);
|
||||||
|
|
||||||
int ready_size;
|
int ready_size;
|
||||||
int timeout = blocking && !async_callback ? -1 : 0;
|
int timeout = blocking ? -1 : 0;
|
||||||
do
|
do
|
||||||
{
|
{
|
||||||
ready_size = epoll_wait(epoll_fd, events_out, max_events, timeout);
|
ready_size = epoll_wait(epoll_fd, events_out, max_events, timeout);
|
||||||
|
|
||||||
if (ready_size == -1 && errno != EINTR)
|
if (ready_size == -1 && errno != EINTR)
|
||||||
throwFromErrno("Error in epoll_wait", DB::ErrorCodes::EPOLL_ERROR);
|
throwFromErrno("Error in epoll_wait", DB::ErrorCodes::EPOLL_ERROR);
|
||||||
|
|
||||||
if (ready_size == 0 && blocking && async_callback)
|
|
||||||
async_callback(epoll_fd, 0, "epoll");
|
|
||||||
}
|
}
|
||||||
while (ready_size <= 0 && (ready_size != 0 || blocking));
|
while (ready_size <= 0 && (ready_size != 0 || blocking));
|
||||||
|
|
||||||
|
@ -31,10 +31,8 @@ public:
|
|||||||
|
|
||||||
/// Get events from epoll. Events are written in events_out, this function returns an amount of ready events.
|
/// Get events from epoll. Events are written in events_out, this function returns an amount of ready events.
|
||||||
/// If blocking is false and there are no ready events,
|
/// If blocking is false and there are no ready events,
|
||||||
/// return empty vector, otherwise wait for ready events. If blocking is true,
|
/// return empty vector, otherwise wait for ready events.
|
||||||
/// async_callback is given and there is no ready events, async_callback is called
|
size_t getManyReady(int max_events, epoll_event * events_out, bool blocking) const;
|
||||||
/// with epoll file descriptor.
|
|
||||||
size_t getManyReady(int max_events, epoll_event * events_out, bool blocking, AsyncCallback async_callback = {}) const;
|
|
||||||
|
|
||||||
int getFileDescriptor() const { return epoll_fd; }
|
int getFileDescriptor() const { return epoll_fd; }
|
||||||
|
|
||||||
@ -42,11 +40,14 @@ public:
|
|||||||
|
|
||||||
bool empty() const { return events_count == 0; }
|
bool empty() const { return events_count == 0; }
|
||||||
|
|
||||||
|
const std::string & getDescription() const { return fd_description; }
|
||||||
|
|
||||||
~Epoll();
|
~Epoll();
|
||||||
|
|
||||||
private:
|
private:
|
||||||
int epoll_fd;
|
int epoll_fd;
|
||||||
std::atomic<int> events_count;
|
std::atomic<int> events_count;
|
||||||
|
const std::string fd_description = "epoll";
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -4,6 +4,7 @@
|
|||||||
#include <Common/Exception.h>
|
#include <Common/Exception.h>
|
||||||
#include <Common/NetException.h>
|
#include <Common/NetException.h>
|
||||||
#include <Common/Stopwatch.h>
|
#include <Common/Stopwatch.h>
|
||||||
|
#include <common/logger_useful.h>
|
||||||
|
|
||||||
|
|
||||||
namespace ProfileEvents
|
namespace ProfileEvents
|
||||||
@ -34,11 +35,16 @@ bool ReadBufferFromPocoSocket::nextImpl()
|
|||||||
/// Add more details to exceptions.
|
/// Add more details to exceptions.
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
|
if (!async_callback)
|
||||||
|
LOG_DEBUG(&Poco::Logger::get("ReadBufferFromPocoSocket"), "Don't have async callback");
|
||||||
|
|
||||||
bytes_read = socket.impl()->receiveBytes(internal_buffer.begin(), internal_buffer.size(), flags);
|
bytes_read = socket.impl()->receiveBytes(internal_buffer.begin(), internal_buffer.size(), flags);
|
||||||
|
|
||||||
/// If async_callback is specified, and read is blocking, run async_callback and try again later.
|
/// If async_callback is specified, and read is blocking, run async_callback and try again later.
|
||||||
/// It is expected that file descriptor may be polled externally.
|
/// It is expected that file descriptor may be polled externally.
|
||||||
/// Note that receive timeout is not checked here. External code should check it while polling.
|
/// Note that receive timeout is not checked here. External code should check it while polling.
|
||||||
|
LOG_DEBUG(&Poco::Logger::get("ReadBufferFromPocoSocket"), "Don't have async callback");
|
||||||
|
|
||||||
while (bytes_read < 0 && async_callback && errno == EAGAIN)
|
while (bytes_read < 0 && async_callback && errno == EAGAIN)
|
||||||
{
|
{
|
||||||
async_callback(socket.impl()->sockfd(), socket.getReceiveTimeout(), socket_description);
|
async_callback(socket.impl()->sockfd(), socket.getReceiveTimeout(), socket_description);
|
||||||
|
@ -40,6 +40,9 @@ add_executable (in_join_subqueries_preprocessor in_join_subqueries_preprocessor.
|
|||||||
target_link_libraries (in_join_subqueries_preprocessor PRIVATE clickhouse_aggregate_functions dbms clickhouse_parsers)
|
target_link_libraries (in_join_subqueries_preprocessor PRIVATE clickhouse_aggregate_functions dbms clickhouse_parsers)
|
||||||
add_check(in_join_subqueries_preprocessor)
|
add_check(in_join_subqueries_preprocessor)
|
||||||
|
|
||||||
|
add_executable (context context.cpp)
|
||||||
|
target_link_libraries (context PRIVATE dbms)
|
||||||
|
|
||||||
if (OS_LINUX)
|
if (OS_LINUX)
|
||||||
add_executable (internal_iotop internal_iotop.cpp)
|
add_executable (internal_iotop internal_iotop.cpp)
|
||||||
target_link_libraries (internal_iotop PRIVATE dbms)
|
target_link_libraries (internal_iotop PRIVATE dbms)
|
||||||
|
@ -78,6 +78,7 @@ def _check_exception(exception, expected_tries=3):
|
|||||||
expected_lines = (
|
expected_lines = (
|
||||||
'Code: 209, ' + EXCEPTION_NETWORK + EXCEPTION_TIMEOUT,
|
'Code: 209, ' + EXCEPTION_NETWORK + EXCEPTION_TIMEOUT,
|
||||||
'Code: 209, ' + EXCEPTION_NETWORK + EXCEPTION_CONNECT,
|
'Code: 209, ' + EXCEPTION_NETWORK + EXCEPTION_CONNECT,
|
||||||
|
EXCEPTION_TIMEOUT,
|
||||||
)
|
)
|
||||||
|
|
||||||
assert any(line.startswith(expected) for expected in expected_lines), \
|
assert any(line.startswith(expected) for expected in expected_lines), \
|
||||||
|
Loading…
Reference in New Issue
Block a user