This commit is contained in:
Evgeniy Gatov 2015-02-11 14:39:06 +03:00
commit 391d078eb4
43 changed files with 1124 additions and 681 deletions

View File

@ -4,6 +4,8 @@
#include <Poco/Net/StreamSocket.h>
#include <DB/Common/Throttler.h>
#include <DB/Core/Block.h>
#include <DB/Core/Defines.h>
#include <DB/Core/Progress.h>
@ -33,6 +35,7 @@ typedef std::pair<BlockInputStreamPtr, std::string> ExternalTableData;
/// Вектор пар, описывающих таблицы
typedef std::vector<ExternalTableData> ExternalTablesData;
/** Соединение с сервером БД для использования в клиенте.
* Как использовать - см. Core/Protocol.h
* (Реализацию на стороне сервера - см. Server/TCPHandler.h)
@ -69,6 +72,12 @@ public:
virtual ~Connection() {};
/// Установить ограничитель сетевого трафика. Один ограничитель может использоваться одновременно для нескольких разных соединений.
void setThrottler(const ThrottlerPtr & throttler_)
{
throttler = throttler_;
}
/// Пакет, который может быть получен от сервера.
struct Packet
@ -161,6 +170,11 @@ private:
const DataTypeFactory & data_type_factory;
/** Если не nullptr, то используется, чтобы ограничить сетевой трафик.
* Учитывается только трафик при передаче блоков. Другие пакеты не учитываются.
*/
ThrottlerPtr throttler;
Poco::Timespan connect_timeout;
Poco::Timespan receive_timeout;
Poco::Timespan send_timeout;

View File

@ -1,85 +1,93 @@
#pragma once
#include <DB/Common/Throttler.h>
#include <DB/Client/Connection.h>
#include <DB/Client/ConnectionPool.h>
namespace DB
{
/** Множество реплик одного шарда.
/** Для получения данных сразу из нескольких реплик (соединений) в рамках одного потока.
* В качестве вырожденного случая, может также работать с одним соединением.
*
* Интерфейс почти совпадает с Connection.
*/
class ParallelReplicas final : private boost::noncopyable
{
public:
/// Принимает готовое соединение.
ParallelReplicas(Connection * connection_, Settings * settings_, ThrottlerPtr throttler_);
/// Принимает пул, из которого нужно будет достать одно или несколько соединений.
ParallelReplicas(IConnectionPool * pool_, Settings * settings_, ThrottlerPtr throttler_);
/// Отправить на реплики всё содержимое внешних таблиц.
void sendExternalTablesData(std::vector<ExternalTablesData> & data);
/// Отправить запрос на реплики.
void sendQuery(const String & query, const String & query_id = "",
UInt64 stage = QueryProcessingStage::Complete, bool with_pending_data = false);
/// Получить пакет от какой-нибудь реплики.
Connection::Packet receivePacket();
/// Отменить запросы к репликам
void sendCancel();
/** На каждой реплике читать и пропускать все пакеты до EndOfStream или Exception.
* Возвращает EndOfStream, если не было получено никакого исключения. В противном
* случае возвращает последний полученный пакет типа Exception.
*/
class ParallelReplicas final
{
public:
/// Принимает готовое соединение.
ParallelReplicas(Connection * connection_, Settings * settings_);
Connection::Packet drain();
/// Принимает пул, из которого нужно будет достать одно или несколько соединений.
ParallelReplicas(IConnectionPool * pool_, Settings * settings_);
/// Получить адреса реплик в виде строки.
std::string dumpAddresses() const;
ParallelReplicas(const ParallelReplicas &) = delete;
ParallelReplicas & operator=(const ParallelReplicas &) = delete;
/// Возвращает количесто реплик.
size_t size() const { return replica_map.size(); }
/// Отправить на реплики всё содержимое внешних таблиц.
void sendExternalTablesData(std::vector<ExternalTablesData> & data);
/// Проверить, есть ли действительные реплики.
bool hasActiveReplicas() const { return active_replica_count > 0; }
/// Отправить запрос на реплики.
void sendQuery(const String & query, const String & query_id = "",
UInt64 stage = QueryProcessingStage::Complete, bool with_pending_data = false);
private:
/// Реплики хэшированные по id сокета
using ReplicaMap = std::unordered_map<int, Connection *>;
/// Получить пакет от какой-нибудь реплики.
Connection::Packet receivePacket();
/// Отменить запросы к репликам
void sendCancel();
/// Зарегистрировать реплику.
void registerReplica(Connection * connection);
/** На каждой реплике читать и пропускать все пакеты до EndOfStream или Exception.
* Возвращает EndOfStream, если не было получено никакого исключения. В противном
* случае возвращает последний полученный пакет типа Exception.
*/
Connection::Packet drain();
/// Получить реплику, на которой можно прочитать данные.
ReplicaMap::iterator getReplicaForReading();
/// Получить адреса реплик в виде строки.
std::string dumpAddresses() const;
/** Проверить, есть ли данные, которые можно прочитать на каких-нибудь репликах.
* Возвращает одну такую реплику, если она найдётся.
*/
ReplicaMap::iterator waitForReadEvent();
/// Возвращает количесто реплик.
size_t size() const { return replica_map.size(); }
/// Пометить реплику как недействительную.
void invalidateReplica(ReplicaMap::iterator it);
/// Проверить, есть ли действительные реплики.
bool hasActiveReplicas() const { return active_replica_count > 0; }
private:
/// Реплики хэшированные по id сокета
using ReplicaMap = std::unordered_map<int, Connection *>;
Settings * settings;
ReplicaMap replica_map;
private:
/// Зарегистрировать реплику.
void registerReplica(Connection * connection);
/// Если не nullptr, то используется, чтобы ограничить сетевой трафик.
ThrottlerPtr throttler;
/// Получить реплику, на которой можно прочитать данные.
ReplicaMap::iterator getReplicaForReading();
std::vector<ConnectionPool::Entry> pool_entries;
ConnectionPool::Entry pool_entry;
/** Проверить, есть ли данные, которые можно прочитать на каких-нибудь репликах.
* Возвращает одну такую реплику, если она найдётся.
*/
ReplicaMap::iterator waitForReadEvent();
/// Текущее количество действительных соединений к репликам.
size_t active_replica_count;
/// Запрос выполняется параллельно на нескольких репликах.
bool supports_parallel_execution;
/// Отправили запрос
bool sent_query = false;
/// Отменили запрос
bool cancelled = false;
};
/// Пометить реплику как недействительную.
void invalidateReplica(ReplicaMap::iterator it);
private:
Settings * settings;
ReplicaMap replica_map;
std::vector<ConnectionPool::Entry> pool_entries;
ConnectionPool::Entry pool_entry;
/// Текущее количество действительных соединений к репликам.
size_t active_replica_count;
/// Запрос выполняется параллельно на нескольких репликах.
bool supports_parallel_execution;
/// Отправили запрос
bool sent_query = false;
/// Отменили запрос
bool cancelled = false;
};
}

View File

@ -0,0 +1,59 @@
#pragma once
#include <mutex>
#include <memory>
#include <statdaemons/Stopwatch.h>
/** Позволяет ограничить скорость чего либо (в штуках в секунду) с помощью sleep.
* Особенности работы:
* - считается только средняя скорость, от момента первого вызова функции add;
* если были периоды с низкой скоростью, то в течение промежутка времени после них, скорость будет выше;
*/
class Throttler
{
public:
Throttler(size_t max_speed_) : max_speed(max_speed_) {}
void add(size_t amount)
{
size_t new_count;
UInt64 elapsed_ns;
{
std::lock_guard<std::mutex> lock(mutex);
if (0 == count)
{
watch.start();
elapsed_ns = 0;
}
else
elapsed_ns = watch.elapsed();
count += amount;
new_count = count;
}
/// Сколько должно было бы пройти времени, если бы скорость была равна max_speed.
UInt64 desired_ns = new_count * 1000000000 / max_speed;
if (desired_ns > elapsed_ns)
{
UInt64 sleep_ns = desired_ns - elapsed_ns;
timespec sleep_ts;
sleep_ts.tv_sec = sleep_ns / 1000000000;
sleep_ts.tv_nsec = sleep_ns % 1000000000;
nanosleep(&sleep_ts, nullptr); /// NOTE Завершается раньше в случае сигнала. Это считается нормальным.
}
}
private:
size_t max_speed;
size_t count = 0;
Stopwatch watch {CLOCK_MONOTONIC_COARSE};
std::mutex mutex;
};
typedef std::shared_ptr<Throttler> ThrottlerPtr;

View File

@ -0,0 +1,27 @@
#pragma once
#include <DB/Core/Types.h>
#include <Poco/Util/Application.h>
#include <Poco/Net/NetworkInterface.h>
#include <Poco/Net/SocketAddress.h>
namespace DB
{
inline bool isLocalAddress(const Poco::Net::SocketAddress & address)
{
const UInt16 clickhouse_port = Poco::Util::Application::instance().config().getInt("tcp_port", 0);
static auto interfaces = Poco::Net::NetworkInterface::list();
if (clickhouse_port == address.port())
{
return interfaces.end() != std::find_if(interfaces.begin(), interfaces.end(),
[&] (const Poco::Net::NetworkInterface & interface) {
return interface.address() == address.host();
});
}
return false;
}
}

View File

