cloud: fixed funny bug that cloud cannot be created if some host is unknown by DNS [#METR-10969]

This commit is contained in:
Pavel Kartavyy 2014-10-08 23:00:25 +04:00
parent 10f948d3a3
commit d9bba776eb
2 changed files with 42 additions and 18 deletions

View File

@ -18,6 +18,8 @@
#include <DB/Interpreters/Settings.h>
#include <atomic>
namespace DB
{
@ -54,7 +56,7 @@ public:
server_version_major(0), server_version_minor(0), server_revision(0),
query_id(""), compression(compression_), data_type_factory(data_type_factory_),
connect_timeout(connect_timeout_), receive_timeout(receive_timeout_), send_timeout(send_timeout_),
log(&Logger::get("Connection (" + Poco::Net::SocketAddress(host, port).toString() + ")"))
log_wrapper(host, port)
{
/// Соединеняемся не сразу, а при первой необходимости.
@ -162,7 +164,29 @@ private:
SharedPtr<WriteBuffer> maybe_compressed_out;
BlockOutputStreamPtr block_out;
Logger * log;
/// логгер, создаваемый лениво, чтобы не обращаться к DNS в конструкторе
class LoggerWrapper
{
public:
LoggerWrapper(std::string & host_, size_t port_) : log(nullptr), host(host_), port(port_)
{
}
Logger * get()
{
if (!log)
log = &Logger::get("Connection (" + Poco::Net::SocketAddress(host, port).toString() + ")");
return log;
}
private:
std::atomic<Logger *> log;
std::string host;
size_t port;
};
LoggerWrapper log_wrapper;
void connect();
void sendHello();

View File

@ -32,7 +32,7 @@ void Connection::connect()
if (connected)
disconnect();
LOG_TRACE(log, "Connecting to " << default_database << "@" << host << ":" << port);
LOG_TRACE(log_wrapper.get(), "Connecting to " << default_database << "@" << host << ":" << port);
socket.connect(Poco::Net::SocketAddress(host, port), connect_timeout);
socket.setReceiveTimeout(receive_timeout);
@ -47,7 +47,7 @@ void Connection::connect()
sendHello();
receiveHello();
LOG_TRACE(log, "Connected to " << server_name
LOG_TRACE(log_wrapper.get(), "Connected to " << server_name
<< " server version " << server_version_major
<< "." << server_version_minor
<< "." << server_revision
@ -72,7 +72,7 @@ void Connection::connect()
void Connection::disconnect()
{
//LOG_TRACE(log, "Disconnecting (" << getServerAddress() << ")");
//LOG_TRACE(log_wrapper.get(), "Disconnecting (" << getServerAddress() << ")");
socket.close();
in = nullptr;
@ -83,7 +83,7 @@ void Connection::disconnect()
void Connection::sendHello()
{
//LOG_TRACE(log, "Sending hello (" << getServerAddress() << ")");
//LOG_TRACE(log_wrapper.get(), "Sending hello (" << getServerAddress() << ")");
writeVarUInt(Protocol::Client::Hello, *out);
writeStringBinary((DBMS_NAME " ") + client_name, *out);
@ -100,7 +100,7 @@ void Connection::sendHello()
void Connection::receiveHello()
{
//LOG_TRACE(log, "Receiving hello (" << getServerAddress() << ")");
//LOG_TRACE(log_wrapper.get(), "Receiving hello (" << getServerAddress() << ")");
/// Получить hello пакет.
UInt64 packet_type = 0;
@ -157,7 +157,7 @@ void Connection::forceConnected()
}
else if (!ping())
{
LOG_TRACE(log, "Connection was closed, will reconnect.");
LOG_TRACE(log_wrapper.get(), "Connection was closed, will reconnect.");
connect();
}
}
@ -165,7 +165,7 @@ void Connection::forceConnected()
bool Connection::ping()
{
LOG_TRACE(log, "Ping (" << getServerAddress() << ")");
LOG_TRACE(log_wrapper.get(), "Ping (" << getServerAddress() << ")");
try
{
@ -198,7 +198,7 @@ bool Connection::ping()
}
catch (const Poco::Exception & e)
{
LOG_TRACE(log, e.displayText());
LOG_TRACE(log_wrapper.get(), e.displayText());
return false;
}
@ -212,7 +212,7 @@ void Connection::sendQuery(const String & query, const String & query_id_, UInt6
query_id = query_id_;
//LOG_TRACE(log, "Sending query (" << getServerAddress() << ")");
//LOG_TRACE(log_wrapper.get(), "Sending query (" << getServerAddress() << ")");
writeVarUInt(Protocol::Client::Query, *out);
@ -251,7 +251,7 @@ void Connection::sendQuery(const String & query, const String & query_id_, UInt6
void Connection::sendCancel()
{
//LOG_TRACE(log, "Sending cancel (" << getServerAddress() << ")");
//LOG_TRACE(log_wrapper.get(), "Sending cancel (" << getServerAddress() << ")");
writeVarUInt(Protocol::Client::Cancel, *out);
out->next();
@ -260,7 +260,7 @@ void Connection::sendCancel()
void Connection::sendData(const Block & block, const String & name)
{
//LOG_TRACE(log, "Sending data (" << getServerAddress() << ")");
//LOG_TRACE(log_wrapper.get(), "Sending data (" << getServerAddress() << ")");
if (!block_out)
{
@ -350,7 +350,7 @@ void Connection::sendExternalTablesData(ExternalTablesData & data)
else
msg << ", no compression.";
LOG_DEBUG(log, msg.rdbuf());
LOG_DEBUG(log_wrapper.get(), msg.rdbuf());
}
@ -362,7 +362,7 @@ bool Connection::poll(size_t timeout_microseconds)
Connection::Packet Connection::receivePacket()
{
//LOG_TRACE(log, "Receiving packet (" << getServerAddress() << ")");
//LOG_TRACE(log_wrapper.get(), "Receiving packet (" << getServerAddress() << ")");
try
{
@ -421,7 +421,7 @@ Connection::Packet Connection::receivePacket()
Block Connection::receiveData()
{
//LOG_TRACE(log, "Receiving data (" << getServerAddress() << ")");
//LOG_TRACE(log_wrapper.get(), "Receiving data (" << getServerAddress() << ")");
initBlockInput();
@ -457,7 +457,7 @@ String Connection::getServerAddress() const
SharedPtr<Exception> Connection::receiveException()
{
//LOG_TRACE(log, "Receiving exception (" << getServerAddress() << ")");
//LOG_TRACE(log_wrapper.get(), "Receiving exception (" << getServerAddress() << ")");
Exception e;
readException(e, *in, "Received from " + getServerAddress());
@ -467,7 +467,7 @@ SharedPtr<Exception> Connection::receiveException()
Progress Connection::receiveProgress()
{
//LOG_TRACE(log, "Receiving progress (" << getServerAddress() << ")");
//LOG_TRACE(log_wrapper.get(), "Receiving progress (" << getServerAddress() << ")");
Progress progress;
progress.read(*in);