This commit is contained in:
Evgeniy Gatov 2015-10-17 16:54:20 +03:00
commit fc0973f738
14 changed files with 161 additions and 44 deletions

View File

@ -233,7 +233,13 @@ public:
void getExtremes(Field & min, Field & max) const override
{
throw Exception("Method getExtremes is not supported for " + getName(), ErrorCodes::NOT_IMPLEMENTED);
size_t tuple_size = columns.size();
min = Array(tuple_size);
max = Array(tuple_size);
for (size_t i = 0; i < tuple_size; ++i)
columns[i]->getExtremes(min.get<Array &>()[i], max.get<Array &>()[i]);
}

View File

@ -67,7 +67,7 @@ private:
return n;
}
static size_t to_size(size_t n) { return byte_size(std::max(POD_ARRAY_INITIAL_SIZE, round_up_to_power_of_two(n))); }
static size_t to_size(size_t n) { return byte_size(round_up_to_power_of_two(n)); }
void alloc(size_t n)
{

View File

@ -83,7 +83,6 @@ public:
/// Массив шардов. Для каждого шарда - массив адресов реплик (серверов, считающихся идентичными).
typedef std::vector<Addresses> AddressesWithFailover;
public:
const Addresses & getShardsInfo() const { return addresses; }
const AddressesWithFailover & getShardsWithFailoverInfo() const { return addresses_with_failover; }
const Addresses & getLocalShardsInfo() const { return local_addresses; }

View File

@ -166,6 +166,8 @@ struct Settings
/** Для запросов SELECT из реплицируемой таблицы, кидать исключение, если на реплике нет куска, записанного с кворумом; \
* не читать куски, которые ещё не были записаны с кворумом. */ \
M(SettingUInt64, select_sequential_consistency, 0) \
/** Максимальное количество различных шардов и максимальное количество реплик одного шарда в функции remote. */ \
M(SettingUInt64, table_function_remote_max_addresses, 1000) \
/// Всевозможные ограничения на выполнение запроса.
Limits limits;

View File

@ -1,13 +1,13 @@
#pragma once
#include <DB/Parsers/IAST.h>
#include <DB/Parsers/ASTQueryWithOutput.h>
namespace DB
{
struct ASTCheckQuery : public IAST
struct ASTCheckQuery : public ASTQueryWithOutput
{
ASTCheckQuery(StringRange range_ = StringRange()) : IAST(range_) {};
ASTCheckQuery(StringRange range_ = StringRange()) : ASTQueryWithOutput(range_) {};
/** Получить текст, который идентифицирует этот элемент. */
String getID() const override { return ("CheckQuery_" + database + "_" + table); };

View File

@ -100,7 +100,8 @@ private:
virtual const NamesAndTypesList & getColumnsListImpl() const = 0;
using ColumnsListRange = boost::range::joined_range<const NamesAndTypesList, const NamesAndTypesList>;
ColumnsListRange getColumnsListIterator() const;
/// Returns a lazily joined range of table's ordinary and materialized columns, without unnecessary copying
ColumnsListRange getColumnsListRange() const;
};
}

View File

@ -24,9 +24,6 @@ namespace DB
class TableFunctionRemote : public ITableFunction
{
public:
/// Максимальное количество различных шардов и максимальное количество реплик одного шарда
const size_t MAX_ADDRESSES = 1000; /// TODO Перенести в Settings.
std::string getName() const override { return "remote"; }
StoragePtr execute(ASTPtr ast_function, Context & context) const override
@ -109,11 +106,13 @@ public:
if (ASTIdentifier * id = typeid_cast<ASTIdentifier *>(arg.get()))
id->kind = ASTIdentifier::Table;
size_t max_addresses = context.getSettingsRef().table_function_remote_max_addresses;
std::vector<std::vector<String>> names;
std::vector<String> shards = parseDescription(description, 0, description.size(), ',');
std::vector<String> shards = parseDescription(description, 0, description.size(), ',', max_addresses);
for (size_t i = 0; i < shards.size(); ++i)
names.push_back(parseDescription(shards[i], 0, shards[i].size(), '|'));
names.push_back(parseDescription(shards[i], 0, shards[i].size(), '|', max_addresses));
if (names.empty())
throw Exception("Shard list is empty after parsing first argument", ErrorCodes::BAD_ARGUMENTS);
@ -164,7 +163,7 @@ private:
}
/// Декартово произведение двух множеств строк, результат записываем на место первого аргумента
void append(std::vector<String> & to, const std::vector<String> & what) const
void append(std::vector<String> & to, const std::vector<String> & what, size_t max_addresses) const
{
if (what.empty()) return;
if (to.empty())
@ -172,7 +171,7 @@ private:
to = what;
return;
}
if (what.size() * to.size() > MAX_ADDRESSES)
if (what.size() * to.size() > max_addresses)
throw Exception("Storage Distributed, first argument generates too many result addresses",
ErrorCodes::BAD_ARGUMENTS);
std::vector<String> res;
@ -209,7 +208,7 @@ private:
* abc{1..9}de{f,g,h} - прямое произведение, 27 шардов.
* abc{1..9}de{0|1} - прямое произведение, 9 шардов, в каждом 2 реплики.
*/
std::vector<String> parseDescription(const String & description, size_t l, size_t r, char separator) const
std::vector<String> parseDescription(const String & description, size_t l, size_t r, char separator, size_t max_addresses) const
{
std::vector<String> res;
std::vector<String> cur;
@ -263,7 +262,7 @@ private:
throw Exception("Storage Distributed, incorrect argument in braces (left number is greater then right): "
+ description.substr(i, m - i + 1),
ErrorCodes::BAD_ARGUMENTS);
if (right - left + 1 > MAX_ADDRESSES)
if (right - left + 1 > max_addresses)
throw Exception("Storage Distributed, first argument generates too many result addresses",
ErrorCodes::BAD_ARGUMENTS);
bool add_leading_zeroes = false;
@ -282,25 +281,29 @@ private:
buffer.push_back(cur);
}
} else if (have_splitter) /// Если внутри есть текущий разделитель, то сгенерировать множество получаемых строк
buffer = parseDescription(description, i + 1, m, separator);
buffer = parseDescription(description, i + 1, m, separator, max_addresses);
else /// Иначе просто скопировать, порождение произойдет при вызове с правильным разделителем
buffer.push_back(description.substr(i, m - i + 1));
/// К текущему множеству строк добавить все возможные полученные продолжения
append(cur, buffer);
append(cur, buffer, max_addresses);
i = m;
} else if (description[i] == separator) {
}
else if (description[i] == separator)
{
/// Если разделитель, то добавляем в ответ найденные строки
res.insert(res.end(), cur.begin(), cur.end());
cur.clear();
} else {
}
else
{
/// Иначе просто дописываем символ к текущим строкам
std::vector<String> buffer;
buffer.push_back(description.substr(i, 1));
append(cur, buffer);
append(cur, buffer, max_addresses);
}
}
res.insert(res.end(), cur.begin(), cur.end());
if (res.size() > MAX_ADDRESSES)
if (res.size() > max_addresses)
throw Exception("Storage Distributed, first argument generates too many result addresses",
ErrorCodes::BAD_ARGUMENTS);
return res;