@ -5,6 +5,7 @@
#include <DB/DataStreams/IProfilingBlockInputStream.h>
#include <DB/DataStreams/OneBlockInputStream.h>
#include <DB/Common/VirtualColumnUtils.h>
#include <DB/Common/Throttler.h>
#include <DB/Interpreters/Context.h>
#include <DB/Client/ConnectionPool.h>
@ -32,29 +33,29 @@ private:
public:
/// Принимает готовое соединение.
RemoteBlockInputStream(Connection & connection_, const String & query_, const Settings * settings_,
RemoteBlockInputStream(Connection & connection_, const String & query_, const Settings * settings_, ThrottlerPtr throttler_ = nullptr,
const Tables & external_tables_ = Tables(), QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete,
const Context & context = getDefaultContext())
: connection(&connection_), query(query_), external_tables(external_tables_), stage(stage_), context(context)
: connection(&connection_), query(query_), throttler(throttler_), external_tables(external_tables_), stage(stage_), context(context)
{
init(settings_);
}
/// Принимает готовое соединение. Захватывает владение соединением из пула.
RemoteBlockInputStream(ConnectionPool::Entry & pool_entry_, const String & query_, const Settings * settings_,
RemoteBlockInputStream(ConnectionPool::Entry & pool_entry_, const String & query_, const Settings * settings_, ThrottlerPtr throttler_ = nullptr,
const Tables & external_tables_ = Tables(), QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete,
const Context & context = getDefaultContext())
: pool_entry(pool_entry_), connection(&*pool_entry_), query(query_),
: pool_entry(pool_entry_), connection(&*pool_entry_), query(query_), throttler(throttler_),
external_tables(external_tables_), stage(stage_), context(context)
{
init(settings_);
}
/// Принимает пул, из которого нужно будет достать одно или несколько соединений.
RemoteBlockInputStream(IConnectionPool * pool_, const String & query_, const Settings * settings_,
RemoteBlockInputStream(IConnectionPool * pool_, const String & query_, const Settings * settings_, ThrottlerPtr throttler_ = nullptr,
const Tables & external_tables_ = Tables(), QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete,
const Context & context = getDefaultContext())
: pool(pool_), query(query_), external_tables(external_tables_), stage(stage_), context(context)
: pool(pool_), query(query_), throttler(throttler_), external_tables(external_tables_), stage(stage_), context(context)
{
init(settings_);
}
@ -257,9 +258,9 @@ protected:
{
Settings * parallel_replicas_settings = send_settings ? &settings : nullptr;
if (connection != nullptr)
parallel_replicas = ext::make_unique<ParallelReplicas>(connection, parallel_replicas_settings);
parallel_replicas = std::make_unique<ParallelReplicas>(connection, parallel_replicas_settings, throttler);
else
parallel_replicas = ext::make_unique<ParallelReplicas>(pool, parallel_replicas_settings);
parallel_replicas = std::make_unique<ParallelReplicas>(pool, parallel_replicas_settings, throttler);
}
/// Возвращает true, если запрос отправлен, а ещё не выполнен.
@ -290,6 +291,8 @@ private:
const String query;
bool send_settings;
Settings settings;
/// Если не nullptr, то используется, чтобы ограничить сетевой трафик.
ThrottlerPtr throttler;
/// Временные таблицы, которые необходимо переслать на удаленные сервера.
Tables external_tables;
QueryProcessingStage::Enum stage;

View File

@ -0,0 +1,137 @@
#pragma once
#include <DB/Dictionaries/IDictionary.h>
#include <DB/Dictionaries/IDictionarySource.h>
#include <DB/Dictionaries/DictionaryStructure.h>
namespace DB
{
class CacheDictionary final : public IDictionary
{
public:
CacheDictionary(const std::string & name, const DictionaryStructure & dict_struct,
DictionarySourcePtr source_ptr, const DictionaryLifetime dict_lifetime,
const std::size_t size)
: name{name}, dict_struct(dict_struct),
source_ptr{std::move(source_ptr)}, dict_lifetime(dict_lifetime),
size{size}
{
if (!this->source_ptr->supportsSelectiveLoad())
throw Exception{
"Source cannot be used with CacheDictionary",
ErrorCodes::UNSUPPORTED_METHOD
};
}
CacheDictionary(const CacheDictionary & other)
: CacheDictionary{other.name, other.dict_struct, other.source_ptr->clone(), other.dict_lifetime, other.size}
{}
std::string getName() const override { return name; }
std::string getTypeName() const override { return "CacheDictionary"; }
bool isCached() const override { return true; }
DictionaryPtr clone() const override { return std::make_unique<CacheDictionary>(*this); }
const IDictionarySource * getSource() const override { return source_ptr.get(); }
const DictionaryLifetime & getLifetime() const override { return dict_lifetime; }
bool hasHierarchy() const override { return false; }
id_t toParent(const id_t id) const override { return 0; }
#define DECLARE_SAFE_GETTER(TYPE, NAME, LC_TYPE) \
TYPE get##NAME(const std::string & attribute_name, const id_t id) const override\
{\
return {};\
}
DECLARE_SAFE_GETTER(UInt8, UInt8, uint8)
DECLARE_SAFE_GETTER(UInt16, UInt16, uint16)
DECLARE_SAFE_GETTER(UInt32, UInt32, uint32)
DECLARE_SAFE_GETTER(UInt64, UInt64, uint64)
DECLARE_SAFE_GETTER(Int8, Int8, int8)
DECLARE_SAFE_GETTER(Int16, Int16, int16)
DECLARE_SAFE_GETTER(Int32, Int32, int32)
DECLARE_SAFE_GETTER(Int64, Int64, int64)
DECLARE_SAFE_GETTER(Float32, Float32, float32)
DECLARE_SAFE_GETTER(Float64, Float64, float64)
DECLARE_SAFE_GETTER(StringRef, String, string)
#undef DECLARE_SAFE_GETTER
std::size_t getAttributeIndex(const std::string & attribute_name) const override
{
return {};
}
#define DECLARE_TYPE_CHECKER(NAME, LC_NAME)\
bool is##NAME(const std::size_t attribute_idx) const override\
{\
return true;\
}
DECLARE_TYPE_CHECKER(UInt8, uint8)
DECLARE_TYPE_CHECKER(UInt16, uint16)
DECLARE_TYPE_CHECKER(UInt32, uint32)
DECLARE_TYPE_CHECKER(UInt64, uint64)
DECLARE_TYPE_CHECKER(Int8, int8)
DECLARE_TYPE_CHECKER(Int16, int16)
DECLARE_TYPE_CHECKER(Int32, int32)
DECLARE_TYPE_CHECKER(Int64, int64)
DECLARE_TYPE_CHECKER(Float32, float32)
DECLARE_TYPE_CHECKER(Float64, float64)
DECLARE_TYPE_CHECKER(String, string)
#undef DECLARE_TYPE_CHECKER
#define DECLARE_UNSAFE_GETTER(TYPE, NAME, LC_NAME)\
TYPE get##NAME##Unsafe(const std::size_t attribute_idx, const id_t id) const override\
{\
return {};\
}
DECLARE_UNSAFE_GETTER(UInt8, UInt8, uint8)
DECLARE_UNSAFE_GETTER(UInt16, UInt16, uint16)
DECLARE_UNSAFE_GETTER(UInt32, UInt32, uint32)
DECLARE_UNSAFE_GETTER(UInt64, UInt64, uint64)
DECLARE_UNSAFE_GETTER(Int8, Int8, int8)
DECLARE_UNSAFE_GETTER(Int16, Int16, int16)
DECLARE_UNSAFE_GETTER(Int32, Int32, int32)
DECLARE_UNSAFE_GETTER(Int64, Int64, int64)
DECLARE_UNSAFE_GETTER(Float32, Float32, float32)
DECLARE_UNSAFE_GETTER(Float64, Float64, float64)
DECLARE_UNSAFE_GETTER(StringRef, String, string)
#undef DECLARE_UNSAFE_GETTER
private:
const std::string name;
const DictionaryStructure dict_struct;
const DictionarySourcePtr source_ptr;
const DictionaryLifetime dict_lifetime;
const std::size_t size;
union item
{
UInt8 uint8_value;
UInt16 uint16_value;
UInt32 uint32_value;
UInt64 uint64_value;
Int8 int8_value;
Int16 int16_value;
Int32 int32_value;
Int64 int64_value;
Float32 float32_value;
Float64 float64_value;
StringRef string_value;
};
struct cell
{
id_t id;
std::vector<item> attrs;
};
std::vector<cell> cells;
};
}

View File

@ -4,6 +4,7 @@
#include <DB/Client/ConnectionPool.h>
#include <DB/DataStreams/RemoteBlockInputStream.h>
#include <DB/Interpreters/executeQuery.h>
#include <DB/Common/isLocalAddress.h>
#include <statdaemons/ext/range.hpp>
#include <Poco/Util/AbstractConfiguration.h>
#include <Poco/Net/NetworkInterface.h>
@ -13,44 +14,50 @@ namespace DB
const auto max_connections = 1;
class ClickhouseDictionarySource final : public IDictionarySource
/** Allows loading dictionaries from local or remote ClickHouse instance
* @todo use ConnectionPoolWithFailover
* @todo invent a way to keep track of source modifications
*/
class ClickHouseDictionarySource final : public IDictionarySource
{
static const auto max_block_size = 8192;
public:
ClickhouseDictionarySource(const Poco::Util::AbstractConfiguration & config,
ClickHouseDictionarySource(const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
Block & sample_block, Context & context)
: host{config.getString(config_prefix + "host")},
port(config.getInt(config_prefix + "port")),
user{config.getString(config_prefix + "user", "")},
password{config.getString(config_prefix + "password", "")},
db{config.getString(config_prefix + "db", "")},
table{config.getString(config_prefix + "table")},
: host{config.getString(config_prefix + ".host")},
port(config.getInt(config_prefix + ".port")),
user{config.getString(config_prefix + ".user", "")},
password{config.getString(config_prefix + ".password", "")},
db{config.getString(config_prefix + ".db", "")},
table{config.getString(config_prefix + ".table")},
sample_block{sample_block}, context(context),
is_local{isLocal(host, port)},
pool{is_local ? nullptr : ext::make_unique<ConnectionPool>(
is_local{isLocalAddress({ host, port })},
pool{is_local ? nullptr : std::make_unique<ConnectionPool>(
max_connections, host, port, db, user, password, context.getDataTypeFactory(),
"ClickhouseDictionarySource")
"ClickHouseDictionarySource")
},
load_all_query{composeLoadAllQuery(sample_block, table)}
{}
ClickhouseDictionarySource(const ClickhouseDictionarySource & other)
/// copy-constructor is provided in order to support cloneability
ClickHouseDictionarySource(const ClickHouseDictionarySource & other)
: host{other.host}, port{other.port}, user{other.user}, password{other.password},
db{other.db}, table{other.db},
sample_block{other.sample_block}, context(other.context),
is_local{other.is_local},
pool{is_local ? nullptr : ext::make_unique<ConnectionPool>(
pool{is_local ? nullptr : std::make_unique<ConnectionPool>(
max_connections, host, port, db, user, password, context.getDataTypeFactory(),
"ClickhouseDictionarySource")},
"ClickHouseDictionarySource")},
load_all_query{other.load_all_query}
{}
BlockInputStreamPtr loadAll() override
{
/** Query to local ClickHouse is marked internal in order to avoid
* the necessity of holding process_list_element shared pointer.
*/
if (is_local)
return executeQuery(load_all_query, context).in;
return executeQuery(load_all_query, context, true).in;
return new RemoteBlockInputStream{pool.get(), load_all_query, nullptr};
}
@ -70,12 +77,13 @@ public:
};
}
/// @todo check update time somehow
bool isModified() const override { return true; }
bool supportsSelectiveLoad() const override { return true; }
DictionarySourcePtr clone() const override { return ext::make_unique<ClickhouseDictionarySource>(*this); }
DictionarySourcePtr clone() const override { return std::make_unique<ClickHouseDictionarySource>(*this); }
private:
/// @todo escape table and column names
static std::string composeLoadAllQuery(const Block & block, const std::string & table)
{
std::string query{"SELECT "};

View File

@ -3,11 +3,11 @@
#include <DB/Core/Block.h>
#include <DB/Dictionaries/DictionaryStructure.h>
#include <DB/Dictionaries/FileDictionarySource.h>
#include <DB/Dictionaries/MysqlDictionarySource.h>
#include <DB/Dictionaries/ClickhouseDictionarySource.h>
#include <DB/Dictionaries/MySQLDictionarySource.h>
#include <DB/Dictionaries/ClickHouseDictionarySource.h>
#include <DB/DataTypes/DataTypesNumberFixed.h>
#include <Yandex/singleton.h>
#include <statdaemons/ext/memory.hpp>
#include <memory>
namespace DB
{
@ -38,6 +38,7 @@ Block createSampleBlock(const DictionaryStructure & dict_struct, const Context &
}
/// creates IDictionarySource instance from config and DictionaryStructure
class DictionarySourceFactory : public Singleton<DictionarySourceFactory>
{
public:
@ -46,25 +47,38 @@ public:
const DictionaryStructure & dict_struct,
Context & context) const
{
Poco::Util::AbstractConfiguration::Keys keys;
config.keys(config_prefix, keys);
if (keys.size() != 1)
throw Exception{
"Element dictionary.source should have exactly one child element",
ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG
};
auto sample_block = createSampleBlock(dict_struct, context);
if (config.has(config_prefix + "file"))
const auto & source_type = keys.front();
if ("file" == source_type)
{
const auto & filename = config.getString(config_prefix + "file.path");
const auto & format = config.getString(config_prefix + "file.format");
return ext::make_unique<FileDictionarySource>(filename, format, sample_block, context);
const auto filename = config.getString(config_prefix + ".file.path");
const auto format = config.getString(config_prefix + ".file.format");
return std::make_unique<FileDictionarySource>(filename, format, sample_block, context);
}
else if (config.has(config_prefix + "mysql"))
else if ("mysql" == source_type)
{
return ext::make_unique<MysqlDictionarySource>(config, config_prefix + "mysql", sample_block, context);
return std::make_unique<MySQLDictionarySource>(config, config_prefix + ".mysql", sample_block);
}
else if (config.has(config_prefix + "clickhouse"))
else if ("clickhouse" == source_type)
{
return ext::make_unique<ClickhouseDictionarySource>(config, config_prefix + "clickhouse.",
return std::make_unique<ClickHouseDictionarySource>(config, config_prefix + ".clickhouse",
sample_block, context);
}
throw Exception{"unsupported source type"};
throw Exception{
"Unknown dictionary source type: " + source_type,
ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG
};
}
};

View File

@ -9,7 +9,7 @@
namespace DB
{
enum class attribute_type
enum class AttributeType
{
uint8,
uint16,
@ -24,20 +24,20 @@ enum class attribute_type
string
};
inline attribute_type getAttributeTypeByName(const std::string & type)
inline AttributeType getAttributeTypeByName(const std::string & type)
{
static const std::unordered_map<std::string, attribute_type> dictionary{
{ "UInt8", attribute_type::uint8 },
{ "UInt16", attribute_type::uint16 },
{ "UInt32", attribute_type::uint32 },
{ "UInt64", attribute_type::uint64 },
{ "Int8", attribute_type::int8 },
{ "Int16", attribute_type::int16 },
{ "Int32", attribute_type::int32 },
{ "Int64", attribute_type::int64 },
{ "Float32", attribute_type::float32 },
{ "Float64", attribute_type::float64 },
{ "String", attribute_type::string },
static const std::unordered_map<std::string, AttributeType> dictionary{
{ "UInt8", AttributeType::uint8 },
{ "UInt16", AttributeType::uint16 },
{ "UInt32", AttributeType::uint32 },
{ "UInt64", AttributeType::uint64 },
{ "Int8", AttributeType::int8 },
{ "Int16", AttributeType::int16 },
{ "Int32", AttributeType::int32 },
{ "Int64", AttributeType::int64 },
{ "Float32", AttributeType::float32 },
{ "Float64", AttributeType::float64 },
{ "String", AttributeType::string },
};
const auto it = dictionary.find(type);
@ -50,44 +50,53 @@ inline attribute_type getAttributeTypeByName(const std::string & type)
};
}
inline std::string toString(const attribute_type type)
inline std::string toString(const AttributeType type)
{
switch (type)
{
case attribute_type::uint8: return "UInt8";
case attribute_type::uint16: return "UInt16";
case attribute_type::uint32: return "UInt32";
case attribute_type::uint64: return "UInt64";
case attribute_type::int8: return "Int8";
case attribute_type::int16: return "Int16";
case attribute_type::int32: return "Int32";
case attribute_type::int64: return "Int64";
case attribute_type::float32: return "Float32";
case attribute_type::float64: return "Float64";
case attribute_type::string: return "String";
case AttributeType::uint8: return "UInt8";
case AttributeType::uint16: return "UInt16";
case AttributeType::uint32: return "UInt32";
case AttributeType::uint64: return "UInt64";
case AttributeType::int8: return "Int8";
case AttributeType::int16: return "Int16";
case AttributeType::int32: return "Int32";
case AttributeType::int64: return "Int64";
case AttributeType::float32: return "Float32";
case AttributeType::float64: return "Float64";
case AttributeType::string: return "String";
}
throw Exception{
"Unknown attribute_type " + toString(type),
"Unknown attribute_type " + toString(static_cast<int>(type)),
ErrorCodes::ARGUMENT_OUT_OF_BOUND
};
}
/// Min and max lifetimes for a dictionary or it's entry
struct DictionaryLifetime
{
std::uint64_t min_sec;
std::uint64_t max_sec;
static DictionaryLifetime fromConfig(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix)
DictionaryLifetime(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix)
{
const auto & lifetime_min_key = config_prefix + ".min";
const auto has_min = config.has(lifetime_min_key);
const std::uint64_t min_update_time = has_min ? config.getInt(lifetime_min_key) : config.getInt(config_prefix);
const std::uint64_t max_update_time = has_min ? config.getInt(config_prefix + ".max") : min_update_time;
return { min_update_time, max_update_time };
this->min_sec = has_min ? config.getInt(lifetime_min_key) : config.getInt(config_prefix);
this->max_sec = has_min ? config.getInt(config_prefix + ".max") : this->min_sec;
}
};
/** Holds the description of a single dictionary attribute:
* - name, used for lookup into dictionary and source;
* - type, used in conjunction with DataTypeFactory and getAttributeTypeByname;
* - null_value, used as a default value for non-existent entries in the dictionary,
* decimal representation for numeric attributes;
* - hierarchical, whether this attribute defines a hierarchy;
* - injective, whether the mapping to parent is injective (can be used for optimization of GROUP BY?)
*/
struct DictionaryAttribute
{
std::string name;
@ -97,34 +106,34 @@ struct DictionaryAttribute
bool injective;
};
/// Name of identifier plus list of attributes
struct DictionaryStructure
{
std::string id_name;
std::vector<DictionaryAttribute> attributes;
static DictionaryStructure fromConfig(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix)
DictionaryStructure(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix)
: id_name{config.getString(config_prefix + ".id.name")}
{
const auto & id_name = config.getString(config_prefix + ".id.name");
if (id_name.empty())
throw Exception{
"No 'id' specified for dictionary",
ErrorCodes::BAD_ARGUMENTS
};
DictionaryStructure result{id_name};
Poco::Util::AbstractConfiguration::Keys keys;
config.keys(config_prefix, keys);
auto has_hierarchy = false;
for (const auto & key : keys)
{
if (0 != strncmp(key.data(), "attribute", strlen("attribute")))
continue;
const auto & prefix = config_prefix + '.' + key + '.';
const auto & name = config.getString(prefix + "name");
const auto & type = config.getString(prefix + "type");
const auto & null_value = config.getString(prefix + "null_value");
const auto prefix = config_prefix + '.' + key + '.';
const auto name = config.getString(prefix + "name");
const auto type = config.getString(prefix + "type");
const auto null_value = config.getString(prefix + "null_value");
const auto hierarchical = config.getBool(prefix + "hierarchical", false);
const auto injective = config.getBool(prefix + "injective", false);
if (name.empty() || type.empty())
@ -141,16 +150,16 @@ struct DictionaryStructure
has_hierarchy = has_hierarchy || hierarchical;
result.attributes.emplace_back(DictionaryAttribute{name, type, null_value, hierarchical, injective});
attributes.emplace_back(DictionaryAttribute{
name, type, null_value, hierarchical, injective
});
}
if (result.attributes.empty())
if (attributes.empty())
throw Exception{
"Dictionary has no attributes defined",
ErrorCodes::BAD_ARGUMENTS
};
return result;
}
};

View File

@ -10,13 +10,14 @@
namespace DB
{
/// Allows loading dictionaries from a file with given format, does not support "random access"
class FileDictionarySource final : public IDictionarySource
{
static const auto max_block_size = 8192;
public:
FileDictionarySource(const std::string & filename, const std::string & format, Block & sample_block,
Context & context)
const Context & context)
: filename{filename}, format{format}, sample_block{sample_block}, context(context),
last_modification{getLastModification()}
{}
@ -29,7 +30,7 @@ public:
BlockInputStreamPtr loadAll() override
{
auto in_ptr = ext::make_unique<ReadBufferFromFile>(filename);
auto in_ptr = std::make_unique<ReadBufferFromFile>(filename);
auto stream = context.getFormatFactory().getInput(
format, *in_ptr, sample_block, max_block_size, context.getDataTypeFactory());
last_modification = getLastModification();
@ -54,8 +55,9 @@ public:
}
bool isModified() const override { return getLastModification() > last_modification; }
bool supportsSelectiveLoad() const override { return false; }
DictionarySourcePtr clone() const override { return ext::make_unique<FileDictionarySource>(*this); }
DictionarySourcePtr clone() const override { return std::make_unique<FileDictionarySource>(*this); }
private:
Poco::Timestamp getLastModification() const { return Poco::File{filename}.getLastModified(); }
@ -63,7 +65,7 @@ private:
const std::string filename;
const std::string format;
Block sample_block;
Context & context;
const Context & context;
Poco::Timestamp last_modification;
};

View File

@ -2,10 +2,8 @@
#include <DB/Dictionaries/IDictionary.h>
#include <DB/Dictionaries/IDictionarySource.h>
#include <DB/Dictionaries/DictionarySourceFactory.h>
#include <DB/Dictionaries/DictionaryStructure.h>
#include <statdaemons/ext/range.hpp>
#include <Poco/Util/AbstractConfiguration.h>
#include <vector>
namespace DB
@ -36,9 +34,9 @@ public:
bool isCached() const override { return false; }
DictionaryPtr clone() const override { return ext::make_unique<FlatDictionary>(*this); }
DictionaryPtr clone() const override { return std::make_unique<FlatDictionary>(*this); }
const IDictionarySource * const getSource() const override { return source_ptr.get(); }
const IDictionarySource * getSource() const override { return source_ptr.get(); }
const DictionaryLifetime & getLifetime() const override { return dict_lifetime; }
@ -50,18 +48,18 @@ public:
switch (hierarchical_attribute->type)
{
case attribute_type::uint8: return id < attr->uint8_array->size() ? (*attr->uint8_array)[id] : attr->uint8_null_value;
case attribute_type::uint16: return id < attr->uint16_array->size() ? (*attr->uint16_array)[id] : attr->uint16_null_value;
case attribute_type::uint32: return id < attr->uint32_array->size() ? (*attr->uint32_array)[id] : attr->uint32_null_value;
case attribute_type::uint64: return id < attr->uint64_array->size() ? (*attr->uint64_array)[id] : attr->uint64_null_value;
case attribute_type::int8: return id < attr->int8_array->size() ? (*attr->int8_array)[id] : attr->int8_null_value;
case attribute_type::int16: return id < attr->int16_array->size() ? (*attr->int16_array)[id] : attr->int16_null_value;
case attribute_type::int32: return id < attr->int32_array->size() ? (*attr->int32_array)[id] : attr->int32_null_value;
case attribute_type::int64: return id < attr->int64_array->size() ? (*attr->int64_array)[id] : attr->int64_null_value;
case attribute_type::float32:
case attribute_type::float64:
case attribute_type::string:
break;
case AttributeType::uint8: return id < attr->uint8_array->size() ? (*attr->uint8_array)[id] : attr->uint8_null_value;
case AttributeType::uint16: return id < attr->uint16_array->size() ? (*attr->uint16_array)[id] : attr->uint16_null_value;
case AttributeType::uint32: return id < attr->uint32_array->size() ? (*attr->uint32_array)[id] : attr->uint32_null_value;
case AttributeType::uint64: return id < attr->uint64_array->size() ? (*attr->uint64_array)[id] : attr->uint64_null_value;
case AttributeType::int8: return id < attr->int8_array->size() ? (*attr->int8_array)[id] : attr->int8_null_value;
case AttributeType::int16: return id < attr->int16_array->size() ? (*attr->int16_array)[id] : attr->int16_null_value;
case AttributeType::int32: return id < attr->int32_array->size() ? (*attr->int32_array)[id] : attr->int32_null_value;
case AttributeType::int64: return id < attr->int64_array->size() ? (*attr->int64_array)[id] : attr->int64_null_value;
case AttributeType::float32:
case AttributeType::float64:
case AttributeType::string:
break;
}
throw Exception{
@ -75,7 +73,7 @@ public:
{\
const auto idx = getAttributeIndex(attribute_name);\
const auto & attribute = attributes[idx];\
if (attribute.type != attribute_type::LC_TYPE)\
if (attribute.type != AttributeType::LC_TYPE)\
throw Exception{\
"Type mismatch: attribute " + attribute_name + " has type " + toString(attribute.type),\
ErrorCodes::TYPE_MISMATCH\
@ -112,7 +110,7 @@ public:
#define DECLARE_TYPE_CHECKER(NAME, LC_NAME)\
bool is##NAME(const std::size_t attribute_idx) const override\
{\
return attributes[attribute_idx].type == attribute_type::LC_NAME;\
return attributes[attribute_idx].type == AttributeType::LC_NAME;\
}
DECLARE_TYPE_CHECKER(UInt8, uint8)
DECLARE_TYPE_CHECKER(UInt16, uint16)
@ -151,7 +149,7 @@ public:
private:
struct attribute_t
{
attribute_type type;
AttributeType type;
UInt8 uint8_null_value;
UInt16 uint16_null_value;
UInt32 uint32_null_value;
@ -195,6 +193,7 @@ private:
void loadData()
{
auto stream = source_ptr->loadAll();
stream->readPrefix();
while (const auto block = stream->read())
{
@ -209,65 +208,67 @@ private:
setAttributeValue(attribute, id_column[row_idx].get<UInt64>(), attribute_column[row_idx]);
}
}
stream->readSuffix();
}
attribute_t createAttributeWithType(const attribute_type type, const std::string & null_value)
attribute_t createAttributeWithType(const AttributeType type, const std::string & null_value)
{
attribute_t attr{type};
switch (type)
{
case attribute_type::uint8:
case AttributeType::uint8:
attr.uint8_null_value = DB::parse<UInt8>(null_value);
attr.uint8_array.reset(new PODArray<UInt8>);
attr.uint8_array->resize_fill(initial_array_size, attr.uint8_null_value);
break;
case attribute_type::uint16:
case AttributeType::uint16:
attr.uint16_null_value = DB::parse<UInt16>(null_value);
attr.uint16_array.reset(new PODArray<UInt16>);
attr.uint16_array->resize_fill(initial_array_size, attr.uint16_null_value);
break;
case attribute_type::uint32:
case AttributeType::uint32:
attr.uint32_null_value = DB::parse<UInt32>(null_value);
attr.uint32_array.reset(new PODArray<UInt32>);
attr.uint32_array->resize_fill(initial_array_size, attr.uint32_null_value);
break;
case attribute_type::uint64:
case AttributeType::uint64:
attr.uint64_null_value = DB::parse<UInt64>(null_value);
attr.uint64_array.reset(new PODArray<UInt64>);
attr.uint64_array->resize_fill(initial_array_size, attr.uint64_null_value);
break;
case attribute_type::int8:
case AttributeType::int8:
attr.int8_null_value = DB::parse<Int8>(null_value);
attr.int8_array.reset(new PODArray<Int8>);
attr.int8_array->resize_fill(initial_array_size, attr.int8_null_value);
break;
case attribute_type::int16:
case AttributeType::int16:
attr.int16_null_value = DB::parse<Int16>(null_value);
attr.int16_array.reset(new PODArray<Int16>);
attr.int16_array->resize_fill(initial_array_size, attr.int16_null_value);
break;
case attribute_type::int32:
case AttributeType::int32:
attr.int32_null_value = DB::parse<Int32>(null_value);
attr.int32_array.reset(new PODArray<Int32>);
attr.int32_array->resize_fill(initial_array_size, attr.int32_null_value);
break;
case attribute_type::int64:
case AttributeType::int64:
attr.int64_null_value = DB::parse<Int64>(null_value);
attr.int64_array.reset(new PODArray<Int64>);
attr.int64_array->resize_fill(initial_array_size, attr.int64_null_value);
break;
case attribute_type::float32:
case AttributeType::float32:
attr.float32_null_value = DB::parse<Float32>(null_value);
attr.float32_array.reset(new PODArray<Float32>);
attr.float32_array->resize_fill(initial_array_size, attr.float32_null_value);
break;
case attribute_type::float64:
case AttributeType::float64:
attr.float64_null_value = DB::parse<Float64>(null_value);
attr.float64_array.reset(new PODArray<Float64>);
attr.float64_array->resize_fill(initial_array_size, attr.float64_null_value);
break;
case attribute_type::string:
case AttributeType::string:
attr.string_null_value = null_value;
attr.string_arena.reset(new Arena);
attr.string_array.reset(new PODArray<StringRef>);
@ -288,77 +289,77 @@ private:
switch (attribute.type)
{
case attribute_type::uint8:
case AttributeType::uint8:
{
if (id >= attribute.uint8_array->size())
attribute.uint8_array->resize_fill(id, attribute.uint8_null_value);
(*attribute.uint8_array)[id] = value.get<UInt64>();
break;
}
case attribute_type::uint16:
case AttributeType::uint16:
{
if (id >= attribute.uint16_array->size())
attribute.uint16_array->resize_fill(id, attribute.uint16_null_value);
(*attribute.uint16_array)[id] = value.get<UInt64>();
break;
}
case attribute_type::uint32:
case AttributeType::uint32:
{
if (id >= attribute.uint32_array->size())
attribute.uint32_array->resize_fill(id, attribute.uint32_null_value);
(*attribute.uint32_array)[id] = value.get<UInt64>();
break;
}
case attribute_type::uint64:
case AttributeType::uint64:
{
if (id >= attribute.uint64_array->size())
attribute.uint64_array->resize_fill(id, attribute.uint64_null_value);
(*attribute.uint64_array)[id] = value.get<UInt64>();
break;
}
case attribute_type::int8:
case AttributeType::int8:
{
if (id >= attribute.int8_array->size())
attribute.int8_array->resize_fill(id, attribute.int8_null_value);
(*attribute.int8_array)[id] = value.get<Int64>();
break;
}
case attribute_type::int16:
case AttributeType::int16:
{
if (id >= attribute.int16_array->size())
attribute.int16_array->resize_fill(id, attribute.int16_null_value);
(*attribute.int16_array)[id] = value.get<Int64>();
break;
}
case attribute_type::int32:
case AttributeType::int32:
{
if (id >= attribute.int32_array->size())
attribute.int32_array->resize_fill(id, attribute.int32_null_value);
(*attribute.int32_array)[id] = value.get<Int64>();
break;
}
case attribute_type::int64:
case AttributeType::int64:
{
if (id >= attribute.int64_array->size())
attribute.int64_array->resize_fill(id, attribute.int64_null_value);
(*attribute.int64_array)[id] = value.get<Int64>();
break;
}
case attribute_type::float32:
case AttributeType::float32:
{
if (id >= attribute.float32_array->size())
attribute.float32_array->resize_fill(id, attribute.float32_null_value);
(*attribute.float32_array)[id] = value.get<Float64>();
break;
}
case attribute_type::float64:
case AttributeType::float64:
{
if (id >= attribute.float64_array->size())
attribute.float64_array->resize_fill(id, attribute.float64_null_value);
(*attribute.float64_array)[id] = value.get<Float64>();
break;
}
case attribute_type::string:
case AttributeType::string:
{
if (id >= attribute.string_array->size())
attribute.string_array->resize_fill(id, attribute.string_null_value);

View File

@ -2,11 +2,10 @@
#include <DB/Dictionaries/IDictionary.h>
#include <DB/Dictionaries/IDictionarySource.h>
#include <DB/Dictionaries/DictionarySourceFactory.h>
#include <DB/Dictionaries/DictionaryStructure.h>
#include <DB/Common/HashTable/HashMap.h>
#include <statdaemons/ext/range.hpp>
#include <statdaemons/ext/memory.hpp>
#include <memory>
namespace DB
{
@ -33,9 +32,9 @@ public:
bool isCached() const override { return false; }
DictionaryPtr clone() const override { return ext::make_unique<HashedDictionary>(*this); }
DictionaryPtr clone() const override { return std::make_unique<HashedDictionary>(*this); }
const IDictionarySource * const getSource() const override { return source_ptr.get(); }
const IDictionarySource * getSource() const override { return source_ptr.get(); }
const DictionaryLifetime & getLifetime() const override { return dict_lifetime; }
@ -47,49 +46,49 @@ public:
switch (hierarchical_attribute->type)
{
case attribute_type::uint8:
case AttributeType::uint8:
{
const auto it = attr->uint8_map->find(id);
return it != attr->uint8_map->end() ? it->second : attr->uint8_null_value;
}
case attribute_type::uint16:
case AttributeType::uint16:
{
const auto it = attr->uint16_map->find(id);
return it != attr->uint16_map->end() ? it->second : attr->uint16_null_value;
}
case attribute_type::uint32:
case AttributeType::uint32:
{
const auto it = attr->uint32_map->find(id);
return it != attr->uint32_map->end() ? it->second : attr->uint32_null_value;
}
case attribute_type::uint64:
case AttributeType::uint64:
{
const auto it = attr->uint64_map->find(id);
return it != attr->uint64_map->end() ? it->second : attr->uint64_null_value;
}
case attribute_type::int8:
case AttributeType::int8:
{
const auto it = attr->int8_map->find(id);
return it != attr->int8_map->end() ? it->second : attr->int8_null_value;
}
case attribute_type::int16:
case AttributeType::int16:
{
const auto it = attr->int16_map->find(id);
return it != attr->int16_map->end() ? it->second : attr->int16_null_value;
}
case attribute_type::int32:
case AttributeType::int32:
{
const auto it = attr->int32_map->find(id);
return it != attr->int32_map->end() ? it->second : attr->int32_null_value;
}
case attribute_type::int64:
case AttributeType::int64:
{
const auto it = attr->int64_map->find(id);
return it != attr->int64_map->end() ? it->second : attr->int64_null_value;
}
case attribute_type::float32:
case attribute_type::float64:
case attribute_type::string:
case AttributeType::float32:
case AttributeType::float64:
case AttributeType::string:
break;
};
@ -104,7 +103,7 @@ public:
{\
const auto idx = getAttributeIndex(attribute_name);\
const auto & attribute = attributes[idx];\
if (attribute.type != attribute_type::LC_TYPE)\
if (attribute.type != AttributeType::LC_TYPE)\
throw Exception{\
"Type mismatch: attribute " + attribute_name + " has type " + toString(attribute.type),\
ErrorCodes::TYPE_MISMATCH\
@ -142,7 +141,7 @@ public:
#define DECLARE_TYPE_CHECKER(NAME, LC_NAME)\
bool is##NAME(const std::size_t attribute_idx) const override\
{\
return attributes[attribute_idx].type == attribute_type::LC_NAME;\
return attributes[attribute_idx].type == AttributeType::LC_NAME;\
}
DECLARE_TYPE_CHECKER(UInt8, uint8)
DECLARE_TYPE_CHECKER(UInt16, uint16)
@ -182,7 +181,7 @@ public:
private:
struct attribute_t
{
attribute_type type;
AttributeType type;
UInt8 uint8_null_value;
UInt16 uint16_null_value;
UInt32 uint32_null_value;
@ -226,6 +225,7 @@ private:
void loadData()
{
auto stream = source_ptr->loadAll();
stream->readPrefix();
while (const auto block = stream->read())
{
@ -240,55 +240,57 @@ private:
setAttributeValue(attribute, id_column[row_idx].get<UInt64>(), attribute_column[row_idx]);
}
}
stream->readSuffix();
}
attribute_t createAttributeWithType(const attribute_type type, const std::string & null_value)
attribute_t createAttributeWithType(const AttributeType type, const std::string & null_value)
{
attribute_t attr{type};
switch (type)
{
case attribute_type::uint8:
case AttributeType::uint8:
attr.uint8_null_value = DB::parse<UInt8>(null_value);
attr.uint8_map.reset(new HashMap<UInt64, UInt8>);
break;
case attribute_type::uint16:
case AttributeType::uint16:
attr.uint16_null_value = DB::parse<UInt16>(null_value);
attr.uint16_map.reset(new HashMap<UInt64, UInt16>);
break;
case attribute_type::uint32:
case AttributeType::uint32:
attr.uint32_null_value = DB::parse<UInt32>(null_value);
attr.uint32_map.reset(new HashMap<UInt64, UInt32>);
break;
case attribute_type::uint64:
case AttributeType::uint64:
attr.uint64_null_value = DB::parse<UInt64>(null_value);
attr.uint64_map.reset(new HashMap<UInt64, UInt64>);
break;
case attribute_type::int8:
case AttributeType::int8:
attr.int8_null_value = DB::parse<Int8>(null_value);
attr.int8_map.reset(new HashMap<UInt64, Int8>);
break;
case attribute_type::int16:
case AttributeType::int16:
attr.int16_null_value = DB::parse<Int16>(null_value);
attr.int16_map.reset(new HashMap<UInt64, Int16>);
break;
case attribute_type::int32:
case AttributeType::int32:
attr.int32_null_value = DB::parse<Int32>(null_value);
attr.int32_map.reset(new HashMap<UInt64, Int32>);
break;
case attribute_type::int64:
case AttributeType::int64:
attr.int64_null_value = DB::parse<Int64>(null_value);
attr.int64_map.reset(new HashMap<UInt64, Int64>);
break;
case attribute_type::float32:
case AttributeType::float32:
attr.float32_null_value = DB::parse<Float32>(null_value);
attr.float32_map.reset(new HashMap<UInt64, Float32>);
break;
case attribute_type::float64:
case AttributeType::float64:
attr.float64_null_value = DB::parse<Float64>(null_value);
attr.float64_map.reset(new HashMap<UInt64, Float64>);
break;
case attribute_type::string:
case AttributeType::string:
attr.string_null_value = null_value;
attr.string_arena.reset(new Arena);
attr.string_map.reset(new HashMap<UInt64, StringRef>);
@ -302,57 +304,57 @@ private:
{
switch (attribute.type)
{
case attribute_type::uint8:
case AttributeType::uint8:
{
attribute.uint8_map->insert({ id, value.get<UInt64>() });
break;
}
case attribute_type::uint16:
case AttributeType::uint16:
{
attribute.uint16_map->insert({ id, value.get<UInt64>() });
break;
}
case attribute_type::uint32:
case AttributeType::uint32:
{
attribute.uint32_map->insert({ id, value.get<UInt64>() });
break;
}
case attribute_type::uint64:
case AttributeType::uint64:
{
attribute.uint64_map->insert({ id, value.get<UInt64>() });
break;
}
case attribute_type::int8:
case AttributeType::int8:
{
attribute.int8_map->insert({ id, value.get<Int64>() });
break;
}
case attribute_type::int16:
case AttributeType::int16:
{
attribute.int16_map->insert({ id, value.get<Int64>() });
break;
}
case attribute_type::int32:
case AttributeType::int32:
{
attribute.int32_map->insert({ id, value.get<Int64>() });
break;
}
case attribute_type::int64:
case AttributeType::int64:
{
attribute.int64_map->insert({ id, value.get<Int64>() });
break;
}
case attribute_type::float32:
case AttributeType::float32:
{
attribute.float32_map->insert({ id, value.get<Float64>() });
break;
}
case attribute_type::float64:
case AttributeType::float64:
{
attribute.float64_map->insert({ id, value.get<Float64>() });
break;
}
case attribute_type::string:
case AttributeType::string:
{
const auto & string = value.get<String>();
const auto string_in_arena = attribute.string_arena->insert(string.data(), string.size());

View File

@ -18,7 +18,7 @@ class DictionaryLifetime;
class IDictionary
{
public:
using id_t = std::uint64_t;
using id_t = std::uint64_t;
virtual std::string getName() const = 0;
@ -28,7 +28,7 @@ public:
virtual void reload() {}
virtual DictionaryPtr clone() const = 0;
virtual const IDictionarySource * const getSource() const = 0;
virtual const IDictionarySource * getSource() const = 0;
virtual const DictionaryLifetime & getLifetime() const = 0;
@ -89,7 +89,7 @@ public:
virtual Float64 getFloat64Unsafe(std::size_t attribute_idx, id_t id) const = 0;
virtual StringRef getStringUnsafe(std::size_t attribute_idx, id_t id) const = 0;
virtual ~IDictionary() = default;
virtual ~IDictionary() = default;
};
}

View File

@ -9,12 +9,28 @@ namespace DB
class IDictionarySource;
using DictionarySourcePtr = std::unique_ptr<IDictionarySource>;
/** Data-provider interface for external dictionaries,
* abstracts out the data source (file, MySQL, ClickHouse, external program, network request et cetera)
* from the presentation and memory layout (the dictionary itself).
*/
class IDictionarySource
{
public:
/// returns an input stream with all the data available from this source
virtual BlockInputStreamPtr loadAll() = 0;
/** Indicates whether this source supports "random access" loading of data
* loadId and loadIds can only be used if this function returns true.
*/
virtual bool supportsSelectiveLoad() const = 0;
/// returns an input stream with the data for the requested identifier
virtual BlockInputStreamPtr loadId(const std::uint64_t id) = 0;
/// returns an input stream with the data for a collection of identifiers
virtual BlockInputStreamPtr loadIds(const std::vector<std::uint64_t> ids) = 0;
/// indicates whether the source has been modified since last load* operation
virtual bool isModified() const = 0;
virtual DictionarySourcePtr clone() const = 0;

View File

@ -12,10 +12,11 @@
namespace DB
{
class MysqlBlockInputStream final : public IProfilingBlockInputStream
/// Allows processing results of a MySQL query as a sequence of Blocks, simplifies chaining
class MySQLBlockInputStream final : public IProfilingBlockInputStream
{
public:
MysqlBlockInputStream(mysqlxx::Query query, const Block & sample_block, const std::size_t max_block_size)
MySQLBlockInputStream(mysqlxx::Query query, const Block & sample_block, const std::size_t max_block_size)
: query{std::move(query)}, result{query.use()}, sample_block{sample_block}, max_block_size{max_block_size}
{
types.reserve(sample_block.columns());
@ -24,27 +25,27 @@ public:
{
const auto type = sample_block.getByPosition(idx).type.get();
if (typeid_cast<const DataTypeUInt8 *>(type))
types.push_back(attribute_type::uint8);
types.push_back(AttributeType::uint8);
else if (typeid_cast<const DataTypeUInt16 *>(type))
types.push_back(attribute_type::uint16);
types.push_back(AttributeType::uint16);
else if (typeid_cast<const DataTypeUInt32 *>(type))
types.push_back(attribute_type::uint32);
types.push_back(AttributeType::uint32);
else if (typeid_cast<const DataTypeUInt64 *>(type))
types.push_back(attribute_type::uint64);
types.push_back(AttributeType::uint64);
else if (typeid_cast<const DataTypeInt8 *>(type))
types.push_back(attribute_type::int8);
types.push_back(AttributeType::int8);
else if (typeid_cast<const DataTypeInt16 *>(type))
types.push_back(attribute_type::int16);
types.push_back(AttributeType::int16);
else if (typeid_cast<const DataTypeInt32 *>(type))
types.push_back(attribute_type::int32);
types.push_back(AttributeType::int32);
else if (typeid_cast<const DataTypeInt64 *>(type))
types.push_back(attribute_type::int64);
types.push_back(AttributeType::int64);
else if (typeid_cast<const DataTypeFloat32 *>(type))
types.push_back(attribute_type::float32);
types.push_back(AttributeType::float32);
else if (typeid_cast<const DataTypeInt64 *>(type))
types.push_back(attribute_type::float64);
types.push_back(AttributeType::float64);
else if (typeid_cast<const DataTypeString *>(type))
types.push_back(attribute_type::string);
types.push_back(AttributeType::string);
else
throw Exception{
"Unsupported type " + type->getName(),
@ -53,11 +54,11 @@ public:
}
}
String getName() const override { return "MysqlBlockInputStream"; }
String getName() const override { return "MySQLBlockInputStream"; }
String getID() const override
{
return "Mysql(" + query.str() + ")";
return "MySQL(" + query.str() + ")";
}
private:
@ -67,7 +68,7 @@ private:
if (block.columns() != result.getNumFields())
throw Exception{
"mysqlxx::UserQueryResult contains " + toString(result.getNumFields()) + " columns while " +
"mysqlxx::UseQueryResult contains " + toString(result.getNumFields()) + " columns while " +
toString(block.columns()) + " expected",
ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH
};
@ -86,29 +87,29 @@ private:
return rows == 0 ? Block{} : block;
};
static void insertValue(ColumnPtr & column, const mysqlxx::Value & value, const attribute_type type)
static void insertValue(ColumnPtr & column, const mysqlxx::Value & value, const AttributeType type)
{
switch (type)
{
case attribute_type::uint8: column->insert(static_cast<UInt64>(value)); break;
case attribute_type::uint16: column->insert(static_cast<UInt64>(value)); break;
case attribute_type::uint32: column->insert(static_cast<UInt64>(value)); break;
case attribute_type::uint64: column->insert(static_cast<UInt64>(value)); break;
case attribute_type::int8: column->insert(static_cast<Int64>(value)); break;
case attribute_type::int16: column->insert(static_cast<Int64>(value)); break;
case attribute_type::int32: column->insert(static_cast<Int64>(value)); break;
case attribute_type::int64: column->insert(static_cast<Int64>(value)); break;
case attribute_type::float32: column->insert(static_cast<Float64>(value)); break;
case attribute_type::float64: column->insert(static_cast<Float64>(value)); break;
case attribute_type::string: column->insert(value.getString()); break;
case AttributeType::uint8: column->insert(static_cast<UInt64>(value)); break;
case AttributeType::uint16: column->insert(static_cast<UInt64>(value)); break;
case AttributeType::uint32: column->insert(static_cast<UInt64>(value)); break;
case AttributeType::uint64: column->insert(static_cast<UInt64>(value)); break;
case AttributeType::int8: column->insert(static_cast<Int64>(value)); break;
case AttributeType::int16: column->insert(static_cast<Int64>(value)); break;
case AttributeType::int32: column->insert(static_cast<Int64>(value)); break;
case AttributeType::int64: column->insert(static_cast<Int64>(value)); break;
case AttributeType::float32: column->insert(static_cast<Float64>(value)); break;
case AttributeType::float64: column->insert(static_cast<Float64>(value)); break;
case AttributeType::string: column->insert(value.getString()); break;
}
}
mysqlxx::Query query;
mysqlxx::UseQueryResult result;
Block sample_block;
std::size_t max_block_size;
std::vector<attribute_type> types;
const std::size_t max_block_size;
std::vector<AttributeType> types;
};
}

View File

@ -1,39 +1,42 @@
#pragma once
#include <DB/Dictionaries/IDictionarySource.h>
#include <DB/Dictionaries/MysqlBlockInputStream.h>
#include <DB/Interpreters/Context.h>
#include <DB/Dictionaries/MySQLBlockInputStream.h>
#include <statdaemons/ext/range.hpp>
#include <mysqlxx/Pool.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <strconvert/escape.h>
namespace DB
{
class MysqlDictionarySource final : public IDictionarySource
/// Allows loading dictionaries from a MySQL database
class MySQLDictionarySource final : public IDictionarySource
{
static const auto max_block_size = 8192;
public:
MysqlDictionarySource(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix,
Block & sample_block, const Context & context)
MySQLDictionarySource(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix,
Block & sample_block)
: table{config.getString(config_prefix + ".table")},
sample_block{sample_block}, context(context),
sample_block{sample_block},
pool{config, config_prefix},
load_all_query{composeLoadAllQuery(sample_block, table)},
last_modification{getLastModification()}
{}
MysqlDictionarySource(const MysqlDictionarySource & other)
/// copy-constructor is provided in order to support cloneability
MySQLDictionarySource(const MySQLDictionarySource & other)
: table{other.table},
sample_block{other.sample_block}, context(other.context),
sample_block{other.sample_block},
pool{other.pool},
load_all_query{other.load_all_query}, last_modification{other.last_modification}
{}
BlockInputStreamPtr loadAll() override
{
return new MysqlBlockInputStream{pool.Get()->query(load_all_query), sample_block, max_block_size};
last_modification = getLastModification();
return new MySQLBlockInputStream{pool.Get()->query(load_all_query), sample_block, max_block_size};
}
BlockInputStreamPtr loadId(const std::uint64_t id) override
@ -53,8 +56,9 @@ public:
}
bool isModified() const override { return getLastModification() > last_modification; }
bool supportsSelectiveLoad() const override { return true; }
DictionarySourcePtr clone() const override { return ext::make_unique<MysqlDictionarySource>(*this); }
DictionarySourcePtr clone() const override { return std::make_unique<MySQLDictionarySource>(*this); }
private:
mysqlxx::DateTime getLastModification() const
@ -64,20 +68,23 @@ private:
try
{
auto connection = pool.Get();
auto query = connection->query("SHOW TABLE STATUS LIKE '%" + table + "%';");
auto query = connection->query("SHOW TABLE STATUS LIKE '%" + strconvert::escaped_for_like(table) + "%';");
auto result = query.use();
auto row = result.fetch();
const auto & update_time = row[Update_time_idx];
return !update_time.isNull() ? update_time.getDateTime() : mysqlxx::DateTime{std::time(nullptr)};
if (!update_time.isNull())
return update_time.getDateTime();
}
catch (...)
{
tryLogCurrentException("MysqlDictionarySource");
tryLogCurrentException("MySQLDictionarySource");
}
return {};
/// we suppose failure to get modification time is not an error, therefore return current time
return mysqlxx::DateTime{std::time(nullptr)};
}
/// @todo escape table and column names
static std::string composeLoadAllQuery(const Block & block, const std::string & table)
{
std::string query{"SELECT "};
@ -99,7 +106,6 @@ private:
const std::string table;
Block sample_block;
const Context & context;
mutable mysqlxx::PoolWithFailover pool;
const std::string load_all_query;
mysqlxx::DateTime last_modification;

View File

@ -7,6 +7,9 @@
namespace DB
{
/** Provides reading from a Buffer, taking exclusive ownership over it's lifetime,
* simplifies usage of ReadBufferFromFile (no need to manage buffer lifetime) etc.
*/
class OwningBufferBlockInputStream : public IProfilingBlockInputStream
{
public:
@ -21,9 +24,7 @@ private:
String getName() const override { return "OwningBufferBlockInputStream"; }
String getID() const override {
return "OwningBuffer(" + stream->getID() + ")";
}
String getID() const override { return "OwningBuffer(" + stream->getID() + ")"; }
BlockInputStreamPtr stream;
std::unique_ptr<ReadBuffer> buffer;

View File

@ -1,15 +0,0 @@
#pragma once
#include <memory>
namespace DB
{
template <typename T> struct release
{
void operator()(const T * const ptr) { ptr->release(); }
};
template <typename T> using config_ptr_t = std::unique_ptr<T, release<T>>;
}

View File

@ -14,6 +14,7 @@
#include <statdaemons/ext/range.hpp>
#include <DB/Dictionaries/FlatDictionary.h>
#include <DB/Dictionaries/HashedDictionary.h>
#include <DB/Dictionaries/CacheDictionary.h>
namespace DB
@ -35,6 +36,19 @@ namespace DB
*
* Получить массив идентификаторов регионов, состоящий из исходного и цепочки родителей. Порядок implementation defined.
* regionHierarchy, OSHierarchy, SEHierarchy.
*
* Функции, использующие подключаемые (внешние) словари.
*
* Получить значение аттрибута заданного типа.
* dictGetType(dictionary, attribute, id),
* Type - placeholder для имени типа, в данный момент поддерживаются любые числовые и строковой типы.
* Тип должен соответствовать реальному типу аттрибута, с которым он был объявлен в структуре словаря.
*
* Получить массив идентификаторов, состоящий из исходного и цепочки родителей.
* dictGetHierarchy(dictionary, id).
*
* Является ли первы йидентификатор потомком второго.
* dictIsIn(dictionary, child_id, parent_id).
*/
@ -726,10 +740,10 @@ public:
static IFunction * create(const Context & context)
{
return new FunctionDictGetString{context.getDictionaries()};
return new FunctionDictGetString{context.getExternalDictionaries()};
};
FunctionDictGetString(const Dictionaries & dictionaries) : dictionaries(dictionaries) {}
FunctionDictGetString(const ExternalDictionaries & dictionaries) : dictionaries(dictionaries) {}
String getName() const override { return name; }
@ -746,7 +760,8 @@ private:
if (!typeid_cast<const DataTypeString *>(arguments[0].get()))
{
throw Exception{
"Illegal type " + arguments[0]->getName() + " of argument of function " + getName(),
"Illegal type " + arguments[0]->getName() + " of first argument of function " + getName()
+ ", expected a string.",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT
};
}
@ -754,7 +769,8 @@ private:
if (!typeid_cast<const DataTypeString *>(arguments[1].get()))
{
throw Exception{
"Illegal type " + arguments[1]->getName() + " of argument of function " + getName(),
"Illegal type " + arguments[1]->getName() + " of second argument of function " + getName()
+ ", expected a string.",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT
};
}
@ -770,7 +786,8 @@ private:
!typeid_cast<const DataTypeInt64 *>(id_arg))
{
throw Exception{
"Illegal type " + arguments[2]->getName() + " of argument of function " + getName(),
"Illegal type " + arguments[2]->getName() + " of third argument of function " + getName()
+ ", expected an integer.",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT
};
}
@ -787,11 +804,12 @@ private:
ErrorCodes::ILLEGAL_COLUMN
};
auto dict = dictionaries.getExternalDictionary(dict_name_col->getData());
auto dict = dictionaries.getDictionary(dict_name_col->getData());
const auto dict_ptr = dict.get();
if (!executeDispatch<FlatDictionary>(block, arguments, result, dict_ptr) &&
!executeDispatch<HashedDictionary>(block, arguments, result, dict_ptr))
!executeDispatch<HashedDictionary>(block, arguments, result, dict_ptr) &&
!executeDispatch<CacheDictionary>(block, arguments, result, dict_ptr))
throw Exception{
"Unsupported dictionary type " + dict_ptr->getTypeName(),
ErrorCodes::UNKNOWN_TYPE
@ -871,7 +889,7 @@ private:
return false;
}
const Dictionaries & dictionaries;
const ExternalDictionaries & dictionaries;
};
@ -911,10 +929,10 @@ public:
static IFunction * create(const Context & context)
{
return new FunctionDictGet{context.getDictionaries()};
return new FunctionDictGet{context.getExternalDictionaries()};
};
FunctionDictGet(const Dictionaries & dictionaries) : dictionaries(dictionaries) {}
FunctionDictGet(const ExternalDictionaries & dictionaries) : dictionaries(dictionaries) {}
String getName() const override { return name; }
@ -931,7 +949,8 @@ private:
if (!typeid_cast<const DataTypeString *>(arguments[0].get()))
{
throw Exception{
"Illegal type " + arguments[0]->getName() + " of argument of function " + getName(),
"Illegal type " + arguments[0]->getName() + " of first argument of function " + getName()
+ ", expected a string.",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT
};
}
@ -939,7 +958,8 @@ private:
if (!typeid_cast<const DataTypeString *>(arguments[1].get()))
{
throw Exception{
"Illegal type " + arguments[1]->getName() + " of argument of function " + getName(),
"Illegal type " + arguments[1]->getName() + " of second argument of function " + getName()
+ ", expected a string.",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT
};
}
@ -955,7 +975,8 @@ private:
!typeid_cast<const DataTypeInt64 *>(id_arg))
{
throw Exception{
"Illegal type " + id_arg->getName() + " of argument of function " + getName(),
"Illegal type " + id_arg->getName() + " of third argument of function " + getName()
+ ", expected an integer.",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT
};
}
@ -972,11 +993,12 @@ private:
ErrorCodes::ILLEGAL_COLUMN
};
auto dict = dictionaries.getExternalDictionary(dict_name_col->getData());
auto dict = dictionaries.getDictionary(dict_name_col->getData());
const auto dict_ptr = dict.get();
if (!executeDispatch<FlatDictionary>(block, arguments, result, dict_ptr) &&
!executeDispatch<HashedDictionary>(block, arguments, result, dict_ptr))
!executeDispatch<HashedDictionary>(block, arguments, result, dict_ptr) &&
!executeDispatch<CacheDictionary>(block, arguments, result, dict_ptr))
throw Exception{
"Unsupported dictionary type " + dict_ptr->getTypeName(),
ErrorCodes::UNKNOWN_TYPE
@ -1058,7 +1080,7 @@ private:
return false;
}
const Dictionaries & dictionaries;
const ExternalDictionaries & dictionaries;
};
template <typename DataType>
@ -1084,10 +1106,10 @@ public:
static IFunction * create(const Context & context)
{
return new FunctionDictGetHierarchy{context.getDictionaries()};
return new FunctionDictGetHierarchy{context.getExternalDictionaries()};
};
FunctionDictGetHierarchy(const Dictionaries & dictionaries) : dictionaries(dictionaries) {}
FunctionDictGetHierarchy(const ExternalDictionaries & dictionaries) : dictionaries(dictionaries) {}
String getName() const override { return name; }
@ -1104,7 +1126,8 @@ private:
if (!typeid_cast<const DataTypeString *>(arguments[0].get()))
{
throw Exception{
"Illegal type " + arguments[0]->getName() + " of argument of function " + getName(),
"Illegal type " + arguments[0]->getName() + " of first argument of function " + getName()
+ ", expected a string.",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT
};
}
@ -1120,7 +1143,8 @@ private:
!typeid_cast<const DataTypeInt64 *>(id_arg))
{
throw Exception{
"Illegal type " + id_arg->getName() + " of argument of function " + getName(),
"Illegal type " + id_arg->getName() + " of second argument of function " + getName()
+ ", expected an integer.",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT
};
}
@ -1137,7 +1161,7 @@ private:
ErrorCodes::ILLEGAL_COLUMN
};
auto dict = dictionaries.getExternalDictionary(dict_name_col->getData());
auto dict = dictionaries.getDictionary(dict_name_col->getData());
const auto dict_ptr = dict.get();
if (!dict->hasHierarchy())
@ -1147,7 +1171,8 @@ private:
};
if (!executeDispatch<FlatDictionary>(block, arguments, result, dict_ptr) &&
!executeDispatch<HashedDictionary>(block, arguments, result, dict_ptr))
!executeDispatch<HashedDictionary>(block, arguments, result, dict_ptr) &&
!executeDispatch<CacheDictionary>(block, arguments, result, dict_ptr))
throw Exception{
"Unsupported dictionary type " + dict_ptr->getTypeName(),
ErrorCodes::UNKNOWN_TYPE
@ -1234,7 +1259,7 @@ private:
return false;
}
const Dictionaries & dictionaries;
const ExternalDictionaries & dictionaries;
};
@ -1245,10 +1270,10 @@ public:
static IFunction * create(const Context & context)
{
return new FunctionDictIsIn{context.getDictionaries()};
return new FunctionDictIsIn{context.getExternalDictionaries()};
};
FunctionDictIsIn(const Dictionaries & dictionaries) : dictionaries(dictionaries) {}
FunctionDictIsIn(const ExternalDictionaries & dictionaries) : dictionaries(dictionaries) {}
String getName() const override { return name; }
@ -1265,7 +1290,8 @@ private:
if (!typeid_cast<const DataTypeString *>(arguments[0].get()))
{
throw Exception{
"Illegal type " + arguments[0]->getName() + " of argument of function " + getName(),
"Illegal type " + arguments[0]->getName() + " of first argument of function " + getName()
+ ", expected a string.",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT
};
}
@ -1281,7 +1307,8 @@ private:
!typeid_cast<const DataTypeInt64 *>(child_id_arg))
{
throw Exception{
"Illegal type " + child_id_arg->getName() + " of argument of function " + getName(),
"Illegal type " + child_id_arg->getName() + " of second argument of function " + getName()
+ ", expected an integer.",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT
};
}
@ -1297,7 +1324,8 @@ private:
!typeid_cast<const DataTypeInt64 *>(ancestor_id_arg))
{
throw Exception{
"Illegal type " + ancestor_id_arg->getName() + " of argument of function " + getName(),
"Illegal type " + ancestor_id_arg->getName() + " of argument of third function " + getName()
+ ", expected an integer.",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT
};
}
@ -1314,7 +1342,7 @@ private:
ErrorCodes::ILLEGAL_COLUMN
};
auto dict = dictionaries.getExternalDictionary(dict_name_col->getData());
auto dict = dictionaries.getDictionary(dict_name_col->getData());
const auto dict_ptr = dict.get();
if (!dict->hasHierarchy())
@ -1324,7 +1352,8 @@ private:
};
if (!executeDispatch<FlatDictionary>(block, arguments, result, dict_ptr) &&
!executeDispatch<FlatDictionary>(block, arguments, result, dict_ptr))
!executeDispatch<HashedDictionary>(block, arguments, result, dict_ptr) &&
!executeDispatch<CacheDictionary>(block, arguments, result, dict_ptr))
throw Exception{
"Unsupported dictionary type " + dict_ptr->getTypeName(),
ErrorCodes::UNKNOWN_TYPE
@ -1470,7 +1499,7 @@ private:
return false;
}
const Dictionaries & dictionaries;
const ExternalDictionaries & dictionaries;
};

View File

@ -267,7 +267,7 @@ public:
throw Exception("Illegal column " + col->getName() + " of first argument of function " + getName() + ". Must be constant string.",
ErrorCodes::ILLEGAL_COLUMN);
re = Regexps::get(col->getData());
re = Regexps::get<false, false>(col->getData());
capture = re->getNumberOfSubpatterns() > 0 ? 1 : 0;
matches.resize(capture + 1);

View File

@ -3,7 +3,7 @@
#include <Poco/Mutex.h>
#include <statdaemons/OptimizedRegularExpression.h>
#include <statdaemons/ext/memory.hpp>
#include <memory>
#include <DB/DataTypes/DataTypesNumberFixed.h>
#include <DB/DataTypes/DataTypeString.h>
@ -299,24 +299,24 @@ namespace Regexps
}
template <bool like>
inline Regexp createRegexp(const std::string & pattern) { return pattern; }
inline Regexp createRegexp(const std::string & pattern, int flags) { return {pattern, flags}; }
template <>
inline Regexp createRegexp<true>(const std::string & pattern) { return likePatternToRegexp(pattern); }
inline Regexp createRegexp<true>(const std::string & pattern, int flags) { return {likePatternToRegexp(pattern), flags}; }
template <bool like = false>
template <bool like, bool no_capture>
inline Pointer get(const std::string & pattern)
{
/// C++11 has thread-safe function-local statics on most modern compilers.
static KnownRegexps known_regexps;
static KnownRegexps known_regexps; /// Разные переменные для разных параметров шаблона.
static std::mutex mutex;
std::lock_guard<std::mutex> lock{mutex};
auto it = known_regexps.find(pattern);
if (known_regexps.end() == it)
it = known_regexps.emplace(pattern, ext::make_unique<Holder>()).first;
it = known_regexps.emplace(pattern, std::make_unique<Holder>()).first;
return it->second->get([&pattern] {
return new Regexp{createRegexp<like>(pattern)};
return new Regexp{createRegexp<like>(pattern, no_capture ? OptimizedRegularExpression::RE_NO_CAPTURE : 0)};
});
}
}
@ -373,7 +373,7 @@ struct MatchImpl
}
else
{
const auto & regexp = Regexps::get<like>(pattern);
const auto & regexp = Regexps::get<like, true>(pattern);
size_t size = offsets.size();
for (size_t i = 0; i < size; ++i)
res[i] = revert ^ regexp->match(reinterpret_cast<const char *>(&data[i != 0 ? offsets[i - 1] : 0]), (i != 0 ? offsets[i] - offsets[i - 1] : offsets[0]) - 1);
@ -382,7 +382,7 @@ struct MatchImpl
static void constant(const std::string & data, const std::string & pattern, UInt8 & res)
{
const auto & regexp = Regexps::get<like>(pattern);
const auto & regexp = Regexps::get<like, true>(pattern);
res = revert ^ regexp->match(data);
}
};
@ -397,7 +397,7 @@ struct ExtractImpl
res_data.reserve(data.size() / 5);
res_offsets.resize(offsets.size());
const auto & regexp = Regexps::get(pattern);
const auto & regexp = Regexps::get<false, false>(pattern);
unsigned capture = regexp->getNumberOfSubpatterns() > 0 ? 1 : 0;
OptimizedRegularExpression::MatchVec matches;

View File

@ -23,6 +23,7 @@
#include <DB/Interpreters/Users.h>
#include <DB/Interpreters/Quota.h>
#include <DB/Interpreters/Dictionaries.h>
#include <DB/Interpreters/ExternalDictionaries.h>
#include <DB/Interpreters/ProcessList.h>
#include <DB/Interpreters/Cluster.h>
#include <DB/Interpreters/InterserverIOHandler.h>
@ -87,6 +88,7 @@ struct ContextShared
DataTypeFactory data_type_factory; /// Типы данных.
FormatFactory format_factory; /// Форматы.
mutable SharedPtr<Dictionaries> dictionaries; /// Словари Метрики. Инициализируются лениво.
mutable SharedPtr<ExternalDictionaries> external_dictionaries;
Users users; /// Известные пользователи.
Quotas quotas; /// Известные квоты на использование ресурсов.
mutable UncompressedCachePtr uncompressed_cache; /// Кэш разжатых блоков.
@ -259,6 +261,7 @@ public:
const DataTypeFactory & getDataTypeFactory() const { return shared->data_type_factory; }
const FormatFactory & getFormatFactory() const { return shared->format_factory; }
const Dictionaries & getDictionaries() const;
const ExternalDictionaries & getExternalDictionaries() const;
InterserverIOHandler & getInterserverIOHandler() { return shared->interserver_io_handler; }

View File

@ -1,13 +1,5 @@
#pragma once
#include <mutex>
#include <thread>
#include <unordered_map>
#include <chrono>
#include <random>
#include <Poco/SharedPtr.h>
#include <Yandex/MultiVersion.h>
#include <Yandex/logger_useful.h>
#include <statdaemons/RegionsHierarchies.h>
@ -18,10 +10,7 @@
namespace DB
{
using Poco::SharedPtr;
class Context;
class IDictionary;
/// Словари Метрики, которые могут использоваться в функциях.
@ -31,22 +20,15 @@ private:
MultiVersion<RegionsHierarchies> regions_hierarchies;
MultiVersion<TechDataHierarchy> tech_data_hierarchy;
MultiVersion<RegionsNames> regions_names;
mutable std::mutex external_dictionaries_mutex;
std::unordered_map<std::string, std::shared_ptr<MultiVersion<IDictionary>>> external_dictionaries;
std::unordered_map<std::string, std::chrono::system_clock::time_point> update_times;
std::mt19937 rnd_engine;
Context & context;
/// Периодичность обновления справочников, в секундах.
int reload_period;
std::thread reloading_thread;
std::thread reloading_externals_thread;
Poco::Event destroy{false};
Poco::Event destroy;
Logger * log;
Poco::Timestamp dictionaries_last_modified{0};
void handleException() const
@ -122,7 +104,6 @@ private:
}
void reloadExternals();
/// Обновляет каждые reload_period секунд.
void reloadPeriodically()
@ -136,35 +117,19 @@ private:
}
}
void reloadExternalsPeriodically()
{
const auto check_period = 5 * 1000;
while (true)
{
if (destroy.tryWait(check_period))
return;
reloadExternals();
}
}
public:
/// Справочники будут обновляться в отдельном потоке, каждые reload_period секунд.
Dictionaries(Context & context, int reload_period_ = 3600)
: context(context), reload_period(reload_period_),
log(&Logger::get("Dictionaries"))
Dictionaries(int reload_period_ = 3600)
: reload_period(reload_period_), log(&Logger::get("Dictionaries"))
{
reloadImpl();
reloadExternals();
reloading_thread = std::thread([this] { reloadPeriodically(); });
reloading_externals_thread = std::thread{&Dictionaries::reloadExternalsPeriodically, this};
}
~Dictionaries()
{
destroy.set();
reloading_thread.join();
reloading_externals_thread.join();
}
MultiVersion<RegionsHierarchies>::Version getRegionsHierarchies() const
@ -181,19 +146,6 @@ public:
{
return regions_names.get();
}
MultiVersion<IDictionary>::Version getExternalDictionary(const std::string & name) const
{
const std::lock_guard<std::mutex> lock{external_dictionaries_mutex};
const auto it = external_dictionaries.find(name);
if (it == std::end(external_dictionaries))
throw Exception{
"No such dictionary: " + name,
ErrorCodes::BAD_ARGUMENTS
};
return it->second->get();
}
};
}

