This commit is contained in:
Evgeniy Gatov 2015-02-09 20:24:21 +03:00
commit ab2ca50d5b
31 changed files with 523 additions and 602 deletions

View File

@ -26,7 +26,7 @@ namespace DB
using Poco::SharedPtr;
class ShardReplicas;
class ParallelReplicas;
/// Поток блоков читающих из таблицы и ее имя
typedef std::pair<BlockInputStreamPtr, std::string> ExternalTableData;
@ -42,7 +42,7 @@ typedef std::vector<ExternalTableData> ExternalTablesData;
*/
class Connection : private boost::noncopyable
{
friend class ShardReplicas;
friend class ParallelReplicas;
public:
Connection(const String & host_, UInt16 port_, const String & default_database_,

View File

@ -0,0 +1,85 @@
#pragma once
#include <DB/Client/Connection.h>
#include <DB/Client/ConnectionPool.h>
namespace DB
{
/** Множество реплик одного шарда.
*/
class ParallelReplicas final
{
public:
/// Принимает готовое соединение.
ParallelReplicas(Connection * connection_, Settings * settings_);
/// Принимает пул, из которого нужно будет достать одно или несколько соединений.
ParallelReplicas(IConnectionPool * pool_, Settings * settings_);
ParallelReplicas(const ParallelReplicas &) = delete;
ParallelReplicas & operator=(const ParallelReplicas &) = delete;
/// Отправить на реплики всё содержимое внешних таблиц.
void sendExternalTablesData(std::vector<ExternalTablesData> & data);
/// Отправить запрос на реплики.
void sendQuery(const String & query, const String & query_id = "",
UInt64 stage = QueryProcessingStage::Complete, bool with_pending_data = false);
/// Получить пакет от какой-нибудь реплики.
Connection::Packet receivePacket();
/// Отменить запросы к репликам
void sendCancel();
/** На каждой реплике читать и пропускать все пакеты до EndOfStream или Exception.
* Возвращает EndOfStream, если не было получено никакого исключения. В противном
* случае возвращает последний полученный пакет типа Exception.
*/
Connection::Packet drain();
/// Получить адреса реплик в виде строки.
std::string dumpAddresses() const;
/// Возвращает количесто реплик.
size_t size() const { return replica_map.size(); }
/// Проверить, есть ли действительные реплики.
bool hasActiveReplicas() const { return active_replica_count > 0; }
private:
/// Реплики хэшированные по id сокета
using ReplicaMap = std::unordered_map<int, Connection *>;
private:
/// Зарегистрировать реплику.
void registerReplica(Connection * connection);
/// Получить реплику, на которой можно прочитать данные.
ReplicaMap::iterator getReplicaForReading();
/** Проверить, есть ли данные, которые можно прочитать на каких-нибудь репликах.
* Возвращает одну такую реплику, если она найдётся.
*/
ReplicaMap::iterator waitForReadEvent();
/// Пометить реплику как недействительную.
void invalidateReplica(ReplicaMap::iterator it);
private:
Settings * settings;
ReplicaMap replica_map;
std::vector<ConnectionPool::Entry> pool_entries;
ConnectionPool::Entry pool_entry;
/// Текущее количество действительных соединений к репликам.
size_t active_replica_count;
/// Запрос выполняется параллельно на нескольких репликах.
bool supports_parallel_execution;
/// Отправили запрос
bool sent_query = false;
/// Отменили запрос
bool cancelled = false;
};
}

View File

@ -1,62 +0,0 @@
#pragma once
#include <DB/Client/Connection.h>
#include <DB/Client/ConnectionPool.h>
namespace DB
{
/**
* Множество реплик одного шарда.
*/
class ShardReplicas final
{
public:
ShardReplicas(std::vector<ConnectionPool::Entry> & entries, const Settings & settings_);
~ShardReplicas() = default;
ShardReplicas(const ShardReplicas &) = delete;
ShardReplicas & operator=(const ShardReplicas &) = delete;
/// Отправить на реплики всё содержимое внешних таблиц.
void sendExternalTablesData(std::vector<ExternalTablesData> & data);
/// Отправить запрос на реплики.
void sendQuery(const String & query, const String & query_id = "",
UInt64 stage = QueryProcessingStage::Complete, bool with_pending_data = false);
/// Получить пакет от какой-нибудь реплики.
Connection::Packet receivePacket();
/// Разорвать соединения к репликам
void disconnect();
/// Отменить запросы к репликам
void sendCancel();
/// Для каждой реплики получить оставшиеся пакеты после отмена запроса.
Connection::Packet drain();
/// Получить адреса реплик в виде строки.
std::string dumpAddresses() const;
/// Возвращает количесто реплик.
size_t size() const { return replica_hash.size(); }
private:
/// Проверить, есть ли данные, которые можно прочитать на каких-нибудь репликах.
/// Возвращает соединение на такую реплику, если оно найдётся.
Connection ** waitForReadEvent();
private:
/// Реплики хэшированные по id сокета
using ReplicaHash = std::unordered_map<int, Connection *>;
private:
const Settings & settings;
ReplicaHash replica_hash;
size_t active_connection_count = 0;
bool sent_query = false;
bool cancelled = false;
};
}

View File

@ -273,10 +273,8 @@ namespace ErrorCodes
CANNOT_COMPILE_CODE,
INCOMPATIBLE_TYPE_OF_JOIN,
NO_AVAILABLE_REPLICA,
UNEXPECTED_REPLICA,
MISMATCH_REPLICAS_DATA_SOURCES,
STORAGE_DOESNT_SUPPORT_PARALLEL_REPLICAS,
MISSING_RANGE_IN_CHUNK,
POCO_EXCEPTION = 1000,
STD_EXCEPTION,

View File

@ -48,7 +48,7 @@ inline bool memequalSSE2Wide(const char * p1, const char * p2, size_t size)
if ( compareSSE2(p1, p2)
&& compareSSE2(p1 + 16, p2 + 16)
&& compareSSE2(p1 + 32, p2 + 32)
&& compareSSE2(p1 + 40, p2 + 40))
&& compareSSE2(p1 + 48, p2 + 48))
{
p1 += 64;
p2 += 64;

View File

@ -8,13 +8,13 @@
#include <DB/Interpreters/Context.h>
#include <DB/Client/ConnectionPool.h>
#include <DB/Client/ShardReplicas.h>
#include <DB/Client/ParallelReplicas.h>
namespace DB
{
/** Позволяет выполнить запрос (SELECT) на удалённом сервере и получить результат.
/** Позволяет выполнить запрос (SELECT) на удалённых репликах одного шарда и получить результат.
*/
class RemoteBlockInputStream : public IProfilingBlockInputStream
{
@ -25,7 +25,6 @@ private:
{
send_settings = true;
settings = *settings_;
use_many_replicas = (pool != nullptr) && UInt64(settings.max_parallel_replicas) > 1;
}
else
send_settings = false;
@ -51,7 +50,7 @@ public:
init(settings_);
}
/// Принимает пул, из которого нужно будет достать соединение.
/// Принимает пул, из которого нужно будет достать одно или несколько соединений.
RemoteBlockInputStream(IConnectionPool * pool_, const String & query_, const Settings * settings_,
const Tables & external_tables_ = Tables(), QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete,
const Context & context = getDefaultContext())
@ -83,46 +82,39 @@ public:
if (!__sync_bool_compare_and_swap(&is_cancelled, false, true))
return;
if (sent_query && !was_cancelled && !finished && !got_exception_from_server)
if (isQueryInProgress() && !hasThrownException())
{
std::string addresses;
if (use_many_replicas)
addresses = shard_replicas->dumpAddresses();
else
addresses = connection->getServerAddress();
std::string addresses = parallel_replicas->dumpAddresses();
LOG_TRACE(log, "(" + addresses + ") Cancelling query");
/// Если запрошено прервать запрос - попросим удалённый сервер тоже прервать запрос.
if (use_many_replicas)
shard_replicas->sendCancel();
else
connection->sendCancel();
/// Если запрошено прервать запрос - попросим удалённые реплики тоже прервать запрос.
was_cancelled = true;
parallel_replicas->sendCancel();
}
}
~RemoteBlockInputStream() override
{
/** Если прервались в середине цикла общения с сервером, то закрываем соединение,
* чтобы оно не осталось висеть в рассихронизированном состоянии.
/** Если прервались в середине цикла общения с репликами, то прервываем
* все соединения, затем читаем и пропускаем оставшиеся пакеты чтобы
* эти соединения не остались висеть в рассихронизированном состоянии.
*/
if (sent_query && !finished)
if (isQueryInProgress())
{
if (use_many_replicas)
shard_replicas->disconnect();
else
connection->disconnect();
std::string addresses = parallel_replicas->dumpAddresses();
LOG_TRACE(log, "(" + addresses + ") Aborting query");
parallel_replicas->sendCancel();
(void) parallel_replicas->drain();
}
}
protected:
/// Отправить на удаленные сервера все временные таблицы
/// Отправить на удаленные реплики все временные таблицы
void sendExternalTables()
{
size_t count = use_many_replicas ? shard_replicas->size() : 1;
size_t count = parallel_replicas->size();
std::vector<ExternalTablesData> instances;
instances.reserve(count);
@ -144,50 +136,22 @@ protected:
instances.push_back(std::move(res));
}
if (use_many_replicas)
shard_replicas->sendExternalTablesData(instances);
else
connection->sendExternalTablesData(instances[0]);
parallel_replicas->sendExternalTablesData(instances);
}
Block readImpl() override
{
if (!sent_query)
{
if (use_many_replicas)
{
auto entries = pool->getMany(&settings);
if (entries.size() > 1)
shard_replicas = ext::make_unique<ShardReplicas>(entries, settings);
else
{
/// NOTE IConnectionPool::getMany() всегда возвращает как минимум одно соединение.
use_many_replicas = false;
connection = &*entries[0];
}
}
else
{
/// Если надо - достаём соединение из пула.
if (pool)
{
pool_entry = pool->get(send_settings ? &settings : nullptr);
connection = &*pool_entry;
}
}
if (use_many_replicas)
shard_replicas->sendQuery(query, "", stage, true);
else
connection->sendQuery(query, "", stage, send_settings ? &settings : nullptr, true);
createParallelReplicas();
parallel_replicas->sendQuery(query, "", stage, true);
sendExternalTables();
sent_query = true;
}
while (true)
{
Connection::Packet packet = use_many_replicas ? shard_replicas->receivePacket() : connection->receivePacket();
Connection::Packet packet = parallel_replicas->receivePacket();
switch (packet.type)
{
@ -198,13 +162,17 @@ protected:
break; /// Если блок пустой - получим другие пакеты до EndOfStream.
case Protocol::Server::Exception:
got_exception_from_server = true;
got_exception_from_replica = true;
packet.exception->rethrow();
break;
case Protocol::Server::EndOfStream:
if (!parallel_replicas->hasActiveReplicas())
{
finished = true;
return Block();
}
break;
case Protocol::Server::Progress:
/** Используем прогресс с удалённого сервера.
@ -215,7 +183,7 @@ protected:
*/
progressImpl(packet.progress);
if (!was_cancelled && !finished && isCancelled())
if (isQueryInProgress() && isCancelled())
cancel();
break;
@ -233,6 +201,7 @@ protected:
break;
default:
got_unknown_packet_from_replica = true;
throw Exception("Unknown packet from server", ErrorCodes::UNKNOWN_PACKET_FROM_SERVER);
}
}
@ -243,10 +212,11 @@ protected:
/** Если одно из:
* - ничего не начинали делать;
* - получили все пакеты до EndOfStream;
* - получили с сервера эксепшен;
* - получили с одной реплики эксепшен;
* - получили с одной реплики неизвестный пакет;
* - то больше читать ничего не нужно.
*/
if (!sent_query || finished || got_exception_from_server)
if (hasNoQueryInProgress() || hasThrownException())
return;
/** Если ещё прочитали не все данные, но они больше не нужны.
@ -256,25 +226,15 @@ protected:
/// Отправим просьбу прервать выполнение запроса, если ещё не отправляли.
if (!was_cancelled)
{
std::string addresses;
if (use_many_replicas)
addresses = shard_replicas->dumpAddresses();
else
addresses = connection->getServerAddress();
std::string addresses = parallel_replicas->dumpAddresses();
LOG_TRACE(log, "(" + addresses + ") Cancelling query because enough data has been read");
was_cancelled = true;
if (use_many_replicas)
shard_replicas->sendCancel();
else
connection->sendCancel();
parallel_replicas->sendCancel();
}
if (use_many_replicas)
{
Connection::Packet packet = shard_replicas->drain();
/// Получим оставшиеся пакеты, чтобы не было рассинхронизации в соединениях с репликами.
Connection::Packet packet = parallel_replicas->drain();
switch (packet.type)
{
case Protocol::Server::EndOfStream:
@ -282,52 +242,50 @@ protected:
break;
case Protocol::Server::Exception:
got_exception_from_server = true;
got_exception_from_replica = true;
packet.exception->rethrow();
break;
default:
got_unknown_packet_from_replica = true;
throw Exception("Unknown packet from server", ErrorCodes::UNKNOWN_PACKET_FROM_SERVER);
}
}
/// Создать объект для общения с репликами одного шарда, на которых должен выполниться запрос.
void createParallelReplicas()
{
Settings * parallel_replicas_settings = send_settings ? &settings : nullptr;
if (connection != nullptr)
parallel_replicas = ext::make_unique<ParallelReplicas>(connection, parallel_replicas_settings);
else
{
/// Получим оставшиеся пакеты, чтобы не было рассинхронизации в соединении с сервером.
while (true)
{
Connection::Packet packet = connection->receivePacket();
switch (packet.type)
{
case Protocol::Server::Data:
case Protocol::Server::Progress:
case Protocol::Server::ProfileInfo:
case Protocol::Server::Totals:
case Protocol::Server::Extremes:
break;
case Protocol::Server::EndOfStream:
finished = true;
return;
case Protocol::Server::Exception:
got_exception_from_server = true;
packet.exception->rethrow();
break;
default:
throw Exception("Unknown packet from server", ErrorCodes::UNKNOWN_PACKET_FROM_SERVER);
parallel_replicas = ext::make_unique<ParallelReplicas>(pool, parallel_replicas_settings);
}
/// Возвращает true, если запрос отправлен, а ещё не выполнен.
bool isQueryInProgress() const
{
return sent_query && !finished && !was_cancelled;
}
/// Возвращает true, если никакой запрос не отправлен или один запрос уже выполнен.
bool hasNoQueryInProgress() const
{
return !sent_query || finished;
}
/// Возвращает true, если исключение было выкинуто.
bool hasThrownException() const
{
return got_exception_from_replica || got_unknown_packet_from_replica;
}
private:
IConnectionPool * pool = nullptr;
ConnectionPool::Entry pool_entry;
Connection * connection = nullptr;
std::unique_ptr<ShardReplicas> shard_replicas;
std::unique_ptr<ParallelReplicas> parallel_replicas;
const String query;
bool send_settings;
@ -337,26 +295,31 @@ private:
QueryProcessingStage::Enum stage;
Context context;
bool use_many_replicas = false;
/// Отправили запрос (это делается перед получением первого блока).
bool sent_query = false;
/** Получили все данные от сервера, до пакета EndOfStream.
/** Получили все данные от всех реплик, до пакета EndOfStream.
* Если при уничтожении объекта, ещё не все данные считаны,
* то для того, чтобы не было рассинхронизации, на сервер отправляется просьба прервать выполнение запроса,
* то для того, чтобы не было рассинхронизации, на реплики отправляются просьбы прервать выполнение запроса,
* и после этого считываются все пакеты до EndOfStream.
*/
bool finished = false;
/** На сервер была отправлена просьба прервать выполенение запроса, так как данные больше не нужны.
/** На каждую реплику была отправлена просьба прервать выполнение запроса, так как данные больше не нужны.
* Это может быть из-за того, что данных достаточно (например, при использовании LIMIT),
* или если на стороне клиента произошло исключение.
*/
bool was_cancelled = false;
/// С сервера было получено исключение. В этом случае получать больше пакетов или просить прервать запрос не нужно.
bool got_exception_from_server = false;
/** С одной репилки было получено исключение. В этом случае получать больше пакетов или
* просить прервать запрос на этой реплике не нужно.
*/
bool got_exception_from_replica = false;
/** С одной реплики был получен неизвестный пакет. В этом случае получать больше пакетов или
* просить прервать запрос на этой реплике не нужно.
*/
bool got_unknown_packet_from_replica = false;
Logger * log = &Logger::get("RemoteBlockInputStream");

View File

@ -79,6 +79,12 @@ public:
return bytes + offset();
}
/** Проверить, есть ли данные в буфере. */
bool hasPendingData() const
{
return pos != working_buffer.end();
}
protected:
/// Ссылка на кусок памяти для буфера.
Buffer internal_buffer;

View File

@ -25,7 +25,7 @@ protected:
return false;
/// Первое чтение
if (working_buffer.size() == 0 && (*current)->position() != (*current)->buffer().end())
if (working_buffer.size() == 0 && (*current)->hasPendingData())
{
working_buffer = Buffer((*current)->position(), (*current)->buffer().end());
return true;

View File

@ -40,7 +40,7 @@ public:
/** прочитать следующие данные и заполнить ими буфер; переместить позицию в начало;
* вернуть false в случае конца, true иначе; кинуть исключение, если что-то не так
*/
inline bool next()
bool next()
{
bytes += offset();
bool res = nextImpl();
@ -54,7 +54,7 @@ public:
inline void nextIfAtEnd()
{
if (pos == working_buffer.end())
if (!hasPendingData())
next();
}
@ -68,9 +68,9 @@ public:
*
* При попытке чтения после конца, следует кидать исключение.
*/
inline bool eof()
bool eof()
{
return pos == working_buffer.end() && !next();
return !hasPendingData() && !next();
}
void ignore()
@ -143,12 +143,6 @@ public:
return read(to, n);
}
/** Проверить, есть ли данные в буфере для чтения. */
bool hasPendingData() const
{
return offset() != working_buffer.size();
}
private:
/** Прочитать следующие данные и заполнить ими буфер.
* Вернуть false в случае конца, true иначе.

View File

@ -80,7 +80,7 @@ public:
if (new_pos + (working_buffer.end() - pos) == pos_in_file)
return new_pos;
if (pos != working_buffer.end() && new_pos <= pos_in_file && new_pos >= pos_in_file - static_cast<off_t>(working_buffer.size()))
if (hasPendingData() && new_pos <= pos_in_file && new_pos >= pos_in_file - static_cast<off_t>(working_buffer.size()))
{
/// Остались в пределах буфера.
pos = working_buffer.begin() + (new_pos - (pos_in_file - working_buffer.size()));

View File

@ -56,7 +56,7 @@ public:
inline void nextIfAtEnd()
{
if (pos == working_buffer.end())
if (!hasPendingData())
next();
}

View File

@ -16,7 +16,6 @@
#include <DB/Storages/IStorage.h>
#include <DB/AggregateFunctions/AggregateFunctionFactory.h>
#include <DB/DataTypes/DataTypeFactory.h>
#include <DB/Storages/StorageFactory.h>
#include <DB/Storages/MergeTree/BackgroundProcessingPool.h>
#include <DB/Storages/MergeTree/MergeList.h>
#include <DB/TableFunctions/TableFunctionFactory.h>
@ -86,7 +85,6 @@ struct ContextShared
TableFunctionFactory table_function_factory; /// Табличные функции.
AggregateFunctionFactory aggregate_function_factory; /// Агрегатные функции.
DataTypeFactory data_type_factory; /// Типы данных.
StorageFactory storage_factory; /// Движки таблиц.
FormatFactory format_factory; /// Форматы.
mutable SharedPtr<Dictionaries> dictionaries; /// Словари Метрики. Инициализируются лениво.
Users users; /// Известные пользователи.
@ -259,7 +257,6 @@ public:
const TableFunctionFactory & getTableFunctionFactory() const { return shared->table_function_factory; }
const AggregateFunctionFactory & getAggregateFunctionFactory() const { return shared->aggregate_function_factory; }
const DataTypeFactory & getDataTypeFactory() const { return shared->data_type_factory; }
const StorageFactory & getStorageFactory() const { return shared->storage_factory; }
const FormatFactory & getFormatFactory() const { return shared->format_factory; }
const Dictionaries & getDictionaries() const;

View File

@ -22,8 +22,6 @@ public:
NameAndTypePair getColumn(const String & column_name) const override { return getSource().getColumn(column_name); };
bool hasColumn(const String & column_name) const override { return getSource().hasColumn(column_name); };
bool supportsParallelReplicas() const override { return true; }
BlockInputStreams read(
const Names & column_names,
ASTPtr query,

View File

@ -1,6 +1,7 @@
#pragma once
#include <DB/Storages/IStorage.h>
#include <Yandex/singleton.h>
namespace DB
@ -11,7 +12,7 @@ class Context;
/** Позволяет создать таблицу по имени движка.
*/
class StorageFactory
class StorageFactory : public Singleton<StorageFactory>
{
public:
StoragePtr get(

View File

@ -54,6 +54,7 @@ public:
bool supportsSampling() const override { return data.supportsSampling(); }
bool supportsFinal() const override { return data.supportsFinal(); }
bool supportsPrewhere() const override { return data.supportsPrewhere(); }
bool supportsParallelReplicas() const override { return true; }
const NamesAndTypesList & getColumnsListImpl() const override { return data.getColumnsListNonMaterialized(); }

View File

@ -103,8 +103,6 @@ public:
/// Добавить кусок в очередь кусков, чьи данные нужно проверить в фоновом потоке.
void enqueuePartForCheck(const String & name);
void skipUnreplicated() { process_unreplicated = false; }
MergeTreeData & getData() { return data; }
MergeTreeData * getUnreplicatedData() { return unreplicated_data.get(); }
@ -166,8 +164,6 @@ private:
current_zookeeper = zookeeper;
}
bool process_unreplicated = true;
/// Если true, таблица в офлайновом режиме, и в нее нельзя писать.
bool is_readonly = false;

View File

@ -165,7 +165,7 @@ void Connection::forceConnected()
bool Connection::ping()
{
LOG_TRACE(log_wrapper.get(), "Ping (" << getServerAddress() << ")");
// LOG_TRACE(log_wrapper.get(), "Ping (" << getServerAddress() << ")");
try
{

View File

@ -0,0 +1,268 @@
#include <DB/Client/ParallelReplicas.h>
namespace DB
{
ParallelReplicas::ParallelReplicas(Connection * connection_, Settings * settings_)
: settings(settings_),
active_replica_count(1),
supports_parallel_execution(false)
{
registerReplica(connection_);
}
ParallelReplicas::ParallelReplicas(IConnectionPool * pool_, Settings * settings_)
: settings(settings_)
{
if (pool_ == nullptr)
throw Exception("Null pool specified", ErrorCodes::LOGICAL_ERROR);
bool has_many_replicas = (settings != nullptr) && (settings->max_parallel_replicas > 1);
if (has_many_replicas)
{
pool_entries = pool_->getMany(settings);
active_replica_count = pool_entries.size();
supports_parallel_execution = (active_replica_count > 1);
if (active_replica_count == 0)
throw Exception("No connection available", ErrorCodes::LOGICAL_ERROR);
replica_map.reserve(active_replica_count);
for (auto & entry : pool_entries)
registerReplica(&*entry);
}
else
{
active_replica_count = 1;
supports_parallel_execution = false;
pool_entry = pool_->get(settings);
registerReplica(&*pool_entry);
}
}
void ParallelReplicas::sendExternalTablesData(std::vector<ExternalTablesData> & data)
{
if (!sent_query)
throw Exception("Cannot send external tables data: query not yet sent.", ErrorCodes::LOGICAL_ERROR);
if (data.size() < active_replica_count)
throw Exception("Mismatch between replicas and data sources", ErrorCodes::MISMATCH_REPLICAS_DATA_SOURCES);
auto it = data.begin();
for (auto & e : replica_map)
{
Connection * connection = e.second;
if (connection != nullptr)
connection->sendExternalTablesData(*it);
++it;
}
}
void ParallelReplicas::sendQuery(const String & query, const String & query_id, UInt64 stage, bool with_pending_data)
{
if (sent_query)
throw Exception("Query already sent.", ErrorCodes::LOGICAL_ERROR);
if (supports_parallel_execution)
{
Settings query_settings = *settings;
query_settings.parallel_replicas_count = active_replica_count;
UInt64 offset = 0;
for (auto & e : replica_map)
{
Connection * connection = e.second;
if (connection != nullptr)
{
query_settings.parallel_replica_offset = offset;
connection->sendQuery(query, query_id, stage, &query_settings, with_pending_data);
++offset;
}
}
if (offset > 0)
sent_query = true;
}
else
{
auto it = replica_map.begin();
Connection * connection = it->second;
if (connection != nullptr)
{
connection->sendQuery(query, query_id, stage, settings, with_pending_data);
sent_query = true;
}
}
}
Connection::Packet ParallelReplicas::receivePacket()
{
if (!sent_query)
throw Exception("Cannot receive packets: no query sent.", ErrorCodes::LOGICAL_ERROR);
if (!hasActiveReplicas())
throw Exception("No more packets are available.", ErrorCodes::LOGICAL_ERROR);
auto it = getReplicaForReading();
if (it == replica_map.end())
throw Exception("No available replica", ErrorCodes::NO_AVAILABLE_REPLICA);
Connection * connection = it->second;
Connection::Packet packet = connection->receivePacket();
switch (packet.type)
{
case Protocol::Server::Data:
case Protocol::Server::Progress:
case Protocol::Server::ProfileInfo:
case Protocol::Server::Totals:
case Protocol::Server::Extremes:
break;
case Protocol::Server::EndOfStream:
invalidateReplica(it);
break;
case Protocol::Server::Exception:
default:
connection->disconnect();
invalidateReplica(it);
break;
}
return packet;
}
void ParallelReplicas::sendCancel()
{
if (!sent_query || cancelled)
throw Exception("Cannot cancel. Either no query sent or already cancelled.", ErrorCodes::LOGICAL_ERROR);
for (auto & e : replica_map)
{
Connection * connection = e.second;
if (connection != nullptr)
connection->sendCancel();
}
cancelled = true;
}
Connection::Packet ParallelReplicas::drain()
{
if (!cancelled)
throw Exception("Cannot drain connections: cancel first.", ErrorCodes::LOGICAL_ERROR);
Connection::Packet res;
res.type = Protocol::Server::EndOfStream;
while (hasActiveReplicas())
{
Connection::Packet packet = receivePacket();
switch (packet.type)
{
case Protocol::Server::Data:
case Protocol::Server::Progress:
case Protocol::Server::ProfileInfo:
case Protocol::Server::Totals:
case Protocol::Server::Extremes:
case Protocol::Server::EndOfStream:
break;
case Protocol::Server::Exception:
default:
/// Если мы получили исключение или неизвестный пакет, сохраняем его.
res = packet;
break;
}
}
return res;
}
std::string ParallelReplicas::dumpAddresses() const
{
bool is_first = true;
std::ostringstream os;
for (auto & e : replica_map)
{
const Connection * connection = e.second;
if (connection != nullptr)
{
os << (is_first ? "" : "; ") << connection->getServerAddress();
if (is_first) { is_first = false; }
}
}
return os.str();
}
void ParallelReplicas::registerReplica(Connection * connection)
{
if (connection == nullptr)
throw Exception("Invalid connection specified in parameter.", ErrorCodes::LOGICAL_ERROR);
auto res = replica_map.insert(std::make_pair(connection->socket.impl()->sockfd(), connection));
if (!res.second)
throw Exception("Invalid set of connections.", ErrorCodes::LOGICAL_ERROR);
}
ParallelReplicas::ReplicaMap::iterator ParallelReplicas::getReplicaForReading()
{
ReplicaMap::iterator it;
if (supports_parallel_execution)
it = waitForReadEvent();
else
{
it = replica_map.begin();
if (it->second == nullptr)
it = replica_map.end();
}
return it;
}
ParallelReplicas::ReplicaMap::iterator ParallelReplicas::waitForReadEvent()
{
Poco::Net::Socket::SocketList read_list;
read_list.reserve(active_replica_count);
/** Сначала проверяем, есть ли данные, которые уже лежат в буфере
* хоть одного соединения.
*/
for (auto & e : replica_map)
{
Connection * connection = e.second;
if ((connection != nullptr) && connection->hasReadBufferPendingData())
read_list.push_back(connection->socket);
}
/** Если не было найдено никаких данных, то проверяем, есть ли соединения
* готовые для чтения.
*/
if (read_list.empty())
{
Poco::Net::Socket::SocketList write_list;
Poco::Net::Socket::SocketList except_list;
for (auto & e : replica_map)
{
Connection * connection = e.second;
if (connection != nullptr)
read_list.push_back(connection->socket);
}
int n = Poco::Net::Socket::select(read_list, write_list, except_list, settings->poll_interval * 1000000);
if (n == 0)
return replica_map.end();
}
auto & socket = read_list[rand() % read_list.size()];
return replica_map.find(socket.impl()->sockfd());
}
void ParallelReplicas::invalidateReplica(ParallelReplicas::ReplicaMap::iterator it)
{
it->second = nullptr;
--active_replica_count;
}
}

View File

@ -1,227 +0,0 @@
#include <DB/Client/ShardReplicas.h>
#include <boost/concept_check.hpp>
namespace DB
{
ShardReplicas::ShardReplicas(std::vector<ConnectionPool::Entry> & entries, const Settings & settings_) :
settings(settings_),
active_connection_count(entries.size())
{
replica_hash.reserve(entries.size());
for (auto & entry : entries)
{
Connection * connection = &*entry;
if (connection == nullptr)
throw Exception("Invalid connection specified in parameter.");
auto res = replica_hash.insert(std::make_pair(connection->socket.impl()->sockfd(), connection));
if (!res.second)
throw Exception("Invalid set of connections.");
}
}
void ShardReplicas::sendExternalTablesData(std::vector<ExternalTablesData> & data)
{
if (!sent_query)
throw Exception("Cannot send external tables data: query not yet sent.");
if (data.size() < active_connection_count)
throw Exception("Mismatch between replicas and data sources", ErrorCodes::MISMATCH_REPLICAS_DATA_SOURCES);
auto it = data.begin();
for (auto & e : replica_hash)
{
Connection * connection = e.second;
if (connection != nullptr)
connection->sendExternalTablesData(*it);
++it;
}
}
void ShardReplicas::sendQuery(const String & query, const String & query_id, UInt64 stage, bool with_pending_data)
{
if (sent_query)
throw Exception("Query already sent.");
Settings query_settings = settings;
query_settings.parallel_replicas_count = replica_hash.size();
UInt64 offset = 0;
for (auto & e : replica_hash)
{
Connection * connection = e.second;
if (connection != nullptr)
{
query_settings.parallel_replica_offset = offset;
connection->sendQuery(query, query_id, stage, &query_settings, with_pending_data);
++offset;
}
}
sent_query = true;
}
Connection::Packet ShardReplicas::receivePacket()
{
if (!sent_query)
throw Exception("Cannot receive packets: no query sent.");
if (active_connection_count == 0)
throw Exception("No more packets are available.");
Connection ** connection = waitForReadEvent();
if (connection == nullptr)
throw Exception("No available replica", ErrorCodes::NO_AVAILABLE_REPLICA);
Connection::Packet packet = (*connection)->receivePacket();
switch (packet.type)
{
case Protocol::Server::Data:
case Protocol::Server::Progress:
case Protocol::Server::ProfileInfo:
case Protocol::Server::Totals:
case Protocol::Server::Extremes:
break;
case Protocol::Server::EndOfStream:
*connection = nullptr;
--active_connection_count;
if (active_connection_count > 0)
{
Connection::Packet empty_packet;
empty_packet.type = Protocol::Server::Data;
return empty_packet;
}
break;
case Protocol::Server::Exception:
default:
*connection = nullptr;
--active_connection_count;
if (!cancelled)
{
sendCancel();
(void) drain();
}
break;
}
return packet;
}
void ShardReplicas::disconnect()
{
for (auto & e : replica_hash)
{
Connection * & connection = e.second;
if (connection != nullptr)
{
connection->disconnect();
connection = nullptr;
--active_connection_count;
}
}
}
void ShardReplicas::sendCancel()
{
if (!sent_query || cancelled)
throw Exception("Cannot cancel. Either no query sent or already cancelled.");
for (auto & e : replica_hash)
{
Connection * connection = e.second;
if (connection != nullptr)
connection->sendCancel();
}
cancelled = true;
}
Connection::Packet ShardReplicas::drain()
{
if (!cancelled)
throw Exception("Cannot drain connections: cancel first.");
Connection::Packet res;
res.type = Protocol::Server::EndOfStream;
while (active_connection_count > 0)
{
Connection::Packet packet = receivePacket();
switch (packet.type)
{
case Protocol::Server::Data:
case Protocol::Server::Progress:
case Protocol::Server::ProfileInfo:
case Protocol::Server::Totals:
case Protocol::Server::Extremes:
break;
case Protocol::Server::EndOfStream:
return res;
case Protocol::Server::Exception:
default:
res = packet;
break;
}
}
return res;
}
std::string ShardReplicas::dumpAddresses() const
{
std::ostringstream os;
for (auto & e : replica_hash)
{
char prefix = '\0';
const Connection * connection = e.second;
if (connection != nullptr)
{
os << prefix << connection->getServerAddress();
if (prefix == '\0')
prefix = ';';
}
}
return os.str();
}
Connection ** ShardReplicas::waitForReadEvent()
{
Poco::Net::Socket::SocketList read_list;
read_list.reserve(active_connection_count);
for (auto & e : replica_hash)
{
Connection * connection = e.second;
if ((connection != nullptr) && connection->hasReadBufferPendingData())
read_list.push_back(connection->socket);
}
if (read_list.empty())
{
Poco::Net::Socket::SocketList write_list;
Poco::Net::Socket::SocketList except_list;
for (auto & e : replica_hash)
{
Connection * connection = e.second;
if (connection != nullptr)
read_list.push_back(connection->socket);
}
int n = Poco::Net::Socket::select(read_list, write_list, except_list, settings.poll_interval * 1000000);
if (n == 0)
return nullptr;
}
auto & socket = read_list[rand() % read_list.size()];
auto it = replica_hash.find(socket.impl()->sockfd());
if (it == replica_hash.end())
throw Exception("Unexpected replica", ErrorCodes::UNEXPECTED_REPLICA);
return &(it->second);
}
}

View File

@ -11,7 +11,6 @@
#include <DB/DataTypes/DataTypeNested.h>
#include <DB/Parsers/ASTExpressionList.h>
#include <DB/Interpreters/ExpressionAnalyzer.h>
#include <statdaemons/ext/memory.hpp>
#include <DB/Parsers/formatAST.h>

View File

@ -55,7 +55,7 @@ void readString(String & s, ReadBuffer & buf)
s.append(buf.position(), bytes);
buf.position() += bytes;
if (buf.position() != buf.buffer().end())
if (buf.hasPendingData())
return;
}
}
@ -121,7 +121,7 @@ void readEscapedString(DB::String & s, DB::ReadBuffer & buf)
s.append(buf.position(), next_pos - buf.position());
buf.position() += next_pos - buf.position();
if (buf.position() == buf.buffer().end())
if (!buf.hasPendingData())
continue;
if (*buf.position() == '\t' || *buf.position() == '\n')
@ -192,7 +192,7 @@ static void readAnyQuotedString(String & s, ReadBuffer & buf)
s.append(buf.position(), next_pos - buf.position());
buf.position() += next_pos - buf.position();
if (buf.position() == buf.buffer().end())
if (!buf.hasPendingData())
continue;
if (*buf.position() == quote)

View File

@ -74,7 +74,7 @@ namespace test
s.append(buf.position(), next_pos - buf.position());
buf.position() += next_pos - buf.position();
if (buf.position() == buf.buffer().end())
if (!buf.hasPendingData())
continue;
if (*buf.position() == '\t' || *buf.position() == '\n')

View File

@ -3,6 +3,7 @@
#include <DB/Dictionaries/DictionaryStructure.h>
#include <DB/Dictionaries/IDictionarySource.h>
#include <DB/Dictionaries/config_ptr_t.h>
#include <statdaemons/ext/scope_guard.hpp>
namespace DB
{
@ -33,7 +34,7 @@ void Dictionaries::reloadExternals()
const auto config_path = getDictionariesConfigPath(Poco::Util::Application::instance().config());
const Poco::File config_file{config_path};
if (!config_file.exists())
if (config_path.empty() || !config_file.exists())
{
LOG_WARNING(log, "config file '" + config_path + "' does not exist");
}
@ -125,6 +126,12 @@ void Dictionaries::reloadExternals()
if (std::chrono::system_clock::now() < update_time)
continue;
scope_exit({
/// calculate next update time
std::uniform_int_distribution<std::uint64_t> distribution{lifetime.min_sec, lifetime.max_sec};
update_time = std::chrono::system_clock::now() + std::chrono::seconds{distribution(rnd_engine)};
});
/// check source modified
if (current->getSource()->isModified())
{
@ -132,10 +139,6 @@ void Dictionaries::reloadExternals()
auto new_version = current->clone();
dictionary.second->set(new_version.release());
}
/// calculate next update time
std::uniform_int_distribution<std::uint64_t> distribution{lifetime.min_sec, lifetime.max_sec};
update_time = std::chrono::system_clock::now() + std::chrono::seconds{distribution(rnd_engine)};
}
}
catch (...)

View File

@ -10,9 +10,6 @@
#include <DB/IO/copyData.h>
#include <DB/Common/escapeForFileName.h>
#include <DB/Parsers/formatAST.h>
#include <DB/Storages/StorageMerge.h>
#include <DB/Storages/StorageMergeTree.h>
#include <DB/Storages/StorageReplicatedMergeTree.h>
#include <Poco/FileStream.h>

View File

@ -13,6 +13,7 @@
#include <DB/Parsers/ASTNameTypePair.h>
#include <DB/Parsers/ASTColumnDeclaration.h>
#include <DB/Storages/StorageFactory.h>
#include <DB/Storages/StorageLog.h>
#include <DB/Storages/StorageSystemNumbers.h>
@ -194,7 +195,7 @@ StoragePtr InterpreterCreateQuery::execute(bool assume_metadata_exists)
else
throw Exception("Incorrect CREATE query: required ENGINE.", ErrorCodes::ENGINE_REQUIRED);
res = context.getStorageFactory().get(
res = StorageFactory::instance().get(
storage_name, data_path, table_name, database_name, context,
context.getGlobalContext(), query_ptr, columns,
materialized_columns, alias_columns, column_defaults, create.attach);

View File

@ -25,7 +25,6 @@
#include <DB/Interpreters/InterpreterSelectQuery.h>
#include <DB/Storages/StorageView.h>
#include <DB/Storages/StorageReplicatedMergeTree.h>
#include <DB/TableFunctions/ITableFunction.h>
#include <DB/TableFunctions/TableFunctionFactory.h>
@ -105,12 +104,6 @@ void InterpreterSelectQuery::basicInit(BlockInputStreamPtr input_, const NamesAn
+ " does not support parallel execution on several replicas",
ErrorCodes::STORAGE_DOESNT_SUPPORT_PARALLEL_REPLICAS);
if (StorageReplicatedMergeTree * storage_replicated_merge_tree = typeid_cast<StorageReplicatedMergeTree *>(&*storage))
{
if (settings.parallel_replica_offset > 0)
storage_replicated_merge_tree->skipUnreplicated();
}
table_lock = storage->lockStructure(false);
if (table_column_names.empty())
context.setColumns(storage->getColumnsListNonMaterialized());

View File

@ -149,10 +149,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
relative_sample_size = 0;
}
UInt64 parallel_replicas_count = UInt64(settings.parallel_replicas_count);
UInt64 parallel_replica_offset = UInt64(settings.parallel_replica_offset);
if ((parallel_replicas_count > 1) && !data.sampling_expression.isNull() && (relative_sample_size == 0))
if ((settings.parallel_replicas_count > 1) && !data.sampling_expression.isNull() && (relative_sample_size == 0))
relative_sample_size = 1;
if (relative_sample_size != 0)
@ -175,12 +172,11 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
UInt64 sampling_column_value_upper_limit;
UInt64 upper_limit = static_cast<UInt64>(relative_sample_size * sampling_column_max);
if (parallel_replicas_count > 1)
if (settings.parallel_replicas_count > 1)
{
UInt64 step = upper_limit / parallel_replicas_count;
sampling_column_value_lower_limit = parallel_replica_offset * step;
if ((parallel_replica_offset + 1) < parallel_replicas_count)
sampling_column_value_upper_limit = (parallel_replica_offset + 1) * step;
sampling_column_value_lower_limit = (settings.parallel_replica_offset * upper_limit) / settings.parallel_replicas_count;
if ((settings.parallel_replica_offset + 1) < settings.parallel_replicas_count)
sampling_column_value_upper_limit = ((settings.parallel_replica_offset + 1) * upper_limit) / settings.parallel_replicas_count;
else
sampling_column_value_upper_limit = upper_limit + 1;
}
@ -191,6 +187,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
}
/// Добавим условие, чтобы отсечь еще что-нибудь при повторном просмотре индекса.
if (sampling_column_value_lower_limit > 0)
if (!key_condition.addCondition(data.sampling_expression->getColumnName(),
Range::createLeftBounded(sampling_column_value_lower_limit, true)))
throw Exception("Sampling column not in primary key", ErrorCodes::ILLEGAL_COLUMN);
@ -199,6 +196,17 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
Range::createRightBounded(sampling_column_value_upper_limit, false)))
throw Exception("Sampling column not in primary key", ErrorCodes::ILLEGAL_COLUMN);
ASTPtr upper_filter_args = new ASTExpressionList;
upper_filter_args->children.push_back(data.sampling_expression);
upper_filter_args->children.push_back(new ASTLiteral(StringRange(), sampling_column_value_upper_limit));
ASTFunctionPtr upper_filter_function = new ASTFunction;
upper_filter_function->name = "less";
upper_filter_function->arguments = upper_filter_args;
upper_filter_function->children.push_back(upper_filter_function->arguments);
if (sampling_column_value_lower_limit > 0)
{
/// Выражение для фильтрации: sampling_expression in [sampling_column_value_lower_limit, sampling_column_value_upper_limit)
ASTPtr lower_filter_args = new ASTExpressionList;
@ -210,15 +218,6 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
lower_filter_function->arguments = lower_filter_args;
lower_filter_function->children.push_back(lower_filter_function->arguments);
ASTPtr upper_filter_args = new ASTExpressionList;
upper_filter_args->children.push_back(data.sampling_expression);
upper_filter_args->children.push_back(new ASTLiteral(StringRange(), sampling_column_value_upper_limit));
ASTFunctionPtr upper_filter_function = new ASTFunction;
upper_filter_function->name = "less";
upper_filter_function->arguments = upper_filter_args;
upper_filter_function->children.push_back(upper_filter_function->arguments);
ASTPtr filter_function_args = new ASTExpressionList;
filter_function_args->children.push_back(lower_filter_function);
filter_function_args->children.push_back(upper_filter_function);
@ -227,6 +226,12 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
filter_function->name = "and";
filter_function->arguments = filter_function_args;
filter_function->children.push_back(filter_function->arguments);
}
else
{
/// Выражение для фильтрации: sampling_expression < sampling_column_value_upper_limit
filter_function = upper_filter_function;
}
filter_expression = ExpressionAnalyzer(filter_function, data.context, data.getColumnsList()).getActions(false);

