This commit is contained in:
Evgeniy Gatov 2015-05-31 04:02:09 +03:00
commit d378c76964
23 changed files with 316 additions and 158 deletions

View File

@ -56,17 +56,47 @@ public:
Poco::Timespan ping_timeout_ = Poco::Timespan(DBMS_DEFAULT_PING_TIMEOUT_SEC, 0))
:
host(host_), port(port_), default_database(default_database_),
user(user_), password(password_),
user(user_), password(password_), resolved_address(host, port),
client_name(client_name_),
compression(compression_),
connect_timeout(connect_timeout_), receive_timeout(receive_timeout_), send_timeout(send_timeout_),
ping_timeout(ping_timeout_),
log_wrapper(host, port)
log_wrapper(*this)
{
/// Соединеняемся не сразу, а при первой необходимости.
if (user.empty())
user = "default";
setDescription();
}
Connection(const String & host_, UInt16 port_, const Poco::Net::SocketAddress & resolved_address_,
const String & default_database_,
const String & user_, const String & password_,
const String & client_name_ = "client",
Protocol::Compression::Enum compression_ = Protocol::Compression::Enable,
Poco::Timespan connect_timeout_ = Poco::Timespan(DBMS_DEFAULT_CONNECT_TIMEOUT_SEC, 0),
Poco::Timespan receive_timeout_ = Poco::Timespan(DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC, 0),
Poco::Timespan send_timeout_ = Poco::Timespan(DBMS_DEFAULT_SEND_TIMEOUT_SEC, 0),
Poco::Timespan ping_timeout_ = Poco::Timespan(DBMS_DEFAULT_PING_TIMEOUT_SEC, 0))
:
host(host_), port(port_),
default_database(default_database_),
user(user_), password(password_),
resolved_address(resolved_address_),
client_name(client_name_),
compression(compression_),
connect_timeout(connect_timeout_), receive_timeout(receive_timeout_), send_timeout(send_timeout_),
ping_timeout(ping_timeout_),
log_wrapper(*this)
{
/// Соединеняемся не сразу, а при первой необходимости.
if (user.empty())
user = "default";
setDescription();
}
virtual ~Connection() {};
@ -96,8 +126,21 @@ public:
void getServerVersion(String & name, UInt64 & version_major, UInt64 & version_minor, UInt64 & revision);
/// Адрес сервера - для сообщений в логе и в эксепшенах.
String getServerAddress() const;
/// Для сообщений в логе и в эксепшенах.
const String & getDescription() const
{
return description;
}
const String & getHost() const
{
return host;
}
UInt16 getPort() const
{
return port;
}
/// Если последний флаг true, то затем необходимо вызвать sendExternalTablesData
void sendQuery(const String & query, const String & query_id_ = "", UInt64 stage = QueryProcessingStage::Complete,
@ -131,16 +174,6 @@ public:
*/
void disconnect();
const std::string & getHost() const
{
return host;
}
UInt16 getPort() const
{
return port;
}
size_t outBytesCount() const { return !out.isNull() ? out->count() : 0; }
size_t inBytesCount() const { return !in.isNull() ? in->count() : 0; }
@ -151,6 +184,15 @@ private:
String user;
String password;
/** Адрес может быть заранее отрезолвен и передан в конструктор. Тогда поля host и port имеют смысл только для логгирования.
* Иначе адрес резолвится в конструкторе. То есть, DNS балансировка не поддерживается.
*/
Poco::Net::SocketAddress resolved_address;
/// Для сообщений в логе и в эксепшенах.
String description;
void setDescription();
String client_name;
bool connected = false;
@ -191,22 +233,22 @@ private:
class LoggerWrapper
{
public:
LoggerWrapper(std::string & host_, size_t port_) : log(nullptr), host(host_), port(port_)
LoggerWrapper(Connection & parent_)
: log(nullptr), parent(parent_)
{
}
Logger * get()
{
if (!log)
log = &Logger::get("Connection (" + Poco::Net::SocketAddress(host, port).toString() + ")");
log = &Logger::get("Connection (" + parent.getDescription() + ")");
return log;
}
private:
std::atomic<Logger *> log;
std::string host;
size_t port;
Connection & parent;
};
LoggerWrapper log_wrapper;

View File

@ -54,16 +54,34 @@ public:
typedef PoolBase<Connection> Base;
ConnectionPool(unsigned max_connections_,
const String & host_, UInt16 port_, const String & default_database_,
const String & host_, UInt16 port_,
const String & default_database_,
const String & user_, const String & password_,
const String & client_name_ = "client",
Protocol::Compression::Enum compression_ = Protocol::Compression::Enable,
Poco::Timespan connect_timeout_ = Poco::Timespan(DBMS_DEFAULT_CONNECT_TIMEOUT_SEC, 0),
Poco::Timespan receive_timeout_ = Poco::Timespan(DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC, 0),
Poco::Timespan send_timeout_ = Poco::Timespan(DBMS_DEFAULT_SEND_TIMEOUT_SEC, 0))
: Base(max_connections_, &Logger::get("ConnectionPool (" + Poco::Net::SocketAddress(host_, port_).toString() + ")")),
: Base(max_connections_, &Logger::get("ConnectionPool (" + host_ + ":" + toString(port_) + ")")),
host(host_), port(port_), default_database(default_database_),
user(user_), password(password_),
user(user_), password(password_), resolved_address(host_, port_),
client_name(client_name_), compression(compression_),
connect_timeout(connect_timeout_), receive_timeout(receive_timeout_), send_timeout(send_timeout_)
{
}
ConnectionPool(unsigned max_connections_,
const String & host_, UInt16 port_, const Poco::Net::SocketAddress & resolved_address_,
const String & default_database_,
const String & user_, const String & password_,
const String & client_name_ = "client",
Protocol::Compression::Enum compression_ = Protocol::Compression::Enable,
Poco::Timespan connect_timeout_ = Poco::Timespan(DBMS_DEFAULT_CONNECT_TIMEOUT_SEC, 0),
Poco::Timespan receive_timeout_ = Poco::Timespan(DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC, 0),
Poco::Timespan send_timeout_ = Poco::Timespan(DBMS_DEFAULT_SEND_TIMEOUT_SEC, 0))
: Base(max_connections_, &Logger::get("ConnectionPool (" + host_ + ":" + toString(port_) + ")")),
host(host_), port(port_), default_database(default_database_),
user(user_), password(password_), resolved_address(resolved_address_),
client_name(client_name_), compression(compression_),
connect_timeout(connect_timeout_), receive_timeout(receive_timeout_), send_timeout(send_timeout_)
{
@ -89,7 +107,8 @@ protected:
ConnectionPtr allocObject() override
{
return new Connection(
host, port, default_database, user, password,
host, port, resolved_address,
default_database, user, password,
client_name, compression,
connect_timeout, receive_timeout, send_timeout);
}
@ -101,6 +120,11 @@ private:
String user;
String password;
/** Адрес может быть заранее отрезолвен и передан в конструктор. Тогда поля host и port имеют смысл только для логгирования.
* Иначе адрес резолвится в конструкторе. То есть, DNS балансировка не поддерживается.
*/
Poco::Net::SocketAddress resolved_address;
String client_name;
Protocol::Compression::Enum compression; /// Сжимать ли данные при взаимодействии с сервером.

View File

@ -5,6 +5,7 @@
#include <statdaemons/PoolWithFailoverBase.h>
#include <DB/Common/getFQDNOrHostName.h>
#include <DB/Client/ConnectionPool.h>
@ -33,7 +34,7 @@ public:
: Base(nested_pools_, max_tries_, decrease_error_period_,
&Logger::get("ConnectionPoolWithFailover")), default_load_balancing(load_balancing)
{
std::string local_hostname = Poco::Net::DNS::hostName();
const std::string & local_hostname = getFQDNOrHostName();
hostname_differences.resize(nested_pools.size());
for (size_t i = 0; i < nested_pools.size(); ++i)

View File

@ -0,0 +1,8 @@
#pragma once
#include <string>
/** Получить FQDN для локального сервера путём DNS-резолвинга hostname - аналогично вызову утилиты hostname с флагом -f.
* Если не получилось отрезолвить, то вернуть hostname - аналогично вызову утилиты hostname без флагов или uname -n.
*/
const std::string & getFQDNOrHostName();

View File

@ -3,7 +3,6 @@
#include <DB/Core/Progress.h>
#include <DB/Interpreters/Limits.h>
#include <DB/Interpreters/Quota.h>
#include <DB/Interpreters/ProcessList.h>
#include <DB/DataStreams/BlockStreamProfileInfo.h>
@ -14,6 +13,8 @@
namespace DB
{
class QuotaForIntervals;
/** Смотрит за тем, как работает источник блоков.
* Позволяет получить информацию для профайлинга:

View File

@ -112,6 +112,7 @@ public:
}
};
/// Получить имя хоста. (Оно - константа, вычисляется один раз за весь запрос.)
class FunctionHostName : public IFunction
{
@ -146,6 +147,7 @@ public:
}
};
class FunctionVisibleWidth : public IFunction
{
public:

View File

@ -101,18 +101,14 @@ namespace DB
* Без проверки, потому что делитель всегда положительный.
*/
template<typename T, typename Enable = void>
struct FastModulo
{
};
struct FastModulo;
template<typename T>
struct FastModulo<T, typename std::enable_if<std::is_integral<T>::value>::type>
{
private:
template<typename InputType, typename Enable = void>
struct Extend
{
};
struct Extend;
template<typename InputType>
struct Extend<InputType,
@ -150,9 +146,6 @@ namespace DB
static inline T compute(T a, const Divisor & divisor)
{
if (divisor.first == 1)
return 0;
U val = static_cast<U>(a);
U rem = val - (val / divisor.second) * static_cast<U>(divisor.first);
return static_cast<T>(rem);
@ -172,9 +165,7 @@ namespace DB
/** Реализация низкоуровневых функций округления для целочисленных значений.
*/
template<typename T, int rounding_mode, ScaleMode scale_mode, typename Enable = void>
struct IntegerRoundingComputation
{
};
struct IntegerRoundingComputation;
template<typename T, int rounding_mode, ScaleMode scale_mode>
struct IntegerRoundingComputation<T, rounding_mode, scale_mode,
@ -266,9 +257,7 @@ namespace DB
};
template<typename T>
struct BaseFloatRoundingComputation
{
};
struct BaseFloatRoundingComputation;
template<>
struct BaseFloatRoundingComputation<Float32>
@ -494,9 +483,7 @@ namespace DB
/** Реализация высокоуровневых функций округления.
*/
template<typename T, int rounding_mode, ScaleMode scale_mode, typename Enable = void>
struct FunctionRoundingImpl
{
};
struct FunctionRoundingImpl;
/** Реализация высокоуровневых функций округления для целочисленных значений.
*/
@ -619,7 +606,7 @@ namespace DB
public:
static inline void apply(const PODArray<T> & in, size_t scale, typename ColumnVector<T>::Container_t & out)
{
::memset(reinterpret_cast<T *>(&out[0]), 0, in.size());
::memset(reinterpret_cast<T *>(&out[0]), 0, in.size() * sizeof(T));
}
static inline T apply(T val, size_t scale)
@ -683,9 +670,7 @@ namespace
* умножения и деления. Поэтому оно называется масштабом.
*/
template<typename T, typename U, typename Enable = void>
struct ScaleForRightType
{
};
struct ScaleForRightType;
template<typename T, typename U>
struct ScaleForRightType<T, U,

View File

@ -14,6 +14,9 @@
#include <DB/Functions/IFunction.h>
#include <statdaemons/ext/range.hpp>
#include <emmintrin.h>
#include <nmmintrin.h>
namespace DB
{
@ -264,15 +267,16 @@ struct LowerUpperImplVectorized
private:
static void array(const UInt8 * src, const UInt8 * src_end, UInt8 * dst)
{
const auto src_end_sse = src_end - (src_end - src) % 16;
const auto bytes_sse = sizeof(__m128i);
const auto src_end_sse = src_end - (src_end - src) % bytes_sse;
const auto flip_case_mask = 1 << 5;
const auto flip_case_mask = 'A' ^ 'a';
const auto v_not_case_lower_bound = _mm_set1_epi8(not_case_lower_bound - 1);
const auto v_not_case_upper_bound = _mm_set1_epi8(not_case_upper_bound + 1);
const auto v_flip_case_mask = _mm_set1_epi8(flip_case_mask);
for (; src < src_end_sse; src += 16, dst += 16)
for (; src < src_end_sse; src += bytes_sse, dst += bytes_sse)
{
/// load 16 sequential 8-bit characters
const auto chars = _mm_loadu_si128(reinterpret_cast<const __m128i *>(src));
@ -292,7 +296,10 @@ private:
}
for (; src < src_end; ++src, ++dst)
*dst = (*src >= not_case_lower_bound && *src <= not_case_upper_bound) ? *src ^ flip_case_mask : *src;
if (*src >= not_case_lower_bound && *src <= not_case_upper_bound)
*dst = *src ^ flip_case_mask;
else
*dst = *src;
}
};
@ -324,66 +331,77 @@ struct LowerUpperUTF8ImplVectorized
private:
static void array(const UInt8 * src, const UInt8 * src_end, UInt8 * dst)
{
auto is_ascii = false;
static const Poco::UTF8Encoding utf8;
if (isCaseASCII(src, src_end, is_ascii))
std::copy(src, src_end, dst);
else if (is_ascii)
LowerUpperImplVectorized<not_case_lower_bound, not_case_upper_bound>::array(src, src_end, dst);
else
UTF8ToCase(src, src_end, dst);
}
const auto bytes_sse = sizeof(__m128i);
auto src_end_sse = src + (src_end - src) * bytes_sse / bytes_sse;
static bool isCaseASCII(const UInt8 * src, const UInt8 * const src_end, bool & is_ascii)
{
const auto src_end_sse = src_end - (src_end - src) % 16;
const auto flip_case_mask = 'A' ^ 'a';
/// SSE2 packed comparison operate on signed types, hence compare (c < 0) instead of (c > 0x7f)
const auto v_zero = _mm_setzero_si128();
const auto v_not_case_lower_bound = _mm_set1_epi8(not_case_lower_bound - 1);
const auto v_not_case_upper_bound = _mm_set1_epi8(not_case_upper_bound + 1);
// const auto v_not_case_range = _mm_set1_epi16((not_case_upper_bound << 8) | not_case_lower_bound);
const auto v_flip_case_mask = _mm_set1_epi8(flip_case_mask);
const auto not_case_a_16 = _mm_set1_epi8('A' - 1);
const auto not_case_z_16 = _mm_set1_epi8('Z' + 1);
const auto zero_16 = _mm_setzero_si128();
auto is_case = true;
for (; src < src_end_sse; src += 16)
for (; src < src_end_sse; src += bytes_sse, dst += bytes_sse)
{
const auto chars = _mm_loadu_si128(reinterpret_cast<const __m128i *>(src));
/// check for ASCII and case
const auto is_not_ascii = _mm_cmplt_epi8(chars, zero_16);
/// check for ASCII
const auto is_not_ascii = _mm_cmplt_epi8(chars, v_zero);
const auto mask_is_not_ascii = _mm_movemask_epi8(is_not_ascii);
if (mask_is_not_ascii != 0)
/// ASCII
if (mask_is_not_ascii == 0)
{
is_ascii = false;
return false;
const auto is_not_case = _mm_and_si128(_mm_cmpgt_epi8(chars, v_not_case_lower_bound),
_mm_cmplt_epi8(chars, v_not_case_upper_bound));
const auto mask_is_not_case = _mm_movemask_epi8(is_not_case);
/// check for case
// const auto is_case_result = _mm_cmpestra(v_not_case_range, 2, chars, 16, _SIDD_UBYTE_OPS | _SIDD_CMP_RANGES);
// if (is_case_result == 0)
/// not in case
if (mask_is_not_case != 0)
{
/// keep `flip_case_mask` only where necessary, zero out elsewhere
const auto xor_mask = _mm_and_si128(v_flip_case_mask, is_not_case);
/// flip case by applying calculated mask
const auto cased_chars = _mm_xor_si128(chars, xor_mask);
/// store result back to destination
_mm_storeu_si128(reinterpret_cast<__m128i *>(dst), cased_chars);
}
else
std::copy(src, src + bytes_sse, dst);
}
else
{
/// UTF-8
const auto end = src + bytes_sse;
while (src < end)
{
if (const auto chars = utf8.convert(to_case(utf8.convert(src)), dst, src_end_sse - src))
{
src += chars;
dst += chars;
}
else
{
++src;
++dst;
}
}
const auto is_not_case = _mm_and_si128(_mm_cmpgt_epi8(chars, not_case_a_16),
_mm_cmplt_epi8(chars, not_case_z_16));
const auto mask_is_not_case = _mm_movemask_epi8(is_not_case);
if (mask_is_not_case != 0)
is_case = false;
const auto diff = src - end;
src_end_sse += diff;
}
}
/// handle remaining symbols
for (; src < src_end; ++src)
if (*src > '\x7f')
{
is_ascii = false;
return false;
}
else if (*src >= 'A' && *src <= 'Z')
is_case = false;
is_ascii = true;
return is_case;
}
static void UTF8ToCase(const UInt8 * src, const UInt8 * src_end, UInt8 * dst)
{
static const Poco::UTF8Encoding utf8;
while (src < src_end)
{
if (const auto chars = utf8.convert(to_case(utf8.convert(src)), dst, src_end - src))

View File

@ -8,6 +8,7 @@
namespace DB
{
/// Cluster содержит пулы соединений до каждого из узлов
/// С локальными узлами соединение не устанавливается, а выполяется запрос напрямую.
/// Поэтому храним только количество локальных узлов
@ -61,8 +62,9 @@ public:
* </replica>
* </shard>
*/
Poco::Net::SocketAddress host_port;
Poco::Net::SocketAddress resolved_address;
String host_name;
UInt16 port;
String user;
String password;
UInt32 replica_num;

View File

@ -72,13 +72,14 @@ struct QuotaForInterval
{
time_t rounded_time;
size_t duration;
time_t offset; /// Смещение интервала, для рандомизации.
QuotaValues max;
QuotaValues used;
QuotaForInterval() : rounded_time() {}
QuotaForInterval(time_t duration_) : duration(duration_) {}
void initFromConfig(const String & config_elem, time_t duration_, Poco::Util::AbstractConfiguration & config);
void initFromConfig(const String & config_elem, time_t duration_, time_t offset_, Poco::Util::AbstractConfiguration & config);
/// Увеличить соответствующее значение.
void addQuery(time_t current_time, const String & quota_name);
@ -131,7 +132,7 @@ public:
return cont.empty();
}
void initFromConfig(const String & config_elem, Poco::Util::AbstractConfiguration & config);
void initFromConfig(const String & config_elem, Poco::Util::AbstractConfiguration & config, std::mt19937 & rng);
/// Обновляет максимальные значения значениями из quota.
/// Удаляет интервалы, которых нет в quota, добавляет интревалы, которых нет здесь, но есть в quota.
@ -177,7 +178,7 @@ struct Quota
Quota() : is_keyed(false), keyed_by_ip(false) {}
void loadFromConfig(const String & config_elem, const String & name_, Poco::Util::AbstractConfiguration & config);
void loadFromConfig(const String & config_elem, const String & name_, Poco::Util::AbstractConfiguration & config, std::mt19937 & rng);
QuotaForIntervalsPtr get(const String & quota_key, const String & user_name, const Poco::Net::IPAddress & ip);
};

View File

@ -197,7 +197,7 @@ private:
return true;
}
/* Парсит строку, генерирующую шарды и реплики. Splitter - один из двух символов | или '
/* Парсит строку, генерирующую шарды и реплики. Разделитель - один из двух символов | или ,
* в зависимости от того генерируются шарды или реплики.
* Например:
* host1,host2,... - порождает множество шардов из host1, host2, ...
@ -209,7 +209,7 @@ private:
* abc{1..9}de{f,g,h} - прямое произведение, 27 шардов.
* abc{1..9}de{0|1} - прямое произведение, 9 шардов, в каждом 2 реплики.
*/
std::vector<String> parseDescription(const String & description, size_t l, size_t r, char splitter) const
std::vector<String> parseDescription(const String & description, size_t l, size_t r, char separator) const
{
std::vector<String> res;
std::vector<String> cur;
@ -238,7 +238,7 @@ private:
if (description[m] == '{') ++cnt;
if (description[m] == '}') --cnt;
if (description[m] == '.' && description[m-1] == '.') last_dot = m;
if (description[m] == splitter) have_splitter = true;
if (description[m] == separator) have_splitter = true;
if (cnt == 0) break;
}
if (cnt != 0)
@ -282,13 +282,13 @@ private:
buffer.push_back(cur);
}
} else if (have_splitter) /// Если внутри есть текущий разделитель, то сгенерировать множество получаемых строк
buffer = parseDescription(description, i + 1, m, splitter);
buffer = parseDescription(description, i + 1, m, separator);
else /// Иначе просто скопировать, порождение произойдет при вызове с правильным разделителем
buffer.push_back(description.substr(i, m - i + 1));
/// К текущему множеству строк добавить все возможные полученные продолжения
append(cur, buffer);
i = m;
} else if (description[i] == splitter) {
} else if (description[i] == separator) {
/// Если разделитель, то добавляем в ответ найденные строки
res.insert(res.end(), cur.begin(), cur.end());
cur.clear();

View File

@ -468,7 +468,7 @@ AggregateFunctionPtr AggregateFunctionFactory::get(const String & name, const Da
return res;
}
else if (name == "quantileDeterministic")
else if (name == "medianDeterministic" || name == "quantileDeterministic")
{
if (argument_types.size() != 2)
throw Exception("Incorrect number of arguments for aggregate function " + name, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
@ -722,6 +722,7 @@ const AggregateFunctionFactory::FunctionNames & AggregateFunctionFactory::getFun
"quantileTimingWeighted",
"quantilesTimingWeighted",
"medianTimingWeighted",
"medianDeterministic",
"quantileDeterministic",
"quantilesDeterministic",
"sequenceMatch",

View File

@ -34,9 +34,9 @@ void Connection::connect()
if (connected)
disconnect();
LOG_TRACE(log_wrapper.get(), "Connecting to " << default_database << "@" << host << ":" << port);
LOG_TRACE(log_wrapper.get(), "Connecting. Database: " << (default_database.empty() ? "(not specified)" : default_database) << ". User: " << user);
socket.connect(Poco::Net::SocketAddress(host, port), connect_timeout);
socket.connect(resolved_address, connect_timeout);
socket.setReceiveTimeout(receive_timeout);
socket.setSendTimeout(send_timeout);
socket.setNoDelay(true);
@ -60,21 +60,21 @@ void Connection::connect()
disconnect();
/// Добавляем в сообщение адрес сервера. Также объект Exception запомнит stack trace. Жаль, что более точный тип исключения теряется.
throw NetException(e.displayText(), "(" + getServerAddress() + ")", ErrorCodes::NETWORK_ERROR);
throw NetException(e.displayText(), "(" + getDescription() + ")", ErrorCodes::NETWORK_ERROR);
}
catch (Poco::TimeoutException & e)
{
disconnect();
/// Добавляем в сообщение адрес сервера. Также объект Exception запомнит stack trace. Жаль, что более точный тип исключения теряется.
throw NetException(e.displayText(), "(" + getServerAddress() + ")", ErrorCodes::SOCKET_TIMEOUT);
throw NetException(e.displayText(), "(" + getDescription() + ")", ErrorCodes::SOCKET_TIMEOUT);
}
}
void Connection::disconnect()
{
//LOG_TRACE(log_wrapper.get(), "Disconnecting (" << getServerAddress() << ")");
//LOG_TRACE(log_wrapper.get(), "Disconnecting");
socket.close();
in = nullptr;
@ -85,7 +85,7 @@ void Connection::disconnect()
void Connection::sendHello()
{
//LOG_TRACE(log_wrapper.get(), "Sending hello (" << getServerAddress() << ")");
//LOG_TRACE(log_wrapper.get(), "Sending hello");
writeVarUInt(Protocol::Client::Hello, *out);
writeStringBinary((DBMS_NAME " ") + client_name, *out);
@ -102,7 +102,7 @@ void Connection::sendHello()
void Connection::receiveHello()
{
//LOG_TRACE(log_wrapper.get(), "Receiving hello (" << getServerAddress() << ")");
//LOG_TRACE(log_wrapper.get(), "Receiving hello");
/// Получить hello пакет.
UInt64 packet_type = 0;
@ -127,7 +127,7 @@ void Connection::receiveHello()
/// Закроем соединение, чтобы не было рассинхронизации.
disconnect();
throw NetException("Unexpected packet from server " + getServerAddress() + " (expected Hello or Exception, got "
throw NetException("Unexpected packet from server " + getDescription() + " (expected Hello or Exception, got "
+ String(Protocol::Server::toString(packet_type)) + ")", ErrorCodes::UNEXPECTED_PACKET_FROM_SERVER);
}
}
@ -192,7 +192,7 @@ struct PingTimeoutSetter
bool Connection::ping()
{
// LOG_TRACE(log_wrapper.get(), "Ping (" << getServerAddress() << ")");
// LOG_TRACE(log_wrapper.get(), "Ping");
PingTimeoutSetter timeout_setter(socket, ping_timeout);
try
@ -219,7 +219,7 @@ bool Connection::ping()
if (pong != Protocol::Server::Pong)
{
throw Exception("Unexpected packet from server " + getServerAddress() + " (expected Pong, got "
throw Exception("Unexpected packet from server " + getDescription() + " (expected Pong, got "
+ String(Protocol::Server::toString(pong)) + ")",
ErrorCodes::UNEXPECTED_PACKET_FROM_SERVER);
}
@ -242,7 +242,7 @@ void Connection::sendQuery(const String & query, const String & query_id_, UInt6
query_id = query_id_;
//LOG_TRACE(log_wrapper.get(), "Sending query (" << getServerAddress() << ")");
//LOG_TRACE(log_wrapper.get(), "Sending query");
writeVarUInt(Protocol::Client::Query, *out);
@ -281,7 +281,7 @@ void Connection::sendQuery(const String & query, const String & query_id_, UInt6
void Connection::sendCancel()
{
//LOG_TRACE(log_wrapper.get(), "Sending cancel (" << getServerAddress() << ")");
//LOG_TRACE(log_wrapper.get(), "Sending cancel");
writeVarUInt(Protocol::Client::Cancel, *out);
out->next();
@ -290,7 +290,7 @@ void Connection::sendCancel()
void Connection::sendData(const Block & block, const String & name)
{
//LOG_TRACE(log_wrapper.get(), "Sending data (" << getServerAddress() << ")");
//LOG_TRACE(log_wrapper.get(), "Sending data");
if (!block_out)
{
@ -405,7 +405,7 @@ bool Connection::hasReadBufferPendingData() const
Connection::Packet Connection::receivePacket()
{
//LOG_TRACE(log_wrapper.get(), "Receiving packet (" << getServerAddress() << ")");
//LOG_TRACE(log_wrapper.get(), "Receiving packet");
try
{
@ -448,14 +448,14 @@ Connection::Packet Connection::receivePacket()
disconnect();
throw Exception("Unknown packet "
+ toString(res.type)
+ " from server " + getServerAddress(), ErrorCodes::UNKNOWN_PACKET_FROM_SERVER);
+ " from server " + getDescription(), ErrorCodes::UNKNOWN_PACKET_FROM_SERVER);
}
}
catch (Exception & e)
{
/// Дописываем в текст исключения адрес сервера, если надо.
if (e.code() != ErrorCodes::UNKNOWN_PACKET_FROM_SERVER)
e.addMessage("while receiving packet from " + getServerAddress());
e.addMessage("while receiving packet from " + getDescription());
throw;
}
@ -464,7 +464,7 @@ Connection::Packet Connection::receivePacket()
Block Connection::receiveData()
{
//LOG_TRACE(log_wrapper.get(), "Receiving data (" << getServerAddress() << ")");
//LOG_TRACE(log_wrapper.get(), "Receiving data");
initBlockInput();
@ -499,25 +499,29 @@ void Connection::initBlockInput()
}
String Connection::getServerAddress() const
void Connection::setDescription()
{
return Poco::Net::SocketAddress(host, port).toString();
description = host + ":" + toString(resolved_address.port());
auto ip_address = resolved_address.host().toString();
if (host != ip_address)
description += ", " + ip_address;
}
SharedPtr<Exception> Connection::receiveException()
{
//LOG_TRACE(log_wrapper.get(), "Receiving exception (" << getServerAddress() << ")");
//LOG_TRACE(log_wrapper.get(), "Receiving exception");
Exception e;
readException(e, *in, "Received from " + getServerAddress());
readException(e, *in, "Received from " + getDescription());
return e.clone();
}
Progress Connection::receiveProgress()
{
//LOG_TRACE(log_wrapper.get(), "Receiving progress (" << getServerAddress() << ")");
//LOG_TRACE(log_wrapper.get(), "Receiving progress");
Progress progress;
progress.read(*in, server_revision);

View File

@ -184,7 +184,7 @@ std::string ParallelReplicas::dumpAddresses() const
const Connection * connection = e.second;
if (connection != nullptr)
{
os << (is_first ? "" : "; ") << connection->getServerAddress();
os << (is_first ? "" : "; ") << connection->getDescription();
is_first = false;
}
}

View File

@ -0,0 +1,25 @@
#include <Poco/Net/DNS.h>
#include <DB/Common/getFQDNOrHostName.h>
namespace
{
std::string getFQDNOrHostNameImpl()
{
try
{
return Poco::Net::DNS::thisHost().name();
}
catch (...)
{
return Poco::Net::DNS::hostName();
}
}
}
const std::string & getFQDNOrHostName()
{
static std::string result = getFQDNOrHostNameImpl();
return result;
}

View File

@ -4,6 +4,7 @@
#include <Poco/Ext/ThreadNumber.h>*/
#include <DB/Columns/ColumnConst.h>
#include <DB/Interpreters/Quota.h>
#include <DB/DataStreams/IProfilingBlockInputStream.h>

View File

@ -13,11 +13,8 @@ Cluster::Address::Address(const String & config_prefix)
auto & config = Poco::Util::Application::instance().config();
host_name = config.getString(config_prefix + ".host");
host_port = Poco::Net::SocketAddress(
host_name,
config.getInt(config_prefix + ".port")
);
port = config.getInt(config_prefix + ".port");
resolved_address = Poco::Net::SocketAddress(host_name, port);
user = config.getString(config_prefix + ".user", "default");
password = config.getString(config_prefix + ".password", "");
}
@ -29,9 +26,17 @@ Cluster::Address::Address(const String & host_port_, const String & user_, const
/// Похоже на то, что строка host_port_ содержит порт. Если условие срабатывает - не обязательно значит, что порт есть (пример: [::]).
if (nullptr != strchr(host_port_.c_str(), ':') || !default_port)
host_port = Poco::Net::SocketAddress(host_port_);
{
resolved_address = Poco::Net::SocketAddress(host_port_);
host_name = host_port_.substr(0, host_port_.find(':'));
port = resolved_address.port();
}
else
host_port = Poco::Net::SocketAddress(host_port_, default_port);
{
resolved_address = Poco::Net::SocketAddress(host_port_, default_port);
host_name = host_port_;
port = default_port;
}
}
namespace
@ -41,8 +46,8 @@ namespace
return
escapeForFileName(address.user) +
(address.password.empty() ? "" : (':' + escapeForFileName(address.password))) + '@' +
escapeForFileName(address.host_port.host().toString()) + ':' +
std::to_string(address.host_port.port());
escapeForFileName(address.resolved_address.host().toString()) + ':' +
std::to_string(address.resolved_address.port());
}
}
@ -178,7 +183,8 @@ Cluster::Cluster(const Settings & settings, const String & cluster_name)
{
replicas.emplace_back(new ConnectionPool(
settings.distributed_connections_pool_size,
replica.host_port.host().toString(), replica.host_port.port(), "", replica.user, replica.password,
replica.host_name, replica.port, replica.resolved_address,
"", replica.user, replica.password,
"server", Protocol::Compression::Enable,
saturate(settings.connect_timeout_with_failover_ms, settings.limits.max_execution_time),
saturate(settings.receive_timeout, settings.limits.max_execution_time),
@ -204,7 +210,8 @@ Cluster::Cluster(const Settings & settings, const String & cluster_name)
{
pools.emplace_back(new ConnectionPool(
settings.distributed_connections_pool_size,
address.host_port.host().toString(), address.host_port.port(), "", address.user, address.password,
address.host_name, address.port, address.resolved_address,
"", address.user, address.password,
"server", Protocol::Compression::Enable,
saturate(settings.connect_timeout, settings.limits.max_execution_time),
saturate(settings.receive_timeout, settings.limits.max_execution_time),
@ -237,7 +244,8 @@ Cluster::Cluster(const Settings & settings, std::vector<std::vector<String>> nam
{
replicas.emplace_back(new ConnectionPool(
settings.distributed_connections_pool_size,
replica.host_port.host().toString(), replica.host_port.port(), "", replica.user, replica.password,
replica.host_name, replica.port, replica.resolved_address,
"", replica.user, replica.password,
"server", Protocol::Compression::Enable,
saturate(settings.connect_timeout_with_failover_ms, settings.limits.max_execution_time),
saturate(settings.receive_timeout, settings.limits.max_execution_time),
@ -264,7 +272,7 @@ bool Cluster::isLocal(const Address & address)
/// - её порт совпадает с портом, который слушает сервер;
/// - её хост резолвится в набор адресов, один из которых совпадает с одним из адресов сетевых интерфейсов сервера
/// то нужно всегда ходить на этот шард без межпроцессного взаимодействия
return isLocalAddress(address.host_port);
return isLocalAddress(address.resolved_address);
}
}

View File

@ -24,10 +24,11 @@ void QuotaValues::initFromConfig(const String & config_elem, Poco::Util::Abstrac
}
void QuotaForInterval::initFromConfig(const String & config_elem, time_t duration_, Poco::Util::AbstractConfiguration & config)
void QuotaForInterval::initFromConfig(const String & config_elem, time_t duration_, time_t offset_, Poco::Util::AbstractConfiguration & config)
{
rounded_time = 0;
duration = duration_;
offset = offset_;
max.initFromConfig(config_elem, config);
}
@ -95,7 +96,7 @@ void QuotaForInterval::updateTime(time_t current_time)
{
if (current_time >= rounded_time + static_cast<int>(duration))
{
rounded_time = current_time / duration * duration;
rounded_time = (current_time - offset) / duration * duration + offset;
used.clear();
}
}
@ -127,7 +128,7 @@ void QuotaForInterval::check(size_t max_amount, size_t used_amount, time_t curre
}
void QuotaForIntervals::initFromConfig(const String & config_elem, Poco::Util::AbstractConfiguration & config)
void QuotaForIntervals::initFromConfig(const String & config_elem, Poco::Util::AbstractConfiguration & config, std::mt19937 & rng)
{
Poco::Util::AbstractConfiguration::Keys config_keys;
config.keys(config_elem, config_keys);
@ -139,8 +140,13 @@ void QuotaForIntervals::initFromConfig(const String & config_elem, Poco::Util::A
String interval_config_elem = config_elem + "." + *it;
time_t duration = config.getInt(interval_config_elem + ".duration");
time_t offset = 0;
cont[duration].initFromConfig(interval_config_elem, duration, config);
bool randomize = config.getBool(interval_config_elem + ".randomize", false);
if (randomize)
offset = std::uniform_int_distribution<decltype(duration)>(0, duration - 1)(rng);
cont[duration].initFromConfig(interval_config_elem, duration, offset, config);
}
}
@ -210,7 +216,7 @@ String QuotaForIntervals::toString() const
}
void Quota::loadFromConfig(const String & config_elem, const String & name_, Poco::Util::AbstractConfiguration & config)
void Quota::loadFromConfig(const String & config_elem, const String & name_, Poco::Util::AbstractConfiguration & config, std::mt19937 & rng)
{
name = name_;
@ -226,7 +232,7 @@ void Quota::loadFromConfig(const String & config_elem, const String & name_, Poc
}
QuotaForIntervals new_max(name);
new_max.initFromConfig(config_elem, config);
new_max.initFromConfig(config_elem, config, rng);
if (!(new_max == max))
{
max = new_max;
@ -269,6 +275,8 @@ QuotaForIntervalsPtr Quota::get(const String & quota_key, const String & user_na
void Quotas::loadFromConfig(Poco::Util::AbstractConfiguration & config)
{
std::mt19937 rng;
Poco::Util::AbstractConfiguration::Keys config_keys;
config.keys("quotas", config_keys);
@ -286,7 +294,7 @@ void Quotas::loadFromConfig(Poco::Util::AbstractConfiguration & config)
{
if (!cont[*it])
cont[*it] = new Quota();
cont[*it]->loadFromConfig("quotas." + *it, *it, config);
cont[*it]->loadFromConfig("quotas." + *it, *it, config, rng);
}
}

View File

@ -16,7 +16,7 @@ bool ParserSelectQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_p
{
Pos begin = pos;
ASTSelectQuery * select_query = new ASTSelectQuery(StringRange(begin, pos));
ASTSelectQuery * select_query = new ASTSelectQuery;
node = select_query;
ParserWhiteSpaceOrComments ws;
@ -312,6 +312,8 @@ bool ParserSelectQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_p
ws.ignore(pos, end);
}
select_query->range = StringRange(begin, pos);
select_query->children.push_back(select_query->select_expression_list);
if (select_query->database)
select_query->children.push_back(select_query->database);

View File

@ -17,6 +17,7 @@
#include <DB/DataStreams/IProfilingBlockInputStream.h>
#include <DB/Interpreters/executeQuery.h>
#include <DB/Interpreters/Quota.h>
#include <DB/Common/ExternalTable.h>

View File

@ -18,6 +18,7 @@
#include <condition_variable>
#include <DB/Common/Macros.h>
#include <DB/Common/getFQDNOrHostName.h>
#include <DB/Interpreters/loadMetadata.h>
#include <DB/Storages/StorageSystemNumbers.h>
#include <DB/Storages/StorageSystemTables.h>
@ -485,9 +486,14 @@ int Server::main(const std::vector<std::string> & args)
{
String this_host;
if (config().has("interserver_http_host"))
{
this_host = config().getString("interserver_http_host");
}
else
this_host = Poco::Net::DNS::hostName();
{
this_host = getFQDNOrHostName();
LOG_DEBUG(log, "Configuration parameter 'interserver_http_host' doesn't exist. Will use '" + this_host + "' as replica host.");
}
String port_str = config().getString("interserver_http_port");
int port = parse<int>(port_str);

View File

@ -20,6 +20,7 @@
#include <DB/DataStreams/NativeBlockInputStream.h>
#include <DB/DataStreams/NativeBlockOutputStream.h>
#include <DB/Interpreters/executeQuery.h>
#include <DB/Interpreters/Quota.h>
#include <DB/Storages/StorageMemory.h>
@ -225,14 +226,31 @@ void TCPHandler::readData(const Settings & global_settings)
{
while (1)
{
/// Ждём пакета от клиента. При этом, каждые POLL_INTERVAL сек. проверяем, не требуется ли завершить работу.
while (!static_cast<ReadBufferFromPocoSocket &>(*in).poll(global_settings.poll_interval * 1000000) && !Daemon::instance().isCancelled())
;
Stopwatch watch(CLOCK_MONOTONIC_COARSE);
/// Если требуется завершить работу, или клиент отсоединился.
if (Daemon::instance().isCancelled() || in->eof())
/// Ждём пакета от клиента. При этом, каждые POLL_INTERVAL сек. проверяем, не требуется ли завершить работу.
while (1)
{
if (static_cast<ReadBufferFromPocoSocket &>(*in).poll(global_settings.poll_interval * 1000000))
break;
/// Если требуется завершить работу.
if (Daemon::instance().isCancelled())
return;
/** Если ждём данных уже слишком долго.
* Если периодически poll-ить соединение, то receive_timeout у сокета сам по себе не срабатывает.
* Поэтому, добавлена дополнительная проверка.
*/
if (watch.elapsedSeconds() > global_settings.receive_timeout.totalSeconds())
throw Exception("Timeout exceeded while receiving data from client", ErrorCodes::SOCKET_TIMEOUT);
}
/// Если клиент отсоединился.
if (in->eof())
return;
/// Принимаем и обрабатываем данные. А если они закончились, то выходим.
if (!receivePacket())
break;
}

View File

@ -62,8 +62,8 @@ BlockInputStreams StorageSystemClusters::read(
replica_num_column->insert(static_cast<UInt64>(address.replica_num));
host_name_column->insert(address.host_name);
host_address_column->insert(address.host_port.host().toString());
port_column->insert(static_cast<UInt64>(address.host_port.port()));
host_address_column->insert(address.resolved_address.host().toString());
port_column->insert(static_cast<UInt64>(address.port));
user_column->insert(address.user);
};