View File

@ -0,0 +1,122 @@
#pragma once
#include <DB/Core/Exception.h>
#include <DB/Core/ErrorCodes.h>
#include <Yandex/MultiVersion.h>
#include <Yandex/logger_useful.h>
#include <Poco/Event.h>
#include <time.h>
#include <mutex>
#include <thread>
#include <unordered_map>
#include <chrono>
#include <random>
#include <unistd.h>
namespace DB
{
class Context;
class IDictionary;
/** Manages user-defined dictionaries.
* Monitors configuration file and automatically reloads dictionaries in a separate thread.
* The monitoring thread wakes up every @check_period_sec seconds and checks
* modification time of dictionaries' configuration file. If said time is greater than
* @config_last_modified, the dictionaries are created from scratch using configuration file,
* possibly overriding currently existing dictionaries with the same name (previous versions of
* overridden dictionaries will live as long as there are any users retaining them).
*
* Apart from checking configuration file for modifications, each non-cached dictionary
* has a lifetime of its own and may be updated if it's source reports that it has been
* modified. The time of next update is calculated by choosing uniformly a random number
* distributed between lifetime.min_sec and lifetime.max_sec.
* If either of lifetime.min_sec and lifetime.max_sec is zero, such dictionary is never updated.
*/
class ExternalDictionaries
{
private:
static const auto check_period_sec = 5;
mutable std::mutex dictionaries_mutex;
std::unordered_map<std::string, std::shared_ptr<MultiVersion<IDictionary>>> dictionaries;
std::unordered_map<std::string, std::chrono::system_clock::time_point> update_times;
std::mt19937_64 rnd_engine{getSeed()};
Context & context;
std::thread reloading_thread;
Poco::Event destroy;
Logger * log;
Poco::Timestamp config_last_modified{0};
void handleException() const
{
try
{
throw;
}
catch (const Poco::Exception & e)
{
LOG_ERROR(log, "Cannot load exter dictionary! You must resolve this manually. " << e.displayText());
return;
}
catch (...)
{
LOG_ERROR(log, "Cannot load dictionary! You must resolve this manually.");
return;
}
}
void reloadImpl();
void reloadPeriodically()
{
while (true)
{
if (destroy.tryWait(check_period_sec * 1000))
return;
reloadImpl();
}
}
static std::uint64_t getSeed()
{
timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
return ts.tv_nsec ^ getpid();
}
public:
/// Справочники будут обновляться в отдельном потоке, каждые reload_period секунд.
ExternalDictionaries(Context & context)
: context(context), log(&Logger::get("ExternalDictionaries"))
{
reloadImpl();
reloading_thread = std::thread{&ExternalDictionaries::reloadPeriodically, this};
}
~ExternalDictionaries()
{
destroy.set();
reloading_thread.join();
}
MultiVersion<IDictionary>::Version getDictionary(const std::string & name) const
{
const std::lock_guard<std::mutex> lock{dictionaries_mutex};
const auto it = dictionaries.find(name);
if (it == std::end(dictionaries))
throw Exception{
"No such dictionary: " + name,
ErrorCodes::BAD_ARGUMENTS
};
return it->second->get();
}
};
}