View File

@ -1,6 +1,7 @@
#include <DB/Interpreters/Cluster.h>
#include <DB/Common/escapeForFileName.h>
#include <DB/Common/isLocalAddress.h>
#include <DB/Common/SimpleCache.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Poco/Util/Application.h>
@ -8,17 +9,42 @@ namespace DB
{
/// Для кэширования DNS запросов.
static Poco::Net::SocketAddress resolveSocketAddressImpl1(const String & host, UInt16 port)
{
return Poco::Net::SocketAddress(host, port);
}
static Poco::Net::SocketAddress resolveSocketAddressImpl2(const String & host_and_port)
{
return Poco::Net::SocketAddress(host_and_port);
}
static Poco::Net::SocketAddress resolveSocketAddress(const String & host, UInt16 port)
{
static SimpleCache<decltype(resolveSocketAddressImpl1), &resolveSocketAddressImpl1> cache;
return cache(host, port);
}
static Poco::Net::SocketAddress resolveSocketAddress(const String & host_and_port)
{
static SimpleCache<decltype(resolveSocketAddressImpl2), &resolveSocketAddressImpl2> cache;
return cache(host_and_port);
}
Cluster::Address::Address(const String & config_prefix)
{
auto & config = Poco::Util::Application::instance().config();
host_name = config.getString(config_prefix + ".host");
port = config.getInt(config_prefix + ".port");
resolved_address = Poco::Net::SocketAddress(host_name, port);
resolved_address = resolveSocketAddress(host_name, port);
user = config.getString(config_prefix + ".user", "default");
password = config.getString(config_prefix + ".password", "");
}
Cluster::Address::Address(const String & host_port_, const String & user_, const String & password_)
: user(user_), password(password_)
{
@ -27,18 +53,19 @@ Cluster::Address::Address(const String & host_port_, const String & user_, const
/// Похоже на то, что строка host_port_ содержит порт. Если условие срабатывает - не обязательно значит, что порт есть (пример: [::]).
if (nullptr != strchr(host_port_.c_str(), ':') || !default_port)
{
resolved_address = Poco::Net::SocketAddress(host_port_);
resolved_address = resolveSocketAddress(host_port_);
host_name = host_port_.substr(0, host_port_.find(':'));
port = resolved_address.port();
}
else
{
resolved_address = Poco::Net::SocketAddress(host_port_, default_port);
resolved_address = resolveSocketAddress(host_port_, default_port);
host_name = host_port_;
port = default_port;
}
}
namespace
{
inline std::string addressToDirName(const Cluster::Address & address)
@ -67,6 +94,8 @@ Clusters::Clusters(const Settings & settings, const String & config_name)
Cluster::Cluster(const Settings & settings, const String & cluster_name)
{
/// Создать кластер.
Poco::Util::AbstractConfiguration & config = Poco::Util::Application::instance().config();
Poco::Util::AbstractConfiguration::Keys config_keys;
config.keys(cluster_name, config_keys);
@ -161,22 +190,25 @@ Cluster::Cluster(const Settings & settings, const String & cluster_name)
++current_shard_num;
}
/// Создать соответствующие пулы соединений.
if (!addresses_with_failover.empty() && !addresses.empty())
throw Exception("There must be either 'node' or 'shard' elements in config", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
if (addresses_with_failover.size())
if (!addresses_with_failover.empty())
{
for (const auto & shard : addresses_with_failover)
{
ConnectionPools replicas;
replicas.reserve(shard.size());
bool has_local_replics = false;
bool has_local_replica = false;
for (const auto & replica : shard)
{
if (isLocal(replica))
{
has_local_replics = true;
has_local_replica = true;
local_addresses.push_back(replica);
break;
}
@ -193,13 +225,13 @@ Cluster::Cluster(const Settings & settings, const String & cluster_name)
}
}
if (has_local_replics)
if (has_local_replica)
++local_nodes_num;
else
pools.emplace_back(new ConnectionPoolWithFailover(replicas, settings.load_balancing, settings.connections_with_failover_max_tries));
}
}
else if (addresses.size())
else if (!addresses.empty())
{
for (const auto & address : addresses)
{
@ -234,15 +266,13 @@ Cluster::Cluster(const Settings & settings, std::vector<std::vector<String>> nam
Addresses current;
for (auto & replica : shard)
current.emplace_back(replica, username, password);
addresses_with_failover.emplace_back(current);
}
for (const auto & shard : addresses_with_failover)
{
ConnectionPools replicas;
replicas.reserve(shard.size());
replicas.reserve(current.size());
for (const auto & replica : shard)
for (const auto & replica : current)
{
replicas.emplace_back(new ConnectionPool(
settings.distributed_connections_pool_size,

View File

@ -11,6 +11,7 @@ bool ParserCheckQuery::parseImpl(IParser::Pos & pos, IParser::Pos end, ASTPtr &
ParserWhiteSpaceOrComments ws;
ParserString s_check("CHECK", true, true);
ParserString s_table("TABLE", true, true);
ParserString s_format("FORMAT", true, true);
ParserString s_dot(".");
ParserIdentifier table_parser;
@ -46,6 +47,22 @@ bool ParserCheckQuery::parseImpl(IParser::Pos & pos, IParser::Pos end, ASTPtr &
query->table = typeid_cast<ASTIdentifier &>(*table).name;
}
ws.ignore(pos, end);
/// FORMAT format_name
if (s_format.ignore(pos, end, max_parsed_pos, expected))
{
ws.ignore(pos, end);
ParserIdentifier format_p;
if (!format_p.parse(pos, end, query->format, max_parsed_pos, expected))
return false;
typeid_cast<ASTIdentifier &>(*query->format).kind = ASTIdentifier::Format;
ws.ignore(pos, end);
}
node = query;
return true;
}

View File

@ -6,6 +6,8 @@
#include <DB/Parsers/ASTNameTypePair.h>
#include <DB/Interpreters/Context.h>
#include <ext/map.hpp>
#include <ext/identity.hpp>
#include <ext/collection_cast.hpp>
namespace DB
@ -13,13 +15,11 @@ namespace DB
NamesAndTypesList ITableDeclaration::getColumnsList() const
{
const auto & range = getColumnsListIterator();
return { std::begin(range), std::end(range) };
return ext::collection_cast<NamesAndTypesList>(getColumnsListRange());
}
ITableDeclaration::ColumnsListRange ITableDeclaration::getColumnsListIterator() const
ITableDeclaration::ColumnsListRange ITableDeclaration::getColumnsListRange() const
{
return boost::join(getColumnsListImpl(), materialized_columns);
}
@ -27,7 +27,7 @@ ITableDeclaration::ColumnsListRange ITableDeclaration::getColumnsListIterator()
bool ITableDeclaration::hasRealColumn(const String & column_name) const
{
for (auto & it : getColumnsListIterator())
for (auto & it : getColumnsListRange())
if (it.name == column_name)
return true;
return false;
@ -36,13 +36,13 @@ bool ITableDeclaration::hasRealColumn(const String & column_name) const
Names ITableDeclaration::getColumnNamesList() const
{
return ext::map<Names>(getColumnsListIterator(), [] (const auto & it) { return it.name; });
return ext::map<Names>(getColumnsListRange(), [] (const auto & it) { return it.name; });
}
NameAndTypePair ITableDeclaration::getRealColumn(const String & column_name) const
{
for (auto & it : getColumnsListIterator())
for (auto & it : getColumnsListRange())
if (it.name == column_name)
return it;
throw Exception("There is no column " + column_name + " in table.", ErrorCodes::NO_SUCH_COLUMN_IN_TABLE);
@ -79,7 +79,7 @@ NameAndTypePair ITableDeclaration::getColumn(const String & column_name) const
const DataTypePtr ITableDeclaration::getDataTypeByName(const String & column_name) const
{
for (const auto & column : getColumnsListIterator())
for (const auto & column : getColumnsListRange())
if (column.name == column_name)
return column.type;
@ -91,7 +91,7 @@ Block ITableDeclaration::getSampleBlock() const
{
Block res;
for (const auto & col : getColumnsListIterator())
for (const auto & col : getColumnsListRange())
res.insert({ col.type->createColumn(), col.type, col.name });
return res;

View File

@ -0,0 +1,13 @@
0 (0,'2015-01-01')
1 (1,'2015-01-02')
2 (2,'2015-01-03')
3 (3,'2015-01-04')
4 (4,'2015-01-05')
5 (5,'2015-01-06')
6 (6,'2015-01-07')
7 (7,'2015-01-08')
8 (8,'2015-01-09')
9 (9,'2015-01-10')
0 (0,'2015-01-01')
9 (9,'2015-01-10')

View File

@ -0,0 +1 @@
SELECT number, (number, toDate('2015-01-01') + number) FROM system.numbers LIMIT 10 SETTINGS extremes = 1;

View File

@ -0,0 +1,23 @@
#pragma once
namespace ext
{
/** \brief Returns collection of specified container-type.
* Retains stored value_type, constructs resulting collection using iterator range. */
template <template <typename...> class ResultCollection, typename Collection>
auto collection_cast(const Collection & collection)
{
using value_type = typename Collection::value_type;
return ResultCollection<value_type>(std::begin(collection), std::end(collection));
};
/** \brief Returns collection of specified type.
* Performs implicit conversion of between source and result value_type, if available and required. */
template <typename ResultCollection, typename Collection>
auto collection_cast(const Collection & collection)
{
return ResultCollection(std::begin(collection), std::end(collection));
}
}

View File

@ -0,0 +1,22 @@
#pragma once
namespace ext
{
/// \brief Identity function for use with other algorithms as a pass-through.
class identity
{
/** \brief Function pointer type template for converting identity to a function pointer.
* Presumably useless, provided for completeness. */
template <typename T> using function_ptr_t = T &&(*)(T &&);
/** \brief Implementation of identity as a non-instance member function for taking function pointer. */
template <typename T> static T && invoke(T && t) { return std::forward<T>(t); }
public:
/** \brief Returns the value passed as a sole argument using perfect forwarding. */
template <typename T> T && operator()(T && t) const { return std::forward<T>(t); }
/** \brief Allows conversion of identity instance to a function pointer. */
template <typename T> operator function_ptr_t<T>() const { return &invoke; };
};
}