View File

@ -120,7 +120,7 @@ struct Stream
/// Если засечка должна быть ровно на границе блоков, нам подходит и засечка, указывающая на конец предыдущего блока,
/// и на начало следующего.
if (uncompressed_hashing_buf.position() == uncompressed_hashing_buf.buffer().end())
if (!uncompressed_hashing_buf.hasPendingData())
{
/// Получим засечку, указывающую на конец предыдущего блока.
has_alternative_mark = true;

View File

@ -2027,7 +2027,7 @@ BlockInputStreams StorageReplicatedMergeTree::read(
size_t part_index = 0;
if (process_unreplicated && unreplicated_reader && values.count(0))
if ((settings.parallel_replica_offset == 0) && unreplicated_reader && values.count(0))
{
res = unreplicated_reader->read(real_column_names, query,
context, settings, processed_stage,

View File

@ -3,48 +3,3 @@
3 2015-03-01 3 foo
4 2015-04-01 4 bar
5 2015-05-01 5 foo
6 2015-06-01 6 bar
7 2015-07-01 7 foo
8 2015-08-01 8 bar
9 2015-09-01 9 foo
10 2015-10-01 10 bar
11 2015-11-01 1 foo
12 2015-12-01 2 bar
13 2015-01-01 3 foo
14 2015-02-01 4 bar
15 2015-03-01 5 foo
16 2015-04-01 6 bar
17 2015-05-01 7 foo
18 2015-06-01 8 bar
19 2015-07-01 9 foo
20 2015-08-01 10 bar
21 2015-09-01 1 foo
22 2015-10-01 2 bar
23 2015-11-01 3 foo
24 2015-12-01 4 bar
25 2015-01-01 5 foo
26 2015-02-01 6 bar
27 2015-03-01 7 foo
28 2015-04-01 8 bar
29 2015-05-01 9 foo
30 2015-06-01 10 bar
31 2015-07-01 1 foo
32 2015-08-01 2 bar
33 2015-09-01 3 foo
34 2015-10-01 4 bar
35 2015-11-01 5 foo
36 2015-12-01 6 bar
37 2015-01-01 7 foo
38 2015-02-01 8 bar
39 2015-03-01 9 foo
40 2015-04-01 10 bar
41 2015-05-01 1 foo
42 2015-06-01 2 bar
43 2015-07-01 3 foo
44 2015-08-01 4 bar
45 2015-09-01 5 foo
46 2015-10-01 6 bar
47 2015-11-01 7 foo
48 2015-12-01 8 bar
49 2015-01-01 9 foo
50 2015-02-01 10 bar

View File

@ -4,57 +4,7 @@ DROP TABLE IF EXISTS test.report;
CREATE TABLE test.report(id UInt32, event_date Date, priority UInt32, description String) ENGINE = MergeTree(event_date, intHash32(id), (id, event_date, intHash32(id)), 8192);
INSERT INTO test.report(id,event_date,priority,description) VALUES(1, '2015-01-01', 1, 'foo');
INSERT INTO test.report(id,event_date,priority,description) VALUES(2, '2015-02-01', 2, 'bar');
INSERT INTO test.report(id,event_date,priority,description) VALUES(3, '2015-03-01', 3, 'foo');
INSERT INTO test.report(id,event_date,priority,description) VALUES(4, '2015-04-01', 4, 'bar');
INSERT INTO test.report(id,event_date,priority,description) VALUES(5, '2015-05-01', 5, 'foo');
INSERT INTO test.report(id,event_date,priority,description) VALUES(6, '2015-06-01', 6, 'bar');
INSERT INTO test.report(id,event_date,priority,description) VALUES(7, '2015-07-01', 7, 'foo');
INSERT INTO test.report(id,event_date,priority,description) VALUES(8, '2015-08-01', 8, 'bar');
INSERT INTO test.report(id,event_date,priority,description) VALUES(9, '2015-09-01', 9, 'foo');
INSERT INTO test.report(id,event_date,priority,description) VALUES(10, '2015-10-01', 10, 'bar');
INSERT INTO test.report(id,event_date,priority,description) VALUES(11, '2015-11-01', 1, 'foo');
INSERT INTO test.report(id,event_date,priority,description) VALUES(12, '2015-12-01', 2, 'bar');
INSERT INTO test.report(id,event_date,priority,description) VALUES(13, '2015-01-01', 3, 'foo');
INSERT INTO test.report(id,event_date,priority,description) VALUES(14, '2015-02-01', 4, 'bar');
INSERT INTO test.report(id,event_date,priority,description) VALUES(15, '2015-03-01', 5, 'foo');
INSERT INTO test.report(id,event_date,priority,description) VALUES(16, '2015-04-01', 6, 'bar');
INSERT INTO test.report(id,event_date,priority,description) VALUES(17, '2015-05-01', 7, 'foo');
INSERT INTO test.report(id,event_date,priority,description) VALUES(18, '2015-06-01', 8, 'bar');
INSERT INTO test.report(id,event_date,priority,description) VALUES(19, '2015-07-01', 9, 'foo');
INSERT INTO test.report(id,event_date,priority,description) VALUES(20, '2015-08-01', 10, 'bar');
INSERT INTO test.report(id,event_date,priority,description) VALUES(21, '2015-09-01', 1, 'foo');
INSERT INTO test.report(id,event_date,priority,description) VALUES(22, '2015-10-01', 2, 'bar');
INSERT INTO test.report(id,event_date,priority,description) VALUES(23, '2015-11-01', 3, 'foo');
INSERT INTO test.report(id,event_date,priority,description) VALUES(24, '2015-12-01', 4, 'bar');
INSERT INTO test.report(id,event_date,priority,description) VALUES(25, '2015-01-01', 5, 'foo');
INSERT INTO test.report(id,event_date,priority,description) VALUES(26, '2015-02-01', 6, 'bar');
INSERT INTO test.report(id,event_date,priority,description) VALUES(27, '2015-03-01', 7, 'foo');
INSERT INTO test.report(id,event_date,priority,description) VALUES(28, '2015-04-01', 8, 'bar');
INSERT INTO test.report(id,event_date,priority,description) VALUES(29, '2015-05-01', 9, 'foo');
INSERT INTO test.report(id,event_date,priority,description) VALUES(30, '2015-06-01', 10, 'bar');
INSERT INTO test.report(id,event_date,priority,description) VALUES(31, '2015-07-01', 1, 'foo');
INSERT INTO test.report(id,event_date,priority,description) VALUES(32, '2015-08-01', 2, 'bar');
INSERT INTO test.report(id,event_date,priority,description) VALUES(33, '2015-09-01', 3, 'foo');
INSERT INTO test.report(id,event_date,priority,description) VALUES(34, '2015-10-01', 4, 'bar');
INSERT INTO test.report(id,event_date,priority,description) VALUES(35, '2015-11-01', 5, 'foo');
INSERT INTO test.report(id,event_date,priority,description) VALUES(36, '2015-12-01', 6, 'bar');
INSERT INTO test.report(id,event_date,priority,description) VALUES(37, '2015-01-01', 7, 'foo');
INSERT INTO test.report(id,event_date,priority,description) VALUES(38, '2015-02-01', 8, 'bar');
INSERT INTO test.report(id,event_date,priority,description) VALUES(39, '2015-03-01', 9, 'foo');
INSERT INTO test.report(id,event_date,priority,description) VALUES(40, '2015-04-01', 10, 'bar');
INSERT INTO test.report(id,event_date,priority,description) VALUES(41, '2015-05-01', 1, 'foo');
INSERT INTO test.report(id,event_date,priority,description) VALUES(42, '2015-06-01', 2, 'bar');
INSERT INTO test.report(id,event_date,priority,description) VALUES(43, '2015-07-01', 3, 'foo');
INSERT INTO test.report(id,event_date,priority,description) VALUES(44, '2015-08-01', 4, 'bar');
INSERT INTO test.report(id,event_date,priority,description) VALUES(45, '2015-09-01', 5, 'foo');
INSERT INTO test.report(id,event_date,priority,description) VALUES(46, '2015-10-01', 6, 'bar');
INSERT INTO test.report(id,event_date,priority,description) VALUES(47, '2015-11-01', 7, 'foo');
INSERT INTO test.report(id,event_date,priority,description) VALUES(48, '2015-12-01', 8, 'bar');
INSERT INTO test.report(id,event_date,priority,description) VALUES(49, '2015-01-01', 9, 'foo');
INSERT INTO test.report(id,event_date,priority,description) VALUES(50, '2015-02-01', 10, 'bar');
INSERT INTO test.report(id,event_date,priority,description) VALUES (1, '2015-01-01', 1, 'foo')(2, '2015-02-01', 2, 'bar')(3, '2015-03-01', 3, 'foo')(4, '2015-04-01', 4, 'bar')(5, '2015-05-01', 5, 'foo');
SELECT * FROM (SELECT id, event_date, priority, description FROM remote('127.0.0.{1|2}', test, report)) ORDER BY id ASC;
DROP TABLE test.report;