View File

@ -87,6 +87,9 @@ struct Limits
\
/** Максимальное использование памяти при обработке запроса. 0 - не ограничено. */ \
M(SettingUInt64, max_memory_usage, 0) \
\
/** Максимальная скорость обмена данными по сети в байтах в секунду. 0 - не ограничена. */ \
M(SettingUInt64, max_network_bandwidth, 0) \
#define DECLARE(TYPE, NAME, DEFAULT) \
TYPE NAME {DEFAULT};

View File

@ -16,7 +16,7 @@ inline void evaluateMissingDefaults(Block & block,
if (column_defaults.empty())
return;
ASTPtr default_expr_list{ext::make_unique<ASTExpressionList>().release()};
ASTPtr default_expr_list{std::make_unique<ASTExpressionList>().release()};
for (const auto & column : required_columns)
{

View File

@ -10,7 +10,7 @@
#include <DB/Interpreters/InterpreterInsertQuery.h>
#include <statdaemons/Increment.h>
#include <statdaemons/ext/memory.hpp>
#include <memory>
#include <Yandex/Revision.h>
#include <iostream>

View File

@ -14,7 +14,7 @@
#include <DB/Storages/AlterCommands.h>
#include <Poco/File.h>
#include <Poco/RWLock.h>
#include <statdaemons/ext/memory.hpp>
#include <memory>
namespace DB
@ -129,7 +129,7 @@ public:
*/
TableDataWriteLockPtr lockDataForAlter()
{
auto res = ext::make_unique<Poco::ScopedWriteRWLock>(data_lock);
auto res = std::make_unique<Poco::ScopedWriteRWLock>(data_lock);
if (is_dropped)
throw Exception("Table is dropped", ErrorCodes::TABLE_IS_DROPPED);
return res;
@ -137,7 +137,7 @@ public:
TableStructureWriteLockPtr lockStructureForAlter()
{
auto res = ext::make_unique<Poco::ScopedWriteRWLock>(structure_lock);
auto res = std::make_unique<Poco::ScopedWriteRWLock>(structure_lock);
if (is_dropped)
throw Exception("Table is dropped", ErrorCodes::TABLE_IS_DROPPED);
return res;

View File

@ -1,7 +1,7 @@
#pragma once
#include <statdaemons/Stopwatch.h>
#include <statdaemons/ext/memory.hpp>
#include <memory>
#include <list>
#include <mutex>
#include <atomic>
@ -67,7 +67,7 @@ public:
EntryPtr insert(Args &&... args)
{
std::lock_guard<std::mutex> lock{mutex};
return ext::make_unique<Entry>(*this, merges.emplace(merges.end(), std::forward<Args>(args)...));
return std::make_unique<Entry>(*this, merges.emplace(merges.end(), std::forward<Args>(args)...));
}
container_t get() const

View File

@ -9,7 +9,7 @@
#include <DB/Parsers/ASTSubquery.h>
#include <DB/Parsers/formatAST.h>
#include <DB/Common/escapeForFileName.h>
#include <statdaemons/ext/memory.hpp>
#include <memory>
#include <unordered_map>
#include <map>
#include <limits>

View File

@ -92,9 +92,8 @@ private:
/// Отправляем на первый попавшийся шард
BlockInputStreamPtr input{
new RemoteBlockInputStream{
cluster.pools.front().get(), query, &settings,
Tables(), QueryProcessingStage::Complete, context
}
cluster.pools.front().get(), query, &settings, nullptr,
Tables(), QueryProcessingStage::Complete, context}
};
input->readPrefix();

View File

@ -277,15 +277,22 @@ void Connection::sendData(const Block & block, const String & name)
if (server_revision >= DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES)
writeStringBinary(name, *out);
size_t prev_bytes = out->count();
block.checkNestedArraysOffsets();
block_out->write(block);
maybe_compressed_out->next();
out->next();
if (throttler)
throttler->add(out->count() - prev_bytes);
}
void Connection::sendPreparedData(ReadBuffer & input, size_t size, const String & name)
{
/// NOTE В этом методе не используется throttler (хотя можно использовать, но это пока не важно).
writeVarUInt(Protocol::Client::Data, *out);
if (server_revision >= DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES)
@ -436,8 +443,15 @@ Block Connection::receiveData()
if (server_revision >= DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES)
readStringBinary(external_table_name, *in);
size_t prev_bytes = in->count();
/// Прочитать из сети один блок
return block_in->read();
Block res = block_in->read();
if (throttler)
throttler->add(in->count() - prev_bytes);
return res;
}

View File

@ -2,112 +2,163 @@
namespace DB
{
ParallelReplicas::ParallelReplicas(Connection * connection_, Settings * settings_)
: settings(settings_),
active_replica_count(1),
supports_parallel_execution(false)
ParallelReplicas::ParallelReplicas(Connection * connection_, Settings * settings_, ThrottlerPtr throttler_)
: settings(settings_), throttler(throttler_),
active_replica_count(1),
supports_parallel_execution(false)
{
registerReplica(connection_);
}
ParallelReplicas::ParallelReplicas(IConnectionPool * pool_, Settings * settings_, ThrottlerPtr throttler_)
: settings(settings_), throttler(throttler_)
{
if (pool_ == nullptr)
throw Exception("Null pool specified", ErrorCodes::LOGICAL_ERROR);
bool has_many_replicas = (settings != nullptr) && (settings->max_parallel_replicas > 1);
if (has_many_replicas)
{
registerReplica(connection_);
pool_entries = pool_->getMany(settings);
active_replica_count = pool_entries.size();
supports_parallel_execution = (active_replica_count > 1);
if (active_replica_count == 0)
throw Exception("No connection available", ErrorCodes::LOGICAL_ERROR);
replica_map.reserve(active_replica_count);
for (auto & entry : pool_entries)
registerReplica(&*entry);
}
ParallelReplicas::ParallelReplicas(IConnectionPool * pool_, Settings * settings_)
: settings(settings_)
else
{
if (pool_ == nullptr)
throw Exception("Null pool specified", ErrorCodes::LOGICAL_ERROR);
active_replica_count = 1;
supports_parallel_execution = false;
bool has_many_replicas = (settings != nullptr) && (settings->max_parallel_replicas > 1);
if (has_many_replicas)
{
pool_entries = pool_->getMany(settings);
active_replica_count = pool_entries.size();
supports_parallel_execution = (active_replica_count > 1);
if (active_replica_count == 0)
throw Exception("No connection available", ErrorCodes::LOGICAL_ERROR);
replica_map.reserve(active_replica_count);
for (auto & entry : pool_entries)
registerReplica(&*entry);
}
else
{
active_replica_count = 1;
supports_parallel_execution = false;
pool_entry = pool_->get(settings);
registerReplica(&*pool_entry);
}
pool_entry = pool_->get(settings);
registerReplica(&*pool_entry);
}
}
void ParallelReplicas::sendExternalTablesData(std::vector<ExternalTablesData> & data)
void ParallelReplicas::sendExternalTablesData(std::vector<ExternalTablesData> & data)
{
if (!sent_query)
throw Exception("Cannot send external tables data: query not yet sent.", ErrorCodes::LOGICAL_ERROR);
if (data.size() < active_replica_count)
throw Exception("Mismatch between replicas and data sources", ErrorCodes::MISMATCH_REPLICAS_DATA_SOURCES);
auto it = data.begin();
for (auto & e : replica_map)
{
if (!sent_query)
throw Exception("Cannot send external tables data: query not yet sent.", ErrorCodes::LOGICAL_ERROR);
Connection * connection = e.second;
if (connection != nullptr)
connection->sendExternalTablesData(*it);
++it;
}
}
if (data.size() < active_replica_count)
throw Exception("Mismatch between replicas and data sources", ErrorCodes::MISMATCH_REPLICAS_DATA_SOURCES);
void ParallelReplicas::sendQuery(const String & query, const String & query_id, UInt64 stage, bool with_pending_data)
{
if (sent_query)
throw Exception("Query already sent.", ErrorCodes::LOGICAL_ERROR);
if (supports_parallel_execution)
{
Settings query_settings = *settings;
query_settings.parallel_replicas_count = active_replica_count;
UInt64 offset = 0;
auto it = data.begin();
for (auto & e : replica_map)
{
Connection * connection = e.second;
if (connection != nullptr)
connection->sendExternalTablesData(*it);
++it;
}
}
void ParallelReplicas::sendQuery(const String & query, const String & query_id, UInt64 stage, bool with_pending_data)
{
if (sent_query)
throw Exception("Query already sent.", ErrorCodes::LOGICAL_ERROR);
if (supports_parallel_execution)
{
Settings query_settings = *settings;
query_settings.parallel_replicas_count = active_replica_count;
UInt64 offset = 0;
for (auto & e : replica_map)
{
Connection * connection = e.second;
if (connection != nullptr)
{
query_settings.parallel_replica_offset = offset;
connection->sendQuery(query, query_id, stage, &query_settings, with_pending_data);
++offset;
}
}
if (offset > 0)
sent_query = true;
}
else
{
auto it = replica_map.begin();
Connection * connection = it->second;
if (connection != nullptr)
{
connection->sendQuery(query, query_id, stage, settings, with_pending_data);
sent_query = true;
query_settings.parallel_replica_offset = offset;
connection->sendQuery(query, query_id, stage, &query_settings, with_pending_data);
++offset;
}
}
if (offset > 0)
sent_query = true;
}
Connection::Packet ParallelReplicas::receivePacket()
else
{
if (!sent_query)
throw Exception("Cannot receive packets: no query sent.", ErrorCodes::LOGICAL_ERROR);
if (!hasActiveReplicas())
throw Exception("No more packets are available.", ErrorCodes::LOGICAL_ERROR);
auto it = getReplicaForReading();
if (it == replica_map.end())
throw Exception("No available replica", ErrorCodes::NO_AVAILABLE_REPLICA);
auto it = replica_map.begin();
Connection * connection = it->second;
Connection::Packet packet = connection->receivePacket();
if (connection != nullptr)
{
connection->sendQuery(query, query_id, stage, settings, with_pending_data);
sent_query = true;
}
}
}
Connection::Packet ParallelReplicas::receivePacket()
{
if (!sent_query)
throw Exception("Cannot receive packets: no query sent.", ErrorCodes::LOGICAL_ERROR);
if (!hasActiveReplicas())
throw Exception("No more packets are available.", ErrorCodes::LOGICAL_ERROR);
auto it = getReplicaForReading();
if (it == replica_map.end())
throw Exception("No available replica", ErrorCodes::NO_AVAILABLE_REPLICA);
Connection * connection = it->second;
Connection::Packet packet = connection->receivePacket();
switch (packet.type)
{
case Protocol::Server::Data:
case Protocol::Server::Progress:
case Protocol::Server::ProfileInfo:
case Protocol::Server::Totals:
case Protocol::Server::Extremes:
break;
case Protocol::Server::EndOfStream:
invalidateReplica(it);
break;
case Protocol::Server::Exception:
default:
connection->disconnect();
invalidateReplica(it);
break;
}
return packet;
}
void ParallelReplicas::sendCancel()
{
if (!sent_query || cancelled)
throw Exception("Cannot cancel. Either no query sent or already cancelled.", ErrorCodes::LOGICAL_ERROR);
for (auto & e : replica_map)
{
Connection * connection = e.second;
if (connection != nullptr)
connection->sendCancel();
}
cancelled = true;
}
Connection::Packet ParallelReplicas::drain()
{
if (!cancelled)
throw Exception("Cannot drain connections: cancel first.", ErrorCodes::LOGICAL_ERROR);
Connection::Packet res;
res.type = Protocol::Server::EndOfStream;
while (hasActiveReplicas())
{
Connection::Packet packet = receivePacket();
switch (packet.type)
{
@ -116,153 +167,109 @@ namespace DB
case Protocol::Server::ProfileInfo:
case Protocol::Server::Totals:
case Protocol::Server::Extremes:
break;
case Protocol::Server::EndOfStream:
invalidateReplica(it);
break;
case Protocol::Server::Exception:
default:
connection->disconnect();
invalidateReplica(it);
/// Если мы получили исключение или неизвестный пакет, сохраняем его.
res = packet;
break;
}
return packet;
}
void ParallelReplicas::sendCancel()
return res;
}
std::string ParallelReplicas::dumpAddresses() const
{
bool is_first = true;
std::ostringstream os;
for (auto & e : replica_map)
{
if (!sent_query || cancelled)
throw Exception("Cannot cancel. Either no query sent or already cancelled.", ErrorCodes::LOGICAL_ERROR);
const Connection * connection = e.second;
if (connection != nullptr)
{
os << (is_first ? "" : "; ") << connection->getServerAddress();
if (is_first) { is_first = false; }
}
}
return os.str();
}
void ParallelReplicas::registerReplica(Connection * connection)
{
if (connection == nullptr)
throw Exception("Invalid connection specified in parameter.", ErrorCodes::LOGICAL_ERROR);
auto res = replica_map.insert(std::make_pair(connection->socket.impl()->sockfd(), connection));
if (!res.second)
throw Exception("Invalid set of connections.", ErrorCodes::LOGICAL_ERROR);
if (throttler)
connection->setThrottler(throttler);
}
ParallelReplicas::ReplicaMap::iterator ParallelReplicas::getReplicaForReading()
{
ReplicaMap::iterator it;
if (supports_parallel_execution)
it = waitForReadEvent();
else
{
it = replica_map.begin();
if (it->second == nullptr)
it = replica_map.end();
}
return it;
}
ParallelReplicas::ReplicaMap::iterator ParallelReplicas::waitForReadEvent()
{
Poco::Net::Socket::SocketList read_list;
read_list.reserve(active_replica_count);
/** Сначала проверяем, есть ли данные, которые уже лежат в буфере
* хоть одного соединения.
*/
for (auto & e : replica_map)
{
Connection * connection = e.second;
if ((connection != nullptr) && connection->hasReadBufferPendingData())
read_list.push_back(connection->socket);
}
/** Если не было найдено никаких данных, то проверяем, есть ли соединения
* готовые для чтения.
*/
if (read_list.empty())
{
Poco::Net::Socket::SocketList write_list;
Poco::Net::Socket::SocketList except_list;
for (auto & e : replica_map)
{
Connection * connection = e.second;
if (connection != nullptr)
connection->sendCancel();
}
cancelled = true;
}
Connection::Packet ParallelReplicas::drain()
{
if (!cancelled)
throw Exception("Cannot drain connections: cancel first.", ErrorCodes::LOGICAL_ERROR);
Connection::Packet res;
res.type = Protocol::Server::EndOfStream;
while (hasActiveReplicas())
{
Connection::Packet packet = receivePacket();
switch (packet.type)
{
case Protocol::Server::Data:
case Protocol::Server::Progress:
case Protocol::Server::ProfileInfo:
case Protocol::Server::Totals:
case Protocol::Server::Extremes:
case Protocol::Server::EndOfStream:
break;
case Protocol::Server::Exception:
default:
/// Если мы получили исключение или неизвестный пакет, сохраняем его.
res = packet;
break;
}
}
return res;
}
std::string ParallelReplicas::dumpAddresses() const
{
bool is_first = true;
std::ostringstream os;
for (auto & e : replica_map)
{
const Connection * connection = e.second;
if (connection != nullptr)
{
os << (is_first ? "" : "; ") << connection->getServerAddress();
if (is_first) { is_first = false; }
}
}
return os.str();
}
void ParallelReplicas::registerReplica(Connection * connection)
{
if (connection == nullptr)
throw Exception("Invalid connection specified in parameter.", ErrorCodes::LOGICAL_ERROR);
auto res = replica_map.insert(std::make_pair(connection->socket.impl()->sockfd(), connection));
if (!res.second)
throw Exception("Invalid set of connections.", ErrorCodes::LOGICAL_ERROR);
}
ParallelReplicas::ReplicaMap::iterator ParallelReplicas::getReplicaForReading()
{
ReplicaMap::iterator it;
if (supports_parallel_execution)
it = waitForReadEvent();
else
{
it = replica_map.begin();
if (it->second == nullptr)
it = replica_map.end();
}
return it;
}
ParallelReplicas::ReplicaMap::iterator ParallelReplicas::waitForReadEvent()
{
Poco::Net::Socket::SocketList read_list;
read_list.reserve(active_replica_count);
/** Сначала проверяем, есть ли данные, которые уже лежат в буфере
* хоть одного соединения.
*/
for (auto & e : replica_map)
{
Connection * connection = e.second;
if ((connection != nullptr) && connection->hasReadBufferPendingData())
read_list.push_back(connection->socket);
}
/** Если не было найдено никаких данных, то проверяем, есть ли соединения
* готовые для чтения.
*/
if (read_list.empty())
{
Poco::Net::Socket::SocketList write_list;
Poco::Net::Socket::SocketList except_list;
for (auto & e : replica_map)
{
Connection * connection = e.second;
if (connection != nullptr)
read_list.push_back(connection->socket);
}
int n = Poco::Net::Socket::select(read_list, write_list, except_list, settings->poll_interval * 1000000);
if (n == 0)
return replica_map.end();
}
auto & socket = read_list[rand() % read_list.size()];
return replica_map.find(socket.impl()->sockfd());
int n = Poco::Net::Socket::select(read_list, write_list, except_list, settings->poll_interval * 1000000);
if (n == 0)
return replica_map.end();
}
void ParallelReplicas::invalidateReplica(ParallelReplicas::ReplicaMap::iterator it)
{
it->second = nullptr;
--active_replica_count;
}
auto & socket = read_list[rand() % read_list.size()];
return replica_map.find(socket.impl()->sockfd());
}
void ParallelReplicas::invalidateReplica(ParallelReplicas::ReplicaMap::iterator it)
{
it->second = nullptr;
--active_replica_count;
}
}

View File

@ -11,7 +11,7 @@
#include <DB/DataTypes/DataTypeNested.h>
#include <DB/Parsers/ASTExpressionList.h>
#include <statdaemons/ext/memory.hpp>
#include <memory>
#include <DB/Parsers/formatAST.h>

View File

@ -1,5 +1,6 @@
#include <DB/Interpreters/Cluster.h>
#include <DB/Common/escapeForFileName.h>
#include <DB/Common/isLocalAddress.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Poco/Util/Application.h>
#include <Poco/Net/NetworkInterface.h>
@ -255,19 +256,7 @@ bool Cluster::isLocal(const Address & address)
/// - её порт совпадает с портом, который слушает сервер;
/// - её хост резолвится в набор адресов, один из которых совпадает с одним из адресов сетевых интерфейсов сервера
/// то нужно всегда ходить на этот шард без межпроцессного взаимодействия
const UInt16 clickhouse_port = Poco::Util::Application::instance().config().getInt("tcp_port", 0);
static auto interfaces = Poco::Net::NetworkInterface::list();
if (clickhouse_port == address.host_port.port() &&
interfaces.end() != std::find_if(interfaces.begin(), interfaces.end(),
[&](const Poco::Net::NetworkInterface & interface) { return interface.address() == address.host_port.host(); }))
{
LOG_INFO(&Poco::Util::Application::instance().logger(),
"Replica with address " << address.host_port.toString() << " will be processed as local.");
return true;
}
return false;
return isLocalAddress(address.host_port);
}
}

