This commit is contained in:
Alexey Milovidov 2015-02-11 23:34:20 +03:00
commit 070f8eb1e2
66 changed files with 1524 additions and 1002 deletions

View File

@ -4,6 +4,8 @@
#include <Poco/Net/StreamSocket.h>
#include <DB/Common/Throttler.h>
#include <DB/Core/Block.h>
#include <DB/Core/Defines.h>
#include <DB/Core/Progress.h>
@ -26,13 +28,14 @@ namespace DB
using Poco::SharedPtr;
class ShardReplicas;
class ParallelReplicas;
/// Поток блоков читающих из таблицы и ее имя
typedef std::pair<BlockInputStreamPtr, std::string> ExternalTableData;
/// Вектор пар, описывающих таблицы
typedef std::vector<ExternalTableData> ExternalTablesData;
/** Соединение с сервером БД для использования в клиенте.
* Как использовать - см. Core/Protocol.h
* (Реализацию на стороне сервера - см. Server/TCPHandler.h)
@ -42,7 +45,7 @@ typedef std::vector<ExternalTableData> ExternalTablesData;
*/
class Connection : private boost::noncopyable
{
friend class ShardReplicas;
friend class ParallelReplicas;
public:
Connection(const String & host_, UInt16 port_, const String & default_database_,
@ -69,6 +72,12 @@ public:
virtual ~Connection() {};
/// Установить ограничитель сетевого трафика. Один ограничитель может использоваться одновременно для нескольких разных соединений.
void setThrottler(const ThrottlerPtr & throttler_)
{
throttler = throttler_;
}
/// Пакет, который может быть получен от сервера.
struct Packet
@ -161,6 +170,11 @@ private:
const DataTypeFactory & data_type_factory;
/** Если не nullptr, то используется, чтобы ограничить сетевой трафик.
* Учитывается только трафик при передаче блоков. Другие пакеты не учитываются.
*/
ThrottlerPtr throttler;
Poco::Timespan connect_timeout;
Poco::Timespan receive_timeout;
Poco::Timespan send_timeout;

View File

@ -0,0 +1,93 @@
#pragma once
#include <DB/Common/Throttler.h>
#include <DB/Client/Connection.h>
#include <DB/Client/ConnectionPool.h>
namespace DB
{
/** Для получения данных сразу из нескольких реплик (соединений) в рамках одного потока.
* В качестве вырожденного случая, может также работать с одним соединением.
*
* Интерфейс почти совпадает с Connection.
*/
class ParallelReplicas final : private boost::noncopyable
{
public:
/// Принимает готовое соединение.
ParallelReplicas(Connection * connection_, Settings * settings_, ThrottlerPtr throttler_);
/// Принимает пул, из которого нужно будет достать одно или несколько соединений.
ParallelReplicas(IConnectionPool * pool_, Settings * settings_, ThrottlerPtr throttler_);
/// Отправить на реплики всё содержимое внешних таблиц.
void sendExternalTablesData(std::vector<ExternalTablesData> & data);
/// Отправить запрос на реплики.
void sendQuery(const String & query, const String & query_id = "",
UInt64 stage = QueryProcessingStage::Complete, bool with_pending_data = false);
/// Получить пакет от какой-нибудь реплики.
Connection::Packet receivePacket();
/// Отменить запросы к репликам
void sendCancel();
/** На каждой реплике читать и пропускать все пакеты до EndOfStream или Exception.
* Возвращает EndOfStream, если не было получено никакого исключения. В противном
* случае возвращает последний полученный пакет типа Exception.
*/
Connection::Packet drain();
/// Получить адреса реплик в виде строки.
std::string dumpAddresses() const;
/// Возвращает количесто реплик.
size_t size() const { return replica_map.size(); }
/// Проверить, есть ли действительные реплики.
bool hasActiveReplicas() const { return active_replica_count > 0; }
private:
/// Реплики хэшированные по id сокета
using ReplicaMap = std::unordered_map<int, Connection *>;
/// Зарегистрировать реплику.
void registerReplica(Connection * connection);
/// Получить реплику, на которой можно прочитать данные.
ReplicaMap::iterator getReplicaForReading();
/** Проверить, есть ли данные, которые можно прочитать на каких-нибудь репликах.
* Возвращает одну такую реплику, если она найдётся.
*/
ReplicaMap::iterator waitForReadEvent();
/// Пометить реплику как недействительную.
void invalidateReplica(ReplicaMap::iterator it);
Settings * settings;
ReplicaMap replica_map;
/// Если не nullptr, то используется, чтобы ограничить сетевой трафик.
ThrottlerPtr throttler;
std::vector<ConnectionPool::Entry> pool_entries;
ConnectionPool::Entry pool_entry;
/// Текущее количество действительных соединений к репликам.
size_t active_replica_count;
/// Запрос выполняется параллельно на нескольких репликах.
bool supports_parallel_execution;
/// Отправили запрос
bool sent_query = false;
/// Отменили запрос
bool cancelled = false;
};
}

View File

@ -1,62 +0,0 @@
#pragma once
#include <DB/Client/Connection.h>
#include <DB/Client/ConnectionPool.h>
namespace DB
{
/**
* Множество реплик одного шарда.
*/
class ShardReplicas final
{
public:
ShardReplicas(std::vector<ConnectionPool::Entry> & entries, const Settings & settings_);
~ShardReplicas() = default;
ShardReplicas(const ShardReplicas &) = delete;
ShardReplicas & operator=(const ShardReplicas &) = delete;
/// Отправить на реплики всё содержимое внешних таблиц.
void sendExternalTablesData(std::vector<ExternalTablesData> & data);
/// Отправить запрос на реплики.
void sendQuery(const String & query, const String & query_id = "",
UInt64 stage = QueryProcessingStage::Complete, bool with_pending_data = false);
/// Получить пакет от какой-нибудь реплики.
Connection::Packet receivePacket();
/// Разорвать соединения к репликам
void disconnect();
/// Отменить запросы к репликам
void sendCancel();
/// Для каждой реплики получить оставшиеся пакеты после отмена запроса.
Connection::Packet drain();
/// Получить адреса реплик в виде строки.
std::string dumpAddresses() const;
/// Возвращает количесто реплик.
size_t size() const { return replica_hash.size(); }
private:
/// Проверить, есть ли данные, которые можно прочитать на каких-нибудь репликах.
/// Возвращает соединение на такую реплику, если оно найдётся.
Connection ** waitForReadEvent();
private:
/// Реплики хэшированные по id сокета
using ReplicaHash = std::unordered_map<int, Connection *>;
private:
const Settings & settings;
ReplicaHash replica_hash;
size_t active_connection_count = 0;
bool sent_query = false;
bool cancelled = false;
};
}

View File

@ -0,0 +1,59 @@
#pragma once
#include <mutex>
#include <memory>
#include <statdaemons/Stopwatch.h>
/** Позволяет ограничить скорость чего либо (в штуках в секунду) с помощью sleep.
* Особенности работы:
* - считается только средняя скорость, от момента первого вызова функции add;
* если были периоды с низкой скоростью, то в течение промежутка времени после них, скорость будет выше;
*/
class Throttler
{
public:
Throttler(size_t max_speed_) : max_speed(max_speed_) {}
void add(size_t amount)
{
size_t new_count;
UInt64 elapsed_ns;
{
std::lock_guard<std::mutex> lock(mutex);
if (0 == count)
{
watch.start();
elapsed_ns = 0;
}
else
elapsed_ns = watch.elapsed();
count += amount;
new_count = count;
}
/// Сколько должно было бы пройти времени, если бы скорость была равна max_speed.
UInt64 desired_ns = new_count * 1000000000 / max_speed;
if (desired_ns > elapsed_ns)
{
UInt64 sleep_ns = desired_ns - elapsed_ns;
timespec sleep_ts;
sleep_ts.tv_sec = sleep_ns / 1000000000;
sleep_ts.tv_nsec = sleep_ns % 1000000000;
nanosleep(&sleep_ts, nullptr); /// NOTE Завершается раньше в случае сигнала. Это считается нормальным.
}
}
private:
size_t max_speed;
size_t count = 0;
Stopwatch watch {CLOCK_MONOTONIC_COARSE};
std::mutex mutex;
};
typedef std::shared_ptr<Throttler> ThrottlerPtr;

View File

@ -0,0 +1,27 @@
#pragma once
#include <DB/Core/Types.h>
#include <Poco/Util/Application.h>
#include <Poco/Net/NetworkInterface.h>
#include <Poco/Net/SocketAddress.h>
namespace DB
{
inline bool isLocalAddress(const Poco::Net::SocketAddress & address)
{
const UInt16 clickhouse_port = Poco::Util::Application::instance().config().getInt("tcp_port", 0);
static auto interfaces = Poco::Net::NetworkInterface::list();
if (clickhouse_port == address.port())
{
return interfaces.end() != std::find_if(interfaces.begin(), interfaces.end(),
[&] (const Poco::Net::NetworkInterface & interface) {
return interface.address() == address.host();
});
}
return false;
}
}

View File

@ -273,10 +273,8 @@ namespace ErrorCodes
CANNOT_COMPILE_CODE,
INCOMPATIBLE_TYPE_OF_JOIN,
NO_AVAILABLE_REPLICA,
UNEXPECTED_REPLICA,
MISMATCH_REPLICAS_DATA_SOURCES,
STORAGE_DOESNT_SUPPORT_PARALLEL_REPLICAS,
MISSING_RANGE_IN_CHUNK,
POCO_EXCEPTION = 1000,
STD_EXCEPTION,

View File

@ -48,7 +48,7 @@ inline bool memequalSSE2Wide(const char * p1, const char * p2, size_t size)
if ( compareSSE2(p1, p2)
&& compareSSE2(p1 + 16, p2 + 16)
&& compareSSE2(p1 + 32, p2 + 32)
&& compareSSE2(p1 + 40, p2 + 40))
&& compareSSE2(p1 + 48, p2 + 48))
{
p1 += 64;
p2 += 64;

View File

@ -5,16 +5,17 @@
#include <DB/DataStreams/IProfilingBlockInputStream.h>
#include <DB/DataStreams/OneBlockInputStream.h>
#include <DB/Common/VirtualColumnUtils.h>
#include <DB/Common/Throttler.h>
#include <DB/Interpreters/Context.h>
#include <DB/Client/ConnectionPool.h>
#include <DB/Client/ShardReplicas.h>
#include <DB/Client/ParallelReplicas.h>
namespace DB
{
/** Позволяет выполнить запрос (SELECT) на удалённом сервере и получить результат.
/** Позволяет выполнить запрос (SELECT) на удалённых репликах одного шарда и получить результат.
*/
class RemoteBlockInputStream : public IProfilingBlockInputStream
{
@ -25,7 +26,6 @@ private:
{
send_settings = true;
settings = *settings_;
use_many_replicas = (pool != nullptr) && UInt64(settings.max_parallel_replicas) > 1;
}
else
send_settings = false;
@ -33,29 +33,29 @@ private:
public:
/// Принимает готовое соединение.
RemoteBlockInputStream(Connection & connection_, const String & query_, const Settings * settings_,
RemoteBlockInputStream(Connection & connection_, const String & query_, const Settings * settings_, ThrottlerPtr throttler_ = nullptr,
const Tables & external_tables_ = Tables(), QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete,
const Context & context = getDefaultContext())
: connection(&connection_), query(query_), external_tables(external_tables_), stage(stage_), context(context)
: connection(&connection_), query(query_), throttler(throttler_), external_tables(external_tables_), stage(stage_), context(context)
{
init(settings_);
}
/// Принимает готовое соединение. Захватывает владение соединением из пула.
RemoteBlockInputStream(ConnectionPool::Entry & pool_entry_, const String & query_, const Settings * settings_,
RemoteBlockInputStream(ConnectionPool::Entry & pool_entry_, const String & query_, const Settings * settings_, ThrottlerPtr throttler_ = nullptr,
const Tables & external_tables_ = Tables(), QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete,
const Context & context = getDefaultContext())
: pool_entry(pool_entry_), connection(&*pool_entry_), query(query_),
: pool_entry(pool_entry_), connection(&*pool_entry_), query(query_), throttler(throttler_),
external_tables(external_tables_), stage(stage_), context(context)
{
init(settings_);
}
/// Принимает пул, из которого нужно будет достать соединение.
RemoteBlockInputStream(IConnectionPool * pool_, const String & query_, const Settings * settings_,
/// Принимает пул, из которого нужно будет достать одно или несколько соединений.
RemoteBlockInputStream(IConnectionPool * pool_, const String & query_, const Settings * settings_, ThrottlerPtr throttler_ = nullptr,
const Tables & external_tables_ = Tables(), QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete,
const Context & context = getDefaultContext())
: pool(pool_), query(query_), external_tables(external_tables_), stage(stage_), context(context)
: pool(pool_), query(query_), throttler(throttler_), external_tables(external_tables_), stage(stage_), context(context)
{
init(settings_);
}
@ -83,46 +83,39 @@ public:
if (!__sync_bool_compare_and_swap(&is_cancelled, false, true))
return;
if (sent_query && !was_cancelled && !finished && !got_exception_from_server)
if (isQueryInProgress() && !hasThrownException())
{
std::string addresses;
if (use_many_replicas)
addresses = shard_replicas->dumpAddresses();
else
addresses = connection->getServerAddress();
std::string addresses = parallel_replicas->dumpAddresses();
LOG_TRACE(log, "(" + addresses + ") Cancelling query");
/// Если запрошено прервать запрос - попросим удалённый сервер тоже прервать запрос.
if (use_many_replicas)
shard_replicas->sendCancel();
else
connection->sendCancel();
/// Если запрошено прервать запрос - попросим удалённые реплики тоже прервать запрос.
was_cancelled = true;
parallel_replicas->sendCancel();
}
}
~RemoteBlockInputStream() override
{
/** Если прервались в середине цикла общения с сервером, то закрываем соединение,
* чтобы оно не осталось висеть в рассихронизированном состоянии.
/** Если прервались в середине цикла общения с репликами, то прервываем
* все соединения, затем читаем и пропускаем оставшиеся пакеты чтобы
* эти соединения не остались висеть в рассихронизированном состоянии.
*/
if (sent_query && !finished)
if (isQueryInProgress())
{
if (use_many_replicas)
shard_replicas->disconnect();
else
connection->disconnect();
std::string addresses = parallel_replicas->dumpAddresses();
LOG_TRACE(log, "(" + addresses + ") Aborting query");
parallel_replicas->sendCancel();
(void) parallel_replicas->drain();
}
}
protected:
/// Отправить на удаленные сервера все временные таблицы
/// Отправить на удаленные реплики все временные таблицы
void sendExternalTables()
{
size_t count = use_many_replicas ? shard_replicas->size() : 1;
size_t count = parallel_replicas->size();
std::vector<ExternalTablesData> instances;
instances.reserve(count);
@ -144,50 +137,22 @@ protected:
instances.push_back(std::move(res));
}
if (use_many_replicas)
shard_replicas->sendExternalTablesData(instances);
else
connection->sendExternalTablesData(instances[0]);
parallel_replicas->sendExternalTablesData(instances);
}
Block readImpl() override
{
if (!sent_query)
{
if (use_many_replicas)
{
auto entries = pool->getMany(&settings);
if (entries.size() > 1)
shard_replicas = ext::make_unique<ShardReplicas>(entries, settings);
else
{
/// NOTE IConnectionPool::getMany() всегда возвращает как минимум одно соединение.
use_many_replicas = false;
connection = &*entries[0];
}
}
else
{
/// Если надо - достаём соединение из пула.
if (pool)
{
pool_entry = pool->get(send_settings ? &settings : nullptr);
connection = &*pool_entry;
}
}
if (use_many_replicas)
shard_replicas->sendQuery(query, "", stage, true);
else
connection->sendQuery(query, "", stage, send_settings ? &settings : nullptr, true);
createParallelReplicas();
parallel_replicas->sendQuery(query, "", stage, true);
sendExternalTables();
sent_query = true;
}
while (true)
{
Connection::Packet packet = use_many_replicas ? shard_replicas->receivePacket() : connection->receivePacket();
Connection::Packet packet = parallel_replicas->receivePacket();
switch (packet.type)
{
@ -198,13 +163,17 @@ protected:
break; /// Если блок пустой - получим другие пакеты до EndOfStream.
case Protocol::Server::Exception:
got_exception_from_server = true;
got_exception_from_replica = true;
packet.exception->rethrow();
break;
case Protocol::Server::EndOfStream:
finished = true;
return Block();
if (!parallel_replicas->hasActiveReplicas())
{
finished = true;
return Block();
}
break;
case Protocol::Server::Progress:
/** Используем прогресс с удалённого сервера.
@ -215,7 +184,7 @@ protected:
*/
progressImpl(packet.progress);
if (!was_cancelled && !finished && isCancelled())
if (isQueryInProgress() && isCancelled())
cancel();
break;
@ -233,6 +202,7 @@ protected:
break;
default:
got_unknown_packet_from_replica = true;
throw Exception("Unknown packet from server", ErrorCodes::UNKNOWN_PACKET_FROM_SERVER);
}
}
@ -243,10 +213,11 @@ protected:
/** Если одно из:
* - ничего не начинали делать;
* - получили все пакеты до EndOfStream;
* - получили с сервера эксепшен;
* - получили с одной реплики эксепшен;
* - получили с одной реплики неизвестный пакет;
* - то больше читать ничего не нужно.
*/
if (!sent_query || finished || got_exception_from_server)
if (hasNoQueryInProgress() || hasThrownException())
return;
/** Если ещё прочитали не все данные, но они больше не нужны.
@ -256,107 +227,102 @@ protected:
/// Отправим просьбу прервать выполнение запроса, если ещё не отправляли.
if (!was_cancelled)
{
std::string addresses;
if (use_many_replicas)
addresses = shard_replicas->dumpAddresses();
else
addresses = connection->getServerAddress();
std::string addresses = parallel_replicas->dumpAddresses();
LOG_TRACE(log, "(" + addresses + ") Cancelling query because enough data has been read");
was_cancelled = true;
if (use_many_replicas)
shard_replicas->sendCancel();
else
connection->sendCancel();
parallel_replicas->sendCancel();
}
if (use_many_replicas)
/// Получим оставшиеся пакеты, чтобы не было рассинхронизации в соединениях с репликами.
Connection::Packet packet = parallel_replicas->drain();
switch (packet.type)
{
Connection::Packet packet = shard_replicas->drain();
switch (packet.type)
{
case Protocol::Server::EndOfStream:
finished = true;
break;
case Protocol::Server::EndOfStream:
finished = true;
break;
case Protocol::Server::Exception:
got_exception_from_server = true;
packet.exception->rethrow();
break;
case Protocol::Server::Exception:
got_exception_from_replica = true;
packet.exception->rethrow();
break;
default:
throw Exception("Unknown packet from server", ErrorCodes::UNKNOWN_PACKET_FROM_SERVER);
}
default:
got_unknown_packet_from_replica = true;
throw Exception("Unknown packet from server", ErrorCodes::UNKNOWN_PACKET_FROM_SERVER);
}
}
/// Создать объект для общения с репликами одного шарда, на которых должен выполниться запрос.
void createParallelReplicas()
{
Settings * parallel_replicas_settings = send_settings ? &settings : nullptr;
if (connection != nullptr)
parallel_replicas = std::make_unique<ParallelReplicas>(connection, parallel_replicas_settings, throttler);
else
{
/// Получим оставшиеся пакеты, чтобы не было рассинхронизации в соединении с сервером.
while (true)
{
Connection::Packet packet = connection->receivePacket();
parallel_replicas = std::make_unique<ParallelReplicas>(pool, parallel_replicas_settings, throttler);
}
switch (packet.type)
{
case Protocol::Server::Data:
case Protocol::Server::Progress:
case Protocol::Server::ProfileInfo:
case Protocol::Server::Totals:
case Protocol::Server::Extremes:
break;
/// Возвращает true, если запрос отправлен, а ещё не выполнен.
bool isQueryInProgress() const
{
return sent_query && !finished && !was_cancelled;
}
case Protocol::Server::EndOfStream:
finished = true;
return;
/// Возвращает true, если никакой запрос не отправлен или один запрос уже выполнен.
bool hasNoQueryInProgress() const
{
return !sent_query || finished;
}
case Protocol::Server::Exception:
got_exception_from_server = true;
packet.exception->rethrow();
break;
default:
throw Exception("Unknown packet from server", ErrorCodes::UNKNOWN_PACKET_FROM_SERVER);
}
}
}
/// Возвращает true, если исключение было выкинуто.
bool hasThrownException() const
{
return got_exception_from_replica || got_unknown_packet_from_replica;
}
private:
IConnectionPool * pool = nullptr;
ConnectionPool::Entry pool_entry;
Connection * connection = nullptr;
std::unique_ptr<ShardReplicas> shard_replicas;
std::unique_ptr<ParallelReplicas> parallel_replicas;
const String query;
bool send_settings;
Settings settings;
/// Если не nullptr, то используется, чтобы ограничить сетевой трафик.
ThrottlerPtr throttler;
/// Временные таблицы, которые необходимо переслать на удаленные сервера.
Tables external_tables;
QueryProcessingStage::Enum stage;
Context context;
bool use_many_replicas = false;
/// Отправили запрос (это делается перед получением первого блока).
bool sent_query = false;
/** Получили все данные от сервера, до пакета EndOfStream.
/** Получили все данные от всех реплик, до пакета EndOfStream.
* Если при уничтожении объекта, ещё не все данные считаны,
* то для того, чтобы не было рассинхронизации, на сервер отправляется просьба прервать выполнение запроса,
* то для того, чтобы не было рассинхронизации, на реплики отправляются просьбы прервать выполнение запроса,
* и после этого считываются все пакеты до EndOfStream.
*/
bool finished = false;
/** На сервер была отправлена просьба прервать выполенение запроса, так как данные больше не нужны.
/** На каждую реплику была отправлена просьба прервать выполнение запроса, так как данные больше не нужны.
* Это может быть из-за того, что данных достаточно (например, при использовании LIMIT),
* или если на стороне клиента произошло исключение.
*/
bool was_cancelled = false;
/// С сервера было получено исключение. В этом случае получать больше пакетов или просить прервать запрос не нужно.
bool got_exception_from_server = false;
/** С одной репилки было получено исключение. В этом случае получать больше пакетов или
* просить прервать запрос на этой реплике не нужно.
*/
bool got_exception_from_replica = false;
/** С одной реплики был получен неизвестный пакет. В этом случае получать больше пакетов или
* просить прервать запрос на этой реплике не нужно.
*/
bool got_unknown_packet_from_replica = false;
Logger * log = &Logger::get("RemoteBlockInputStream");

View File

@ -0,0 +1,137 @@
#pragma once
#include <DB/Dictionaries/IDictionary.h>
#include <DB/Dictionaries/IDictionarySource.h>
#include <DB/Dictionaries/DictionaryStructure.h>
namespace DB
{
class CacheDictionary final : public IDictionary
{
public:
CacheDictionary(const std::string & name, const DictionaryStructure & dict_struct,
DictionarySourcePtr source_ptr, const DictionaryLifetime dict_lifetime,
const std::size_t size)
: name{name}, dict_struct(dict_struct),
source_ptr{std::move(source_ptr)}, dict_lifetime(dict_lifetime),
size{size}
{
if (!this->source_ptr->supportsSelectiveLoad())
throw Exception{
"Source cannot be used with CacheDictionary",
ErrorCodes::UNSUPPORTED_METHOD
};
}
CacheDictionary(const CacheDictionary & other)
: CacheDictionary{other.name, other.dict_struct, other.source_ptr->clone(), other.dict_lifetime, other.size}
{}
std::string getName() const override { return name; }
std::string getTypeName() const override { return "CacheDictionary"; }
bool isCached() const override { return true; }
DictionaryPtr clone() const override { return std::make_unique<CacheDictionary>(*this); }
const IDictionarySource * getSource() const override { return source_ptr.get(); }
const DictionaryLifetime & getLifetime() const override { return dict_lifetime; }
bool hasHierarchy() const override { return false; }
id_t toParent(const id_t id) const override { return 0; }
#define DECLARE_SAFE_GETTER(TYPE, NAME, LC_TYPE) \
TYPE get##NAME(const std::string & attribute_name, const id_t id) const override\
{\
return {};\
}
DECLARE_SAFE_GETTER(UInt8, UInt8, uint8)
DECLARE_SAFE_GETTER(UInt16, UInt16, uint16)
DECLARE_SAFE_GETTER(UInt32, UInt32, uint32)
DECLARE_SAFE_GETTER(UInt64, UInt64, uint64)
DECLARE_SAFE_GETTER(Int8, Int8, int8)
DECLARE_SAFE_GETTER(Int16, Int16, int16)
DECLARE_SAFE_GETTER(Int32, Int32, int32)
DECLARE_SAFE_GETTER(Int64, Int64, int64)
DECLARE_SAFE_GETTER(Float32, Float32, float32)
DECLARE_SAFE_GETTER(Float64, Float64, float64)
DECLARE_SAFE_GETTER(StringRef, String, string)
#undef DECLARE_SAFE_GETTER
std::size_t getAttributeIndex(const std::string & attribute_name) const override
{
return {};
}
#define DECLARE_TYPE_CHECKER(NAME, LC_NAME)\
bool is##NAME(const std::size_t attribute_idx) const override\
{\
return true;\
}
DECLARE_TYPE_CHECKER(UInt8, uint8)
DECLARE_TYPE_CHECKER(UInt16, uint16)
DECLARE_TYPE_CHECKER(UInt32, uint32)
DECLARE_TYPE_CHECKER(UInt64, uint64)
DECLARE_TYPE_CHECKER(Int8, int8)
DECLARE_TYPE_CHECKER(Int16, int16)
DECLARE_TYPE_CHECKER(Int32, int32)
DECLARE_TYPE_CHECKER(Int64, int64)
DECLARE_TYPE_CHECKER(Float32, float32)
DECLARE_TYPE_CHECKER(Float64, float64)
DECLARE_TYPE_CHECKER(String, string)
#undef DECLARE_TYPE_CHECKER
#define DECLARE_UNSAFE_GETTER(TYPE, NAME, LC_NAME)\
TYPE get##NAME##Unsafe(const std::size_t attribute_idx, const id_t id) const override\
{\
return {};\
}
DECLARE_UNSAFE_GETTER(UInt8, UInt8, uint8)
DECLARE_UNSAFE_GETTER(UInt16, UInt16, uint16)
DECLARE_UNSAFE_GETTER(UInt32, UInt32, uint32)
DECLARE_UNSAFE_GETTER(UInt64, UInt64, uint64)
DECLARE_UNSAFE_GETTER(Int8, Int8, int8)
DECLARE_UNSAFE_GETTER(Int16, Int16, int16)
DECLARE_UNSAFE_GETTER(Int32, Int32, int32)
DECLARE_UNSAFE_GETTER(Int64, Int64, int64)
DECLARE_UNSAFE_GETTER(Float32, Float32, float32)
DECLARE_UNSAFE_GETTER(Float64, Float64, float64)
DECLARE_UNSAFE_GETTER(StringRef, String, string)
#undef DECLARE_UNSAFE_GETTER
private:
const std::string name;
const DictionaryStructure dict_struct;
const DictionarySourcePtr source_ptr;
const DictionaryLifetime dict_lifetime;
const std::size_t size;
union item
{
UInt8 uint8_value;
UInt16 uint16_value;
UInt32 uint32_value;
UInt64 uint64_value;
Int8 int8_value;
Int16 int16_value;
Int32 int32_value;
Int64 int64_value;
Float32 float32_value;
Float64 float64_value;
StringRef string_value;
};
struct cell
{
id_t id;
std::vector<item> attrs;
};
std::vector<cell> cells;
};
}

View File

@ -4,6 +4,7 @@
#include <DB/Client/ConnectionPool.h>
#include <DB/DataStreams/RemoteBlockInputStream.h>
#include <DB/Interpreters/executeQuery.h>
#include <DB/Common/isLocalAddress.h>
#include <statdaemons/ext/range.hpp>
#include <Poco/Util/AbstractConfiguration.h>
#include <Poco/Net/NetworkInterface.h>
@ -13,44 +14,50 @@ namespace DB
const auto max_connections = 1;
class ClickhouseDictionarySource final : public IDictionarySource
/** Allows loading dictionaries from local or remote ClickHouse instance
* @todo use ConnectionPoolWithFailover
* @todo invent a way to keep track of source modifications
*/
class ClickHouseDictionarySource final : public IDictionarySource
{
static const auto max_block_size = 8192;
public:
ClickhouseDictionarySource(const Poco::Util::AbstractConfiguration & config,
ClickHouseDictionarySource(const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
Block & sample_block, Context & context)
: host{config.getString(config_prefix + "host")},
port(config.getInt(config_prefix + "port")),
user{config.getString(config_prefix + "user", "")},
password{config.getString(config_prefix + "password", "")},
db{config.getString(config_prefix + "db", "")},
table{config.getString(config_prefix + "table")},
: host{config.getString(config_prefix + ".host")},
port(config.getInt(config_prefix + ".port")),
user{config.getString(config_prefix + ".user", "")},
password{config.getString(config_prefix + ".password", "")},
db{config.getString(config_prefix + ".db", "")},
table{config.getString(config_prefix + ".table")},
sample_block{sample_block}, context(context),
is_local{isLocal(host, port)},
pool{is_local ? nullptr : ext::make_unique<ConnectionPool>(
is_local{isLocalAddress({ host, port })},
pool{is_local ? nullptr : std::make_unique<ConnectionPool>(
max_connections, host, port, db, user, password, context.getDataTypeFactory(),
"ClickhouseDictionarySource")
"ClickHouseDictionarySource")
},
load_all_query{composeLoadAllQuery(sample_block, table)}
{}
ClickhouseDictionarySource(const ClickhouseDictionarySource & other)
/// copy-constructor is provided in order to support cloneability
ClickHouseDictionarySource(const ClickHouseDictionarySource & other)
: host{other.host}, port{other.port}, user{other.user}, password{other.password},
db{other.db}, table{other.db},
sample_block{other.sample_block}, context(other.context),
is_local{other.is_local},
pool{is_local ? nullptr : ext::make_unique<ConnectionPool>(
pool{is_local ? nullptr : std::make_unique<ConnectionPool>(
max_connections, host, port, db, user, password, context.getDataTypeFactory(),
"ClickhouseDictionarySource")},
"ClickHouseDictionarySource")},
load_all_query{other.load_all_query}
{}
BlockInputStreamPtr loadAll() override
{
/** Query to local ClickHouse is marked internal in order to avoid
* the necessity of holding process_list_element shared pointer.
*/
if (is_local)
return executeQuery(load_all_query, context).in;
return executeQuery(load_all_query, context, true).in;
return new RemoteBlockInputStream{pool.get(), load_all_query, nullptr};
}
@ -70,12 +77,13 @@ public:
};
}
/// @todo check update time somehow
bool isModified() const override { return true; }
bool supportsSelectiveLoad() const override { return true; }
DictionarySourcePtr clone() const override { return ext::make_unique<ClickhouseDictionarySource>(*this); }
DictionarySourcePtr clone() const override { return std::make_unique<ClickHouseDictionarySource>(*this); }
private:
/// @todo escape table and column names
static std::string composeLoadAllQuery(const Block & block, const std::string & table)
{
std::string query{"SELECT "};

View File

@ -3,11 +3,11 @@
#include <DB/Core/Block.h>
#include <DB/Dictionaries/DictionaryStructure.h>
#include <DB/Dictionaries/FileDictionarySource.h>
#include <DB/Dictionaries/MysqlDictionarySource.h>
#include <DB/Dictionaries/ClickhouseDictionarySource.h>
#include <DB/Dictionaries/MySQLDictionarySource.h>
#include <DB/Dictionaries/ClickHouseDictionarySource.h>
#include <DB/DataTypes/DataTypesNumberFixed.h>
#include <Yandex/singleton.h>
#include <statdaemons/ext/memory.hpp>
#include <memory>
namespace DB
{
@ -38,6 +38,7 @@ Block createSampleBlock(const DictionaryStructure & dict_struct, const Context &
}
/// creates IDictionarySource instance from config and DictionaryStructure
class DictionarySourceFactory : public Singleton<DictionarySourceFactory>
{
public:
@ -46,25 +47,38 @@ public:
const DictionaryStructure & dict_struct,
Context & context) const
{
Poco::Util::AbstractConfiguration::Keys keys;
config.keys(config_prefix, keys);
if (keys.size() != 1)
throw Exception{
"Element dictionary.source should have exactly one child element",
ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG
};
auto sample_block = createSampleBlock(dict_struct, context);
if (config.has(config_prefix + "file"))
const auto & source_type = keys.front();
if ("file" == source_type)
{
const auto & filename = config.getString(config_prefix + "file.path");
const auto & format = config.getString(config_prefix + "file.format");
return ext::make_unique<FileDictionarySource>(filename, format, sample_block, context);
const auto filename = config.getString(config_prefix + ".file.path");
const auto format = config.getString(config_prefix + ".file.format");
return std::make_unique<FileDictionarySource>(filename, format, sample_block, context);
}
else if (config.has(config_prefix + "mysql"))
else if ("mysql" == source_type)
{
return ext::make_unique<MysqlDictionarySource>(config, config_prefix + "mysql", sample_block, context);
return std::make_unique<MySQLDictionarySource>(config, config_prefix + ".mysql", sample_block);
}
else if (config.has(config_prefix + "clickhouse"))
else if ("clickhouse" == source_type)
{
return ext::make_unique<ClickhouseDictionarySource>(config, config_prefix + "clickhouse.",
return std::make_unique<ClickHouseDictionarySource>(config, config_prefix + ".clickhouse",
sample_block, context);
}
throw Exception{"unsupported source type"};
throw Exception{
"Unknown dictionary source type: " + source_type,
ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG
};
}
};

View File

@ -9,7 +9,7 @@
namespace DB
{
enum class attribute_type
enum class AttributeType
{
uint8,
uint16,
@ -24,20 +24,20 @@ enum class attribute_type
string
};
inline attribute_type getAttributeTypeByName(const std::string & type)
inline AttributeType getAttributeTypeByName(const std::string & type)
{
static const std::unordered_map<std::string, attribute_type> dictionary{
{ "UInt8", attribute_type::uint8 },
{ "UInt16", attribute_type::uint16 },
{ "UInt32", attribute_type::uint32 },
{ "UInt64", attribute_type::uint64 },
{ "Int8", attribute_type::int8 },
{ "Int16", attribute_type::int16 },
{ "Int32", attribute_type::int32 },
{ "Int64", attribute_type::int64 },
{ "Float32", attribute_type::float32 },
{ "Float64", attribute_type::float64 },
{ "String", attribute_type::string },
static const std::unordered_map<std::string, AttributeType> dictionary{
{ "UInt8", AttributeType::uint8 },
{ "UInt16", AttributeType::uint16 },
{ "UInt32", AttributeType::uint32 },
{ "UInt64", AttributeType::uint64 },
{ "Int8", AttributeType::int8 },
{ "Int16", AttributeType::int16 },
{ "Int32", AttributeType::int32 },
{ "Int64", AttributeType::int64 },
{ "Float32", AttributeType::float32 },
{ "Float64", AttributeType::float64 },
{ "String", AttributeType::string },
};
const auto it = dictionary.find(type);
@ -50,44 +50,53 @@ inline attribute_type getAttributeTypeByName(const std::string & type)
};
}
inline std::string toString(const attribute_type type)
inline std::string toString(const AttributeType type)
{
switch (type)
{
case attribute_type::uint8: return "UInt8";
case attribute_type::uint16: return "UInt16";
case attribute_type::uint32: return "UInt32";
case attribute_type::uint64: return "UInt64";
case attribute_type::int8: return "Int8";
case attribute_type::int16: return "Int16";
case attribute_type::int32: return "Int32";
case attribute_type::int64: return "Int64";
case attribute_type::float32: return "Float32";
case attribute_type::float64: return "Float64";
case attribute_type::string: return "String";
case AttributeType::uint8: return "UInt8";
case AttributeType::uint16: return "UInt16";
case AttributeType::uint32: return "UInt32";
case AttributeType::uint64: return "UInt64";
case AttributeType::int8: return "Int8";
case AttributeType::int16: return "Int16";
case AttributeType::int32: return "Int32";
case AttributeType::int64: return "Int64";
case AttributeType::float32: return "Float32";
case AttributeType::float64: return "Float64";
case AttributeType::string: return "String";
}
throw Exception{
"Unknown attribute_type " + toString(type),
"Unknown attribute_type " + toString(static_cast<int>(type)),
ErrorCodes::ARGUMENT_OUT_OF_BOUND
};
}
/// Min and max lifetimes for a dictionary or it's entry
struct DictionaryLifetime
{
std::uint64_t min_sec;
std::uint64_t max_sec;
static DictionaryLifetime fromConfig(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix)
DictionaryLifetime(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix)
{
const auto & lifetime_min_key = config_prefix + ".min";
const auto has_min = config.has(lifetime_min_key);
const std::uint64_t min_update_time = has_min ? config.getInt(lifetime_min_key) : config.getInt(config_prefix);
const std::uint64_t max_update_time = has_min ? config.getInt(config_prefix + ".max") : min_update_time;
return { min_update_time, max_update_time };
this->min_sec = has_min ? config.getInt(lifetime_min_key) : config.getInt(config_prefix);
this->max_sec = has_min ? config.getInt(config_prefix + ".max") : this->min_sec;
}
};
/** Holds the description of a single dictionary attribute:
* - name, used for lookup into dictionary and source;
* - type, used in conjunction with DataTypeFactory and getAttributeTypeByname;
* - null_value, used as a default value for non-existent entries in the dictionary,
* decimal representation for numeric attributes;
* - hierarchical, whether this attribute defines a hierarchy;
* - injective, whether the mapping to parent is injective (can be used for optimization of GROUP BY?)
*/
struct DictionaryAttribute
{
std::string name;
@ -97,34 +106,34 @@ struct DictionaryAttribute
bool injective;
};
/// Name of identifier plus list of attributes
struct DictionaryStructure
{
std::string id_name;
std::vector<DictionaryAttribute> attributes;
static DictionaryStructure fromConfig(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix)
DictionaryStructure(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix)
: id_name{config.getString(config_prefix + ".id.name")}
{
const auto & id_name = config.getString(config_prefix + ".id.name");
if (id_name.empty())
throw Exception{
"No 'id' specified for dictionary",
ErrorCodes::BAD_ARGUMENTS
};
DictionaryStructure result{id_name};
Poco::Util::AbstractConfiguration::Keys keys;
config.keys(config_prefix, keys);
auto has_hierarchy = false;
for (const auto & key : keys)
{
if (0 != strncmp(key.data(), "attribute", strlen("attribute")))
continue;
const auto & prefix = config_prefix + '.' + key + '.';
const auto & name = config.getString(prefix + "name");
const auto & type = config.getString(prefix + "type");
const auto & null_value = config.getString(prefix + "null_value");
const auto prefix = config_prefix + '.' + key + '.';
const auto name = config.getString(prefix + "name");
const auto type = config.getString(prefix + "type");
const auto null_value = config.getString(prefix + "null_value");
const auto hierarchical = config.getBool(prefix + "hierarchical", false);
const auto injective = config.getBool(prefix + "injective", false);
if (name.empty() || type.empty())
@ -141,16 +150,16 @@ struct DictionaryStructure
has_hierarchy = has_hierarchy || hierarchical;
result.attributes.emplace_back(DictionaryAttribute{name, type, null_value, hierarchical, injective});
attributes.emplace_back(DictionaryAttribute{
name, type, null_value, hierarchical, injective
});
}
if (result.attributes.empty())
if (attributes.empty())
throw Exception{
"Dictionary has no attributes defined",
ErrorCodes::BAD_ARGUMENTS
};
return result;
}
};

View File

@ -10,13 +10,14 @@
namespace DB
{
/// Allows loading dictionaries from a file with given format, does not support "random access"
class FileDictionarySource final : public IDictionarySource
{
static const auto max_block_size = 8192;
public:
FileDictionarySource(const std::string & filename, const std::string & format, Block & sample_block,
Context & context)
const Context & context)
: filename{filename}, format{format}, sample_block{sample_block}, context(context),
last_modification{getLastModification()}
{}
@ -29,7 +30,7 @@ public:
BlockInputStreamPtr loadAll() override
{
auto in_ptr = ext::make_unique<ReadBufferFromFile>(filename);
auto in_ptr = std::make_unique<ReadBufferFromFile>(filename);
auto stream = context.getFormatFactory().getInput(
format, *in_ptr, sample_block, max_block_size, context.getDataTypeFactory());
last_modification = getLastModification();
@ -54,8 +55,9 @@ public:
}
bool isModified() const override { return getLastModification() > last_modification; }
bool supportsSelectiveLoad() const override { return false; }
DictionarySourcePtr clone() const override { return ext::make_unique<FileDictionarySource>(*this); }
DictionarySourcePtr clone() const override { return std::make_unique<FileDictionarySource>(*this); }
private:
Poco::Timestamp getLastModification() const { return Poco::File{filename}.getLastModified(); }
@ -63,7 +65,7 @@ private:
const std::string filename;
const std::string format;
Block sample_block;
Context & context;
const Context & context;
Poco::Timestamp last_modification;
};

View File

@ -2,10 +2,8 @@
#include <DB/Dictionaries/IDictionary.h>
#include <DB/Dictionaries/IDictionarySource.h>
#include <DB/Dictionaries/DictionarySourceFactory.h>
#include <DB/Dictionaries/DictionaryStructure.h>
#include <statdaemons/ext/range.hpp>
#include <Poco/Util/AbstractConfiguration.h>
#include <vector>
namespace DB
@ -36,9 +34,9 @@ public:
bool isCached() const override { return false; }
DictionaryPtr clone() const override { return ext::make_unique<FlatDictionary>(*this); }
DictionaryPtr clone() const override { return std::make_unique<FlatDictionary>(*this); }
const IDictionarySource * const getSource() const override { return source_ptr.get(); }
const IDictionarySource * getSource() const override { return source_ptr.get(); }
const DictionaryLifetime & getLifetime() const override { return dict_lifetime; }
@ -50,18 +48,18 @@ public:
switch (hierarchical_attribute->type)
{
case attribute_type::uint8: return id < attr->uint8_array->size() ? (*attr->uint8_array)[id] : attr->uint8_null_value;
case attribute_type::uint16: return id < attr->uint16_array->size() ? (*attr->uint16_array)[id] : attr->uint16_null_value;
case attribute_type::uint32: return id < attr->uint32_array->size() ? (*attr->uint32_array)[id] : attr->uint32_null_value;
case attribute_type::uint64: return id < attr->uint64_array->size() ? (*attr->uint64_array)[id] : attr->uint64_null_value;
case attribute_type::int8: return id < attr->int8_array->size() ? (*attr->int8_array)[id] : attr->int8_null_value;
case attribute_type::int16: return id < attr->int16_array->size() ? (*attr->int16_array)[id] : attr->int16_null_value;
case attribute_type::int32: return id < attr->int32_array->size() ? (*attr->int32_array)[id] : attr->int32_null_value;
case attribute_type::int64: return id < attr->int64_array->size() ? (*attr->int64_array)[id] : attr->int64_null_value;
case attribute_type::float32:
case attribute_type::float64:
case attribute_type::string:
break;
case AttributeType::uint8: return id < attr->uint8_array->size() ? (*attr->uint8_array)[id] : attr->uint8_null_value;
case AttributeType::uint16: return id < attr->uint16_array->size() ? (*attr->uint16_array)[id] : attr->uint16_null_value;
case AttributeType::uint32: return id < attr->uint32_array->size() ? (*attr->uint32_array)[id] : attr->uint32_null_value;
case AttributeType::uint64: return id < attr->uint64_array->size() ? (*attr->uint64_array)[id] : attr->uint64_null_value;
case AttributeType::int8: return id < attr->int8_array->size() ? (*attr->int8_array)[id] : attr->int8_null_value;
case AttributeType::int16: return id < attr->int16_array->size() ? (*attr->int16_array)[id] : attr->int16_null_value;
case AttributeType::int32: return id < attr->int32_array->size() ? (*attr->int32_array)[id] : attr->int32_null_value;
case AttributeType::int64: return id < attr->int64_array->size() ? (*attr->int64_array)[id] : attr->int64_null_value;
case AttributeType::float32:
case AttributeType::float64:
case AttributeType::string:
break;
}
throw Exception{
@ -75,7 +73,7 @@ public:
{\
const auto idx = getAttributeIndex(attribute_name);\
const auto & attribute = attributes[idx];\
if (attribute.type != attribute_type::LC_TYPE)\
if (attribute.type != AttributeType::LC_TYPE)\
throw Exception{\
"Type mismatch: attribute " + attribute_name + " has type " + toString(attribute.type),\
ErrorCodes::TYPE_MISMATCH\
@ -112,7 +110,7 @@ public:
#define DECLARE_TYPE_CHECKER(NAME, LC_NAME)\
bool is##NAME(const std::size_t attribute_idx) const override\
{\
return attributes[attribute_idx].type == attribute_type::LC_NAME;\
return attributes[attribute_idx].type == AttributeType::LC_NAME;\
}
DECLARE_TYPE_CHECKER(UInt8, uint8)
DECLARE_TYPE_CHECKER(UInt16, uint16)
@ -151,7 +149,7 @@ public:
private:
struct attribute_t
{
attribute_type type;
AttributeType type;
UInt8 uint8_null_value;
UInt16 uint16_null_value;
UInt32 uint32_null_value;
@ -195,6 +193,7 @@ private:
void loadData()
{
auto stream = source_ptr->loadAll();
stream->readPrefix();
while (const auto block = stream->read())
{
@ -209,65 +208,67 @@ private:
setAttributeValue(attribute, id_column[row_idx].get<UInt64>(), attribute_column[row_idx]);
}
}
stream->readSuffix();
}
attribute_t createAttributeWithType(const attribute_type type, const std::string & null_value)
attribute_t createAttributeWithType(const AttributeType type, const std::string & null_value)
{
attribute_t attr{type};
switch (type)
{
case attribute_type::uint8:
case AttributeType::uint8:
attr.uint8_null_value = DB::parse<UInt8>(null_value);
attr.uint8_array.reset(new PODArray<UInt8>);
attr.uint8_array->resize_fill(initial_array_size, attr.uint8_null_value);
break;
case attribute_type::uint16:
case AttributeType::uint16:
attr.uint16_null_value = DB::parse<UInt16>(null_value);
attr.uint16_array.reset(new PODArray<UInt16>);
attr.uint16_array->resize_fill(initial_array_size, attr.uint16_null_value);
break;
case attribute_type::uint32:
case AttributeType::uint32:
attr.uint32_null_value = DB::parse<UInt32>(null_value);
attr.uint32_array.reset(new PODArray<UInt32>);
attr.uint32_array->resize_fill(initial_array_size, attr.uint32_null_value);
break;
case attribute_type::uint64:
case AttributeType::uint64:
attr.uint64_null_value = DB::parse<UInt64>(null_value);
attr.uint64_array.reset(new PODArray<UInt64>);
attr.uint64_array->resize_fill(initial_array_size, attr.uint64_null_value);
break;
case attribute_type::int8:
case AttributeType::int8:
attr.int8_null_value = DB::parse<Int8>(null_value);
attr.int8_array.reset(new PODArray<Int8>);
attr.int8_array->resize_fill(initial_array_size, attr.int8_null_value);
break;
case attribute_type::int16:
case AttributeType::int16:
attr.int16_null_value = DB::parse<Int16>(null_value);
attr.int16_array.reset(new PODArray<Int16>);
attr.int16_array->resize_fill(initial_array_size, attr.int16_null_value);
break;
case attribute_type::int32:
case AttributeType::int32:
attr.int32_null_value = DB::parse<Int32>(null_value);
attr.int32_array.reset(new PODArray<Int32>);
attr.int32_array->resize_fill(initial_array_size, attr.int32_null_value);
break;
case attribute_type::int64:
case AttributeType::int64:
attr.int64_null_value = DB::parse<Int64>(null_value);
attr.int64_array.reset(new PODArray<Int64>);
attr.int64_array->resize_fill(initial_array_size, attr.int64_null_value);
break;
case attribute_type::float32:
case AttributeType::float32:
attr.float32_null_value = DB::parse<Float32>(null_value);
attr.float32_array.reset(new PODArray<Float32>);
attr.float32_array->resize_fill(initial_array_size, attr.float32_null_value);
break;
case attribute_type::float64:
case AttributeType::float64:
attr.float64_null_value = DB::parse<Float64>(null_value);
attr.float64_array.reset(new PODArray<Float64>);
attr.float64_array->resize_fill(initial_array_size, attr.float64_null_value);
break;
case attribute_type::string:
case AttributeType::string:
attr.string_null_value = null_value;
attr.string_arena.reset(new Arena);
attr.string_array.reset(new PODArray<StringRef>);
@ -288,77 +289,77 @@ private:
switch (attribute.type)
{
case attribute_type::uint8:
case AttributeType::uint8:
{
if (id >= attribute.uint8_array->size())
attribute.uint8_array->resize_fill(id, attribute.uint8_null_value);
(*attribute.uint8_array)[id] = value.get<UInt64>();
break;
}
case attribute_type::uint16:
case AttributeType::uint16:
{
if (id >= attribute.uint16_array->size())
attribute.uint16_array->resize_fill(id, attribute.uint16_null_value);
(*attribute.uint16_array)[id] = value.get<UInt64>();
break;
}
case attribute_type::uint32:
case AttributeType::uint32:
{
if (id >= attribute.uint32_array->size())
attribute.uint32_array->resize_fill(id, attribute.uint32_null_value);
(*attribute.uint32_array)[id] = value.get<UInt64>();
break;
}
case attribute_type::uint64:
case AttributeType::uint64:
{
if (id >= attribute.uint64_array->size())
attribute.uint64_array->resize_fill(id, attribute.uint64_null_value);
(*attribute.uint64_array)[id] = value.get<UInt64>();
break;
}
case attribute_type::int8:
case AttributeType::int8:
{
if (id >= attribute.int8_array->size())
attribute.int8_array->resize_fill(id, attribute.int8_null_value);
(*attribute.int8_array)[id] = value.get<Int64>();
break;
}
case attribute_type::int16:
case AttributeType::int16:
{
if (id >= attribute.int16_array->size())
attribute.int16_array->resize_fill(id, attribute.int16_null_value);
(*attribute.int16_array)[id] = value.get<Int64>();
break;
}
case attribute_type::int32:
case AttributeType::int32:
{
if (id >= attribute.int32_array->size())
attribute.int32_array->resize_fill(id, attribute.int32_null_value);
(*attribute.int32_array)[id] = value.get<Int64>();
break;
}
case attribute_type::int64:
case AttributeType::int64:
{
if (id >= attribute.int64_array->size())
attribute.int64_array->resize_fill(id, attribute.int64_null_value);
(*attribute.int64_array)[id] = value.get<Int64>();
break;
}
case attribute_type::float32:
case AttributeType::float32:
{
if (id >= attribute.float32_array->size())
attribute.float32_array->resize_fill(id, attribute.float32_null_value);
(*attribute.float32_array)[id] = value.get<Float64>();
break;
}
case attribute_type::float64:
case AttributeType::float64:
{
if (id >= attribute.float64_array->size())
attribute.float64_array->resize_fill(id, attribute.float64_null_value);
(*attribute.float64_array)[id] = value.get<Float64>();
break;
}
case attribute_type::string:
case AttributeType::string:
{
if (id >= attribute.string_array->size())
attribute.string_array->resize_fill(id, attribute.string_null_value);

View File

@ -2,11 +2,10 @@
#include <DB/Dictionaries/IDictionary.h>
#include <DB/Dictionaries/IDictionarySource.h>
#include <DB/Dictionaries/DictionarySourceFactory.h>
#include <DB/Dictionaries/DictionaryStructure.h>
#include <DB/Common/HashTable/HashMap.h>
#include <statdaemons/ext/range.hpp>
#include <statdaemons/ext/memory.hpp>
#include <memory>
namespace DB
{
@ -33,9 +32,9 @@ public:
bool isCached() const override { return false; }
DictionaryPtr clone() const override { return ext::make_unique<HashedDictionary>(*this); }
DictionaryPtr clone() const override { return std::make_unique<HashedDictionary>(*this); }
const IDictionarySource * const getSource() const override { return source_ptr.get(); }
const IDictionarySource * getSource() const override { return source_ptr.get(); }
const DictionaryLifetime & getLifetime() const override { return dict_lifetime; }
@ -47,49 +46,49 @@ public:
switch (hierarchical_attribute->type)
{
case attribute_type::uint8:
case AttributeType::uint8:
{
const auto it = attr->uint8_map->find(id);
return it != attr->uint8_map->end() ? it->second : attr->uint8_null_value;
}
case attribute_type::uint16:
case AttributeType::uint16:
{
const auto it = attr->uint16_map->find(id);
return it != attr->uint16_map->end() ? it->second : attr->uint16_null_value;
}
case attribute_type::uint32:
case AttributeType::uint32:
{
const auto it = attr->uint32_map->find(id);
return it != attr->uint32_map->end() ? it->second : attr->uint32_null_value;
}
case attribute_type::uint64:
case AttributeType::uint64:
{
const auto it = attr->uint64_map->find(id);
return it != attr->uint64_map->end() ? it->second : attr->uint64_null_value;
}
case attribute_type::int8:
case AttributeType::int8:
{
const auto it = attr->int8_map->find(id);
return it != attr->int8_map->end() ? it->second : attr->int8_null_value;
}
case attribute_type::int16:
case AttributeType::int16:
{
const auto it = attr->int16_map->find(id);
return it != attr->int16_map->end() ? it->second : attr->int16_null_value;
}
case attribute_type::int32:
case AttributeType::int32:
{
const auto it = attr->int32_map->find(id);
return it != attr->int32_map->end() ? it->second : attr->int32_null_value;
}
case attribute_type::int64:
case AttributeType::int64:
{
const auto it = attr->int64_map->find(id);
return it != attr->int64_map->end() ? it->second : attr->int64_null_value;
}
case attribute_type::float32:
case attribute_type::float64:
case attribute_type::string:
case AttributeType::float32:
case AttributeType::float64:
case AttributeType::string:
break;
};
@ -104,7 +103,7 @@ public:
{\
const auto idx = getAttributeIndex(attribute_name);\
const auto & attribute = attributes[idx];\
if (attribute.type != attribute_type::LC_TYPE)\
if (attribute.type != AttributeType::LC_TYPE)\
throw Exception{\
"Type mismatch: attribute " + attribute_name + " has type " + toString(attribute.type),\
ErrorCodes::TYPE_MISMATCH\
@ -142,7 +141,7 @@ public:
#define DECLARE_TYPE_CHECKER(NAME, LC_NAME)\
bool is##NAME(const std::size_t attribute_idx) const override\
{\
return attributes[attribute_idx].type == attribute_type::LC_NAME;\
return attributes[attribute_idx].type == AttributeType::LC_NAME;\
}
DECLARE_TYPE_CHECKER(UInt8, uint8)
DECLARE_TYPE_CHECKER(UInt16, uint16)
@ -182,7 +181,7 @@ public:
private:
struct attribute_t
{
attribute_type type;
AttributeType type;
UInt8 uint8_null_value;
UInt16 uint16_null_value;
UInt32 uint32_null_value;
@ -226,6 +225,7 @@ private:
void loadData()
{
auto stream = source_ptr->loadAll();
stream->readPrefix();
while (const auto block = stream->read())
{
@ -240,55 +240,57 @@ private:
setAttributeValue(attribute, id_column[row_idx].get<UInt64>(), attribute_column[row_idx]);
}
}
stream->readSuffix();
}
attribute_t createAttributeWithType(const attribute_type type, const std::string & null_value)
attribute_t createAttributeWithType(const AttributeType type, const std::string & null_value)
{
attribute_t attr{type};
switch (type)
{
case attribute_type::uint8:
case AttributeType::uint8:
attr.uint8_null_value = DB::parse<UInt8>(null_value);
attr.uint8_map.reset(new HashMap<UInt64, UInt8>);
break;
case attribute_type::uint16:
case AttributeType::uint16:
attr.uint16_null_value = DB::parse<UInt16>(null_value);
attr.uint16_map.reset(new HashMap<UInt64, UInt16>);
break;
case attribute_type::uint32:
case AttributeType::uint32:
attr.uint32_null_value = DB::parse<UInt32>(null_value);
attr.uint32_map.reset(new HashMap<UInt64, UInt32>);
break;
case attribute_type::uint64:
case AttributeType::uint64:
attr.uint64_null_value = DB::parse<UInt64>(null_value);
attr.uint64_map.reset(new HashMap<UInt64, UInt64>);
break;
case attribute_type::int8:
case AttributeType::int8:
attr.int8_null_value = DB::parse<Int8>(null_value);
attr.int8_map.reset(new HashMap<UInt64, Int8>);
break;
case attribute_type::int16:
case AttributeType::int16:
attr.int16_null_value = DB::parse<Int16>(null_value);
attr.int16_map.reset(new HashMap<UInt64, Int16>);
break;
case attribute_type::int32:
case AttributeType::int32:
attr.int32_null_value = DB::parse<Int32>(null_value);
attr.int32_map.reset(new HashMap<UInt64, Int32>);
break;
case attribute_type::int64:
case AttributeType::int64:
attr.int64_null_value = DB::parse<Int64>(null_value);
attr.int64_map.reset(new HashMap<UInt64, Int64>);
break;
case attribute_type::float32:
case AttributeType::float32:
attr.float32_null_value = DB::parse<Float32>(null_value);
attr.float32_map.reset(new HashMap<UInt64, Float32>);
break;
case attribute_type::float64:
case AttributeType::float64:
attr.float64_null_value = DB::parse<Float64>(null_value);
attr.float64_map.reset(new HashMap<UInt64, Float64>);
break;
case attribute_type::string:
case AttributeType::string:
attr.string_null_value = null_value;
attr.string_arena.reset(new Arena);
attr.string_map.reset(new HashMap<UInt64, StringRef>);
@ -302,57 +304,57 @@ private:
{
switch (attribute.type)
{
case attribute_type::uint8:
case AttributeType::uint8:
{
attribute.uint8_map->insert({ id, value.get<UInt64>() });
break;
}
case attribute_type::uint16:
case AttributeType::uint16:
{
attribute.uint16_map->insert({ id, value.get<UInt64>() });
break;
}
case attribute_type::uint32:
case AttributeType::uint32:
{
attribute.uint32_map->insert({ id, value.get<UInt64>() });
break;
}
case attribute_type::uint64:
case AttributeType::uint64:
{
attribute.uint64_map->insert({ id, value.get<UInt64>() });
break;
}
case attribute_type::int8:
case AttributeType::int8:
{
attribute.int8_map->insert({ id, value.get<Int64>() });
break;
}
case attribute_type::int16:
case AttributeType::int16:
{
attribute.int16_map->insert({ id, value.get<Int64>() });
break;
}
case attribute_type::int32:
case AttributeType::int32:
{
attribute.int32_map->insert({ id, value.get<Int64>() });
break;
}
case attribute_type::int64:
case AttributeType::int64:
{
attribute.int64_map->insert({ id, value.get<Int64>() });
break;
}
case attribute_type::float32:
case AttributeType::float32:
{
attribute.float32_map->insert({ id, value.get<Float64>() });
break;
}
case attribute_type::float64:
case AttributeType::float64:
{
attribute.float64_map->insert({ id, value.get<Float64>() });
break;
}
case attribute_type::string:
case AttributeType::string:
{
const auto & string = value.get<String>();
const auto string_in_arena = attribute.string_arena->insert(string.data(), string.size());

View File

@ -18,7 +18,7 @@ class DictionaryLifetime;
class IDictionary
{
public:
using id_t = std::uint64_t;
using id_t = std::uint64_t;
virtual std::string getName() const = 0;
@ -28,7 +28,7 @@ public:
virtual void reload() {}
virtual DictionaryPtr clone() const = 0;
virtual const IDictionarySource * const getSource() const = 0;
virtual const IDictionarySource * getSource() const = 0;
virtual const DictionaryLifetime & getLifetime() const = 0;
@ -89,7 +89,7 @@ public:
virtual Float64 getFloat64Unsafe(std::size_t attribute_idx, id_t id) const = 0;
virtual StringRef getStringUnsafe(std::size_t attribute_idx, id_t id) const = 0;
virtual ~IDictionary() = default;
virtual ~IDictionary() = default;
};
}

View File

@ -9,12 +9,28 @@ namespace DB
class IDictionarySource;
using DictionarySourcePtr = std::unique_ptr<IDictionarySource>;
/** Data-provider interface for external dictionaries,
* abstracts out the data source (file, MySQL, ClickHouse, external program, network request et cetera)
* from the presentation and memory layout (the dictionary itself).
*/
class IDictionarySource
{
public:
/// returns an input stream with all the data available from this source
virtual BlockInputStreamPtr loadAll() = 0;
/** Indicates whether this source supports "random access" loading of data
* loadId and loadIds can only be used if this function returns true.
*/
virtual bool supportsSelectiveLoad() const = 0;
/// returns an input stream with the data for the requested identifier
virtual BlockInputStreamPtr loadId(const std::uint64_t id) = 0;
/// returns an input stream with the data for a collection of identifiers
virtual BlockInputStreamPtr loadIds(const std::vector<std::uint64_t> ids) = 0;
/// indicates whether the source has been modified since last load* operation
virtual bool isModified() const = 0;
virtual DictionarySourcePtr clone() const = 0;

View File

@ -12,10 +12,11 @@
namespace DB
{
class MysqlBlockInputStream final : public IProfilingBlockInputStream
/// Allows processing results of a MySQL query as a sequence of Blocks, simplifies chaining
class MySQLBlockInputStream final : public IProfilingBlockInputStream
{
public:
MysqlBlockInputStream(mysqlxx::Query query, const Block & sample_block, const std::size_t max_block_size)
MySQLBlockInputStream(mysqlxx::Query query, const Block & sample_block, const std::size_t max_block_size)
: query{std::move(query)}, result{query.use()}, sample_block{sample_block}, max_block_size{max_block_size}
{
types.reserve(sample_block.columns());
@ -24,27 +25,27 @@ public:
{
const auto type = sample_block.getByPosition(idx).type.get();
if (typeid_cast<const DataTypeUInt8 *>(type))
types.push_back(attribute_type::uint8);
types.push_back(AttributeType::uint8);
else if (typeid_cast<const DataTypeUInt16 *>(type))
types.push_back(attribute_type::uint16);
types.push_back(AttributeType::uint16);
else if (typeid_cast<const DataTypeUInt32 *>(type))
types.push_back(attribute_type::uint32);
types.push_back(AttributeType::uint32);
else if (typeid_cast<const DataTypeUInt64 *>(type))
types.push_back(attribute_type::uint64);
types.push_back(AttributeType::uint64);
else if (typeid_cast<const DataTypeInt8 *>(type))
types.push_back(attribute_type::int8);
types.push_back(AttributeType::int8);
else if (typeid_cast<const DataTypeInt16 *>(type))
types.push_back(attribute_type::int16);
types.push_back(AttributeType::int16);
else if (typeid_cast<const DataTypeInt32 *>(type))
types.push_back(attribute_type::int32);
types.push_back(AttributeType::int32);
else if (typeid_cast<const DataTypeInt64 *>(type))
types.push_back(attribute_type::int64);
types.push_back(AttributeType::int64);
else if (typeid_cast<const DataTypeFloat32 *>(type))
types.push_back(attribute_type::float32);
types.push_back(AttributeType::float32);
else if (typeid_cast<const DataTypeInt64 *>(type))
types.push_back(attribute_type::float64);
types.push_back(AttributeType::float64);
else if (typeid_cast<const DataTypeString *>(type))
types.push_back(attribute_type::string);
types.push_back(AttributeType::string);
else
throw Exception{
"Unsupported type " + type->getName(),
@ -53,11 +54,11 @@ public:
}
}
String getName() const override { return "MysqlBlockInputStream"; }
String getName() const override { return "MySQLBlockInputStream"; }
String getID() const override
{
return "Mysql(" + query.str() + ")";
return "MySQL(" + query.str() + ")";
}
private:
@ -67,7 +68,7 @@ private:
if (block.columns() != result.getNumFields())
throw Exception{
"mysqlxx::UserQueryResult contains " + toString(result.getNumFields()) + " columns while " +
"mysqlxx::UseQueryResult contains " + toString(result.getNumFields()) + " columns while " +
toString(block.columns()) + " expected",
ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH
};
@ -86,29 +87,29 @@ private:
return rows == 0 ? Block{} : block;
};
static void insertValue(ColumnPtr & column, const mysqlxx::Value & value, const attribute_type type)
static void insertValue(ColumnPtr & column, const mysqlxx::Value & value, const AttributeType type)
{
switch (type)
{
case attribute_type::uint8: column->insert(static_cast<UInt64>(value)); break;
case attribute_type::uint16: column->insert(static_cast<UInt64>(value)); break;
case attribute_type::uint32: column->insert(static_cast<UInt64>(value)); break;
case attribute_type::uint64: column->insert(static_cast<UInt64>(value)); break;
case attribute_type::int8: column->insert(static_cast<Int64>(value)); break;
case attribute_type::int16: column->insert(static_cast<Int64>(value)); break;
case attribute_type::int32: column->insert(static_cast<Int64>(value)); break;
case attribute_type::int64: column->insert(static_cast<Int64>(value)); break;
case attribute_type::float32: column->insert(static_cast<Float64>(value)); break;
case attribute_type::float64: column->insert(static_cast<Float64>(value)); break;
case attribute_type::string: column->insert(value.getString()); break;
case AttributeType::uint8: column->insert(static_cast<UInt64>(value)); break;
case AttributeType::uint16: column->insert(static_cast<UInt64>(value)); break;
case AttributeType::uint32: column->insert(static_cast<UInt64>(value)); break;
case AttributeType::uint64: column->insert(static_cast<UInt64>(value)); break;
case AttributeType::int8: column->insert(static_cast<Int64>(value)); break;
case AttributeType::int16: column->insert(static_cast<Int64>(value)); break;
case AttributeType::int32: column->insert(static_cast<Int64>(value)); break;
case AttributeType::int64: column->insert(static_cast<Int64>(value)); break;
case AttributeType::float32: column->insert(static_cast<Float64>(value)); break;
case AttributeType::float64: column->insert(static_cast<Float64>(value)); break;
case AttributeType::string: column->insert(value.getString()); break;
}
}
mysqlxx::Query query;
mysqlxx::UseQueryResult result;
Block sample_block;
std::size_t max_block_size;
std::vector<attribute_type> types;
const std::size_t max_block_size;
std::vector<AttributeType> types;
};
}

View File

@ -1,39 +1,42 @@
#pragma once
#include <DB/Dictionaries/IDictionarySource.h>
#include <DB/Dictionaries/MysqlBlockInputStream.h>
#include <DB/Interpreters/Context.h>
#include <DB/Dictionaries/MySQLBlockInputStream.h>
#include <statdaemons/ext/range.hpp>
#include <mysqlxx/Pool.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <strconvert/escape.h>
namespace DB
{
class MysqlDictionarySource final : public IDictionarySource
/// Allows loading dictionaries from a MySQL database
class MySQLDictionarySource final : public IDictionarySource
{
static const auto max_block_size = 8192;
public:
MysqlDictionarySource(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix,
Block & sample_block, const Context & context)
MySQLDictionarySource(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix,
Block & sample_block)
: table{config.getString(config_prefix + ".table")},
sample_block{sample_block}, context(context),
sample_block{sample_block},
pool{config, config_prefix},
load_all_query{composeLoadAllQuery(sample_block, table)},
last_modification{getLastModification()}
{}
MysqlDictionarySource(const MysqlDictionarySource & other)
/// copy-constructor is provided in order to support cloneability
MySQLDictionarySource(const MySQLDictionarySource & other)
: table{other.table},
sample_block{other.sample_block}, context(other.context),
sample_block{other.sample_block},
pool{other.pool},
load_all_query{other.load_all_query}, last_modification{other.last_modification}
{}
BlockInputStreamPtr loadAll() override
{
return new MysqlBlockInputStream{pool.Get()->query(load_all_query), sample_block, max_block_size};
last_modification = getLastModification();
return new MySQLBlockInputStream{pool.Get()->query(load_all_query), sample_block, max_block_size};
}
BlockInputStreamPtr loadId(const std::uint64_t id) override
@ -53,8 +56,9 @@ public:
}
bool isModified() const override { return getLastModification() > last_modification; }
bool supportsSelectiveLoad() const override { return true; }
DictionarySourcePtr clone() const override { return ext::make_unique<MysqlDictionarySource>(*this); }
DictionarySourcePtr clone() const override { return std::make_unique<MySQLDictionarySource>(*this); }
private:
mysqlxx::DateTime getLastModification() const
@ -64,20 +68,23 @@ private:
try
{
auto connection = pool.Get();
auto query = connection->query("SHOW TABLE STATUS LIKE '%" + table + "%';");
auto query = connection->query("SHOW TABLE STATUS LIKE '%" + strconvert::escaped_for_like(table) + "%';");
auto result = query.use();
auto row = result.fetch();
const auto & update_time = row[Update_time_idx];
return !update_time.isNull() ? update_time.getDateTime() : mysqlxx::DateTime{std::time(nullptr)};
if (!update_time.isNull())
return update_time.getDateTime();
}
catch (...)
{
tryLogCurrentException("MysqlDictionarySource");
tryLogCurrentException("MySQLDictionarySource");
}
return {};
/// we suppose failure to get modification time is not an error, therefore return current time
return mysqlxx::DateTime{std::time(nullptr)};
}
/// @todo escape table and column names
static std::string composeLoadAllQuery(const Block & block, const std::string & table)
{
std::string query{"SELECT "};
@ -99,7 +106,6 @@ private:
const std::string table;
Block sample_block;
const Context & context;
mutable mysqlxx::PoolWithFailover pool;
const std::string load_all_query;
mysqlxx::DateTime last_modification;

View File

@ -7,6 +7,9 @@
namespace DB
{
/** Provides reading from a Buffer, taking exclusive ownership over it's lifetime,
* simplifies usage of ReadBufferFromFile (no need to manage buffer lifetime) etc.
*/
class OwningBufferBlockInputStream : public IProfilingBlockInputStream
{
public:
@ -21,9 +24,7 @@ private:
String getName() const override { return "OwningBufferBlockInputStream"; }
String getID() const override {
return "OwningBuffer(" + stream->getID() + ")";
}
String getID() const override { return "OwningBuffer(" + stream->getID() + ")"; }
BlockInputStreamPtr stream;
std::unique_ptr<ReadBuffer> buffer;

View File

@ -1,15 +0,0 @@
#pragma once
#include <memory>
namespace DB
{
template <typename T> struct release
{
void operator()(const T * const ptr) { ptr->release(); }
};
template <typename T> using config_ptr_t = std::unique_ptr<T, release<T>>;
}

View File

@ -14,6 +14,7 @@
#include <statdaemons/ext/range.hpp>
#include <DB/Dictionaries/FlatDictionary.h>
#include <DB/Dictionaries/HashedDictionary.h>
#include <DB/Dictionaries/CacheDictionary.h>
namespace DB
@ -35,6 +36,19 @@ namespace DB
*
* Получить массив идентификаторов регионов, состоящий из исходного и цепочки родителей. Порядок implementation defined.
* regionHierarchy, OSHierarchy, SEHierarchy.
*
* Функции, использующие подключаемые (внешние) словари.
*
* Получить значение аттрибута заданного типа.
* dictGetType(dictionary, attribute, id),
* Type - placeholder для имени типа, в данный момент поддерживаются любые числовые и строковой типы.
* Тип должен соответствовать реальному типу аттрибута, с которым он был объявлен в структуре словаря.
*
* Получить массив идентификаторов, состоящий из исходного и цепочки родителей.
* dictGetHierarchy(dictionary, id).
*
* Является ли первы йидентификатор потомком второго.
* dictIsIn(dictionary, child_id, parent_id).
*/
@ -726,10 +740,10 @@ public:
static IFunction * create(const Context & context)
{
return new FunctionDictGetString{context.getDictionaries()};
return new FunctionDictGetString{context.getExternalDictionaries()};
};
FunctionDictGetString(const Dictionaries & dictionaries) : dictionaries(dictionaries) {}
FunctionDictGetString(const ExternalDictionaries & dictionaries) : dictionaries(dictionaries) {}
String getName() const override { return name; }
@ -746,7 +760,8 @@ private:
if (!typeid_cast<const DataTypeString *>(arguments[0].get()))
{
throw Exception{
"Illegal type " + arguments[0]->getName() + " of argument of function " + getName(),
"Illegal type " + arguments[0]->getName() + " of first argument of function " + getName()
+ ", expected a string.",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT
};
}
@ -754,7 +769,8 @@ private:
if (!typeid_cast<const DataTypeString *>(arguments[1].get()))
{
throw Exception{
"Illegal type " + arguments[1]->getName() + " of argument of function " + getName(),
"Illegal type " + arguments[1]->getName() + " of second argument of function " + getName()
+ ", expected a string.",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT
};
}
@ -770,7 +786,8 @@ private:
!typeid_cast<const DataTypeInt64 *>(id_arg))
{
throw Exception{
"Illegal type " + arguments[2]->getName() + " of argument of function " + getName(),
"Illegal type " + arguments[2]->getName() + " of third argument of function " + getName()
+ ", expected an integer.",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT
};
}
@ -787,11 +804,12 @@ private:
ErrorCodes::ILLEGAL_COLUMN
};
auto dict = dictionaries.getExternalDictionary(dict_name_col->getData());
auto dict = dictionaries.getDictionary(dict_name_col->getData());
const auto dict_ptr = dict.get();
if (!executeDispatch<FlatDictionary>(block, arguments, result, dict_ptr) &&
!executeDispatch<HashedDictionary>(block, arguments, result, dict_ptr))
!executeDispatch<HashedDictionary>(block, arguments, result, dict_ptr) &&
!executeDispatch<CacheDictionary>(block, arguments, result, dict_ptr))
throw Exception{
"Unsupported dictionary type " + dict_ptr->getTypeName(),
ErrorCodes::UNKNOWN_TYPE
@ -871,7 +889,7 @@ private:
return false;
}
const Dictionaries & dictionaries;
const ExternalDictionaries & dictionaries;
};
@ -911,10 +929,10 @@ public:
static IFunction * create(const Context & context)
{
return new FunctionDictGet{context.getDictionaries()};
return new FunctionDictGet{context.getExternalDictionaries()};
};
FunctionDictGet(const Dictionaries & dictionaries) : dictionaries(dictionaries) {}
FunctionDictGet(const ExternalDictionaries & dictionaries) : dictionaries(dictionaries) {}
String getName() const override { return name; }
@ -931,7 +949,8 @@ private:
if (!typeid_cast<const DataTypeString *>(arguments[0].get()))
{
throw Exception{
"Illegal type " + arguments[0]->getName() + " of argument of function " + getName(),
"Illegal type " + arguments[0]->getName() + " of first argument of function " + getName()
+ ", expected a string.",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT
};
}
@ -939,7 +958,8 @@ private:
if (!typeid_cast<const DataTypeString *>(arguments[1].get()))
{
throw Exception{
"Illegal type " + arguments[1]->getName() + " of argument of function " + getName(),
"Illegal type " + arguments[1]->getName() + " of second argument of function " + getName()
+ ", expected a string.",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT
};
}
@ -955,7 +975,8 @@ private:
!typeid_cast<const DataTypeInt64 *>(id_arg))
{
throw Exception{
"Illegal type " + id_arg->getName() + " of argument of function " + getName(),
"Illegal type " + id_arg->getName() + " of third argument of function " + getName()
+ ", expected an integer.",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT
};
}
@ -972,11 +993,12 @@ private:
ErrorCodes::ILLEGAL_COLUMN
};
auto dict = dictionaries.getExternalDictionary(dict_name_col->getData());
auto dict = dictionaries.getDictionary(dict_name_col->getData());
const auto dict_ptr = dict.get();
if (!executeDispatch<FlatDictionary>(block, arguments, result, dict_ptr) &&
!executeDispatch<HashedDictionary>(block, arguments, result, dict_ptr))
!executeDispatch<HashedDictionary>(block, arguments, result, dict_ptr) &&
!executeDispatch<CacheDictionary>(block, arguments, result, dict_ptr))
throw Exception{
"Unsupported dictionary type " + dict_ptr->getTypeName(),
ErrorCodes::UNKNOWN_TYPE
@ -1058,7 +1080,7 @@ private:
return false;
}
const Dictionaries & dictionaries;
const ExternalDictionaries & dictionaries;
};
template <typename DataType>
@ -1084,10 +1106,10 @@ public:
static IFunction * create(const Context & context)
{
return new FunctionDictGetHierarchy{context.getDictionaries()};
return new FunctionDictGetHierarchy{context.getExternalDictionaries()};
};
FunctionDictGetHierarchy(const Dictionaries & dictionaries) : dictionaries(dictionaries) {}
FunctionDictGetHierarchy(const ExternalDictionaries & dictionaries) : dictionaries(dictionaries) {}
String getName() const override { return name; }
@ -1104,7 +1126,8 @@ private:
if (!typeid_cast<const DataTypeString *>(arguments[0].get()))
{
throw Exception{
"Illegal type " + arguments[0]->getName() + " of argument of function " + getName(),
"Illegal type " + arguments[0]->getName() + " of first argument of function " + getName()
+ ", expected a string.",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT
};
}
@ -1120,7 +1143,8 @@ private:
!typeid_cast<const DataTypeInt64 *>(id_arg))
{
throw Exception{
"Illegal type " + id_arg->getName() + " of argument of function " + getName(),
"Illegal type " + id_arg->getName() + " of second argument of function " + getName()
+ ", expected an integer.",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT
};
}
@ -1137,7 +1161,7 @@ private:
ErrorCodes::ILLEGAL_COLUMN
};
auto dict = dictionaries.getExternalDictionary(dict_name_col->getData());
auto dict = dictionaries.getDictionary(dict_name_col->getData());
const auto dict_ptr = dict.get();
if (!dict->hasHierarchy())
@ -1147,7 +1171,8 @@ private:
};
if (!executeDispatch<FlatDictionary>(block, arguments, result, dict_ptr) &&
!executeDispatch<HashedDictionary>(block, arguments, result, dict_ptr))
!executeDispatch<HashedDictionary>(block, arguments, result, dict_ptr) &&
!executeDispatch<CacheDictionary>(block, arguments, result, dict_ptr))
throw Exception{
"Unsupported dictionary type " + dict_ptr->getTypeName(),
ErrorCodes::UNKNOWN_TYPE
@ -1234,7 +1259,7 @@ private:
return false;
}
const Dictionaries & dictionaries;
const ExternalDictionaries & dictionaries;
};
@ -1245,10 +1270,10 @@ public:
static IFunction * create(const Context & context)
{
return new FunctionDictIsIn{context.getDictionaries()};
return new FunctionDictIsIn{context.getExternalDictionaries()};
};
FunctionDictIsIn(const Dictionaries & dictionaries) : dictionaries(dictionaries) {}
FunctionDictIsIn(const ExternalDictionaries & dictionaries) : dictionaries(dictionaries) {}
String getName() const override { return name; }
@ -1265,7 +1290,8 @@ private:
if (!typeid_cast<const DataTypeString *>(arguments[0].get()))
{
throw Exception{
"Illegal type " + arguments[0]->getName() + " of argument of function " + getName(),
"Illegal type " + arguments[0]->getName() + " of first argument of function " + getName()
+ ", expected a string.",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT
};
}
@ -1281,7 +1307,8 @@ private:
!typeid_cast<const DataTypeInt64 *>(child_id_arg))
{
throw Exception{
"Illegal type " + child_id_arg->getName() + " of argument of function " + getName(),
"Illegal type " + child_id_arg->getName() + " of second argument of function " + getName()
+ ", expected an integer.",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT
};
}
@ -1297,7 +1324,8 @@ private:
!typeid_cast<const DataTypeInt64 *>(ancestor_id_arg))
{
throw Exception{
"Illegal type " + ancestor_id_arg->getName() + " of argument of function " + getName(),
"Illegal type " + ancestor_id_arg->getName() + " of argument of third function " + getName()
+ ", expected an integer.",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT
};
}
@ -1314,7 +1342,7 @@ private:
ErrorCodes::ILLEGAL_COLUMN
};
auto dict = dictionaries.getExternalDictionary(dict_name_col->getData());
auto dict = dictionaries.getDictionary(dict_name_col->getData());
const auto dict_ptr = dict.get();
if (!dict->hasHierarchy())
@ -1324,7 +1352,8 @@ private:
};
if (!executeDispatch<FlatDictionary>(block, arguments, result, dict_ptr) &&
!executeDispatch<FlatDictionary>(block, arguments, result, dict_ptr))
!executeDispatch<HashedDictionary>(block, arguments, result, dict_ptr) &&
!executeDispatch<CacheDictionary>(block, arguments, result, dict_ptr))
throw Exception{
"Unsupported dictionary type " + dict_ptr->getTypeName(),
ErrorCodes::UNKNOWN_TYPE
@ -1470,7 +1499,7 @@ private:
return false;
}
const Dictionaries & dictionaries;
const ExternalDictionaries & dictionaries;
};

View File

@ -267,7 +267,7 @@ public:
throw Exception("Illegal column " + col->getName() + " of first argument of function " + getName() + ". Must be constant string.",
ErrorCodes::ILLEGAL_COLUMN);
re = Regexps::get(col->getData());
re = Regexps::get<false, false>(col->getData());
capture = re->getNumberOfSubpatterns() > 0 ? 1 : 0;
matches.resize(capture + 1);

View File

@ -3,7 +3,7 @@
#include <Poco/Mutex.h>
#include <statdaemons/OptimizedRegularExpression.h>
#include <statdaemons/ext/memory.hpp>
#include <memory>
#include <DB/DataTypes/DataTypesNumberFixed.h>
#include <DB/DataTypes/DataTypeString.h>
@ -299,24 +299,24 @@ namespace Regexps
}
template <bool like>
inline Regexp createRegexp(const std::string & pattern) { return pattern; }
inline Regexp createRegexp(const std::string & pattern, int flags) { return {pattern, flags}; }
template <>
inline Regexp createRegexp<true>(const std::string & pattern) { return likePatternToRegexp(pattern); }
inline Regexp createRegexp<true>(const std::string & pattern, int flags) { return {likePatternToRegexp(pattern), flags}; }
template <bool like = false>
template <bool like, bool no_capture>
inline Pointer get(const std::string & pattern)
{
/// C++11 has thread-safe function-local statics on most modern compilers.
static KnownRegexps known_regexps;
static KnownRegexps known_regexps; /// Разные переменные для разных параметров шаблона.
static std::mutex mutex;
std::lock_guard<std::mutex> lock{mutex};
auto it = known_regexps.find(pattern);
if (known_regexps.end() == it)
it = known_regexps.emplace(pattern, ext::make_unique<Holder>()).first;
it = known_regexps.emplace(pattern, std::make_unique<Holder>()).first;
return it->second->get([&pattern] {
return new Regexp{createRegexp<like>(pattern)};
return new Regexp{createRegexp<like>(pattern, no_capture ? OptimizedRegularExpression::RE_NO_CAPTURE : 0)};
});
}
}
@ -373,7 +373,7 @@ struct MatchImpl
}
else
{
const auto & regexp = Regexps::get<like>(pattern);
const auto & regexp = Regexps::get<like, true>(pattern);
size_t size = offsets.size();
for (size_t i = 0; i < size; ++i)
res[i] = revert ^ regexp->match(reinterpret_cast<const char *>(&data[i != 0 ? offsets[i - 1] : 0]), (i != 0 ? offsets[i] - offsets[i - 1] : offsets[0]) - 1);
@ -382,7 +382,7 @@ struct MatchImpl
static void constant(const std::string & data, const std::string & pattern, UInt8 & res)
{
const auto & regexp = Regexps::get<like>(pattern);
const auto & regexp = Regexps::get<like, true>(pattern);
res = revert ^ regexp->match(data);
}
};
@ -397,7 +397,7 @@ struct ExtractImpl
res_data.reserve(data.size() / 5);
res_offsets.resize(offsets.size());
const auto & regexp = Regexps::get(pattern);
const auto & regexp = Regexps::get<false, false>(pattern);
unsigned capture = regexp->getNumberOfSubpatterns() > 0 ? 1 : 0;
OptimizedRegularExpression::MatchVec matches;

View File

@ -79,6 +79,12 @@ public:
return bytes + offset();
}
/** Проверить, есть ли данные в буфере. */
bool hasPendingData() const
{
return pos != working_buffer.end();
}
protected:
/// Ссылка на кусок памяти для буфера.
Buffer internal_buffer;

View File

@ -14,23 +14,23 @@ class ConcatReadBuffer : public ReadBuffer
{
public:
typedef std::vector<ReadBuffer *> ReadBuffers;
protected:
ReadBuffers buffers;
ReadBuffers::iterator current;
bool nextImpl()
{
if (buffers.end() == current)
return false;
/// Первое чтение
if (working_buffer.size() == 0 && (*current)->position() != (*current)->buffer().end())
if (working_buffer.size() == 0 && (*current)->hasPendingData())
{
working_buffer = Buffer((*current)->position(), (*current)->buffer().end());
return true;
}
if (!(*current)->next())
{
++current;
@ -45,7 +45,7 @@ protected:
return false;
}
}
working_buffer = Buffer((*current)->position(), (*current)->buffer().end());
return true;
}

View File

@ -40,13 +40,13 @@ public:
/** прочитать следующие данные и заполнить ими буфер; переместить позицию в начало;
* вернуть false в случае конца, true иначе; кинуть исключение, если что-то не так
*/
inline bool next()
bool next()
{
bytes += offset();
bool res = nextImpl();
if (!res)
working_buffer.resize(0);
pos = working_buffer.begin();
return res;
}
@ -54,7 +54,7 @@ public:
inline void nextIfAtEnd()
{
if (pos == working_buffer.end())
if (!hasPendingData())
next();
}
@ -68,9 +68,9 @@ public:
*
* При попытке чтения после конца, следует кидать исключение.
*/
inline bool eof()
bool eof()
{
return pos == working_buffer.end() && !next();
return !hasPendingData() && !next();
}
void ignore()
@ -143,12 +143,6 @@ public:
return read(to, n);
}
/** Проверить, есть ли данные в буфере для чтения. */
bool hasPendingData() const
{
return offset() != working_buffer.size();
}
private:
/** Прочитать следующие данные и заполнить ими буфер.
* Вернуть false в случае конца, true иначе.

View File

@ -80,7 +80,7 @@ public:
if (new_pos + (working_buffer.end() - pos) == pos_in_file)
return new_pos;
if (pos != working_buffer.end() && new_pos <= pos_in_file && new_pos >= pos_in_file - static_cast<off_t>(working_buffer.size()))
if (hasPendingData() && new_pos <= pos_in_file && new_pos >= pos_in_file - static_cast<off_t>(working_buffer.size()))
{
/// Остались в пределах буфера.
pos = working_buffer.begin() + (new_pos - (pos_in_file - working_buffer.size()));

View File

@ -44,7 +44,7 @@ public:
pos = working_buffer.begin();
throw;
}
pos = working_buffer.begin();
}
@ -56,11 +56,11 @@ public:
inline void nextIfAtEnd()
{
if (pos == working_buffer.end())
if (!hasPendingData())
next();
}
void write(const char * from, size_t n)
{
size_t bytes_copied = 0;
@ -82,7 +82,7 @@ public:
*pos = x;
++pos;
}
private:
/** Записать данные, находящиеся в буфере (от начала буфера до текущей позиции).
* Кинуть исключение, если что-то не так.

View File

@ -16,7 +16,6 @@
#include <DB/Storages/IStorage.h>
#include <DB/AggregateFunctions/AggregateFunctionFactory.h>
#include <DB/DataTypes/DataTypeFactory.h>
#include <DB/Storages/StorageFactory.h>
#include <DB/Storages/MergeTree/BackgroundProcessingPool.h>
#include <DB/Storages/MergeTree/MergeList.h>
#include <DB/TableFunctions/TableFunctionFactory.h>
@ -24,6 +23,7 @@
#include <DB/Interpreters/Users.h>
#include <DB/Interpreters/Quota.h>
#include <DB/Interpreters/Dictionaries.h>
#include <DB/Interpreters/ExternalDictionaries.h>
#include <DB/Interpreters/ProcessList.h>
#include <DB/Interpreters/Cluster.h>
#include <DB/Interpreters/InterserverIOHandler.h>
@ -86,9 +86,9 @@ struct ContextShared
TableFunctionFactory table_function_factory; /// Табличные функции.
AggregateFunctionFactory aggregate_function_factory; /// Агрегатные функции.
DataTypeFactory data_type_factory; /// Типы данных.
StorageFactory storage_factory; /// Движки таблиц.
FormatFactory format_factory; /// Форматы.
mutable SharedPtr<Dictionaries> dictionaries; /// Словари Метрики. Инициализируются лениво.
mutable SharedPtr<ExternalDictionaries> external_dictionaries;
Users users; /// Известные пользователи.
Quotas quotas; /// Известные квоты на использование ресурсов.
mutable UncompressedCachePtr uncompressed_cache; /// Кэш разжатых блоков.
@ -259,9 +259,9 @@ public:
const TableFunctionFactory & getTableFunctionFactory() const { return shared->table_function_factory; }
const AggregateFunctionFactory & getAggregateFunctionFactory() const { return shared->aggregate_function_factory; }
const DataTypeFactory & getDataTypeFactory() const { return shared->data_type_factory; }
const StorageFactory & getStorageFactory() const { return shared->storage_factory; }
const FormatFactory & getFormatFactory() const { return shared->format_factory; }
const Dictionaries & getDictionaries() const;
const ExternalDictionaries & getExternalDictionaries() const;
InterserverIOHandler & getInterserverIOHandler() { return shared->interserver_io_handler; }

View File

@ -1,13 +1,5 @@
#pragma once
#include <mutex>
#include <thread>
#include <unordered_map>
#include <chrono>
#include <random>
#include <Poco/SharedPtr.h>
#include <Yandex/MultiVersion.h>
#include <Yandex/logger_useful.h>
#include <statdaemons/RegionsHierarchies.h>
@ -18,10 +10,7 @@
namespace DB
{
using Poco::SharedPtr;
class Context;
class IDictionary;
/// Словари Метрики, которые могут использоваться в функциях.
@ -31,22 +20,15 @@ private:
MultiVersion<RegionsHierarchies> regions_hierarchies;
MultiVersion<TechDataHierarchy> tech_data_hierarchy;
MultiVersion<RegionsNames> regions_names;
mutable std::mutex external_dictionaries_mutex;
std::unordered_map<std::string, std::shared_ptr<MultiVersion<IDictionary>>> external_dictionaries;
std::unordered_map<std::string, std::chrono::system_clock::time_point> update_times;
std::mt19937 rnd_engine;
Context & context;
/// Периодичность обновления справочников, в секундах.
int reload_period;
std::thread reloading_thread;
std::thread reloading_externals_thread;
Poco::Event destroy{false};
Poco::Event destroy;
Logger * log;
Poco::Timestamp dictionaries_last_modified{0};
void handleException() const
@ -122,7 +104,6 @@ private:
}
void reloadExternals();
/// Обновляет каждые reload_period секунд.
void reloadPeriodically()
@ -136,35 +117,19 @@ private:
}
}
void reloadExternalsPeriodically()
{
const auto check_period = 5 * 1000;
while (true)
{
if (destroy.tryWait(check_period))
return;
reloadExternals();
}
}
public:
/// Справочники будут обновляться в отдельном потоке, каждые reload_period секунд.
Dictionaries(Context & context, int reload_period_ = 3600)
: context(context), reload_period(reload_period_),
log(&Logger::get("Dictionaries"))
Dictionaries(int reload_period_ = 3600)
: reload_period(reload_period_), log(&Logger::get("Dictionaries"))
{
reloadImpl();
reloadExternals();
reloading_thread = std::thread([this] { reloadPeriodically(); });
reloading_externals_thread = std::thread{&Dictionaries::reloadExternalsPeriodically, this};
}
~Dictionaries()
{
destroy.set();
reloading_thread.join();
reloading_externals_thread.join();
}
MultiVersion<RegionsHierarchies>::Version getRegionsHierarchies() const
@ -181,19 +146,6 @@ public:
{
return regions_names.get();
}
MultiVersion<IDictionary>::Version getExternalDictionary(const std::string & name) const
{
const std::lock_guard<std::mutex> lock{external_dictionaries_mutex};
const auto it = external_dictionaries.find(name);
if (it == std::end(external_dictionaries))
throw Exception{
"No such dictionary: " + name,
ErrorCodes::BAD_ARGUMENTS
};
return it->second->get();
}
};
}

View File

@ -0,0 +1,122 @@
#pragma once
#include <DB/Core/Exception.h>
#include <DB/Core/ErrorCodes.h>
#include <Yandex/MultiVersion.h>
#include <Yandex/logger_useful.h>
#include <Poco/Event.h>
#include <time.h>
#include <mutex>
#include <thread>
#include <unordered_map>
#include <chrono>
#include <random>
#include <unistd.h>
namespace DB
{
class Context;
class IDictionary;
/** Manages user-defined dictionaries.
* Monitors configuration file and automatically reloads dictionaries in a separate thread.
* The monitoring thread wakes up every @check_period_sec seconds and checks
* modification time of dictionaries' configuration file. If said time is greater than
* @config_last_modified, the dictionaries are created from scratch using configuration file,
* possibly overriding currently existing dictionaries with the same name (previous versions of
* overridden dictionaries will live as long as there are any users retaining them).
*
* Apart from checking configuration file for modifications, each non-cached dictionary
* has a lifetime of its own and may be updated if it's source reports that it has been
* modified. The time of next update is calculated by choosing uniformly a random number
* distributed between lifetime.min_sec and lifetime.max_sec.
* If either of lifetime.min_sec and lifetime.max_sec is zero, such dictionary is never updated.
*/
class ExternalDictionaries
{
private:
static const auto check_period_sec = 5;
mutable std::mutex dictionaries_mutex;
std::unordered_map<std::string, std::shared_ptr<MultiVersion<IDictionary>>> dictionaries;
std::unordered_map<std::string, std::chrono::system_clock::time_point> update_times;
std::mt19937_64 rnd_engine{getSeed()};
Context & context;
std::thread reloading_thread;
Poco::Event destroy;
Logger * log;
Poco::Timestamp config_last_modified{0};
void handleException() const
{
try
{
throw;
}
catch (const Poco::Exception & e)
{
LOG_ERROR(log, "Cannot load exter dictionary! You must resolve this manually. " << e.displayText());
return;
}
catch (...)
{
LOG_ERROR(log, "Cannot load dictionary! You must resolve this manually.");
return;
}
}
void reloadImpl();
void reloadPeriodically()
{
while (true)
{
if (destroy.tryWait(check_period_sec * 1000))
return;
reloadImpl();
}
}
static std::uint64_t getSeed()
{
timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
return ts.tv_nsec ^ getpid();
}
public:
/// Справочники будут обновляться в отдельном потоке, каждые reload_period секунд.
ExternalDictionaries(Context & context)
: context(context), log(&Logger::get("ExternalDictionaries"))
{
reloadImpl();
reloading_thread = std::thread{&ExternalDictionaries::reloadPeriodically, this};
}
~ExternalDictionaries()
{
destroy.set();
reloading_thread.join();
}
MultiVersion<IDictionary>::Version getDictionary(const std::string & name) const
{
const std::lock_guard<std::mutex> lock{dictionaries_mutex};
const auto it = dictionaries.find(name);
if (it == std::end(dictionaries))
throw Exception{
"No such dictionary: " + name,
ErrorCodes::BAD_ARGUMENTS
};
return it->second->get();
}
};
}

View File

@ -87,6 +87,9 @@ struct Limits
\
/** Максимальное использование памяти при обработке запроса. 0 - не ограничено. */ \
M(SettingUInt64, max_memory_usage, 0) \
\
/** Максимальная скорость обмена данными по сети в байтах в секунду. 0 - не ограничена. */ \
M(SettingUInt64, max_network_bandwidth, 0) \
#define DECLARE(TYPE, NAME, DEFAULT) \
TYPE NAME {DEFAULT};

View File

@ -16,7 +16,7 @@ inline void evaluateMissingDefaults(Block & block,
if (column_defaults.empty())
return;
ASTPtr default_expr_list{ext::make_unique<ASTExpressionList>().release()};
ASTPtr default_expr_list{std::make_unique<ASTExpressionList>().release()};
for (const auto & column : required_columns)
{

View File

@ -10,7 +10,7 @@
#include <DB/Interpreters/InterpreterInsertQuery.h>
#include <statdaemons/Increment.h>
#include <statdaemons/ext/memory.hpp>
#include <memory>
#include <Yandex/Revision.h>
#include <iostream>

View File

@ -14,7 +14,7 @@
#include <DB/Storages/AlterCommands.h>
#include <Poco/File.h>
#include <Poco/RWLock.h>
#include <statdaemons/ext/memory.hpp>
#include <memory>
namespace DB
@ -129,7 +129,7 @@ public:
*/
TableDataWriteLockPtr lockDataForAlter()
{
auto res = ext::make_unique<Poco::ScopedWriteRWLock>(data_lock);
auto res = std::make_unique<Poco::ScopedWriteRWLock>(data_lock);
if (is_dropped)
throw Exception("Table is dropped", ErrorCodes::TABLE_IS_DROPPED);
return res;
@ -137,7 +137,7 @@ public:
TableStructureWriteLockPtr lockStructureForAlter()
{
auto res = ext::make_unique<Poco::ScopedWriteRWLock>(structure_lock);
auto res = std::make_unique<Poco::ScopedWriteRWLock>(structure_lock);
if (is_dropped)
throw Exception("Table is dropped", ErrorCodes::TABLE_IS_DROPPED);
return res;

View File

@ -1,7 +1,7 @@
#pragma once
#include <statdaemons/Stopwatch.h>
#include <statdaemons/ext/memory.hpp>
#include <memory>
#include <list>
#include <mutex>
#include <atomic>
@ -67,7 +67,7 @@ public:
EntryPtr insert(Args &&... args)
{
std::lock_guard<std::mutex> lock{mutex};
return ext::make_unique<Entry>(*this, merges.emplace(merges.end(), std::forward<Args>(args)...));
return std::make_unique<Entry>(*this, merges.emplace(merges.end(), std::forward<Args>(args)...));
}
container_t get() const

View File

@ -9,7 +9,7 @@
#include <DB/Parsers/ASTSubquery.h>
#include <DB/Parsers/formatAST.h>
#include <DB/Common/escapeForFileName.h>
#include <statdaemons/ext/memory.hpp>
#include <memory>
#include <unordered_map>
#include <map>
#include <limits>

View File

@ -22,8 +22,6 @@ public:
NameAndTypePair getColumn(const String & column_name) const override { return getSource().getColumn(column_name); };
bool hasColumn(const String & column_name) const override { return getSource().hasColumn(column_name); };
bool supportsParallelReplicas() const override { return true; }
BlockInputStreams read(
const Names & column_names,
ASTPtr query,

View File

@ -1,6 +1,7 @@
#pragma once
#include <DB/Storages/IStorage.h>
#include <Yandex/singleton.h>
namespace DB
@ -11,7 +12,7 @@ class Context;
/** Позволяет создать таблицу по имени движка.
*/
class StorageFactory
class StorageFactory : public Singleton<StorageFactory>
{
public:
StoragePtr get(

View File

@ -54,6 +54,7 @@ public:
bool supportsSampling() const override { return data.supportsSampling(); }
bool supportsFinal() const override { return data.supportsFinal(); }
bool supportsPrewhere() const override { return data.supportsPrewhere(); }
bool supportsParallelReplicas() const override { return true; }
const NamesAndTypesList & getColumnsListImpl() const override { return data.getColumnsListNonMaterialized(); }

View File

@ -103,8 +103,6 @@ public:
/// Добавить кусок в очередь кусков, чьи данные нужно проверить в фоновом потоке.
void enqueuePartForCheck(const String & name);
void skipUnreplicated() { process_unreplicated = false; }
MergeTreeData & getData() { return data; }
MergeTreeData * getUnreplicatedData() { return unreplicated_data.get(); }
@ -166,8 +164,6 @@ private:
current_zookeeper = zookeeper;
}
bool process_unreplicated = true;
/// Если true, таблица в офлайновом режиме, и в нее нельзя писать.
bool is_readonly = false;

View File

@ -92,9 +92,8 @@ private:
/// Отправляем на первый попавшийся шард
BlockInputStreamPtr input{
new RemoteBlockInputStream{
cluster.pools.front().get(), query, &settings,
Tables(), QueryProcessingStage::Complete, context
}
cluster.pools.front().get(), query, &settings, nullptr,
Tables(), QueryProcessingStage::Complete, context}
};
input->readPrefix();

View File

@ -165,7 +165,7 @@ void Connection::forceConnected()
bool Connection::ping()
{
LOG_TRACE(log_wrapper.get(), "Ping (" << getServerAddress() << ")");
// LOG_TRACE(log_wrapper.get(), "Ping (" << getServerAddress() << ")");
try
{
@ -277,15 +277,22 @@ void Connection::sendData(const Block & block, const String & name)
if (server_revision >= DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES)
writeStringBinary(name, *out);
size_t prev_bytes = out->count();
block.checkNestedArraysOffsets();
block_out->write(block);
maybe_compressed_out->next();
out->next();
if (throttler)
throttler->add(out->count() - prev_bytes);
}
void Connection::sendPreparedData(ReadBuffer & input, size_t size, const String & name)
{
/// NOTE В этом методе не используется throttler (хотя можно использовать, но это пока не важно).
writeVarUInt(Protocol::Client::Data, *out);
if (server_revision >= DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES)
@ -436,8 +443,15 @@ Block Connection::receiveData()
if (server_revision >= DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES)
readStringBinary(external_table_name, *in);
size_t prev_bytes = in->count();
/// Прочитать из сети один блок
return block_in->read();
Block res = block_in->read();
if (throttler)
throttler->add(in->count() - prev_bytes);
return res;
}

View File

@ -0,0 +1,275 @@
#include <DB/Client/ParallelReplicas.h>
namespace DB
{
ParallelReplicas::ParallelReplicas(Connection * connection_, Settings * settings_, ThrottlerPtr throttler_)
: settings(settings_), throttler(throttler_),
active_replica_count(1),
supports_parallel_execution(false)
{
registerReplica(connection_);
}
ParallelReplicas::ParallelReplicas(IConnectionPool * pool_, Settings * settings_, ThrottlerPtr throttler_)
: settings(settings_), throttler(throttler_)
{
if (pool_ == nullptr)
throw Exception("Null pool specified", ErrorCodes::LOGICAL_ERROR);
bool has_many_replicas = (settings != nullptr) && (settings->max_parallel_replicas > 1);
if (has_many_replicas)
{
pool_entries = pool_->getMany(settings);
active_replica_count = pool_entries.size();
supports_parallel_execution = (active_replica_count > 1);
if (active_replica_count == 0)
throw Exception("No connection available", ErrorCodes::LOGICAL_ERROR);
replica_map.reserve(active_replica_count);
for (auto & entry : pool_entries)
registerReplica(&*entry);
}
else
{
active_replica_count = 1;
supports_parallel_execution = false;
pool_entry = pool_->get(settings);
registerReplica(&*pool_entry);
}
}
void ParallelReplicas::sendExternalTablesData(std::vector<ExternalTablesData> & data)
{
if (!sent_query)
throw Exception("Cannot send external tables data: query not yet sent.", ErrorCodes::LOGICAL_ERROR);
if (data.size() < active_replica_count)
throw Exception("Mismatch between replicas and data sources", ErrorCodes::MISMATCH_REPLICAS_DATA_SOURCES);
auto it = data.begin();
for (auto & e : replica_map)
{
Connection * connection = e.second;
if (connection != nullptr)
connection->sendExternalTablesData(*it);
++it;
}
}
void ParallelReplicas::sendQuery(const String & query, const String & query_id, UInt64 stage, bool with_pending_data)
{
if (sent_query)
throw Exception("Query already sent.", ErrorCodes::LOGICAL_ERROR);
if (supports_parallel_execution)
{
Settings query_settings = *settings;
query_settings.parallel_replicas_count = active_replica_count;
UInt64 offset = 0;
for (auto & e : replica_map)
{
Connection * connection = e.second;
if (connection != nullptr)
{
query_settings.parallel_replica_offset = offset;
connection->sendQuery(query, query_id, stage, &query_settings, with_pending_data);
++offset;
}
}
if (offset > 0)
sent_query = true;
}
else
{
auto it = replica_map.begin();
Connection * connection = it->second;
if (connection != nullptr)
{
connection->sendQuery(query, query_id, stage, settings, with_pending_data);
sent_query = true;
}
}
}
Connection::Packet ParallelReplicas::receivePacket()
{
if (!sent_query)
throw Exception("Cannot receive packets: no query sent.", ErrorCodes::LOGICAL_ERROR);
if (!hasActiveReplicas())
throw Exception("No more packets are available.", ErrorCodes::LOGICAL_ERROR);
auto it = getReplicaForReading();
if (it == replica_map.end())
throw Exception("No available replica", ErrorCodes::NO_AVAILABLE_REPLICA);
Connection * connection = it->second;
Connection::Packet packet = connection->receivePacket();
switch (packet.type)
{
case Protocol::Server::Data:
case Protocol::Server::Progress:
case Protocol::Server::ProfileInfo:
case Protocol::Server::Totals:
case Protocol::Server::Extremes:
break;
case Protocol::Server::EndOfStream:
invalidateReplica(it);
break;
case Protocol::Server::Exception:
default:
connection->disconnect();
invalidateReplica(it);
break;
}
return packet;
}
void ParallelReplicas::sendCancel()
{
if (!sent_query || cancelled)
throw Exception("Cannot cancel. Either no query sent or already cancelled.", ErrorCodes::LOGICAL_ERROR);
for (auto & e : replica_map)
{
Connection * connection = e.second;
if (connection != nullptr)
connection->sendCancel();
}
cancelled = true;
}
Connection::Packet ParallelReplicas::drain()
{
if (!cancelled)
throw Exception("Cannot drain connections: cancel first.", ErrorCodes::LOGICAL_ERROR);
Connection::Packet res;
res.type = Protocol::Server::EndOfStream;
while (hasActiveReplicas())
{
Connection::Packet packet = receivePacket();
switch (packet.type)
{
case Protocol::Server::Data:
case Protocol::Server::Progress:
case Protocol::Server::ProfileInfo:
case Protocol::Server::Totals:
case Protocol::Server::Extremes:
case Protocol::Server::EndOfStream:
break;
case Protocol::Server::Exception:
default:
/// Если мы получили исключение или неизвестный пакет, сохраняем его.
res = packet;
break;
}
}
return res;
}
std::string ParallelReplicas::dumpAddresses() const
{
bool is_first = true;
std::ostringstream os;
for (auto & e : replica_map)
{
const Connection * connection = e.second;
if (connection != nullptr)
{
os << (is_first ? "" : "; ") << connection->getServerAddress();
if (is_first) { is_first = false; }
}
}
return os.str();
}
void ParallelReplicas::registerReplica(Connection * connection)
{
if (connection == nullptr)
throw Exception("Invalid connection specified in parameter.", ErrorCodes::LOGICAL_ERROR);
auto res = replica_map.insert(std::make_pair(connection->socket.impl()->sockfd(), connection));
if (!res.second)
throw Exception("Invalid set of connections.", ErrorCodes::LOGICAL_ERROR);
if (throttler)
connection->setThrottler(throttler);
}
ParallelReplicas::ReplicaMap::iterator ParallelReplicas::getReplicaForReading()
{
ReplicaMap::iterator it;
if (supports_parallel_execution)
it = waitForReadEvent();
else
{
it = replica_map.begin();
if (it->second == nullptr)
it = replica_map.end();
}
return it;
}
ParallelReplicas::ReplicaMap::iterator ParallelReplicas::waitForReadEvent()
{
Poco::Net::Socket::SocketList read_list;
read_list.reserve(active_replica_count);
/** Сначала проверяем, есть ли данные, которые уже лежат в буфере
* хоть одного соединения.
*/
for (auto & e : replica_map)
{
Connection * connection = e.second;
if ((connection != nullptr) && connection->hasReadBufferPendingData())
read_list.push_back(connection->socket);
}
/** Если не было найдено никаких данных, то проверяем, есть ли соединения
* готовые для чтения.
*/
if (read_list.empty())
{
Poco::Net::Socket::SocketList write_list;
Poco::Net::Socket::SocketList except_list;
for (auto & e : replica_map)
{
Connection * connection = e.second;
if (connection != nullptr)
read_list.push_back(connection->socket);
}
int n = Poco::Net::Socket::select(read_list, write_list, except_list, settings->poll_interval * 1000000);
if (n == 0)
return replica_map.end();
}
auto & socket = read_list[rand() % read_list.size()];
return replica_map.find(socket.impl()->sockfd());
}
void ParallelReplicas::invalidateReplica(ParallelReplicas::ReplicaMap::iterator it)
{
it->second = nullptr;
--active_replica_count;
}
}

View File

@ -1,227 +0,0 @@
#include <DB/Client/ShardReplicas.h>
#include <boost/concept_check.hpp>
namespace DB
{
ShardReplicas::ShardReplicas(std::vector<ConnectionPool::Entry> & entries, const Settings & settings_) :
settings(settings_),
active_connection_count(entries.size())
{
replica_hash.reserve(entries.size());
for (auto & entry : entries)
{
Connection * connection = &*entry;
if (connection == nullptr)
throw Exception("Invalid connection specified in parameter.");
auto res = replica_hash.insert(std::make_pair(connection->socket.impl()->sockfd(), connection));
if (!res.second)
throw Exception("Invalid set of connections.");
}
}
void ShardReplicas::sendExternalTablesData(std::vector<ExternalTablesData> & data)
{
if (!sent_query)
throw Exception("Cannot send external tables data: query not yet sent.");
if (data.size() < active_connection_count)
throw Exception("Mismatch between replicas and data sources", ErrorCodes::MISMATCH_REPLICAS_DATA_SOURCES);
auto it = data.begin();
for (auto & e : replica_hash)
{
Connection * connection = e.second;
if (connection != nullptr)
connection->sendExternalTablesData(*it);
++it;
}
}
void ShardReplicas::sendQuery(const String & query, const String & query_id, UInt64 stage, bool with_pending_data)
{
if (sent_query)
throw Exception("Query already sent.");
Settings query_settings = settings;
query_settings.parallel_replicas_count = replica_hash.size();
UInt64 offset = 0;
for (auto & e : replica_hash)
{
Connection * connection = e.second;
if (connection != nullptr)
{
query_settings.parallel_replica_offset = offset;
connection->sendQuery(query, query_id, stage, &query_settings, with_pending_data);
++offset;
}
}
sent_query = true;
}
Connection::Packet ShardReplicas::receivePacket()
{
if (!sent_query)
throw Exception("Cannot receive packets: no query sent.");
if (active_connection_count == 0)
throw Exception("No more packets are available.");
Connection ** connection = waitForReadEvent();
if (connection == nullptr)
throw Exception("No available replica", ErrorCodes::NO_AVAILABLE_REPLICA);
Connection::Packet packet = (*connection)->receivePacket();
switch (packet.type)
{
case Protocol::Server::Data:
case Protocol::Server::Progress:
case Protocol::Server::ProfileInfo:
case Protocol::Server::Totals:
case Protocol::Server::Extremes:
break;
case Protocol::Server::EndOfStream:
*connection = nullptr;
--active_connection_count;
if (active_connection_count > 0)
{
Connection::Packet empty_packet;
empty_packet.type = Protocol::Server::Data;
return empty_packet;
}
break;
case Protocol::Server::Exception:
default:
*connection = nullptr;
--active_connection_count;
if (!cancelled)
{
sendCancel();
(void) drain();
}
break;
}
return packet;
}
void ShardReplicas::disconnect()
{
for (auto & e : replica_hash)
{
Connection * & connection = e.second;
if (connection != nullptr)
{
connection->disconnect();
connection = nullptr;
--active_connection_count;
}
}
}
void ShardReplicas::sendCancel()
{
if (!sent_query || cancelled)
throw Exception("Cannot cancel. Either no query sent or already cancelled.");
for (auto & e : replica_hash)
{
Connection * connection = e.second;
if (connection != nullptr)
connection->sendCancel();
}
cancelled = true;
}
Connection::Packet ShardReplicas::drain()
{
if (!cancelled)
throw Exception("Cannot drain connections: cancel first.");
Connection::Packet res;
res.type = Protocol::Server::EndOfStream;
while (active_connection_count > 0)
{
Connection::Packet packet = receivePacket();
switch (packet.type)
{
case Protocol::Server::Data:
case Protocol::Server::Progress:
case Protocol::Server::ProfileInfo:
case Protocol::Server::Totals:
case Protocol::Server::Extremes:
break;
case Protocol::Server::EndOfStream:
return res;
case Protocol::Server::Exception:
default:
res = packet;
break;
}
}
return res;
}
std::string ShardReplicas::dumpAddresses() const
{
std::ostringstream os;
for (auto & e : replica_hash)
{
char prefix = '\0';
const Connection * connection = e.second;
if (connection != nullptr)
{
os << prefix << connection->getServerAddress();
if (prefix == '\0')
prefix = ';';
}
}
return os.str();
}
Connection ** ShardReplicas::waitForReadEvent()
{
Poco::Net::Socket::SocketList read_list;
read_list.reserve(active_connection_count);
for (auto & e : replica_hash)
{
Connection * connection = e.second;
if ((connection != nullptr) && connection->hasReadBufferPendingData())
read_list.push_back(connection->socket);
}
if (read_list.empty())
{
Poco::Net::Socket::SocketList write_list;
Poco::Net::Socket::SocketList except_list;
for (auto & e : replica_hash)
{
Connection * connection = e.second;
if (connection != nullptr)
read_list.push_back(connection->socket);
}
int n = Poco::Net::Socket::select(read_list, write_list, except_list, settings.poll_interval * 1000000);
if (n == 0)
return nullptr;
}
auto & socket = read_list[rand() % read_list.size()];
auto it = replica_hash.find(socket.impl()->sockfd());
if (it == replica_hash.end())
throw Exception("Unexpected replica", ErrorCodes::UNEXPECTED_REPLICA);
return &(it->second);
}
}

View File

@ -11,8 +11,7 @@
#include <DB/DataTypes/DataTypeNested.h>
#include <DB/Parsers/ASTExpressionList.h>
#include <DB/Interpreters/ExpressionAnalyzer.h>
#include <statdaemons/ext/memory.hpp>
#include <memory>
#include <DB/Parsers/formatAST.h>

View File

@ -55,7 +55,7 @@ void readString(String & s, ReadBuffer & buf)
s.append(buf.position(), bytes);
buf.position() += bytes;
if (buf.position() != buf.buffer().end())
if (buf.hasPendingData())
return;
}
}
@ -121,7 +121,7 @@ void readEscapedString(DB::String & s, DB::ReadBuffer & buf)
s.append(buf.position(), next_pos - buf.position());
buf.position() += next_pos - buf.position();
if (buf.position() == buf.buffer().end())
if (!buf.hasPendingData())
continue;
if (*buf.position() == '\t' || *buf.position() == '\n')
@ -191,8 +191,8 @@ static void readAnyQuotedString(String & s, ReadBuffer & buf)
s.append(buf.position(), next_pos - buf.position());
buf.position() += next_pos - buf.position();
if (buf.position() == buf.buffer().end())
if (!buf.hasPendingData())
continue;
if (*buf.position() == quote)

View File

@ -0,0 +1,158 @@
/** Воспроизводит баг в gcc 4.8.2
* Баг: исключение не ловится.
*
* /usr/bin/c++ -std=c++11 -Wall -O3 ./io_and_exceptions.cpp && ./a.out
*
* Выводит:
* terminate called after throwing an instance of 'int'
* Aborted
*
* А должно ничего не выводить.
*
* В gcc 4.9 и clang 3.6 всё Ок.
*/
typedef unsigned long size_t;
class BufferBase
{
public:
typedef char * Position;
struct Buffer
{
Buffer(Position begin_pos_, Position end_pos_) : begin_pos(begin_pos_), end_pos(end_pos_) {}
inline Position begin() const { return begin_pos; }
inline Position end() const { return end_pos; }
inline size_t size() const { return end_pos - begin_pos; }
inline void resize(size_t size) { end_pos = begin_pos + size; }
private:
Position begin_pos;
Position end_pos;
};
BufferBase(Position ptr, size_t size, size_t offset)
: internal_buffer(ptr, ptr + size), working_buffer(ptr, ptr + size), pos(ptr + offset), bytes(0) {}
void set(Position ptr, size_t size, size_t offset)
{
internal_buffer = Buffer(ptr, ptr + size);
working_buffer = Buffer(ptr, ptr + size);
pos = ptr + offset;
}
inline Buffer & buffer() { return working_buffer; }
inline Position & position() { return pos; };
inline size_t offset() const { return pos - working_buffer.begin(); }
size_t count() const
{
return bytes + offset();
}
protected:
Buffer internal_buffer;
Buffer working_buffer;
Position pos;
size_t bytes;
};
class ReadBuffer : public BufferBase
{
public:
ReadBuffer(Position ptr, size_t size) : BufferBase(ptr, size, 0) { working_buffer.resize(0); }
ReadBuffer(Position ptr, size_t size, size_t offset) : BufferBase(ptr, size, offset) {}
inline bool next()
{
bytes += offset();
bool res = nextImpl();
if (!res)
working_buffer.resize(0);
pos = working_buffer.begin();
return res;
}
virtual ~ReadBuffer() {}
inline bool eof()
{
return pos == working_buffer.end() && !next();
}
private:
virtual bool nextImpl() { return false; };
};
class CompressedReadBuffer : public ReadBuffer
{
private:
bool nextImpl()
{
throw 1;
return true;
}
public:
CompressedReadBuffer() : ReadBuffer(nullptr, 0)
{
}
};
void readIntText(unsigned & x, ReadBuffer & buf)
{
x = 0;
while (!buf.eof())
{
switch (*buf.position())
{
case '+':
break;
case '9':
x *= 10;
break;
default:
return;
}
}
}
unsigned parse(const char * data)
{
unsigned res;
ReadBuffer buf(const_cast<char *>(data), 10, 0);
readIntText(res, buf);
return res;
}
int main()
{
CompressedReadBuffer in;
try
{
while (!in.eof())
;
}
catch (...)
{
}
return 0;
}
void f()
{
parse("123");
}

View File

@ -63,7 +63,7 @@ namespace test
return end;
}
void readEscapedString(DB::String & s, DB::ReadBuffer & buf)
{
s = "";
@ -74,7 +74,7 @@ namespace test
s.append(buf.position(), next_pos - buf.position());
buf.position() += next_pos - buf.position();
if (buf.position() == buf.buffer().end())
if (!buf.hasPendingData())
continue;
if (*buf.position() == '\t' || *buf.position() == '\n')

View File

@ -1,5 +1,6 @@
#include <DB/Interpreters/Cluster.h>
#include <DB/Common/escapeForFileName.h>
#include <DB/Common/isLocalAddress.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Poco/Util/Application.h>
#include <Poco/Net/NetworkInterface.h>
@ -255,19 +256,7 @@ bool Cluster::isLocal(const Address & address)
/// - её порт совпадает с портом, который слушает сервер;
/// - её хост резолвится в набор адресов, один из которых совпадает с одним из адресов сетевых интерфейсов сервера
/// то нужно всегда ходить на этот шард без межпроцессного взаимодействия
const UInt16 clickhouse_port = Poco::Util::Application::instance().config().getInt("tcp_port", 0);
static auto interfaces = Poco::Net::NetworkInterface::list();
if (clickhouse_port == address.host_port.port() &&
interfaces.end() != std::find_if(interfaces.begin(), interfaces.end(),
[&](const Poco::Net::NetworkInterface & interface) { return interface.address() == address.host_port.host(); }))
{
LOG_INFO(&Poco::Util::Application::instance().logger(),
"Replica with address " << address.host_port.toString() << " will be processed as local.");
return true;
}
return false;
return isLocalAddress(address.host_port);
}
}

View File

@ -495,13 +495,24 @@ const Dictionaries & Context::getDictionaries() const
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
if (!shared->dictionaries)
shared->dictionaries = new Dictionaries;
return *shared->dictionaries;
}
const ExternalDictionaries & Context::getExternalDictionaries() const
{
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
if (!shared->external_dictionaries)
{
if (!this->global_context)
throw Exception("Logical error: there is no global context", ErrorCodes::LOGICAL_ERROR);
shared->dictionaries = new Dictionaries{ *this->global_context };
shared->external_dictionaries = new ExternalDictionaries{*this->global_context};
}
return *shared->dictionaries;
return *shared->external_dictionaries;
}

View File

@ -2,9 +2,10 @@
#include <DB/Dictionaries/DictionarySourceFactory.h>
#include <DB/Dictionaries/FlatDictionary.h>
#include <DB/Dictionaries/HashedDictionary.h>
#include <DB/Dictionaries/CacheDictionary.h>
#include <DB/Dictionaries/DictionaryStructure.h>
#include <memory>
#include <Yandex/singleton.h>
#include <statdaemons/ext/memory.hpp>
namespace DB
{
@ -12,39 +13,48 @@ namespace DB
DictionaryPtr DictionaryFactory::create(const std::string & name, Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix, Context & context) const
{
auto dict_struct = DictionaryStructure::fromConfig(config, config_prefix + "structure");
Poco::Util::AbstractConfiguration::Keys keys;
const auto & layout_prefix = config_prefix + ".layout";
config.keys(layout_prefix, keys);
if (keys.size() != 1)
throw Exception{
"Element dictionary.layout should have exactly one child element",
ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG
};
const DictionaryStructure dict_struct{config, config_prefix + ".structure"};
auto source_ptr = DictionarySourceFactory::instance().create(
config, config_prefix + "source.", dict_struct, context);
config, config_prefix + ".source", dict_struct, context);
const auto dict_lifetime = DictionaryLifetime::fromConfig(config, config_prefix + "lifetime");
const DictionaryLifetime dict_lifetime{config, config_prefix + ".lifetime"};
const auto & layout_prefix = config_prefix + "layout.";
const auto & layout_type = keys.front();
if (config.has(layout_prefix + "flat"))
if ("flat" == layout_type)
{
return ext::make_unique<FlatDictionary>(name, dict_struct, std::move(source_ptr), dict_lifetime);
return std::make_unique<FlatDictionary>(name, dict_struct, std::move(source_ptr), dict_lifetime);
}
else if (config.has(layout_prefix + "hashed"))
else if ("hashed" == layout_type)
{
return ext::make_unique<HashedDictionary>(name, dict_struct, std::move(source_ptr), dict_lifetime);
return std::make_unique<HashedDictionary>(name, dict_struct, std::move(source_ptr), dict_lifetime);
}
else if (config.has(layout_prefix + "cache"))
else if ("cache" == layout_type)
{
const auto size = config.getInt(layout_prefix + "cache.size", 0);
const auto size = config.getInt(layout_prefix + ".cache.size");
if (size == 0)
throw Exception{
"Dictionary of type 'cache' cannot have size of 0 bytes",
ErrorCodes::TOO_SMALL_BUFFER_SIZE
};
throw Exception{
"Dictionary of type 'cache' is not yet implemented",
ErrorCodes::NOT_IMPLEMENTED
};
return std::make_unique<CacheDictionary>(name, dict_struct, std::move(source_ptr), dict_lifetime, size);
}
throw Exception{"No dictionary type specified", ErrorCodes::BAD_ARGUMENTS};
throw Exception{
"Unknown dictionary layout type: " + layout_type,
ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG
};
};
}

View File

@ -1,8 +1,9 @@
#include <DB/Interpreters/Dictionaries.h>
#include <DB/Interpreters/ExternalDictionaries.h>
#include <DB/Dictionaries/DictionaryFactory.h>
#include <DB/Dictionaries/DictionaryStructure.h>
#include <DB/Dictionaries/IDictionarySource.h>
#include <DB/Dictionaries/config_ptr_t.h>
#include <statdaemons/ext/scope_guard.hpp>
#include <Poco/Util/Application.h>
namespace DB
{
@ -28,24 +29,27 @@ namespace
}
}
void Dictionaries::reloadExternals()
void ExternalDictionaries::reloadImpl()
{
const auto config_path = getDictionariesConfigPath(Poco::Util::Application::instance().config());
const Poco::File config_file{config_path};
if (!config_file.exists())
if (config_path.empty() || !config_file.exists())
{
LOG_WARNING(log, "config file '" + config_path + "' does not exist");
}
else
{
const auto last_modified = config_file.getLastModified();
if (last_modified > dictionaries_last_modified)
if (last_modified > config_last_modified)
{
/// definitions of dictionaries may have changed, recreate all of them
dictionaries_last_modified = last_modified;
config_last_modified = last_modified;
const config_ptr_t<Poco::Util::XMLConfiguration> config{new Poco::Util::XMLConfiguration{config_path}};
const auto config = new Poco::Util::XMLConfiguration{config_path};
SCOPE_EXIT(
config->release();
);
/// get all dictionaries' definitions
Poco::Util::AbstractConfiguration::Keys keys;
@ -62,16 +66,14 @@ void Dictionaries::reloadExternals()
continue;
}
const auto & prefix = key + '.';
const auto & name = config->getString(prefix + "name");
const auto name = config->getString(key + ".name");
if (name.empty())
{
LOG_WARNING(log, "dictionary name cannot be empty");
continue;
}
auto dict_ptr = DictionaryFactory::instance().create(name, *config, prefix, context);
auto dict_ptr = DictionaryFactory::instance().create(name, *config, key, context);
if (!dict_ptr->isCached())
{
const auto & lifetime = dict_ptr->getLifetime();
@ -86,12 +88,12 @@ void Dictionaries::reloadExternals()
}
}
auto it = external_dictionaries.find(name);
auto it = dictionaries.find(name);
/// add new dictionary or update an existing version
if (it == std::end(external_dictionaries))
if (it == std::end(dictionaries))
{
const std::lock_guard<std::mutex> lock{external_dictionaries_mutex};
external_dictionaries.emplace(name, std::make_shared<MultiVersion<IDictionary>>(dict_ptr.release()));
const std::lock_guard<std::mutex> lock{dictionaries_mutex};
dictionaries.emplace(name, std::make_shared<MultiVersion<IDictionary>>(dict_ptr.release()));
}
else
it->second->set(dict_ptr.release());
@ -105,7 +107,7 @@ void Dictionaries::reloadExternals()
}
/// periodic update
for (auto & dictionary : external_dictionaries)
for (auto & dictionary : dictionaries)
{
try
{
@ -125,6 +127,12 @@ void Dictionaries::reloadExternals()
if (std::chrono::system_clock::now() < update_time)
continue;
SCOPE_EXIT(
/// calculate next update time
std::uniform_int_distribution<std::uint64_t> distribution{lifetime.min_sec, lifetime.max_sec};
update_time = std::chrono::system_clock::now() + std::chrono::seconds{distribution(rnd_engine)};
);
/// check source modified
if (current->getSource()->isModified())
{
@ -132,10 +140,6 @@ void Dictionaries::reloadExternals()
auto new_version = current->clone();
dictionary.second->set(new_version.release());
}
/// calculate next update time
std::uniform_int_distribution<std::uint64_t> distribution{lifetime.min_sec, lifetime.max_sec};
update_time = std::chrono::system_clock::now() + std::chrono::seconds{distribution(rnd_engine)};
}
}
catch (...)

View File

@ -10,9 +10,6 @@
#include <DB/IO/copyData.h>
#include <DB/Common/escapeForFileName.h>
#include <DB/Parsers/formatAST.h>
#include <DB/Storages/StorageMerge.h>
#include <DB/Storages/StorageMergeTree.h>
#include <DB/Storages/StorageReplicatedMergeTree.h>
#include <Poco/FileStream.h>

View File

@ -13,6 +13,7 @@
#include <DB/Parsers/ASTNameTypePair.h>
#include <DB/Parsers/ASTColumnDeclaration.h>
#include <DB/Storages/StorageFactory.h>
#include <DB/Storages/StorageLog.h>
#include <DB/Storages/StorageSystemNumbers.h>
@ -194,7 +195,7 @@ StoragePtr InterpreterCreateQuery::execute(bool assume_metadata_exists)
else
throw Exception("Incorrect CREATE query: required ENGINE.", ErrorCodes::ENGINE_REQUIRED);
res = context.getStorageFactory().get(
res = StorageFactory::instance().get(
storage_name, data_path, table_name, database_name, context,
context.getGlobalContext(), query_ptr, columns,
materialized_columns, alias_columns, column_defaults, create.attach);

View File

@ -25,7 +25,6 @@
#include <DB/Interpreters/InterpreterSelectQuery.h>
#include <DB/Storages/StorageView.h>
#include <DB/Storages/StorageReplicatedMergeTree.h>
#include <DB/TableFunctions/ITableFunction.h>
#include <DB/TableFunctions/TableFunctionFactory.h>
@ -105,12 +104,6 @@ void InterpreterSelectQuery::basicInit(BlockInputStreamPtr input_, const NamesAn
+ " does not support parallel execution on several replicas",
ErrorCodes::STORAGE_DOESNT_SUPPORT_PARALLEL_REPLICAS);
if (StorageReplicatedMergeTree * storage_replicated_merge_tree = typeid_cast<StorageReplicatedMergeTree *>(&*storage))
{
if (settings.parallel_replica_offset > 0)
storage_replicated_merge_tree->skipUnreplicated();
}
table_lock = storage->lockStructure(false);
if (table_column_names.empty())
context.setColumns(storage->getColumnsListNonMaterialized());

View File

@ -8,7 +8,7 @@
#include <Yandex/ErrorHandlers.h>
#include <Yandex/Revision.h>
#include <statdaemons/ConfigProcessor.h>
#include <statdaemons/ext/memory.hpp>
#include <memory>
#include <DB/Interpreters/loadMetadata.h>
#include <DB/Storages/StorageSystemNumbers.h>
@ -491,7 +491,7 @@ int Server::main(const std::vector<std::string> & args)
global_context->setMacros(Macros(config(), "macros"));
std::string users_config_path = config().getString("users_config", config().getString("config-file", "config.xml"));
auto users_config_reloader = ext::make_unique<UsersConfigReloader>(users_config_path, global_context.get());
auto users_config_reloader = std::make_unique<UsersConfigReloader>(users_config_path, global_context.get());
/// Максимальное количество одновременно выполняющихся запросов.
global_context->getProcessList().setMaxSize(config().getInt("max_concurrent_queries", 0));
@ -536,7 +536,7 @@ int Server::main(const std::vector<std::string> & args)
{
const auto profile_events_transmitter = config().getBool("use_graphite", true)
? ext::make_unique<ProfileEventsTransmitter>()
? std::make_unique<ProfileEventsTransmitter>()
: nullptr;
const std::string listen_host = config().getString("listen_host", "::");

View File

@ -324,7 +324,7 @@ MergeTreeData::DataPartPtr MergeTreeDataMerger::mergeParts(
{
MarkRanges ranges(1, MarkRange(0, parts[i]->size));
auto input = ext::make_unique<MergeTreeBlockInputStream>(
auto input = std::make_unique<MergeTreeBlockInputStream>(
data.getFullPath() + parts[i]->name + '/', DEFAULT_MERGE_BLOCK_SIZE, union_column_names, data,
parts[i], ranges, false, nullptr, "");
@ -348,19 +348,19 @@ MergeTreeData::DataPartPtr MergeTreeDataMerger::mergeParts(
switch (data.mode)
{
case MergeTreeData::Ordinary:
merged_stream = ext::make_unique<MergingSortedBlockInputStream>(src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE);
merged_stream = std::make_unique<MergingSortedBlockInputStream>(src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE);
break;
case MergeTreeData::Collapsing:
merged_stream = ext::make_unique<CollapsingSortedBlockInputStream>(src_streams, data.getSortDescription(), data.sign_column, DEFAULT_MERGE_BLOCK_SIZE);
merged_stream = std::make_unique<CollapsingSortedBlockInputStream>(src_streams, data.getSortDescription(), data.sign_column, DEFAULT_MERGE_BLOCK_SIZE);
break;
case MergeTreeData::Summing:
merged_stream = ext::make_unique<SummingSortedBlockInputStream>(src_streams, data.getSortDescription(), data.columns_to_sum, DEFAULT_MERGE_BLOCK_SIZE);
merged_stream = std::make_unique<SummingSortedBlockInputStream>(src_streams, data.getSortDescription(), data.columns_to_sum, DEFAULT_MERGE_BLOCK_SIZE);
break;
case MergeTreeData::Aggregating:
merged_stream = ext::make_unique<AggregatingSortedBlockInputStream>(src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE);
merged_stream = std::make_unique<AggregatingSortedBlockInputStream>(src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE);
break;
default:

View File

@ -149,10 +149,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
relative_sample_size = 0;
}
UInt64 parallel_replicas_count = UInt64(settings.parallel_replicas_count);
UInt64 parallel_replica_offset = UInt64(settings.parallel_replica_offset);
if ((parallel_replicas_count > 1) && !data.sampling_expression.isNull() && (relative_sample_size == 0))
if ((settings.parallel_replicas_count > 1) && !data.sampling_expression.isNull() && (relative_sample_size == 0))
relative_sample_size = 1;
if (relative_sample_size != 0)
@ -175,12 +172,11 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
UInt64 sampling_column_value_upper_limit;
UInt64 upper_limit = static_cast<UInt64>(relative_sample_size * sampling_column_max);
if (parallel_replicas_count > 1)
if (settings.parallel_replicas_count > 1)
{
UInt64 step = upper_limit / parallel_replicas_count;
sampling_column_value_lower_limit = parallel_replica_offset * step;
if ((parallel_replica_offset + 1) < parallel_replicas_count)
sampling_column_value_upper_limit = (parallel_replica_offset + 1) * step;
sampling_column_value_lower_limit = (settings.parallel_replica_offset * upper_limit) / settings.parallel_replicas_count;
if ((settings.parallel_replica_offset + 1) < settings.parallel_replicas_count)
sampling_column_value_upper_limit = ((settings.parallel_replica_offset + 1) * upper_limit) / settings.parallel_replicas_count;
else
sampling_column_value_upper_limit = upper_limit + 1;
}
@ -191,25 +187,15 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
}
/// Добавим условие, чтобы отсечь еще что-нибудь при повторном просмотре индекса.
if (!key_condition.addCondition(data.sampling_expression->getColumnName(),
Range::createLeftBounded(sampling_column_value_lower_limit, true)))
throw Exception("Sampling column not in primary key", ErrorCodes::ILLEGAL_COLUMN);
if (sampling_column_value_lower_limit > 0)
if (!key_condition.addCondition(data.sampling_expression->getColumnName(),
Range::createLeftBounded(sampling_column_value_lower_limit, true)))
throw Exception("Sampling column not in primary key", ErrorCodes::ILLEGAL_COLUMN);
if (!key_condition.addCondition(data.sampling_expression->getColumnName(),
Range::createRightBounded(sampling_column_value_upper_limit, false)))
throw Exception("Sampling column not in primary key", ErrorCodes::ILLEGAL_COLUMN);
/// Выражение для фильтрации: sampling_expression in [sampling_column_value_lower_limit, sampling_column_value_upper_limit)
ASTPtr lower_filter_args = new ASTExpressionList;
lower_filter_args->children.push_back(data.sampling_expression);
lower_filter_args->children.push_back(new ASTLiteral(StringRange(), sampling_column_value_lower_limit));
ASTFunctionPtr lower_filter_function = new ASTFunction;
lower_filter_function->name = "greaterOrEquals";
lower_filter_function->arguments = lower_filter_args;
lower_filter_function->children.push_back(lower_filter_function->arguments);
ASTPtr upper_filter_args = new ASTExpressionList;
upper_filter_args->children.push_back(data.sampling_expression);
upper_filter_args->children.push_back(new ASTLiteral(StringRange(), sampling_column_value_upper_limit));
@ -219,14 +205,33 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
upper_filter_function->arguments = upper_filter_args;
upper_filter_function->children.push_back(upper_filter_function->arguments);
ASTPtr filter_function_args = new ASTExpressionList;
filter_function_args->children.push_back(lower_filter_function);
filter_function_args->children.push_back(upper_filter_function);
if (sampling_column_value_lower_limit > 0)
{
/// Выражение для фильтрации: sampling_expression in [sampling_column_value_lower_limit, sampling_column_value_upper_limit)
filter_function = new ASTFunction;
filter_function->name = "and";
filter_function->arguments = filter_function_args;
filter_function->children.push_back(filter_function->arguments);
ASTPtr lower_filter_args = new ASTExpressionList;
lower_filter_args->children.push_back(data.sampling_expression);
lower_filter_args->children.push_back(new ASTLiteral(StringRange(), sampling_column_value_lower_limit));
ASTFunctionPtr lower_filter_function = new ASTFunction;
lower_filter_function->name = "greaterOrEquals";
lower_filter_function->arguments = lower_filter_args;
lower_filter_function->children.push_back(lower_filter_function->arguments);
ASTPtr filter_function_args = new ASTExpressionList;
filter_function_args->children.push_back(lower_filter_function);
filter_function_args->children.push_back(upper_filter_function);
filter_function = new ASTFunction;
filter_function->name = "and";
filter_function->arguments = filter_function_args;
filter_function->children.push_back(filter_function->arguments);
}
else
{
/// Выражение для фильтрации: sampling_expression < sampling_column_value_upper_limit
filter_function = upper_filter_function;
}
filter_expression = ExpressionAnalyzer(filter_function, data.context, data.getColumnsList()).getActions(false);

View File

@ -120,7 +120,7 @@ struct Stream
/// Если засечка должна быть ровно на границе блоков, нам подходит и засечка, указывающая на конец предыдущего блока,
/// и на начало следующего.
if (uncompressed_hashing_buf.position() == uncompressed_hashing_buf.buffer().end())
if (!uncompressed_hashing_buf.hasPendingData())
{
/// Получим засечку, указывающую на конец предыдущего блока.
has_alternative_mark = true;

View File

@ -12,7 +12,7 @@
#include <DB/Core/Field.h>
#include <statdaemons/ext/memory.hpp>
#include <memory>
namespace DB
{
@ -149,7 +149,7 @@ BlockInputStreams StorageDistributed::read(
Settings new_settings = settings;
new_settings.queue_max_wait_ms = Cluster::saturate(new_settings.queue_max_wait_ms, settings.limits.max_execution_time);
size_t result_size = cluster.pools.size() + cluster.getLocalNodesNum();
size_t result_size = (cluster.pools.size() * settings.max_parallel_replicas) + cluster.getLocalNodesNum();
processed_stage = result_size == 1
? QueryProcessingStage::Complete
@ -160,10 +160,15 @@ BlockInputStreams StorageDistributed::read(
query, remote_database, remote_table);
const auto & modified_query = queryToString(modified_query_ast);
/// Ограничение сетевого трафика, если нужно.
ThrottlerPtr throttler;
if (settings.limits.max_network_bandwidth)
throttler.reset(new Throttler(settings.limits.max_network_bandwidth));
/// Цикл по шардам.
for (auto & conn_pool : cluster.pools)
res.emplace_back(new RemoteBlockInputStream{
conn_pool, modified_query, &new_settings,
conn_pool, modified_query, &new_settings, throttler,
external_tables, processed_stage, context});
/// Добавляем запросы к локальному ClickHouse.
@ -234,7 +239,7 @@ bool StorageDistributed::hasColumn(const String & column_name) const
void StorageDistributed::createDirectoryMonitor(const std::string & name)
{
directory_monitors.emplace(name, ext::make_unique<DirectoryMonitor>(*this, name));
directory_monitors.emplace(name, std::make_unique<DirectoryMonitor>(*this, name));
}
void StorageDistributed::createDirectoryMonitors()

View File

@ -2027,7 +2027,7 @@ BlockInputStreams StorageReplicatedMergeTree::read(
size_t part_index = 0;
if (process_unreplicated && unreplicated_reader && values.count(0))
if ((settings.parallel_replica_offset == 0) && unreplicated_reader && values.count(0))
{
res = unreplicated_reader->read(real_column_names, query,
context, settings, processed_stage,
@ -2107,7 +2107,7 @@ void StorageReplicatedMergeTree::alter(const AlterCommands & params,
auto zookeeper = getZooKeeper();
const MergeTreeMergeBlocker merge_blocker{merger};
const auto unreplicated_merge_blocker = unreplicated_merger ?
ext::make_unique<MergeTreeMergeBlocker>(*unreplicated_merger) : nullptr;
std::make_unique<MergeTreeMergeBlocker>(*unreplicated_merger) : nullptr;
LOG_DEBUG(log, "Doing ALTER");

View File

@ -3,48 +3,3 @@
3 2015-03-01 3 foo
4 2015-04-01 4 bar
5 2015-05-01 5 foo
6 2015-06-01 6 bar
7 2015-07-01 7 foo
8 2015-08-01 8 bar
9 2015-09-01 9 foo
10 2015-10-01 10 bar
11 2015-11-01 1 foo
12 2015-12-01 2 bar
13 2015-01-01 3 foo
14 2015-02-01 4 bar
15 2015-03-01 5 foo
16 2015-04-01 6 bar
17 2015-05-01 7 foo
18 2015-06-01 8 bar
19 2015-07-01 9 foo
20 2015-08-01 10 bar
21 2015-09-01 1 foo
22 2015-10-01 2 bar
23 2015-11-01 3 foo
24 2015-12-01 4 bar
25 2015-01-01 5 foo
26 2015-02-01 6 bar
27 2015-03-01 7 foo
28 2015-04-01 8 bar
29 2015-05-01 9 foo
30 2015-06-01 10 bar
31 2015-07-01 1 foo
32 2015-08-01 2 bar
33 2015-09-01 3 foo
34 2015-10-01 4 bar
35 2015-11-01 5 foo
36 2015-12-01 6 bar
37 2015-01-01 7 foo
38 2015-02-01 8 bar
39 2015-03-01 9 foo
40 2015-04-01 10 bar
41 2015-05-01 1 foo
42 2015-06-01 2 bar
43 2015-07-01 3 foo
44 2015-08-01 4 bar
45 2015-09-01 5 foo
46 2015-10-01 6 bar
47 2015-11-01 7 foo
48 2015-12-01 8 bar
49 2015-01-01 9 foo
50 2015-02-01 10 bar

View File

@ -4,57 +4,7 @@ DROP TABLE IF EXISTS test.report;
CREATE TABLE test.report(id UInt32, event_date Date, priority UInt32, description String) ENGINE = MergeTree(event_date, intHash32(id), (id, event_date, intHash32(id)), 8192);
INSERT INTO test.report(id,event_date,priority,description) VALUES(1, '2015-01-01', 1, 'foo');
INSERT INTO test.report(id,event_date,priority,description) VALUES(2, '2015-02-01', 2, 'bar');
INSERT INTO test.report(id,event_date,priority,description) VALUES(3, '2015-03-01', 3, 'foo');
INSERT INTO test.report(id,event_date,priority,description) VALUES(4, '2015-04-01', 4, 'bar');
INSERT INTO test.report(id,event_date,priority,description) VALUES(5, '2015-05-01', 5, 'foo');
INSERT INTO test.report(id,event_date,priority,description) VALUES(6, '2015-06-01', 6, 'bar');
INSERT INTO test.report(id,event_date,priority,description) VALUES(7, '2015-07-01', 7, 'foo');
INSERT INTO test.report(id,event_date,priority,description) VALUES(8, '2015-08-01', 8, 'bar');
INSERT INTO test.report(id,event_date,priority,description) VALUES(9, '2015-09-01', 9, 'foo');
INSERT INTO test.report(id,event_date,priority,description) VALUES(10, '2015-10-01', 10, 'bar');
INSERT INTO test.report(id,event_date,priority,description) VALUES(11, '2015-11-01', 1, 'foo');
INSERT INTO test.report(id,event_date,priority,description) VALUES(12, '2015-12-01', 2, 'bar');
INSERT INTO test.report(id,event_date,priority,description) VALUES(13, '2015-01-01', 3, 'foo');
INSERT INTO test.report(id,event_date,priority,description) VALUES(14, '2015-02-01', 4, 'bar');
INSERT INTO test.report(id,event_date,priority,description) VALUES(15, '2015-03-01', 5, 'foo');
INSERT INTO test.report(id,event_date,priority,description) VALUES(16, '2015-04-01', 6, 'bar');
INSERT INTO test.report(id,event_date,priority,description) VALUES(17, '2015-05-01', 7, 'foo');
INSERT INTO test.report(id,event_date,priority,description) VALUES(18, '2015-06-01', 8, 'bar');
INSERT INTO test.report(id,event_date,priority,description) VALUES(19, '2015-07-01', 9, 'foo');
INSERT INTO test.report(id,event_date,priority,description) VALUES(20, '2015-08-01', 10, 'bar');
INSERT INTO test.report(id,event_date,priority,description) VALUES(21, '2015-09-01', 1, 'foo');
INSERT INTO test.report(id,event_date,priority,description) VALUES(22, '2015-10-01', 2, 'bar');
INSERT INTO test.report(id,event_date,priority,description) VALUES(23, '2015-11-01', 3, 'foo');
INSERT INTO test.report(id,event_date,priority,description) VALUES(24, '2015-12-01', 4, 'bar');
INSERT INTO test.report(id,event_date,priority,description) VALUES(25, '2015-01-01', 5, 'foo');
INSERT INTO test.report(id,event_date,priority,description) VALUES(26, '2015-02-01', 6, 'bar');
INSERT INTO test.report(id,event_date,priority,description) VALUES(27, '2015-03-01', 7, 'foo');
INSERT INTO test.report(id,event_date,priority,description) VALUES(28, '2015-04-01', 8, 'bar');
INSERT INTO test.report(id,event_date,priority,description) VALUES(29, '2015-05-01', 9, 'foo');
INSERT INTO test.report(id,event_date,priority,description) VALUES(30, '2015-06-01', 10, 'bar');
INSERT INTO test.report(id,event_date,priority,description) VALUES(31, '2015-07-01', 1, 'foo');
INSERT INTO test.report(id,event_date,priority,description) VALUES(32, '2015-08-01', 2, 'bar');
INSERT INTO test.report(id,event_date,priority,description) VALUES(33, '2015-09-01', 3, 'foo');
INSERT INTO test.report(id,event_date,priority,description) VALUES(34, '2015-10-01', 4, 'bar');
INSERT INTO test.report(id,event_date,priority,description) VALUES(35, '2015-11-01', 5, 'foo');
INSERT INTO test.report(id,event_date,priority,description) VALUES(36, '2015-12-01', 6, 'bar');
INSERT INTO test.report(id,event_date,priority,description) VALUES(37, '2015-01-01', 7, 'foo');
INSERT INTO test.report(id,event_date,priority,description) VALUES(38, '2015-02-01', 8, 'bar');
INSERT INTO test.report(id,event_date,priority,description) VALUES(39, '2015-03-01', 9, 'foo');
INSERT INTO test.report(id,event_date,priority,description) VALUES(40, '2015-04-01', 10, 'bar');
INSERT INTO test.report(id,event_date,priority,description) VALUES(41, '2015-05-01', 1, 'foo');
INSERT INTO test.report(id,event_date,priority,description) VALUES(42, '2015-06-01', 2, 'bar');
INSERT INTO test.report(id,event_date,priority,description) VALUES(43, '2015-07-01', 3, 'foo');
INSERT INTO test.report(id,event_date,priority,description) VALUES(44, '2015-08-01', 4, 'bar');
INSERT INTO test.report(id,event_date,priority,description) VALUES(45, '2015-09-01', 5, 'foo');
INSERT INTO test.report(id,event_date,priority,description) VALUES(46, '2015-10-01', 6, 'bar');
INSERT INTO test.report(id,event_date,priority,description) VALUES(47, '2015-11-01', 7, 'foo');
INSERT INTO test.report(id,event_date,priority,description) VALUES(48, '2015-12-01', 8, 'bar');
INSERT INTO test.report(id,event_date,priority,description) VALUES(49, '2015-01-01', 9, 'foo');
INSERT INTO test.report(id,event_date,priority,description) VALUES(50, '2015-02-01', 10, 'bar');
INSERT INTO test.report(id,event_date,priority,description) VALUES (1, '2015-01-01', 1, 'foo')(2, '2015-02-01', 2, 'bar')(3, '2015-03-01', 3, 'foo')(4, '2015-04-01', 4, 'bar')(5, '2015-05-01', 5, 'foo');
SELECT * FROM (SELECT id, event_date, priority, description FROM remote('127.0.0.{1|2}', test, report)) ORDER BY id ASC;
DROP TABLE test.report;

View File

@ -93,6 +93,11 @@ any_runs()
if [[ $(running_processes) -gt 0 ]]; then return 0; else return 1; fi
}
all_runs()
{
[[ $(running_processes) -eq $NUMBER_OF_PROCESSES ]]
}
wait4done()
{
while any_runs; do
@ -108,7 +113,7 @@ start()
echo -n "Start $PROGRAM service: "
if [[ $(running_processes) -eq $NUMBER_OF_PROCESSES ]]; then
if all_runs; then
echo -n "already running "
EXIT_STATUS=1
else
@ -236,7 +241,7 @@ main()
restart
;;
condstart)
any_runs || start
all_runs || start
;;
condstop)
any_runs && stop