View File

@ -495,13 +495,24 @@ const Dictionaries & Context::getDictionaries() const
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
if (!shared->dictionaries)
shared->dictionaries = new Dictionaries;
return *shared->dictionaries;
}
const ExternalDictionaries & Context::getExternalDictionaries() const
{
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
if (!shared->external_dictionaries)
{
if (!this->global_context)
throw Exception("Logical error: there is no global context", ErrorCodes::LOGICAL_ERROR);
shared->dictionaries = new Dictionaries{ *this->global_context };
shared->external_dictionaries = new ExternalDictionaries{*this->global_context};
}
return *shared->dictionaries;
return *shared->external_dictionaries;
}

View File

@ -2,9 +2,10 @@
#include <DB/Dictionaries/DictionarySourceFactory.h>
#include <DB/Dictionaries/FlatDictionary.h>
#include <DB/Dictionaries/HashedDictionary.h>
#include <DB/Dictionaries/CacheDictionary.h>
#include <DB/Dictionaries/DictionaryStructure.h>
#include <memory>
#include <Yandex/singleton.h>
#include <statdaemons/ext/memory.hpp>
namespace DB
{
@ -12,39 +13,48 @@ namespace DB
DictionaryPtr DictionaryFactory::create(const std::string & name, Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix, Context & context) const
{
auto dict_struct = DictionaryStructure::fromConfig(config, config_prefix + "structure");
Poco::Util::AbstractConfiguration::Keys keys;
const auto & layout_prefix = config_prefix + ".layout";
config.keys(layout_prefix, keys);
if (keys.size() != 1)
throw Exception{
"Element dictionary.layout should have exactly one child element",
ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG
};
const DictionaryStructure dict_struct{config, config_prefix + ".structure"};
auto source_ptr = DictionarySourceFactory::instance().create(
config, config_prefix + "source.", dict_struct, context);
config, config_prefix + ".source", dict_struct, context);
const auto dict_lifetime = DictionaryLifetime::fromConfig(config, config_prefix + "lifetime");
const DictionaryLifetime dict_lifetime{config, config_prefix + ".lifetime"};
const auto & layout_prefix = config_prefix + "layout.";
const auto & layout_type = keys.front();
if (config.has(layout_prefix + "flat"))
if ("flat" == layout_type)
{
return ext::make_unique<FlatDictionary>(name, dict_struct, std::move(source_ptr), dict_lifetime);
return std::make_unique<FlatDictionary>(name, dict_struct, std::move(source_ptr), dict_lifetime);
}
else if (config.has(layout_prefix + "hashed"))
else if ("hashed" == layout_type)
{
return ext::make_unique<HashedDictionary>(name, dict_struct, std::move(source_ptr), dict_lifetime);
return std::make_unique<HashedDictionary>(name, dict_struct, std::move(source_ptr), dict_lifetime);
}
else if (config.has(layout_prefix + "cache"))
else if ("cache" == layout_type)
{
const auto size = config.getInt(layout_prefix + "cache.size", 0);
const auto size = config.getInt(layout_prefix + ".cache.size");
if (size == 0)
throw Exception{
"Dictionary of type 'cache' cannot have size of 0 bytes",
ErrorCodes::TOO_SMALL_BUFFER_SIZE
};
throw Exception{
"Dictionary of type 'cache' is not yet implemented",
ErrorCodes::NOT_IMPLEMENTED
};
return std::make_unique<CacheDictionary>(name, dict_struct, std::move(source_ptr), dict_lifetime, size);
}
throw Exception{"No dictionary type specified", ErrorCodes::BAD_ARGUMENTS};
throw Exception{
"Unknown dictionary layout type: " + layout_type,
ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG
};
};
}

View File

@ -1,9 +1,9 @@
#include <DB/Interpreters/Dictionaries.h>
#include <DB/Interpreters/ExternalDictionaries.h>
#include <DB/Dictionaries/DictionaryFactory.h>
#include <DB/Dictionaries/DictionaryStructure.h>
#include <DB/Dictionaries/IDictionarySource.h>
#include <DB/Dictionaries/config_ptr_t.h>
#include <statdaemons/ext/scope_guard.hpp>
#include <Poco/Util/Application.h>
namespace DB
{
@ -29,7 +29,7 @@ namespace
}
}
void Dictionaries::reloadExternals()
void ExternalDictionaries::reloadImpl()
{
const auto config_path = getDictionariesConfigPath(Poco::Util::Application::instance().config());
const Poco::File config_file{config_path};
@ -41,12 +41,15 @@ void Dictionaries::reloadExternals()
else
{
const auto last_modified = config_file.getLastModified();
if (last_modified > dictionaries_last_modified)
if (last_modified > config_last_modified)
{
/// definitions of dictionaries may have changed, recreate all of them
dictionaries_last_modified = last_modified;
config_last_modified = last_modified;
const config_ptr_t<Poco::Util::XMLConfiguration> config{new Poco::Util::XMLConfiguration{config_path}};
const auto config = new Poco::Util::XMLConfiguration{config_path};
SCOPE_EXIT(
config->release();
);
/// get all dictionaries' definitions
Poco::Util::AbstractConfiguration::Keys keys;
@ -63,16 +66,14 @@ void Dictionaries::reloadExternals()
continue;
}
const auto & prefix = key + '.';
const auto & name = config->getString(prefix + "name");
const auto name = config->getString(key + ".name");
if (name.empty())
{
LOG_WARNING(log, "dictionary name cannot be empty");
continue;
}
auto dict_ptr = DictionaryFactory::instance().create(name, *config, prefix, context);
auto dict_ptr = DictionaryFactory::instance().create(name, *config, key, context);
if (!dict_ptr->isCached())
{
const auto & lifetime = dict_ptr->getLifetime();
@ -87,12 +88,12 @@ void Dictionaries::reloadExternals()
}
}
auto it = external_dictionaries.find(name);
auto it = dictionaries.find(name);
/// add new dictionary or update an existing version
if (it == std::end(external_dictionaries))
if (it == std::end(dictionaries))
{
const std::lock_guard<std::mutex> lock{external_dictionaries_mutex};
external_dictionaries.emplace(name, std::make_shared<MultiVersion<IDictionary>>(dict_ptr.release()));
const std::lock_guard<std::mutex> lock{dictionaries_mutex};
dictionaries.emplace(name, std::make_shared<MultiVersion<IDictionary>>(dict_ptr.release()));
}
else
it->second->set(dict_ptr.release());
@ -106,7 +107,7 @@ void Dictionaries::reloadExternals()
}
/// periodic update
for (auto & dictionary : external_dictionaries)
for (auto & dictionary : dictionaries)
{
try
{
@ -126,11 +127,11 @@ void Dictionaries::reloadExternals()
if (std::chrono::system_clock::now() < update_time)
continue;
scope_exit({
SCOPE_EXIT(
/// calculate next update time
std::uniform_int_distribution<std::uint64_t> distribution{lifetime.min_sec, lifetime.max_sec};
update_time = std::chrono::system_clock::now() + std::chrono::seconds{distribution(rnd_engine)};
});
);
/// check source modified
if (current->getSource()->isModified())

View File

@ -8,7 +8,7 @@
#include <Yandex/ErrorHandlers.h>
#include <Yandex/Revision.h>
#include <statdaemons/ConfigProcessor.h>
#include <statdaemons/ext/memory.hpp>
#include <memory>
#include <DB/Interpreters/loadMetadata.h>
#include <DB/Storages/StorageSystemNumbers.h>
@ -491,7 +491,7 @@ int Server::main(const std::vector<std::string> & args)
global_context->setMacros(Macros(config(), "macros"));
std::string users_config_path = config().getString("users_config", config().getString("config-file", "config.xml"));
auto users_config_reloader = ext::make_unique<UsersConfigReloader>(users_config_path, global_context.get());
auto users_config_reloader = std::make_unique<UsersConfigReloader>(users_config_path, global_context.get());
/// Максимальное количество одновременно выполняющихся запросов.
global_context->getProcessList().setMaxSize(config().getInt("max_concurrent_queries", 0));
@ -536,7 +536,7 @@ int Server::main(const std::vector<std::string> & args)
{
const auto profile_events_transmitter = config().getBool("use_graphite", true)
? ext::make_unique<ProfileEventsTransmitter>()
? std::make_unique<ProfileEventsTransmitter>()
: nullptr;
const std::string listen_host = config().getString("listen_host", "::");

View File

@ -324,7 +324,7 @@ MergeTreeData::DataPartPtr MergeTreeDataMerger::mergeParts(
{
MarkRanges ranges(1, MarkRange(0, parts[i]->size));
auto input = ext::make_unique<MergeTreeBlockInputStream>(
auto input = std::make_unique<MergeTreeBlockInputStream>(
data.getFullPath() + parts[i]->name + '/', DEFAULT_MERGE_BLOCK_SIZE, union_column_names, data,
parts[i], ranges, false, nullptr, "");
@ -348,19 +348,19 @@ MergeTreeData::DataPartPtr MergeTreeDataMerger::mergeParts(
switch (data.mode)
{
case MergeTreeData::Ordinary:
merged_stream = ext::make_unique<MergingSortedBlockInputStream>(src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE);
merged_stream = std::make_unique<MergingSortedBlockInputStream>(src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE);
break;
case MergeTreeData::Collapsing:
merged_stream = ext::make_unique<CollapsingSortedBlockInputStream>(src_streams, data.getSortDescription(), data.sign_column, DEFAULT_MERGE_BLOCK_SIZE);
merged_stream = std::make_unique<CollapsingSortedBlockInputStream>(src_streams, data.getSortDescription(), data.sign_column, DEFAULT_MERGE_BLOCK_SIZE);
break;
case MergeTreeData::Summing:
merged_stream = ext::make_unique<SummingSortedBlockInputStream>(src_streams, data.getSortDescription(), data.columns_to_sum, DEFAULT_MERGE_BLOCK_SIZE);
merged_stream = std::make_unique<SummingSortedBlockInputStream>(src_streams, data.getSortDescription(), data.columns_to_sum, DEFAULT_MERGE_BLOCK_SIZE);
break;
case MergeTreeData::Aggregating:
merged_stream = ext::make_unique<AggregatingSortedBlockInputStream>(src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE);
merged_stream = std::make_unique<AggregatingSortedBlockInputStream>(src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE);
break;
default:

View File

@ -12,7 +12,7 @@
#include <DB/Core/Field.h>
#include <statdaemons/ext/memory.hpp>
#include <memory>
namespace DB
{
@ -160,10 +160,15 @@ BlockInputStreams StorageDistributed::read(
query, remote_database, remote_table);
const auto & modified_query = queryToString(modified_query_ast);
/// Ограничение сетевого трафика, если нужно.
ThrottlerPtr throttler;
if (settings.limits.max_network_bandwidth)
throttler.reset(new Throttler(settings.limits.max_network_bandwidth));
/// Цикл по шардам.
for (auto & conn_pool : cluster.pools)
res.emplace_back(new RemoteBlockInputStream{
conn_pool, modified_query, &new_settings,
conn_pool, modified_query, &new_settings, throttler,
external_tables, processed_stage, context});
/// Добавляем запросы к локальному ClickHouse.
@ -234,7 +239,7 @@ bool StorageDistributed::hasColumn(const String & column_name) const
void StorageDistributed::createDirectoryMonitor(const std::string & name)
{
directory_monitors.emplace(name, ext::make_unique<DirectoryMonitor>(*this, name));
directory_monitors.emplace(name, std::make_unique<DirectoryMonitor>(*this, name));
}
void StorageDistributed::createDirectoryMonitors()

View File

@ -2107,7 +2107,7 @@ void StorageReplicatedMergeTree::alter(const AlterCommands & params,
auto zookeeper = getZooKeeper();
const MergeTreeMergeBlocker merge_blocker{merger};
const auto unreplicated_merge_blocker = unreplicated_merger ?
ext::make_unique<MergeTreeMergeBlocker>(*unreplicated_merger) : nullptr;
std::make_unique<MergeTreeMergeBlocker>(*unreplicated_merger) : nullptr;
LOG_DEBUG(log, "Doing ALTER");

View File

@ -93,6 +93,11 @@ any_runs()
if [[ $(running_processes) -gt 0 ]]; then return 0; else return 1; fi
}
all_runs()
{
[[ $(running_processes) -eq $NUMBER_OF_PROCESSES ]]
}
wait4done()
{
while any_runs; do
@ -108,7 +113,7 @@ start()
echo -n "Start $PROGRAM service: "
if [[ $(running_processes) -eq $NUMBER_OF_PROCESSES ]]; then
if all_runs; then
echo -n "already running "
EXIT_STATUS=1
else
@ -236,7 +241,7 @@ main()
restart
;;
condstart)
any_runs || start
all_runs || start
;;
condstop)
any_runs && stop