diff --git a/dbms/include/DB/Client/Connection.h b/dbms/include/DB/Client/Connection.h index ae38abc2839..2d968532101 100644 --- a/dbms/include/DB/Client/Connection.h +++ b/dbms/include/DB/Client/Connection.h @@ -4,6 +4,8 @@ #include +#include + #include #include #include @@ -26,13 +28,14 @@ namespace DB using Poco::SharedPtr; -class ShardReplicas; +class ParallelReplicas; /// Поток блоков читающих из таблицы и ее имя typedef std::pair ExternalTableData; /// Вектор пар, описывающих таблицы typedef std::vector ExternalTablesData; + /** Соединение с сервером БД для использования в клиенте. * Как использовать - см. Core/Protocol.h * (Реализацию на стороне сервера - см. Server/TCPHandler.h) @@ -42,7 +45,7 @@ typedef std::vector ExternalTablesData; */ class Connection : private boost::noncopyable { - friend class ShardReplicas; + friend class ParallelReplicas; public: Connection(const String & host_, UInt16 port_, const String & default_database_, @@ -69,6 +72,12 @@ public: virtual ~Connection() {}; + /// Установить ограничитель сетевого трафика. Один ограничитель может использоваться одновременно для нескольких разных соединений. + void setThrottler(const ThrottlerPtr & throttler_) + { + throttler = throttler_; + } + /// Пакет, который может быть получен от сервера. struct Packet @@ -161,6 +170,11 @@ private: const DataTypeFactory & data_type_factory; + /** Если не nullptr, то используется, чтобы ограничить сетевой трафик. + * Учитывается только трафик при передаче блоков. Другие пакеты не учитываются. + */ + ThrottlerPtr throttler; + Poco::Timespan connect_timeout; Poco::Timespan receive_timeout; Poco::Timespan send_timeout; diff --git a/dbms/include/DB/Client/ParallelReplicas.h b/dbms/include/DB/Client/ParallelReplicas.h new file mode 100644 index 00000000000..4dec86bf592 --- /dev/null +++ b/dbms/include/DB/Client/ParallelReplicas.h @@ -0,0 +1,93 @@ +#pragma once + +#include +#include +#include + + +namespace DB +{ + + +/** Для получения данных сразу из нескольких реплик (соединений) в рамках одного потока. + * В качестве вырожденного случая, может также работать с одним соединением. + * + * Интерфейс почти совпадает с Connection. + */ +class ParallelReplicas final : private boost::noncopyable +{ +public: + /// Принимает готовое соединение. + ParallelReplicas(Connection * connection_, Settings * settings_, ThrottlerPtr throttler_); + + /// Принимает пул, из которого нужно будет достать одно или несколько соединений. + ParallelReplicas(IConnectionPool * pool_, Settings * settings_, ThrottlerPtr throttler_); + + /// Отправить на реплики всё содержимое внешних таблиц. + void sendExternalTablesData(std::vector & data); + + /// Отправить запрос на реплики. + void sendQuery(const String & query, const String & query_id = "", + UInt64 stage = QueryProcessingStage::Complete, bool with_pending_data = false); + + /// Получить пакет от какой-нибудь реплики. + Connection::Packet receivePacket(); + + /// Отменить запросы к репликам + void sendCancel(); + + /** На каждой реплике читать и пропускать все пакеты до EndOfStream или Exception. + * Возвращает EndOfStream, если не было получено никакого исключения. В противном + * случае возвращает последний полученный пакет типа Exception. + */ + Connection::Packet drain(); + + /// Получить адреса реплик в виде строки. + std::string dumpAddresses() const; + + /// Возвращает количесто реплик. + size_t size() const { return replica_map.size(); } + + /// Проверить, есть ли действительные реплики. + bool hasActiveReplicas() const { return active_replica_count > 0; } + +private: + /// Реплики хэшированные по id сокета + using ReplicaMap = std::unordered_map; + + + /// Зарегистрировать реплику. + void registerReplica(Connection * connection); + + /// Получить реплику, на которой можно прочитать данные. + ReplicaMap::iterator getReplicaForReading(); + + /** Проверить, есть ли данные, которые можно прочитать на каких-нибудь репликах. + * Возвращает одну такую реплику, если она найдётся. + */ + ReplicaMap::iterator waitForReadEvent(); + + /// Пометить реплику как недействительную. + void invalidateReplica(ReplicaMap::iterator it); + + + Settings * settings; + ReplicaMap replica_map; + + /// Если не nullptr, то используется, чтобы ограничить сетевой трафик. + ThrottlerPtr throttler; + + std::vector pool_entries; + ConnectionPool::Entry pool_entry; + + /// Текущее количество действительных соединений к репликам. + size_t active_replica_count; + /// Запрос выполняется параллельно на нескольких репликах. + bool supports_parallel_execution; + /// Отправили запрос + bool sent_query = false; + /// Отменили запрос + bool cancelled = false; +}; + +} diff --git a/dbms/include/DB/Client/ShardReplicas.h b/dbms/include/DB/Client/ShardReplicas.h deleted file mode 100644 index afca1781917..00000000000 --- a/dbms/include/DB/Client/ShardReplicas.h +++ /dev/null @@ -1,62 +0,0 @@ -#pragma once - -#include -#include - -namespace DB -{ - /** - * Множество реплик одного шарда. - */ - class ShardReplicas final - { - public: - ShardReplicas(std::vector & entries, const Settings & settings_); - - ~ShardReplicas() = default; - - ShardReplicas(const ShardReplicas &) = delete; - ShardReplicas & operator=(const ShardReplicas &) = delete; - - /// Отправить на реплики всё содержимое внешних таблиц. - void sendExternalTablesData(std::vector & data); - - /// Отправить запрос на реплики. - void sendQuery(const String & query, const String & query_id = "", - UInt64 stage = QueryProcessingStage::Complete, bool with_pending_data = false); - - /// Получить пакет от какой-нибудь реплики. - Connection::Packet receivePacket(); - - /// Разорвать соединения к репликам - void disconnect(); - - /// Отменить запросы к репликам - void sendCancel(); - - /// Для каждой реплики получить оставшиеся пакеты после отмена запроса. - Connection::Packet drain(); - - /// Получить адреса реплик в виде строки. - std::string dumpAddresses() const; - - /// Возвращает количесто реплик. - size_t size() const { return replica_hash.size(); } - - private: - /// Проверить, есть ли данные, которые можно прочитать на каких-нибудь репликах. - /// Возвращает соединение на такую реплику, если оно найдётся. - Connection ** waitForReadEvent(); - - private: - /// Реплики хэшированные по id сокета - using ReplicaHash = std::unordered_map; - - private: - const Settings & settings; - ReplicaHash replica_hash; - size_t active_connection_count = 0; - bool sent_query = false; - bool cancelled = false; - }; -} diff --git a/dbms/include/DB/Common/Throttler.h b/dbms/include/DB/Common/Throttler.h new file mode 100644 index 00000000000..7f0e5848f0d --- /dev/null +++ b/dbms/include/DB/Common/Throttler.h @@ -0,0 +1,59 @@ +#pragma once + +#include +#include +#include + + +/** Позволяет ограничить скорость чего либо (в штуках в секунду) с помощью sleep. + * Особенности работы: + * - считается только средняя скорость, от момента первого вызова функции add; + * если были периоды с низкой скоростью, то в течение промежутка времени после них, скорость будет выше; + */ +class Throttler +{ +public: + Throttler(size_t max_speed_) : max_speed(max_speed_) {} + + void add(size_t amount) + { + size_t new_count; + UInt64 elapsed_ns; + + { + std::lock_guard lock(mutex); + + if (0 == count) + { + watch.start(); + elapsed_ns = 0; + } + else + elapsed_ns = watch.elapsed(); + + count += amount; + new_count = count; + } + + /// Сколько должно было бы пройти времени, если бы скорость была равна max_speed. + UInt64 desired_ns = new_count * 1000000000 / max_speed; + + if (desired_ns > elapsed_ns) + { + UInt64 sleep_ns = desired_ns - elapsed_ns; + timespec sleep_ts; + sleep_ts.tv_sec = sleep_ns / 1000000000; + sleep_ts.tv_nsec = sleep_ns % 1000000000; + nanosleep(&sleep_ts, nullptr); /// NOTE Завершается раньше в случае сигнала. Это считается нормальным. + } + } + +private: + size_t max_speed; + size_t count = 0; + Stopwatch watch {CLOCK_MONOTONIC_COARSE}; + std::mutex mutex; +}; + + +typedef std::shared_ptr ThrottlerPtr; diff --git a/dbms/include/DB/Common/isLocalAddress.h b/dbms/include/DB/Common/isLocalAddress.h new file mode 100644 index 00000000000..62318b34ba6 --- /dev/null +++ b/dbms/include/DB/Common/isLocalAddress.h @@ -0,0 +1,27 @@ +#pragma once + +#include +#include +#include +#include + +namespace DB +{ + +inline bool isLocalAddress(const Poco::Net::SocketAddress & address) +{ + const UInt16 clickhouse_port = Poco::Util::Application::instance().config().getInt("tcp_port", 0); + static auto interfaces = Poco::Net::NetworkInterface::list(); + + if (clickhouse_port == address.port()) + { + return interfaces.end() != std::find_if(interfaces.begin(), interfaces.end(), + [&] (const Poco::Net::NetworkInterface & interface) { + return interface.address() == address.host(); + }); + } + + return false; +} + +} diff --git a/dbms/include/DB/Core/ErrorCodes.h b/dbms/include/DB/Core/ErrorCodes.h index 969a65e1d0a..cc18deee7ec 100644 --- a/dbms/include/DB/Core/ErrorCodes.h +++ b/dbms/include/DB/Core/ErrorCodes.h @@ -273,10 +273,8 @@ namespace ErrorCodes CANNOT_COMPILE_CODE, INCOMPATIBLE_TYPE_OF_JOIN, NO_AVAILABLE_REPLICA, - UNEXPECTED_REPLICA, MISMATCH_REPLICAS_DATA_SOURCES, STORAGE_DOESNT_SUPPORT_PARALLEL_REPLICAS, - MISSING_RANGE_IN_CHUNK, POCO_EXCEPTION = 1000, STD_EXCEPTION, diff --git a/dbms/include/DB/Core/StringRef.h b/dbms/include/DB/Core/StringRef.h index e7ca944e33a..4ae264e4f3f 100644 --- a/dbms/include/DB/Core/StringRef.h +++ b/dbms/include/DB/Core/StringRef.h @@ -48,7 +48,7 @@ inline bool memequalSSE2Wide(const char * p1, const char * p2, size_t size) if ( compareSSE2(p1, p2) && compareSSE2(p1 + 16, p2 + 16) && compareSSE2(p1 + 32, p2 + 32) - && compareSSE2(p1 + 40, p2 + 40)) + && compareSSE2(p1 + 48, p2 + 48)) { p1 += 64; p2 += 64; diff --git a/dbms/include/DB/DataStreams/RemoteBlockInputStream.h b/dbms/include/DB/DataStreams/RemoteBlockInputStream.h index 6b65dc88d55..0acafa906c2 100644 --- a/dbms/include/DB/DataStreams/RemoteBlockInputStream.h +++ b/dbms/include/DB/DataStreams/RemoteBlockInputStream.h @@ -5,16 +5,17 @@ #include #include #include +#include #include #include -#include +#include namespace DB { -/** Позволяет выполнить запрос (SELECT) на удалённом сервере и получить результат. +/** Позволяет выполнить запрос (SELECT) на удалённых репликах одного шарда и получить результат. */ class RemoteBlockInputStream : public IProfilingBlockInputStream { @@ -25,7 +26,6 @@ private: { send_settings = true; settings = *settings_; - use_many_replicas = (pool != nullptr) && UInt64(settings.max_parallel_replicas) > 1; } else send_settings = false; @@ -33,29 +33,29 @@ private: public: /// Принимает готовое соединение. - RemoteBlockInputStream(Connection & connection_, const String & query_, const Settings * settings_, + RemoteBlockInputStream(Connection & connection_, const String & query_, const Settings * settings_, ThrottlerPtr throttler_ = nullptr, const Tables & external_tables_ = Tables(), QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete, const Context & context = getDefaultContext()) - : connection(&connection_), query(query_), external_tables(external_tables_), stage(stage_), context(context) + : connection(&connection_), query(query_), throttler(throttler_), external_tables(external_tables_), stage(stage_), context(context) { init(settings_); } /// Принимает готовое соединение. Захватывает владение соединением из пула. - RemoteBlockInputStream(ConnectionPool::Entry & pool_entry_, const String & query_, const Settings * settings_, + RemoteBlockInputStream(ConnectionPool::Entry & pool_entry_, const String & query_, const Settings * settings_, ThrottlerPtr throttler_ = nullptr, const Tables & external_tables_ = Tables(), QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete, const Context & context = getDefaultContext()) - : pool_entry(pool_entry_), connection(&*pool_entry_), query(query_), + : pool_entry(pool_entry_), connection(&*pool_entry_), query(query_), throttler(throttler_), external_tables(external_tables_), stage(stage_), context(context) { init(settings_); } - /// Принимает пул, из которого нужно будет достать соединение. - RemoteBlockInputStream(IConnectionPool * pool_, const String & query_, const Settings * settings_, + /// Принимает пул, из которого нужно будет достать одно или несколько соединений. + RemoteBlockInputStream(IConnectionPool * pool_, const String & query_, const Settings * settings_, ThrottlerPtr throttler_ = nullptr, const Tables & external_tables_ = Tables(), QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete, const Context & context = getDefaultContext()) - : pool(pool_), query(query_), external_tables(external_tables_), stage(stage_), context(context) + : pool(pool_), query(query_), throttler(throttler_), external_tables(external_tables_), stage(stage_), context(context) { init(settings_); } @@ -83,46 +83,39 @@ public: if (!__sync_bool_compare_and_swap(&is_cancelled, false, true)) return; - if (sent_query && !was_cancelled && !finished && !got_exception_from_server) + if (isQueryInProgress() && !hasThrownException()) { - std::string addresses; - if (use_many_replicas) - addresses = shard_replicas->dumpAddresses(); - else - addresses = connection->getServerAddress(); - + std::string addresses = parallel_replicas->dumpAddresses(); LOG_TRACE(log, "(" + addresses + ") Cancelling query"); - /// Если запрошено прервать запрос - попросим удалённый сервер тоже прервать запрос. - if (use_many_replicas) - shard_replicas->sendCancel(); - else - connection->sendCancel(); - + /// Если запрошено прервать запрос - попросим удалённые реплики тоже прервать запрос. was_cancelled = true; + parallel_replicas->sendCancel(); } } ~RemoteBlockInputStream() override { - /** Если прервались в середине цикла общения с сервером, то закрываем соединение, - * чтобы оно не осталось висеть в рассихронизированном состоянии. + /** Если прервались в середине цикла общения с репликами, то прервываем + * все соединения, затем читаем и пропускаем оставшиеся пакеты чтобы + * эти соединения не остались висеть в рассихронизированном состоянии. */ - if (sent_query && !finished) + if (isQueryInProgress()) { - if (use_many_replicas) - shard_replicas->disconnect(); - else - connection->disconnect(); + std::string addresses = parallel_replicas->dumpAddresses(); + LOG_TRACE(log, "(" + addresses + ") Aborting query"); + + parallel_replicas->sendCancel(); + (void) parallel_replicas->drain(); } } protected: - /// Отправить на удаленные сервера все временные таблицы + /// Отправить на удаленные реплики все временные таблицы void sendExternalTables() { - size_t count = use_many_replicas ? shard_replicas->size() : 1; + size_t count = parallel_replicas->size(); std::vector instances; instances.reserve(count); @@ -144,50 +137,22 @@ protected: instances.push_back(std::move(res)); } - if (use_many_replicas) - shard_replicas->sendExternalTablesData(instances); - else - connection->sendExternalTablesData(instances[0]); + parallel_replicas->sendExternalTablesData(instances); } Block readImpl() override { if (!sent_query) { - if (use_many_replicas) - { - auto entries = pool->getMany(&settings); - if (entries.size() > 1) - shard_replicas = ext::make_unique(entries, settings); - else - { - /// NOTE IConnectionPool::getMany() всегда возвращает как минимум одно соединение. - use_many_replicas = false; - connection = &*entries[0]; - } - } - else - { - /// Если надо - достаём соединение из пула. - if (pool) - { - pool_entry = pool->get(send_settings ? &settings : nullptr); - connection = &*pool_entry; - } - } - - if (use_many_replicas) - shard_replicas->sendQuery(query, "", stage, true); - else - connection->sendQuery(query, "", stage, send_settings ? &settings : nullptr, true); - + createParallelReplicas(); + parallel_replicas->sendQuery(query, "", stage, true); sendExternalTables(); sent_query = true; } while (true) { - Connection::Packet packet = use_many_replicas ? shard_replicas->receivePacket() : connection->receivePacket(); + Connection::Packet packet = parallel_replicas->receivePacket(); switch (packet.type) { @@ -198,13 +163,17 @@ protected: break; /// Если блок пустой - получим другие пакеты до EndOfStream. case Protocol::Server::Exception: - got_exception_from_server = true; + got_exception_from_replica = true; packet.exception->rethrow(); break; case Protocol::Server::EndOfStream: - finished = true; - return Block(); + if (!parallel_replicas->hasActiveReplicas()) + { + finished = true; + return Block(); + } + break; case Protocol::Server::Progress: /** Используем прогресс с удалённого сервера. @@ -215,7 +184,7 @@ protected: */ progressImpl(packet.progress); - if (!was_cancelled && !finished && isCancelled()) + if (isQueryInProgress() && isCancelled()) cancel(); break; @@ -233,6 +202,7 @@ protected: break; default: + got_unknown_packet_from_replica = true; throw Exception("Unknown packet from server", ErrorCodes::UNKNOWN_PACKET_FROM_SERVER); } } @@ -243,10 +213,11 @@ protected: /** Если одно из: * - ничего не начинали делать; * - получили все пакеты до EndOfStream; - * - получили с сервера эксепшен; + * - получили с одной реплики эксепшен; + * - получили с одной реплики неизвестный пакет; * - то больше читать ничего не нужно. */ - if (!sent_query || finished || got_exception_from_server) + if (hasNoQueryInProgress() || hasThrownException()) return; /** Если ещё прочитали не все данные, но они больше не нужны. @@ -256,107 +227,102 @@ protected: /// Отправим просьбу прервать выполнение запроса, если ещё не отправляли. if (!was_cancelled) { - std::string addresses; - if (use_many_replicas) - addresses = shard_replicas->dumpAddresses(); - else - addresses = connection->getServerAddress(); - + std::string addresses = parallel_replicas->dumpAddresses(); LOG_TRACE(log, "(" + addresses + ") Cancelling query because enough data has been read"); was_cancelled = true; - - if (use_many_replicas) - shard_replicas->sendCancel(); - else - connection->sendCancel(); + parallel_replicas->sendCancel(); } - if (use_many_replicas) + /// Получим оставшиеся пакеты, чтобы не было рассинхронизации в соединениях с репликами. + Connection::Packet packet = parallel_replicas->drain(); + switch (packet.type) { - Connection::Packet packet = shard_replicas->drain(); - switch (packet.type) - { - case Protocol::Server::EndOfStream: - finished = true; - break; + case Protocol::Server::EndOfStream: + finished = true; + break; - case Protocol::Server::Exception: - got_exception_from_server = true; - packet.exception->rethrow(); - break; + case Protocol::Server::Exception: + got_exception_from_replica = true; + packet.exception->rethrow(); + break; - default: - throw Exception("Unknown packet from server", ErrorCodes::UNKNOWN_PACKET_FROM_SERVER); - } + default: + got_unknown_packet_from_replica = true; + throw Exception("Unknown packet from server", ErrorCodes::UNKNOWN_PACKET_FROM_SERVER); } + } + + /// Создать объект для общения с репликами одного шарда, на которых должен выполниться запрос. + void createParallelReplicas() + { + Settings * parallel_replicas_settings = send_settings ? &settings : nullptr; + if (connection != nullptr) + parallel_replicas = std::make_unique(connection, parallel_replicas_settings, throttler); else - { - /// Получим оставшиеся пакеты, чтобы не было рассинхронизации в соединении с сервером. - while (true) - { - Connection::Packet packet = connection->receivePacket(); + parallel_replicas = std::make_unique(pool, parallel_replicas_settings, throttler); + } - switch (packet.type) - { - case Protocol::Server::Data: - case Protocol::Server::Progress: - case Protocol::Server::ProfileInfo: - case Protocol::Server::Totals: - case Protocol::Server::Extremes: - break; + /// Возвращает true, если запрос отправлен, а ещё не выполнен. + bool isQueryInProgress() const + { + return sent_query && !finished && !was_cancelled; + } - case Protocol::Server::EndOfStream: - finished = true; - return; + /// Возвращает true, если никакой запрос не отправлен или один запрос уже выполнен. + bool hasNoQueryInProgress() const + { + return !sent_query || finished; + } - case Protocol::Server::Exception: - got_exception_from_server = true; - packet.exception->rethrow(); - break; - - default: - throw Exception("Unknown packet from server", ErrorCodes::UNKNOWN_PACKET_FROM_SERVER); - } - } - } + /// Возвращает true, если исключение было выкинуто. + bool hasThrownException() const + { + return got_exception_from_replica || got_unknown_packet_from_replica; } private: IConnectionPool * pool = nullptr; + ConnectionPool::Entry pool_entry; Connection * connection = nullptr; - - std::unique_ptr shard_replicas; + std::unique_ptr parallel_replicas; const String query; bool send_settings; Settings settings; + /// Если не nullptr, то используется, чтобы ограничить сетевой трафик. + ThrottlerPtr throttler; /// Временные таблицы, которые необходимо переслать на удаленные сервера. Tables external_tables; QueryProcessingStage::Enum stage; Context context; - bool use_many_replicas = false; - /// Отправили запрос (это делается перед получением первого блока). bool sent_query = false; - /** Получили все данные от сервера, до пакета EndOfStream. + /** Получили все данные от всех реплик, до пакета EndOfStream. * Если при уничтожении объекта, ещё не все данные считаны, - * то для того, чтобы не было рассинхронизации, на сервер отправляется просьба прервать выполнение запроса, + * то для того, чтобы не было рассинхронизации, на реплики отправляются просьбы прервать выполнение запроса, * и после этого считываются все пакеты до EndOfStream. */ bool finished = false; - /** На сервер была отправлена просьба прервать выполенение запроса, так как данные больше не нужны. + /** На каждую реплику была отправлена просьба прервать выполнение запроса, так как данные больше не нужны. * Это может быть из-за того, что данных достаточно (например, при использовании LIMIT), * или если на стороне клиента произошло исключение. */ bool was_cancelled = false; - /// С сервера было получено исключение. В этом случае получать больше пакетов или просить прервать запрос не нужно. - bool got_exception_from_server = false; + /** С одной репилки было получено исключение. В этом случае получать больше пакетов или + * просить прервать запрос на этой реплике не нужно. + */ + bool got_exception_from_replica = false; + + /** С одной реплики был получен неизвестный пакет. В этом случае получать больше пакетов или + * просить прервать запрос на этой реплике не нужно. + */ + bool got_unknown_packet_from_replica = false; Logger * log = &Logger::get("RemoteBlockInputStream"); diff --git a/dbms/include/DB/Dictionaries/CacheDictionary.h b/dbms/include/DB/Dictionaries/CacheDictionary.h new file mode 100644 index 00000000000..6959e99682d --- /dev/null +++ b/dbms/include/DB/Dictionaries/CacheDictionary.h @@ -0,0 +1,137 @@ +#pragma once + +#include +#include +#include + +namespace DB +{ + +class CacheDictionary final : public IDictionary +{ +public: + CacheDictionary(const std::string & name, const DictionaryStructure & dict_struct, + DictionarySourcePtr source_ptr, const DictionaryLifetime dict_lifetime, + const std::size_t size) + : name{name}, dict_struct(dict_struct), + source_ptr{std::move(source_ptr)}, dict_lifetime(dict_lifetime), + size{size} + { + if (!this->source_ptr->supportsSelectiveLoad()) + throw Exception{ + "Source cannot be used with CacheDictionary", + ErrorCodes::UNSUPPORTED_METHOD + }; + } + + CacheDictionary(const CacheDictionary & other) + : CacheDictionary{other.name, other.dict_struct, other.source_ptr->clone(), other.dict_lifetime, other.size} + {} + + std::string getName() const override { return name; } + + std::string getTypeName() const override { return "CacheDictionary"; } + + bool isCached() const override { return true; } + + DictionaryPtr clone() const override { return std::make_unique(*this); } + + const IDictionarySource * getSource() const override { return source_ptr.get(); } + + const DictionaryLifetime & getLifetime() const override { return dict_lifetime; } + + bool hasHierarchy() const override { return false; } + + id_t toParent(const id_t id) const override { return 0; } + +#define DECLARE_SAFE_GETTER(TYPE, NAME, LC_TYPE) \ + TYPE get##NAME(const std::string & attribute_name, const id_t id) const override\ + {\ + return {};\ + } + DECLARE_SAFE_GETTER(UInt8, UInt8, uint8) + DECLARE_SAFE_GETTER(UInt16, UInt16, uint16) + DECLARE_SAFE_GETTER(UInt32, UInt32, uint32) + DECLARE_SAFE_GETTER(UInt64, UInt64, uint64) + DECLARE_SAFE_GETTER(Int8, Int8, int8) + DECLARE_SAFE_GETTER(Int16, Int16, int16) + DECLARE_SAFE_GETTER(Int32, Int32, int32) + DECLARE_SAFE_GETTER(Int64, Int64, int64) + DECLARE_SAFE_GETTER(Float32, Float32, float32) + DECLARE_SAFE_GETTER(Float64, Float64, float64) + DECLARE_SAFE_GETTER(StringRef, String, string) +#undef DECLARE_SAFE_GETTER + + std::size_t getAttributeIndex(const std::string & attribute_name) const override + { + return {}; + } + +#define DECLARE_TYPE_CHECKER(NAME, LC_NAME)\ + bool is##NAME(const std::size_t attribute_idx) const override\ + {\ + return true;\ + } + DECLARE_TYPE_CHECKER(UInt8, uint8) + DECLARE_TYPE_CHECKER(UInt16, uint16) + DECLARE_TYPE_CHECKER(UInt32, uint32) + DECLARE_TYPE_CHECKER(UInt64, uint64) + DECLARE_TYPE_CHECKER(Int8, int8) + DECLARE_TYPE_CHECKER(Int16, int16) + DECLARE_TYPE_CHECKER(Int32, int32) + DECLARE_TYPE_CHECKER(Int64, int64) + DECLARE_TYPE_CHECKER(Float32, float32) + DECLARE_TYPE_CHECKER(Float64, float64) + DECLARE_TYPE_CHECKER(String, string) +#undef DECLARE_TYPE_CHECKER + +#define DECLARE_UNSAFE_GETTER(TYPE, NAME, LC_NAME)\ + TYPE get##NAME##Unsafe(const std::size_t attribute_idx, const id_t id) const override\ + {\ + return {};\ + } + DECLARE_UNSAFE_GETTER(UInt8, UInt8, uint8) + DECLARE_UNSAFE_GETTER(UInt16, UInt16, uint16) + DECLARE_UNSAFE_GETTER(UInt32, UInt32, uint32) + DECLARE_UNSAFE_GETTER(UInt64, UInt64, uint64) + DECLARE_UNSAFE_GETTER(Int8, Int8, int8) + DECLARE_UNSAFE_GETTER(Int16, Int16, int16) + DECLARE_UNSAFE_GETTER(Int32, Int32, int32) + DECLARE_UNSAFE_GETTER(Int64, Int64, int64) + DECLARE_UNSAFE_GETTER(Float32, Float32, float32) + DECLARE_UNSAFE_GETTER(Float64, Float64, float64) + DECLARE_UNSAFE_GETTER(StringRef, String, string) +#undef DECLARE_UNSAFE_GETTER + +private: + const std::string name; + const DictionaryStructure dict_struct; + const DictionarySourcePtr source_ptr; + const DictionaryLifetime dict_lifetime; + const std::size_t size; + + union item + { + UInt8 uint8_value; + UInt16 uint16_value; + UInt32 uint32_value; + UInt64 uint64_value; + Int8 int8_value; + Int16 int16_value; + Int32 int32_value; + Int64 int64_value; + Float32 float32_value; + Float64 float64_value; + StringRef string_value; + }; + + struct cell + { + id_t id; + std::vector attrs; + }; + + std::vector cells; +}; + +} diff --git a/dbms/include/DB/Dictionaries/ClickhouseDictionarySource.h b/dbms/include/DB/Dictionaries/ClickHouseDictionarySource.h similarity index 64% rename from dbms/include/DB/Dictionaries/ClickhouseDictionarySource.h rename to dbms/include/DB/Dictionaries/ClickHouseDictionarySource.h index 10846fdf26e..d56f9472e2d 100644 --- a/dbms/include/DB/Dictionaries/ClickhouseDictionarySource.h +++ b/dbms/include/DB/Dictionaries/ClickHouseDictionarySource.h @@ -4,6 +4,7 @@ #include #include #include +#include #include #include #include @@ -13,44 +14,50 @@ namespace DB const auto max_connections = 1; -class ClickhouseDictionarySource final : public IDictionarySource +/** Allows loading dictionaries from local or remote ClickHouse instance +* @todo use ConnectionPoolWithFailover +* @todo invent a way to keep track of source modifications +*/ +class ClickHouseDictionarySource final : public IDictionarySource { - static const auto max_block_size = 8192; - public: - ClickhouseDictionarySource(const Poco::Util::AbstractConfiguration & config, + ClickHouseDictionarySource(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, Block & sample_block, Context & context) - : host{config.getString(config_prefix + "host")}, - port(config.getInt(config_prefix + "port")), - user{config.getString(config_prefix + "user", "")}, - password{config.getString(config_prefix + "password", "")}, - db{config.getString(config_prefix + "db", "")}, - table{config.getString(config_prefix + "table")}, + : host{config.getString(config_prefix + ".host")}, + port(config.getInt(config_prefix + ".port")), + user{config.getString(config_prefix + ".user", "")}, + password{config.getString(config_prefix + ".password", "")}, + db{config.getString(config_prefix + ".db", "")}, + table{config.getString(config_prefix + ".table")}, sample_block{sample_block}, context(context), - is_local{isLocal(host, port)}, - pool{is_local ? nullptr : ext::make_unique( + is_local{isLocalAddress({ host, port })}, + pool{is_local ? nullptr : std::make_unique( max_connections, host, port, db, user, password, context.getDataTypeFactory(), - "ClickhouseDictionarySource") + "ClickHouseDictionarySource") }, load_all_query{composeLoadAllQuery(sample_block, table)} {} - ClickhouseDictionarySource(const ClickhouseDictionarySource & other) + /// copy-constructor is provided in order to support cloneability + ClickHouseDictionarySource(const ClickHouseDictionarySource & other) : host{other.host}, port{other.port}, user{other.user}, password{other.password}, db{other.db}, table{other.db}, sample_block{other.sample_block}, context(other.context), is_local{other.is_local}, - pool{is_local ? nullptr : ext::make_unique( + pool{is_local ? nullptr : std::make_unique( max_connections, host, port, db, user, password, context.getDataTypeFactory(), - "ClickhouseDictionarySource")}, + "ClickHouseDictionarySource")}, load_all_query{other.load_all_query} {} BlockInputStreamPtr loadAll() override { + /** Query to local ClickHouse is marked internal in order to avoid + * the necessity of holding process_list_element shared pointer. + */ if (is_local) - return executeQuery(load_all_query, context).in; + return executeQuery(load_all_query, context, true).in; return new RemoteBlockInputStream{pool.get(), load_all_query, nullptr}; } @@ -70,12 +77,13 @@ public: }; } - /// @todo check update time somehow bool isModified() const override { return true; } + bool supportsSelectiveLoad() const override { return true; } - DictionarySourcePtr clone() const override { return ext::make_unique(*this); } + DictionarySourcePtr clone() const override { return std::make_unique(*this); } private: + /// @todo escape table and column names static std::string composeLoadAllQuery(const Block & block, const std::string & table) { std::string query{"SELECT "}; diff --git a/dbms/include/DB/Dictionaries/DictionarySourceFactory.h b/dbms/include/DB/Dictionaries/DictionarySourceFactory.h index e46c8e8fe5c..810f87e5d46 100644 --- a/dbms/include/DB/Dictionaries/DictionarySourceFactory.h +++ b/dbms/include/DB/Dictionaries/DictionarySourceFactory.h @@ -3,11 +3,11 @@ #include #include #include -#include -#include +#include +#include #include #include -#include +#include namespace DB { @@ -38,6 +38,7 @@ Block createSampleBlock(const DictionaryStructure & dict_struct, const Context & } +/// creates IDictionarySource instance from config and DictionaryStructure class DictionarySourceFactory : public Singleton { public: @@ -46,25 +47,38 @@ public: const DictionaryStructure & dict_struct, Context & context) const { + Poco::Util::AbstractConfiguration::Keys keys; + config.keys(config_prefix, keys); + if (keys.size() != 1) + throw Exception{ + "Element dictionary.source should have exactly one child element", + ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG + }; + auto sample_block = createSampleBlock(dict_struct, context); - if (config.has(config_prefix + "file")) + const auto & source_type = keys.front(); + + if ("file" == source_type) { - const auto & filename = config.getString(config_prefix + "file.path"); - const auto & format = config.getString(config_prefix + "file.format"); - return ext::make_unique(filename, format, sample_block, context); + const auto filename = config.getString(config_prefix + ".file.path"); + const auto format = config.getString(config_prefix + ".file.format"); + return std::make_unique(filename, format, sample_block, context); } - else if (config.has(config_prefix + "mysql")) + else if ("mysql" == source_type) { - return ext::make_unique(config, config_prefix + "mysql", sample_block, context); + return std::make_unique(config, config_prefix + ".mysql", sample_block); } - else if (config.has(config_prefix + "clickhouse")) + else if ("clickhouse" == source_type) { - return ext::make_unique(config, config_prefix + "clickhouse.", + return std::make_unique(config, config_prefix + ".clickhouse", sample_block, context); } - throw Exception{"unsupported source type"}; + throw Exception{ + "Unknown dictionary source type: " + source_type, + ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG + }; } }; diff --git a/dbms/include/DB/Dictionaries/DictionaryStructure.h b/dbms/include/DB/Dictionaries/DictionaryStructure.h index cd552784551..15ed7946e1f 100644 --- a/dbms/include/DB/Dictionaries/DictionaryStructure.h +++ b/dbms/include/DB/Dictionaries/DictionaryStructure.h @@ -9,7 +9,7 @@ namespace DB { -enum class attribute_type +enum class AttributeType { uint8, uint16, @@ -24,20 +24,20 @@ enum class attribute_type string }; -inline attribute_type getAttributeTypeByName(const std::string & type) +inline AttributeType getAttributeTypeByName(const std::string & type) { - static const std::unordered_map dictionary{ - { "UInt8", attribute_type::uint8 }, - { "UInt16", attribute_type::uint16 }, - { "UInt32", attribute_type::uint32 }, - { "UInt64", attribute_type::uint64 }, - { "Int8", attribute_type::int8 }, - { "Int16", attribute_type::int16 }, - { "Int32", attribute_type::int32 }, - { "Int64", attribute_type::int64 }, - { "Float32", attribute_type::float32 }, - { "Float64", attribute_type::float64 }, - { "String", attribute_type::string }, + static const std::unordered_map dictionary{ + { "UInt8", AttributeType::uint8 }, + { "UInt16", AttributeType::uint16 }, + { "UInt32", AttributeType::uint32 }, + { "UInt64", AttributeType::uint64 }, + { "Int8", AttributeType::int8 }, + { "Int16", AttributeType::int16 }, + { "Int32", AttributeType::int32 }, + { "Int64", AttributeType::int64 }, + { "Float32", AttributeType::float32 }, + { "Float64", AttributeType::float64 }, + { "String", AttributeType::string }, }; const auto it = dictionary.find(type); @@ -50,44 +50,53 @@ inline attribute_type getAttributeTypeByName(const std::string & type) }; } -inline std::string toString(const attribute_type type) +inline std::string toString(const AttributeType type) { switch (type) { - case attribute_type::uint8: return "UInt8"; - case attribute_type::uint16: return "UInt16"; - case attribute_type::uint32: return "UInt32"; - case attribute_type::uint64: return "UInt64"; - case attribute_type::int8: return "Int8"; - case attribute_type::int16: return "Int16"; - case attribute_type::int32: return "Int32"; - case attribute_type::int64: return "Int64"; - case attribute_type::float32: return "Float32"; - case attribute_type::float64: return "Float64"; - case attribute_type::string: return "String"; + case AttributeType::uint8: return "UInt8"; + case AttributeType::uint16: return "UInt16"; + case AttributeType::uint32: return "UInt32"; + case AttributeType::uint64: return "UInt64"; + case AttributeType::int8: return "Int8"; + case AttributeType::int16: return "Int16"; + case AttributeType::int32: return "Int32"; + case AttributeType::int64: return "Int64"; + case AttributeType::float32: return "Float32"; + case AttributeType::float64: return "Float64"; + case AttributeType::string: return "String"; } throw Exception{ - "Unknown attribute_type " + toString(type), + "Unknown attribute_type " + toString(static_cast(type)), ErrorCodes::ARGUMENT_OUT_OF_BOUND }; } +/// Min and max lifetimes for a dictionary or it's entry struct DictionaryLifetime { std::uint64_t min_sec; std::uint64_t max_sec; - static DictionaryLifetime fromConfig(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix) + DictionaryLifetime(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix) { const auto & lifetime_min_key = config_prefix + ".min"; const auto has_min = config.has(lifetime_min_key); - const std::uint64_t min_update_time = has_min ? config.getInt(lifetime_min_key) : config.getInt(config_prefix); - const std::uint64_t max_update_time = has_min ? config.getInt(config_prefix + ".max") : min_update_time; - return { min_update_time, max_update_time }; + + this->min_sec = has_min ? config.getInt(lifetime_min_key) : config.getInt(config_prefix); + this->max_sec = has_min ? config.getInt(config_prefix + ".max") : this->min_sec; } }; +/** Holds the description of a single dictionary attribute: +* - name, used for lookup into dictionary and source; +* - type, used in conjunction with DataTypeFactory and getAttributeTypeByname; +* - null_value, used as a default value for non-existent entries in the dictionary, +* decimal representation for numeric attributes; +* - hierarchical, whether this attribute defines a hierarchy; +* - injective, whether the mapping to parent is injective (can be used for optimization of GROUP BY?) +*/ struct DictionaryAttribute { std::string name; @@ -97,34 +106,34 @@ struct DictionaryAttribute bool injective; }; +/// Name of identifier plus list of attributes struct DictionaryStructure { std::string id_name; std::vector attributes; - static DictionaryStructure fromConfig(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix) + DictionaryStructure(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix) + : id_name{config.getString(config_prefix + ".id.name")} { - const auto & id_name = config.getString(config_prefix + ".id.name"); if (id_name.empty()) throw Exception{ "No 'id' specified for dictionary", ErrorCodes::BAD_ARGUMENTS }; - DictionaryStructure result{id_name}; - Poco::Util::AbstractConfiguration::Keys keys; config.keys(config_prefix, keys); auto has_hierarchy = false; + for (const auto & key : keys) { if (0 != strncmp(key.data(), "attribute", strlen("attribute"))) continue; - const auto & prefix = config_prefix + '.' + key + '.'; - const auto & name = config.getString(prefix + "name"); - const auto & type = config.getString(prefix + "type"); - const auto & null_value = config.getString(prefix + "null_value"); + const auto prefix = config_prefix + '.' + key + '.'; + const auto name = config.getString(prefix + "name"); + const auto type = config.getString(prefix + "type"); + const auto null_value = config.getString(prefix + "null_value"); const auto hierarchical = config.getBool(prefix + "hierarchical", false); const auto injective = config.getBool(prefix + "injective", false); if (name.empty() || type.empty()) @@ -141,16 +150,16 @@ struct DictionaryStructure has_hierarchy = has_hierarchy || hierarchical; - result.attributes.emplace_back(DictionaryAttribute{name, type, null_value, hierarchical, injective}); + attributes.emplace_back(DictionaryAttribute{ + name, type, null_value, hierarchical, injective + }); } - if (result.attributes.empty()) + if (attributes.empty()) throw Exception{ "Dictionary has no attributes defined", ErrorCodes::BAD_ARGUMENTS }; - - return result; } }; diff --git a/dbms/include/DB/Dictionaries/FileDictionarySource.h b/dbms/include/DB/Dictionaries/FileDictionarySource.h index ac90f103727..52659058b36 100644 --- a/dbms/include/DB/Dictionaries/FileDictionarySource.h +++ b/dbms/include/DB/Dictionaries/FileDictionarySource.h @@ -10,13 +10,14 @@ namespace DB { +/// Allows loading dictionaries from a file with given format, does not support "random access" class FileDictionarySource final : public IDictionarySource { static const auto max_block_size = 8192; public: FileDictionarySource(const std::string & filename, const std::string & format, Block & sample_block, - Context & context) + const Context & context) : filename{filename}, format{format}, sample_block{sample_block}, context(context), last_modification{getLastModification()} {} @@ -29,7 +30,7 @@ public: BlockInputStreamPtr loadAll() override { - auto in_ptr = ext::make_unique(filename); + auto in_ptr = std::make_unique(filename); auto stream = context.getFormatFactory().getInput( format, *in_ptr, sample_block, max_block_size, context.getDataTypeFactory()); last_modification = getLastModification(); @@ -54,8 +55,9 @@ public: } bool isModified() const override { return getLastModification() > last_modification; } + bool supportsSelectiveLoad() const override { return false; } - DictionarySourcePtr clone() const override { return ext::make_unique(*this); } + DictionarySourcePtr clone() const override { return std::make_unique(*this); } private: Poco::Timestamp getLastModification() const { return Poco::File{filename}.getLastModified(); } @@ -63,7 +65,7 @@ private: const std::string filename; const std::string format; Block sample_block; - Context & context; + const Context & context; Poco::Timestamp last_modification; }; diff --git a/dbms/include/DB/Dictionaries/FlatDictionary.h b/dbms/include/DB/Dictionaries/FlatDictionary.h index c0604ccd701..3b018a061ec 100644 --- a/dbms/include/DB/Dictionaries/FlatDictionary.h +++ b/dbms/include/DB/Dictionaries/FlatDictionary.h @@ -2,10 +2,8 @@ #include #include -#include #include #include -#include #include namespace DB @@ -36,9 +34,9 @@ public: bool isCached() const override { return false; } - DictionaryPtr clone() const override { return ext::make_unique(*this); } + DictionaryPtr clone() const override { return std::make_unique(*this); } - const IDictionarySource * const getSource() const override { return source_ptr.get(); } + const IDictionarySource * getSource() const override { return source_ptr.get(); } const DictionaryLifetime & getLifetime() const override { return dict_lifetime; } @@ -50,18 +48,18 @@ public: switch (hierarchical_attribute->type) { - case attribute_type::uint8: return id < attr->uint8_array->size() ? (*attr->uint8_array)[id] : attr->uint8_null_value; - case attribute_type::uint16: return id < attr->uint16_array->size() ? (*attr->uint16_array)[id] : attr->uint16_null_value; - case attribute_type::uint32: return id < attr->uint32_array->size() ? (*attr->uint32_array)[id] : attr->uint32_null_value; - case attribute_type::uint64: return id < attr->uint64_array->size() ? (*attr->uint64_array)[id] : attr->uint64_null_value; - case attribute_type::int8: return id < attr->int8_array->size() ? (*attr->int8_array)[id] : attr->int8_null_value; - case attribute_type::int16: return id < attr->int16_array->size() ? (*attr->int16_array)[id] : attr->int16_null_value; - case attribute_type::int32: return id < attr->int32_array->size() ? (*attr->int32_array)[id] : attr->int32_null_value; - case attribute_type::int64: return id < attr->int64_array->size() ? (*attr->int64_array)[id] : attr->int64_null_value; - case attribute_type::float32: - case attribute_type::float64: - case attribute_type::string: - break; + case AttributeType::uint8: return id < attr->uint8_array->size() ? (*attr->uint8_array)[id] : attr->uint8_null_value; + case AttributeType::uint16: return id < attr->uint16_array->size() ? (*attr->uint16_array)[id] : attr->uint16_null_value; + case AttributeType::uint32: return id < attr->uint32_array->size() ? (*attr->uint32_array)[id] : attr->uint32_null_value; + case AttributeType::uint64: return id < attr->uint64_array->size() ? (*attr->uint64_array)[id] : attr->uint64_null_value; + case AttributeType::int8: return id < attr->int8_array->size() ? (*attr->int8_array)[id] : attr->int8_null_value; + case AttributeType::int16: return id < attr->int16_array->size() ? (*attr->int16_array)[id] : attr->int16_null_value; + case AttributeType::int32: return id < attr->int32_array->size() ? (*attr->int32_array)[id] : attr->int32_null_value; + case AttributeType::int64: return id < attr->int64_array->size() ? (*attr->int64_array)[id] : attr->int64_null_value; + case AttributeType::float32: + case AttributeType::float64: + case AttributeType::string: + break; } throw Exception{ @@ -75,7 +73,7 @@ public: {\ const auto idx = getAttributeIndex(attribute_name);\ const auto & attribute = attributes[idx];\ - if (attribute.type != attribute_type::LC_TYPE)\ + if (attribute.type != AttributeType::LC_TYPE)\ throw Exception{\ "Type mismatch: attribute " + attribute_name + " has type " + toString(attribute.type),\ ErrorCodes::TYPE_MISMATCH\ @@ -112,7 +110,7 @@ public: #define DECLARE_TYPE_CHECKER(NAME, LC_NAME)\ bool is##NAME(const std::size_t attribute_idx) const override\ {\ - return attributes[attribute_idx].type == attribute_type::LC_NAME;\ + return attributes[attribute_idx].type == AttributeType::LC_NAME;\ } DECLARE_TYPE_CHECKER(UInt8, uint8) DECLARE_TYPE_CHECKER(UInt16, uint16) @@ -151,7 +149,7 @@ public: private: struct attribute_t { - attribute_type type; + AttributeType type; UInt8 uint8_null_value; UInt16 uint16_null_value; UInt32 uint32_null_value; @@ -195,6 +193,7 @@ private: void loadData() { auto stream = source_ptr->loadAll(); + stream->readPrefix(); while (const auto block = stream->read()) { @@ -209,65 +208,67 @@ private: setAttributeValue(attribute, id_column[row_idx].get(), attribute_column[row_idx]); } } + + stream->readSuffix(); } - attribute_t createAttributeWithType(const attribute_type type, const std::string & null_value) + attribute_t createAttributeWithType(const AttributeType type, const std::string & null_value) { attribute_t attr{type}; switch (type) { - case attribute_type::uint8: + case AttributeType::uint8: attr.uint8_null_value = DB::parse(null_value); attr.uint8_array.reset(new PODArray); attr.uint8_array->resize_fill(initial_array_size, attr.uint8_null_value); break; - case attribute_type::uint16: + case AttributeType::uint16: attr.uint16_null_value = DB::parse(null_value); attr.uint16_array.reset(new PODArray); attr.uint16_array->resize_fill(initial_array_size, attr.uint16_null_value); break; - case attribute_type::uint32: + case AttributeType::uint32: attr.uint32_null_value = DB::parse(null_value); attr.uint32_array.reset(new PODArray); attr.uint32_array->resize_fill(initial_array_size, attr.uint32_null_value); break; - case attribute_type::uint64: + case AttributeType::uint64: attr.uint64_null_value = DB::parse(null_value); attr.uint64_array.reset(new PODArray); attr.uint64_array->resize_fill(initial_array_size, attr.uint64_null_value); break; - case attribute_type::int8: + case AttributeType::int8: attr.int8_null_value = DB::parse(null_value); attr.int8_array.reset(new PODArray); attr.int8_array->resize_fill(initial_array_size, attr.int8_null_value); break; - case attribute_type::int16: + case AttributeType::int16: attr.int16_null_value = DB::parse(null_value); attr.int16_array.reset(new PODArray); attr.int16_array->resize_fill(initial_array_size, attr.int16_null_value); break; - case attribute_type::int32: + case AttributeType::int32: attr.int32_null_value = DB::parse(null_value); attr.int32_array.reset(new PODArray); attr.int32_array->resize_fill(initial_array_size, attr.int32_null_value); break; - case attribute_type::int64: + case AttributeType::int64: attr.int64_null_value = DB::parse(null_value); attr.int64_array.reset(new PODArray); attr.int64_array->resize_fill(initial_array_size, attr.int64_null_value); break; - case attribute_type::float32: + case AttributeType::float32: attr.float32_null_value = DB::parse(null_value); attr.float32_array.reset(new PODArray); attr.float32_array->resize_fill(initial_array_size, attr.float32_null_value); break; - case attribute_type::float64: + case AttributeType::float64: attr.float64_null_value = DB::parse(null_value); attr.float64_array.reset(new PODArray); attr.float64_array->resize_fill(initial_array_size, attr.float64_null_value); break; - case attribute_type::string: + case AttributeType::string: attr.string_null_value = null_value; attr.string_arena.reset(new Arena); attr.string_array.reset(new PODArray); @@ -288,77 +289,77 @@ private: switch (attribute.type) { - case attribute_type::uint8: + case AttributeType::uint8: { if (id >= attribute.uint8_array->size()) attribute.uint8_array->resize_fill(id, attribute.uint8_null_value); (*attribute.uint8_array)[id] = value.get(); break; } - case attribute_type::uint16: + case AttributeType::uint16: { if (id >= attribute.uint16_array->size()) attribute.uint16_array->resize_fill(id, attribute.uint16_null_value); (*attribute.uint16_array)[id] = value.get(); break; } - case attribute_type::uint32: + case AttributeType::uint32: { if (id >= attribute.uint32_array->size()) attribute.uint32_array->resize_fill(id, attribute.uint32_null_value); (*attribute.uint32_array)[id] = value.get(); break; } - case attribute_type::uint64: + case AttributeType::uint64: { if (id >= attribute.uint64_array->size()) attribute.uint64_array->resize_fill(id, attribute.uint64_null_value); (*attribute.uint64_array)[id] = value.get(); break; } - case attribute_type::int8: + case AttributeType::int8: { if (id >= attribute.int8_array->size()) attribute.int8_array->resize_fill(id, attribute.int8_null_value); (*attribute.int8_array)[id] = value.get(); break; } - case attribute_type::int16: + case AttributeType::int16: { if (id >= attribute.int16_array->size()) attribute.int16_array->resize_fill(id, attribute.int16_null_value); (*attribute.int16_array)[id] = value.get(); break; } - case attribute_type::int32: + case AttributeType::int32: { if (id >= attribute.int32_array->size()) attribute.int32_array->resize_fill(id, attribute.int32_null_value); (*attribute.int32_array)[id] = value.get(); break; } - case attribute_type::int64: + case AttributeType::int64: { if (id >= attribute.int64_array->size()) attribute.int64_array->resize_fill(id, attribute.int64_null_value); (*attribute.int64_array)[id] = value.get(); break; } - case attribute_type::float32: + case AttributeType::float32: { if (id >= attribute.float32_array->size()) attribute.float32_array->resize_fill(id, attribute.float32_null_value); (*attribute.float32_array)[id] = value.get(); break; } - case attribute_type::float64: + case AttributeType::float64: { if (id >= attribute.float64_array->size()) attribute.float64_array->resize_fill(id, attribute.float64_null_value); (*attribute.float64_array)[id] = value.get(); break; } - case attribute_type::string: + case AttributeType::string: { if (id >= attribute.string_array->size()) attribute.string_array->resize_fill(id, attribute.string_null_value); diff --git a/dbms/include/DB/Dictionaries/HashedDictionary.h b/dbms/include/DB/Dictionaries/HashedDictionary.h index 4c83f6395ca..34015f36087 100644 --- a/dbms/include/DB/Dictionaries/HashedDictionary.h +++ b/dbms/include/DB/Dictionaries/HashedDictionary.h @@ -2,11 +2,10 @@ #include #include -#include #include #include #include -#include +#include namespace DB { @@ -33,9 +32,9 @@ public: bool isCached() const override { return false; } - DictionaryPtr clone() const override { return ext::make_unique(*this); } + DictionaryPtr clone() const override { return std::make_unique(*this); } - const IDictionarySource * const getSource() const override { return source_ptr.get(); } + const IDictionarySource * getSource() const override { return source_ptr.get(); } const DictionaryLifetime & getLifetime() const override { return dict_lifetime; } @@ -47,49 +46,49 @@ public: switch (hierarchical_attribute->type) { - case attribute_type::uint8: + case AttributeType::uint8: { const auto it = attr->uint8_map->find(id); return it != attr->uint8_map->end() ? it->second : attr->uint8_null_value; } - case attribute_type::uint16: + case AttributeType::uint16: { const auto it = attr->uint16_map->find(id); return it != attr->uint16_map->end() ? it->second : attr->uint16_null_value; } - case attribute_type::uint32: + case AttributeType::uint32: { const auto it = attr->uint32_map->find(id); return it != attr->uint32_map->end() ? it->second : attr->uint32_null_value; } - case attribute_type::uint64: + case AttributeType::uint64: { const auto it = attr->uint64_map->find(id); return it != attr->uint64_map->end() ? it->second : attr->uint64_null_value; } - case attribute_type::int8: + case AttributeType::int8: { const auto it = attr->int8_map->find(id); return it != attr->int8_map->end() ? it->second : attr->int8_null_value; } - case attribute_type::int16: + case AttributeType::int16: { const auto it = attr->int16_map->find(id); return it != attr->int16_map->end() ? it->second : attr->int16_null_value; } - case attribute_type::int32: + case AttributeType::int32: { const auto it = attr->int32_map->find(id); return it != attr->int32_map->end() ? it->second : attr->int32_null_value; } - case attribute_type::int64: + case AttributeType::int64: { const auto it = attr->int64_map->find(id); return it != attr->int64_map->end() ? it->second : attr->int64_null_value; } - case attribute_type::float32: - case attribute_type::float64: - case attribute_type::string: + case AttributeType::float32: + case AttributeType::float64: + case AttributeType::string: break; }; @@ -104,7 +103,7 @@ public: {\ const auto idx = getAttributeIndex(attribute_name);\ const auto & attribute = attributes[idx];\ - if (attribute.type != attribute_type::LC_TYPE)\ + if (attribute.type != AttributeType::LC_TYPE)\ throw Exception{\ "Type mismatch: attribute " + attribute_name + " has type " + toString(attribute.type),\ ErrorCodes::TYPE_MISMATCH\ @@ -142,7 +141,7 @@ public: #define DECLARE_TYPE_CHECKER(NAME, LC_NAME)\ bool is##NAME(const std::size_t attribute_idx) const override\ {\ - return attributes[attribute_idx].type == attribute_type::LC_NAME;\ + return attributes[attribute_idx].type == AttributeType::LC_NAME;\ } DECLARE_TYPE_CHECKER(UInt8, uint8) DECLARE_TYPE_CHECKER(UInt16, uint16) @@ -182,7 +181,7 @@ public: private: struct attribute_t { - attribute_type type; + AttributeType type; UInt8 uint8_null_value; UInt16 uint16_null_value; UInt32 uint32_null_value; @@ -226,6 +225,7 @@ private: void loadData() { auto stream = source_ptr->loadAll(); + stream->readPrefix(); while (const auto block = stream->read()) { @@ -240,55 +240,57 @@ private: setAttributeValue(attribute, id_column[row_idx].get(), attribute_column[row_idx]); } } + + stream->readSuffix(); } - attribute_t createAttributeWithType(const attribute_type type, const std::string & null_value) + attribute_t createAttributeWithType(const AttributeType type, const std::string & null_value) { attribute_t attr{type}; switch (type) { - case attribute_type::uint8: + case AttributeType::uint8: attr.uint8_null_value = DB::parse(null_value); attr.uint8_map.reset(new HashMap); break; - case attribute_type::uint16: + case AttributeType::uint16: attr.uint16_null_value = DB::parse(null_value); attr.uint16_map.reset(new HashMap); break; - case attribute_type::uint32: + case AttributeType::uint32: attr.uint32_null_value = DB::parse(null_value); attr.uint32_map.reset(new HashMap); break; - case attribute_type::uint64: + case AttributeType::uint64: attr.uint64_null_value = DB::parse(null_value); attr.uint64_map.reset(new HashMap); break; - case attribute_type::int8: + case AttributeType::int8: attr.int8_null_value = DB::parse(null_value); attr.int8_map.reset(new HashMap); break; - case attribute_type::int16: + case AttributeType::int16: attr.int16_null_value = DB::parse(null_value); attr.int16_map.reset(new HashMap); break; - case attribute_type::int32: + case AttributeType::int32: attr.int32_null_value = DB::parse(null_value); attr.int32_map.reset(new HashMap); break; - case attribute_type::int64: + case AttributeType::int64: attr.int64_null_value = DB::parse(null_value); attr.int64_map.reset(new HashMap); break; - case attribute_type::float32: + case AttributeType::float32: attr.float32_null_value = DB::parse(null_value); attr.float32_map.reset(new HashMap); break; - case attribute_type::float64: + case AttributeType::float64: attr.float64_null_value = DB::parse(null_value); attr.float64_map.reset(new HashMap); break; - case attribute_type::string: + case AttributeType::string: attr.string_null_value = null_value; attr.string_arena.reset(new Arena); attr.string_map.reset(new HashMap); @@ -302,57 +304,57 @@ private: { switch (attribute.type) { - case attribute_type::uint8: + case AttributeType::uint8: { attribute.uint8_map->insert({ id, value.get() }); break; } - case attribute_type::uint16: + case AttributeType::uint16: { attribute.uint16_map->insert({ id, value.get() }); break; } - case attribute_type::uint32: + case AttributeType::uint32: { attribute.uint32_map->insert({ id, value.get() }); break; } - case attribute_type::uint64: + case AttributeType::uint64: { attribute.uint64_map->insert({ id, value.get() }); break; } - case attribute_type::int8: + case AttributeType::int8: { attribute.int8_map->insert({ id, value.get() }); break; } - case attribute_type::int16: + case AttributeType::int16: { attribute.int16_map->insert({ id, value.get() }); break; } - case attribute_type::int32: + case AttributeType::int32: { attribute.int32_map->insert({ id, value.get() }); break; } - case attribute_type::int64: + case AttributeType::int64: { attribute.int64_map->insert({ id, value.get() }); break; } - case attribute_type::float32: + case AttributeType::float32: { attribute.float32_map->insert({ id, value.get() }); break; } - case attribute_type::float64: + case AttributeType::float64: { attribute.float64_map->insert({ id, value.get() }); break; } - case attribute_type::string: + case AttributeType::string: { const auto & string = value.get(); const auto string_in_arena = attribute.string_arena->insert(string.data(), string.size()); diff --git a/dbms/include/DB/Dictionaries/IDictionary.h b/dbms/include/DB/Dictionaries/IDictionary.h index 088c9901912..09361caa6cf 100644 --- a/dbms/include/DB/Dictionaries/IDictionary.h +++ b/dbms/include/DB/Dictionaries/IDictionary.h @@ -18,7 +18,7 @@ class DictionaryLifetime; class IDictionary { public: - using id_t = std::uint64_t; + using id_t = std::uint64_t; virtual std::string getName() const = 0; @@ -28,7 +28,7 @@ public: virtual void reload() {} virtual DictionaryPtr clone() const = 0; - virtual const IDictionarySource * const getSource() const = 0; + virtual const IDictionarySource * getSource() const = 0; virtual const DictionaryLifetime & getLifetime() const = 0; @@ -89,7 +89,7 @@ public: virtual Float64 getFloat64Unsafe(std::size_t attribute_idx, id_t id) const = 0; virtual StringRef getStringUnsafe(std::size_t attribute_idx, id_t id) const = 0; - virtual ~IDictionary() = default; + virtual ~IDictionary() = default; }; } diff --git a/dbms/include/DB/Dictionaries/IDictionarySource.h b/dbms/include/DB/Dictionaries/IDictionarySource.h index 50404cc682c..3a0eea143db 100644 --- a/dbms/include/DB/Dictionaries/IDictionarySource.h +++ b/dbms/include/DB/Dictionaries/IDictionarySource.h @@ -9,12 +9,28 @@ namespace DB class IDictionarySource; using DictionarySourcePtr = std::unique_ptr; +/** Data-provider interface for external dictionaries, +* abstracts out the data source (file, MySQL, ClickHouse, external program, network request et cetera) +* from the presentation and memory layout (the dictionary itself). +*/ class IDictionarySource { public: + /// returns an input stream with all the data available from this source virtual BlockInputStreamPtr loadAll() = 0; + + /** Indicates whether this source supports "random access" loading of data + * loadId and loadIds can only be used if this function returns true. + */ + virtual bool supportsSelectiveLoad() const = 0; + + /// returns an input stream with the data for the requested identifier virtual BlockInputStreamPtr loadId(const std::uint64_t id) = 0; + + /// returns an input stream with the data for a collection of identifiers virtual BlockInputStreamPtr loadIds(const std::vector ids) = 0; + + /// indicates whether the source has been modified since last load* operation virtual bool isModified() const = 0; virtual DictionarySourcePtr clone() const = 0; diff --git a/dbms/include/DB/Dictionaries/MysqlBlockInputStream.h b/dbms/include/DB/Dictionaries/MySQLBlockInputStream.h similarity index 54% rename from dbms/include/DB/Dictionaries/MysqlBlockInputStream.h rename to dbms/include/DB/Dictionaries/MySQLBlockInputStream.h index 120a02ac47f..26942ad9b28 100644 --- a/dbms/include/DB/Dictionaries/MysqlBlockInputStream.h +++ b/dbms/include/DB/Dictionaries/MySQLBlockInputStream.h @@ -12,10 +12,11 @@ namespace DB { -class MysqlBlockInputStream final : public IProfilingBlockInputStream +/// Allows processing results of a MySQL query as a sequence of Blocks, simplifies chaining +class MySQLBlockInputStream final : public IProfilingBlockInputStream { public: - MysqlBlockInputStream(mysqlxx::Query query, const Block & sample_block, const std::size_t max_block_size) + MySQLBlockInputStream(mysqlxx::Query query, const Block & sample_block, const std::size_t max_block_size) : query{std::move(query)}, result{query.use()}, sample_block{sample_block}, max_block_size{max_block_size} { types.reserve(sample_block.columns()); @@ -24,27 +25,27 @@ public: { const auto type = sample_block.getByPosition(idx).type.get(); if (typeid_cast(type)) - types.push_back(attribute_type::uint8); + types.push_back(AttributeType::uint8); else if (typeid_cast(type)) - types.push_back(attribute_type::uint16); + types.push_back(AttributeType::uint16); else if (typeid_cast(type)) - types.push_back(attribute_type::uint32); + types.push_back(AttributeType::uint32); else if (typeid_cast(type)) - types.push_back(attribute_type::uint64); + types.push_back(AttributeType::uint64); else if (typeid_cast(type)) - types.push_back(attribute_type::int8); + types.push_back(AttributeType::int8); else if (typeid_cast(type)) - types.push_back(attribute_type::int16); + types.push_back(AttributeType::int16); else if (typeid_cast(type)) - types.push_back(attribute_type::int32); + types.push_back(AttributeType::int32); else if (typeid_cast(type)) - types.push_back(attribute_type::int64); + types.push_back(AttributeType::int64); else if (typeid_cast(type)) - types.push_back(attribute_type::float32); + types.push_back(AttributeType::float32); else if (typeid_cast(type)) - types.push_back(attribute_type::float64); + types.push_back(AttributeType::float64); else if (typeid_cast(type)) - types.push_back(attribute_type::string); + types.push_back(AttributeType::string); else throw Exception{ "Unsupported type " + type->getName(), @@ -53,11 +54,11 @@ public: } } - String getName() const override { return "MysqlBlockInputStream"; } + String getName() const override { return "MySQLBlockInputStream"; } String getID() const override { - return "Mysql(" + query.str() + ")"; + return "MySQL(" + query.str() + ")"; } private: @@ -67,7 +68,7 @@ private: if (block.columns() != result.getNumFields()) throw Exception{ - "mysqlxx::UserQueryResult contains " + toString(result.getNumFields()) + " columns while " + + "mysqlxx::UseQueryResult contains " + toString(result.getNumFields()) + " columns while " + toString(block.columns()) + " expected", ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH }; @@ -86,29 +87,29 @@ private: return rows == 0 ? Block{} : block; }; - static void insertValue(ColumnPtr & column, const mysqlxx::Value & value, const attribute_type type) + static void insertValue(ColumnPtr & column, const mysqlxx::Value & value, const AttributeType type) { switch (type) { - case attribute_type::uint8: column->insert(static_cast(value)); break; - case attribute_type::uint16: column->insert(static_cast(value)); break; - case attribute_type::uint32: column->insert(static_cast(value)); break; - case attribute_type::uint64: column->insert(static_cast(value)); break; - case attribute_type::int8: column->insert(static_cast(value)); break; - case attribute_type::int16: column->insert(static_cast(value)); break; - case attribute_type::int32: column->insert(static_cast(value)); break; - case attribute_type::int64: column->insert(static_cast(value)); break; - case attribute_type::float32: column->insert(static_cast(value)); break; - case attribute_type::float64: column->insert(static_cast(value)); break; - case attribute_type::string: column->insert(value.getString()); break; + case AttributeType::uint8: column->insert(static_cast(value)); break; + case AttributeType::uint16: column->insert(static_cast(value)); break; + case AttributeType::uint32: column->insert(static_cast(value)); break; + case AttributeType::uint64: column->insert(static_cast(value)); break; + case AttributeType::int8: column->insert(static_cast(value)); break; + case AttributeType::int16: column->insert(static_cast(value)); break; + case AttributeType::int32: column->insert(static_cast(value)); break; + case AttributeType::int64: column->insert(static_cast(value)); break; + case AttributeType::float32: column->insert(static_cast(value)); break; + case AttributeType::float64: column->insert(static_cast(value)); break; + case AttributeType::string: column->insert(value.getString()); break; } } mysqlxx::Query query; mysqlxx::UseQueryResult result; Block sample_block; - std::size_t max_block_size; - std::vector types; + const std::size_t max_block_size; + std::vector types; }; } diff --git a/dbms/include/DB/Dictionaries/MysqlDictionarySource.h b/dbms/include/DB/Dictionaries/MySQLDictionarySource.h similarity index 64% rename from dbms/include/DB/Dictionaries/MysqlDictionarySource.h rename to dbms/include/DB/Dictionaries/MySQLDictionarySource.h index e97382cd223..dd1f49aca75 100644 --- a/dbms/include/DB/Dictionaries/MysqlDictionarySource.h +++ b/dbms/include/DB/Dictionaries/MySQLDictionarySource.h @@ -1,39 +1,42 @@ #pragma once #include -#include -#include +#include #include #include #include +#include namespace DB { -class MysqlDictionarySource final : public IDictionarySource +/// Allows loading dictionaries from a MySQL database +class MySQLDictionarySource final : public IDictionarySource { static const auto max_block_size = 8192; public: - MysqlDictionarySource(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, - Block & sample_block, const Context & context) + MySQLDictionarySource(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, + Block & sample_block) : table{config.getString(config_prefix + ".table")}, - sample_block{sample_block}, context(context), + sample_block{sample_block}, pool{config, config_prefix}, load_all_query{composeLoadAllQuery(sample_block, table)}, last_modification{getLastModification()} {} - MysqlDictionarySource(const MysqlDictionarySource & other) + /// copy-constructor is provided in order to support cloneability + MySQLDictionarySource(const MySQLDictionarySource & other) : table{other.table}, - sample_block{other.sample_block}, context(other.context), + sample_block{other.sample_block}, pool{other.pool}, load_all_query{other.load_all_query}, last_modification{other.last_modification} {} BlockInputStreamPtr loadAll() override { - return new MysqlBlockInputStream{pool.Get()->query(load_all_query), sample_block, max_block_size}; + last_modification = getLastModification(); + return new MySQLBlockInputStream{pool.Get()->query(load_all_query), sample_block, max_block_size}; } BlockInputStreamPtr loadId(const std::uint64_t id) override @@ -53,8 +56,9 @@ public: } bool isModified() const override { return getLastModification() > last_modification; } + bool supportsSelectiveLoad() const override { return true; } - DictionarySourcePtr clone() const override { return ext::make_unique(*this); } + DictionarySourcePtr clone() const override { return std::make_unique(*this); } private: mysqlxx::DateTime getLastModification() const @@ -64,20 +68,23 @@ private: try { auto connection = pool.Get(); - auto query = connection->query("SHOW TABLE STATUS LIKE '%" + table + "%';"); + auto query = connection->query("SHOW TABLE STATUS LIKE '%" + strconvert::escaped_for_like(table) + "%';"); auto result = query.use(); auto row = result.fetch(); const auto & update_time = row[Update_time_idx]; - return !update_time.isNull() ? update_time.getDateTime() : mysqlxx::DateTime{std::time(nullptr)}; + if (!update_time.isNull()) + return update_time.getDateTime(); } catch (...) { - tryLogCurrentException("MysqlDictionarySource"); + tryLogCurrentException("MySQLDictionarySource"); } - return {}; + /// we suppose failure to get modification time is not an error, therefore return current time + return mysqlxx::DateTime{std::time(nullptr)}; } + /// @todo escape table and column names static std::string composeLoadAllQuery(const Block & block, const std::string & table) { std::string query{"SELECT "}; @@ -99,7 +106,6 @@ private: const std::string table; Block sample_block; - const Context & context; mutable mysqlxx::PoolWithFailover pool; const std::string load_all_query; mysqlxx::DateTime last_modification; diff --git a/dbms/include/DB/Dictionaries/OwningBufferBlockInputStream.h b/dbms/include/DB/Dictionaries/OwningBufferBlockInputStream.h index b50d4c322a7..cc4c066f9f9 100644 --- a/dbms/include/DB/Dictionaries/OwningBufferBlockInputStream.h +++ b/dbms/include/DB/Dictionaries/OwningBufferBlockInputStream.h @@ -7,6 +7,9 @@ namespace DB { +/** Provides reading from a Buffer, taking exclusive ownership over it's lifetime, +* simplifies usage of ReadBufferFromFile (no need to manage buffer lifetime) etc. +*/ class OwningBufferBlockInputStream : public IProfilingBlockInputStream { public: @@ -21,9 +24,7 @@ private: String getName() const override { return "OwningBufferBlockInputStream"; } - String getID() const override { - return "OwningBuffer(" + stream->getID() + ")"; - } + String getID() const override { return "OwningBuffer(" + stream->getID() + ")"; } BlockInputStreamPtr stream; std::unique_ptr buffer; diff --git a/dbms/include/DB/Dictionaries/config_ptr_t.h b/dbms/include/DB/Dictionaries/config_ptr_t.h deleted file mode 100644 index f5f29792cc7..00000000000 --- a/dbms/include/DB/Dictionaries/config_ptr_t.h +++ /dev/null @@ -1,15 +0,0 @@ -#pragma once - -#include - -namespace DB -{ - -template struct release -{ - void operator()(const T * const ptr) { ptr->release(); } -}; - -template using config_ptr_t = std::unique_ptr>; - -} diff --git a/dbms/include/DB/Functions/FunctionsDictionaries.h b/dbms/include/DB/Functions/FunctionsDictionaries.h index c5e2a95989f..8bfef963177 100644 --- a/dbms/include/DB/Functions/FunctionsDictionaries.h +++ b/dbms/include/DB/Functions/FunctionsDictionaries.h @@ -14,6 +14,7 @@ #include #include #include +#include namespace DB @@ -35,6 +36,19 @@ namespace DB * * Получить массив идентификаторов регионов, состоящий из исходного и цепочки родителей. Порядок implementation defined. * regionHierarchy, OSHierarchy, SEHierarchy. + * + * Функции, использующие подключаемые (внешние) словари. + * + * Получить значение аттрибута заданного типа. + * dictGetType(dictionary, attribute, id), + * Type - placeholder для имени типа, в данный момент поддерживаются любые числовые и строковой типы. + * Тип должен соответствовать реальному типу аттрибута, с которым он был объявлен в структуре словаря. + * + * Получить массив идентификаторов, состоящий из исходного и цепочки родителей. + * dictGetHierarchy(dictionary, id). + * + * Является ли первы йидентификатор потомком второго. + * dictIsIn(dictionary, child_id, parent_id). */ @@ -726,10 +740,10 @@ public: static IFunction * create(const Context & context) { - return new FunctionDictGetString{context.getDictionaries()}; + return new FunctionDictGetString{context.getExternalDictionaries()}; }; - FunctionDictGetString(const Dictionaries & dictionaries) : dictionaries(dictionaries) {} + FunctionDictGetString(const ExternalDictionaries & dictionaries) : dictionaries(dictionaries) {} String getName() const override { return name; } @@ -746,7 +760,8 @@ private: if (!typeid_cast(arguments[0].get())) { throw Exception{ - "Illegal type " + arguments[0]->getName() + " of argument of function " + getName(), + "Illegal type " + arguments[0]->getName() + " of first argument of function " + getName() + + ", expected a string.", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT }; } @@ -754,7 +769,8 @@ private: if (!typeid_cast(arguments[1].get())) { throw Exception{ - "Illegal type " + arguments[1]->getName() + " of argument of function " + getName(), + "Illegal type " + arguments[1]->getName() + " of second argument of function " + getName() + + ", expected a string.", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT }; } @@ -770,7 +786,8 @@ private: !typeid_cast(id_arg)) { throw Exception{ - "Illegal type " + arguments[2]->getName() + " of argument of function " + getName(), + "Illegal type " + arguments[2]->getName() + " of third argument of function " + getName() + + ", expected an integer.", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT }; } @@ -787,11 +804,12 @@ private: ErrorCodes::ILLEGAL_COLUMN }; - auto dict = dictionaries.getExternalDictionary(dict_name_col->getData()); + auto dict = dictionaries.getDictionary(dict_name_col->getData()); const auto dict_ptr = dict.get(); if (!executeDispatch(block, arguments, result, dict_ptr) && - !executeDispatch(block, arguments, result, dict_ptr)) + !executeDispatch(block, arguments, result, dict_ptr) && + !executeDispatch(block, arguments, result, dict_ptr)) throw Exception{ "Unsupported dictionary type " + dict_ptr->getTypeName(), ErrorCodes::UNKNOWN_TYPE @@ -871,7 +889,7 @@ private: return false; } - const Dictionaries & dictionaries; + const ExternalDictionaries & dictionaries; }; @@ -911,10 +929,10 @@ public: static IFunction * create(const Context & context) { - return new FunctionDictGet{context.getDictionaries()}; + return new FunctionDictGet{context.getExternalDictionaries()}; }; - FunctionDictGet(const Dictionaries & dictionaries) : dictionaries(dictionaries) {} + FunctionDictGet(const ExternalDictionaries & dictionaries) : dictionaries(dictionaries) {} String getName() const override { return name; } @@ -931,7 +949,8 @@ private: if (!typeid_cast(arguments[0].get())) { throw Exception{ - "Illegal type " + arguments[0]->getName() + " of argument of function " + getName(), + "Illegal type " + arguments[0]->getName() + " of first argument of function " + getName() + + ", expected a string.", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT }; } @@ -939,7 +958,8 @@ private: if (!typeid_cast(arguments[1].get())) { throw Exception{ - "Illegal type " + arguments[1]->getName() + " of argument of function " + getName(), + "Illegal type " + arguments[1]->getName() + " of second argument of function " + getName() + + ", expected a string.", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT }; } @@ -955,7 +975,8 @@ private: !typeid_cast(id_arg)) { throw Exception{ - "Illegal type " + id_arg->getName() + " of argument of function " + getName(), + "Illegal type " + id_arg->getName() + " of third argument of function " + getName() + + ", expected an integer.", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT }; } @@ -972,11 +993,12 @@ private: ErrorCodes::ILLEGAL_COLUMN }; - auto dict = dictionaries.getExternalDictionary(dict_name_col->getData()); + auto dict = dictionaries.getDictionary(dict_name_col->getData()); const auto dict_ptr = dict.get(); if (!executeDispatch(block, arguments, result, dict_ptr) && - !executeDispatch(block, arguments, result, dict_ptr)) + !executeDispatch(block, arguments, result, dict_ptr) && + !executeDispatch(block, arguments, result, dict_ptr)) throw Exception{ "Unsupported dictionary type " + dict_ptr->getTypeName(), ErrorCodes::UNKNOWN_TYPE @@ -1058,7 +1080,7 @@ private: return false; } - const Dictionaries & dictionaries; + const ExternalDictionaries & dictionaries; }; template @@ -1084,10 +1106,10 @@ public: static IFunction * create(const Context & context) { - return new FunctionDictGetHierarchy{context.getDictionaries()}; + return new FunctionDictGetHierarchy{context.getExternalDictionaries()}; }; - FunctionDictGetHierarchy(const Dictionaries & dictionaries) : dictionaries(dictionaries) {} + FunctionDictGetHierarchy(const ExternalDictionaries & dictionaries) : dictionaries(dictionaries) {} String getName() const override { return name; } @@ -1104,7 +1126,8 @@ private: if (!typeid_cast(arguments[0].get())) { throw Exception{ - "Illegal type " + arguments[0]->getName() + " of argument of function " + getName(), + "Illegal type " + arguments[0]->getName() + " of first argument of function " + getName() + + ", expected a string.", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT }; } @@ -1120,7 +1143,8 @@ private: !typeid_cast(id_arg)) { throw Exception{ - "Illegal type " + id_arg->getName() + " of argument of function " + getName(), + "Illegal type " + id_arg->getName() + " of second argument of function " + getName() + + ", expected an integer.", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT }; } @@ -1137,7 +1161,7 @@ private: ErrorCodes::ILLEGAL_COLUMN }; - auto dict = dictionaries.getExternalDictionary(dict_name_col->getData()); + auto dict = dictionaries.getDictionary(dict_name_col->getData()); const auto dict_ptr = dict.get(); if (!dict->hasHierarchy()) @@ -1147,7 +1171,8 @@ private: }; if (!executeDispatch(block, arguments, result, dict_ptr) && - !executeDispatch(block, arguments, result, dict_ptr)) + !executeDispatch(block, arguments, result, dict_ptr) && + !executeDispatch(block, arguments, result, dict_ptr)) throw Exception{ "Unsupported dictionary type " + dict_ptr->getTypeName(), ErrorCodes::UNKNOWN_TYPE @@ -1234,7 +1259,7 @@ private: return false; } - const Dictionaries & dictionaries; + const ExternalDictionaries & dictionaries; }; @@ -1245,10 +1270,10 @@ public: static IFunction * create(const Context & context) { - return new FunctionDictIsIn{context.getDictionaries()}; + return new FunctionDictIsIn{context.getExternalDictionaries()}; }; - FunctionDictIsIn(const Dictionaries & dictionaries) : dictionaries(dictionaries) {} + FunctionDictIsIn(const ExternalDictionaries & dictionaries) : dictionaries(dictionaries) {} String getName() const override { return name; } @@ -1265,7 +1290,8 @@ private: if (!typeid_cast(arguments[0].get())) { throw Exception{ - "Illegal type " + arguments[0]->getName() + " of argument of function " + getName(), + "Illegal type " + arguments[0]->getName() + " of first argument of function " + getName() + + ", expected a string.", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT }; } @@ -1281,7 +1307,8 @@ private: !typeid_cast(child_id_arg)) { throw Exception{ - "Illegal type " + child_id_arg->getName() + " of argument of function " + getName(), + "Illegal type " + child_id_arg->getName() + " of second argument of function " + getName() + + ", expected an integer.", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT }; } @@ -1297,7 +1324,8 @@ private: !typeid_cast(ancestor_id_arg)) { throw Exception{ - "Illegal type " + ancestor_id_arg->getName() + " of argument of function " + getName(), + "Illegal type " + ancestor_id_arg->getName() + " of argument of third function " + getName() + + ", expected an integer.", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT }; } @@ -1314,7 +1342,7 @@ private: ErrorCodes::ILLEGAL_COLUMN }; - auto dict = dictionaries.getExternalDictionary(dict_name_col->getData()); + auto dict = dictionaries.getDictionary(dict_name_col->getData()); const auto dict_ptr = dict.get(); if (!dict->hasHierarchy()) @@ -1324,7 +1352,8 @@ private: }; if (!executeDispatch(block, arguments, result, dict_ptr) && - !executeDispatch(block, arguments, result, dict_ptr)) + !executeDispatch(block, arguments, result, dict_ptr) && + !executeDispatch(block, arguments, result, dict_ptr)) throw Exception{ "Unsupported dictionary type " + dict_ptr->getTypeName(), ErrorCodes::UNKNOWN_TYPE @@ -1470,7 +1499,7 @@ private: return false; } - const Dictionaries & dictionaries; + const ExternalDictionaries & dictionaries; }; diff --git a/dbms/include/DB/Functions/FunctionsStringArray.h b/dbms/include/DB/Functions/FunctionsStringArray.h index 629d215bad0..12f8f994b94 100644 --- a/dbms/include/DB/Functions/FunctionsStringArray.h +++ b/dbms/include/DB/Functions/FunctionsStringArray.h @@ -267,7 +267,7 @@ public: throw Exception("Illegal column " + col->getName() + " of first argument of function " + getName() + ". Must be constant string.", ErrorCodes::ILLEGAL_COLUMN); - re = Regexps::get(col->getData()); + re = Regexps::get(col->getData()); capture = re->getNumberOfSubpatterns() > 0 ? 1 : 0; matches.resize(capture + 1); diff --git a/dbms/include/DB/Functions/FunctionsStringSearch.h b/dbms/include/DB/Functions/FunctionsStringSearch.h index c8ed6b852f6..ffc95a8928d 100644 --- a/dbms/include/DB/Functions/FunctionsStringSearch.h +++ b/dbms/include/DB/Functions/FunctionsStringSearch.h @@ -3,7 +3,7 @@ #include #include -#include +#include #include #include @@ -299,24 +299,24 @@ namespace Regexps } template - inline Regexp createRegexp(const std::string & pattern) { return pattern; } + inline Regexp createRegexp(const std::string & pattern, int flags) { return {pattern, flags}; } template <> - inline Regexp createRegexp(const std::string & pattern) { return likePatternToRegexp(pattern); } + inline Regexp createRegexp(const std::string & pattern, int flags) { return {likePatternToRegexp(pattern), flags}; } - template + template inline Pointer get(const std::string & pattern) { /// C++11 has thread-safe function-local statics on most modern compilers. - static KnownRegexps known_regexps; + static KnownRegexps known_regexps; /// Разные переменные для разных параметров шаблона. static std::mutex mutex; std::lock_guard lock{mutex}; auto it = known_regexps.find(pattern); if (known_regexps.end() == it) - it = known_regexps.emplace(pattern, ext::make_unique()).first; + it = known_regexps.emplace(pattern, std::make_unique()).first; return it->second->get([&pattern] { - return new Regexp{createRegexp(pattern)}; + return new Regexp{createRegexp(pattern, no_capture ? OptimizedRegularExpression::RE_NO_CAPTURE : 0)}; }); } } @@ -373,7 +373,7 @@ struct MatchImpl } else { - const auto & regexp = Regexps::get(pattern); + const auto & regexp = Regexps::get(pattern); size_t size = offsets.size(); for (size_t i = 0; i < size; ++i) res[i] = revert ^ regexp->match(reinterpret_cast(&data[i != 0 ? offsets[i - 1] : 0]), (i != 0 ? offsets[i] - offsets[i - 1] : offsets[0]) - 1); @@ -382,7 +382,7 @@ struct MatchImpl static void constant(const std::string & data, const std::string & pattern, UInt8 & res) { - const auto & regexp = Regexps::get(pattern); + const auto & regexp = Regexps::get(pattern); res = revert ^ regexp->match(data); } }; @@ -397,7 +397,7 @@ struct ExtractImpl res_data.reserve(data.size() / 5); res_offsets.resize(offsets.size()); - const auto & regexp = Regexps::get(pattern); + const auto & regexp = Regexps::get(pattern); unsigned capture = regexp->getNumberOfSubpatterns() > 0 ? 1 : 0; OptimizedRegularExpression::MatchVec matches; diff --git a/dbms/include/DB/IO/BufferBase.h b/dbms/include/DB/IO/BufferBase.h index d0d2b0f9cb1..a3afde8eb99 100644 --- a/dbms/include/DB/IO/BufferBase.h +++ b/dbms/include/DB/IO/BufferBase.h @@ -79,6 +79,12 @@ public: return bytes + offset(); } + /** Проверить, есть ли данные в буфере. */ + bool hasPendingData() const + { + return pos != working_buffer.end(); + } + protected: /// Ссылка на кусок памяти для буфера. Buffer internal_buffer; diff --git a/dbms/include/DB/IO/ConcatReadBuffer.h b/dbms/include/DB/IO/ConcatReadBuffer.h index f4f27f6562e..1f56b06fbdb 100644 --- a/dbms/include/DB/IO/ConcatReadBuffer.h +++ b/dbms/include/DB/IO/ConcatReadBuffer.h @@ -14,23 +14,23 @@ class ConcatReadBuffer : public ReadBuffer { public: typedef std::vector ReadBuffers; - + protected: ReadBuffers buffers; ReadBuffers::iterator current; - + bool nextImpl() { if (buffers.end() == current) return false; - + /// Первое чтение - if (working_buffer.size() == 0 && (*current)->position() != (*current)->buffer().end()) + if (working_buffer.size() == 0 && (*current)->hasPendingData()) { working_buffer = Buffer((*current)->position(), (*current)->buffer().end()); return true; } - + if (!(*current)->next()) { ++current; @@ -45,7 +45,7 @@ protected: return false; } } - + working_buffer = Buffer((*current)->position(), (*current)->buffer().end()); return true; } diff --git a/dbms/include/DB/IO/ReadBuffer.h b/dbms/include/DB/IO/ReadBuffer.h index d3175042fc4..4e97039fce9 100644 --- a/dbms/include/DB/IO/ReadBuffer.h +++ b/dbms/include/DB/IO/ReadBuffer.h @@ -40,13 +40,13 @@ public: /** прочитать следующие данные и заполнить ими буфер; переместить позицию в начало; * вернуть false в случае конца, true иначе; кинуть исключение, если что-то не так */ - inline bool next() + bool next() { bytes += offset(); bool res = nextImpl(); if (!res) working_buffer.resize(0); - + pos = working_buffer.begin(); return res; } @@ -54,7 +54,7 @@ public: inline void nextIfAtEnd() { - if (pos == working_buffer.end()) + if (!hasPendingData()) next(); } @@ -68,9 +68,9 @@ public: * * При попытке чтения после конца, следует кидать исключение. */ - inline bool eof() + bool eof() { - return pos == working_buffer.end() && !next(); + return !hasPendingData() && !next(); } void ignore() @@ -143,12 +143,6 @@ public: return read(to, n); } - /** Проверить, есть ли данные в буфере для чтения. */ - bool hasPendingData() const - { - return offset() != working_buffer.size(); - } - private: /** Прочитать следующие данные и заполнить ими буфер. * Вернуть false в случае конца, true иначе. diff --git a/dbms/include/DB/IO/ReadBufferFromFileDescriptor.h b/dbms/include/DB/IO/ReadBufferFromFileDescriptor.h index ea89708910e..1a2febf61b5 100644 --- a/dbms/include/DB/IO/ReadBufferFromFileDescriptor.h +++ b/dbms/include/DB/IO/ReadBufferFromFileDescriptor.h @@ -80,7 +80,7 @@ public: if (new_pos + (working_buffer.end() - pos) == pos_in_file) return new_pos; - if (pos != working_buffer.end() && new_pos <= pos_in_file && new_pos >= pos_in_file - static_cast(working_buffer.size())) + if (hasPendingData() && new_pos <= pos_in_file && new_pos >= pos_in_file - static_cast(working_buffer.size())) { /// Остались в пределах буфера. pos = working_buffer.begin() + (new_pos - (pos_in_file - working_buffer.size())); diff --git a/dbms/include/DB/IO/WriteBuffer.h b/dbms/include/DB/IO/WriteBuffer.h index c57575068b7..54f38242efc 100644 --- a/dbms/include/DB/IO/WriteBuffer.h +++ b/dbms/include/DB/IO/WriteBuffer.h @@ -44,7 +44,7 @@ public: pos = working_buffer.begin(); throw; } - + pos = working_buffer.begin(); } @@ -56,11 +56,11 @@ public: inline void nextIfAtEnd() { - if (pos == working_buffer.end()) + if (!hasPendingData()) next(); } - + void write(const char * from, size_t n) { size_t bytes_copied = 0; @@ -82,7 +82,7 @@ public: *pos = x; ++pos; } - + private: /** Записать данные, находящиеся в буфере (от начала буфера до текущей позиции). * Кинуть исключение, если что-то не так. diff --git a/dbms/include/DB/Interpreters/Context.h b/dbms/include/DB/Interpreters/Context.h index c3f08b52fcd..18a228bef83 100644 --- a/dbms/include/DB/Interpreters/Context.h +++ b/dbms/include/DB/Interpreters/Context.h @@ -16,7 +16,6 @@ #include #include #include -#include #include #include #include @@ -24,6 +23,7 @@ #include #include #include +#include #include #include #include @@ -86,9 +86,9 @@ struct ContextShared TableFunctionFactory table_function_factory; /// Табличные функции. AggregateFunctionFactory aggregate_function_factory; /// Агрегатные функции. DataTypeFactory data_type_factory; /// Типы данных. - StorageFactory storage_factory; /// Движки таблиц. FormatFactory format_factory; /// Форматы. mutable SharedPtr dictionaries; /// Словари Метрики. Инициализируются лениво. + mutable SharedPtr external_dictionaries; Users users; /// Известные пользователи. Quotas quotas; /// Известные квоты на использование ресурсов. mutable UncompressedCachePtr uncompressed_cache; /// Кэш разжатых блоков. @@ -259,9 +259,9 @@ public: const TableFunctionFactory & getTableFunctionFactory() const { return shared->table_function_factory; } const AggregateFunctionFactory & getAggregateFunctionFactory() const { return shared->aggregate_function_factory; } const DataTypeFactory & getDataTypeFactory() const { return shared->data_type_factory; } - const StorageFactory & getStorageFactory() const { return shared->storage_factory; } const FormatFactory & getFormatFactory() const { return shared->format_factory; } const Dictionaries & getDictionaries() const; + const ExternalDictionaries & getExternalDictionaries() const; InterserverIOHandler & getInterserverIOHandler() { return shared->interserver_io_handler; } diff --git a/dbms/include/DB/Interpreters/Dictionaries.h b/dbms/include/DB/Interpreters/Dictionaries.h index e943d12fe61..94e12484daa 100644 --- a/dbms/include/DB/Interpreters/Dictionaries.h +++ b/dbms/include/DB/Interpreters/Dictionaries.h @@ -1,13 +1,5 @@ #pragma once -#include -#include -#include -#include -#include - -#include - #include #include #include @@ -18,10 +10,7 @@ namespace DB { -using Poco::SharedPtr; - class Context; -class IDictionary; /// Словари Метрики, которые могут использоваться в функциях. @@ -31,22 +20,15 @@ private: MultiVersion regions_hierarchies; MultiVersion tech_data_hierarchy; MultiVersion regions_names; - mutable std::mutex external_dictionaries_mutex; - std::unordered_map>> external_dictionaries; - std::unordered_map update_times; - std::mt19937 rnd_engine; - Context & context; /// Периодичность обновления справочников, в секундах. int reload_period; std::thread reloading_thread; - std::thread reloading_externals_thread; - Poco::Event destroy{false}; + Poco::Event destroy; Logger * log; - Poco::Timestamp dictionaries_last_modified{0}; void handleException() const @@ -122,7 +104,6 @@ private: } - void reloadExternals(); /// Обновляет каждые reload_period секунд. void reloadPeriodically() @@ -136,35 +117,19 @@ private: } } - void reloadExternalsPeriodically() - { - const auto check_period = 5 * 1000; - while (true) - { - if (destroy.tryWait(check_period)) - return; - - reloadExternals(); - } - } - public: /// Справочники будут обновляться в отдельном потоке, каждые reload_period секунд. - Dictionaries(Context & context, int reload_period_ = 3600) - : context(context), reload_period(reload_period_), - log(&Logger::get("Dictionaries")) + Dictionaries(int reload_period_ = 3600) + : reload_period(reload_period_), log(&Logger::get("Dictionaries")) { reloadImpl(); - reloadExternals(); reloading_thread = std::thread([this] { reloadPeriodically(); }); - reloading_externals_thread = std::thread{&Dictionaries::reloadExternalsPeriodically, this}; } ~Dictionaries() { destroy.set(); reloading_thread.join(); - reloading_externals_thread.join(); } MultiVersion::Version getRegionsHierarchies() const @@ -181,19 +146,6 @@ public: { return regions_names.get(); } - - MultiVersion::Version getExternalDictionary(const std::string & name) const - { - const std::lock_guard lock{external_dictionaries_mutex}; - const auto it = external_dictionaries.find(name); - if (it == std::end(external_dictionaries)) - throw Exception{ - "No such dictionary: " + name, - ErrorCodes::BAD_ARGUMENTS - }; - - return it->second->get(); - } }; } diff --git a/dbms/include/DB/Interpreters/ExternalDictionaries.h b/dbms/include/DB/Interpreters/ExternalDictionaries.h new file mode 100644 index 00000000000..171d1a39242 --- /dev/null +++ b/dbms/include/DB/Interpreters/ExternalDictionaries.h @@ -0,0 +1,122 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +class Context; +class IDictionary; + +/** Manages user-defined dictionaries. +* Monitors configuration file and automatically reloads dictionaries in a separate thread. +* The monitoring thread wakes up every @check_period_sec seconds and checks +* modification time of dictionaries' configuration file. If said time is greater than +* @config_last_modified, the dictionaries are created from scratch using configuration file, +* possibly overriding currently existing dictionaries with the same name (previous versions of +* overridden dictionaries will live as long as there are any users retaining them). +* +* Apart from checking configuration file for modifications, each non-cached dictionary +* has a lifetime of its own and may be updated if it's source reports that it has been +* modified. The time of next update is calculated by choosing uniformly a random number +* distributed between lifetime.min_sec and lifetime.max_sec. +* If either of lifetime.min_sec and lifetime.max_sec is zero, such dictionary is never updated. +*/ +class ExternalDictionaries +{ +private: + static const auto check_period_sec = 5; + + mutable std::mutex dictionaries_mutex; + std::unordered_map>> dictionaries; + std::unordered_map update_times; + std::mt19937_64 rnd_engine{getSeed()}; + + Context & context; + + std::thread reloading_thread; + Poco::Event destroy; + + Logger * log; + + Poco::Timestamp config_last_modified{0}; + + void handleException() const + { + try + { + throw; + } + catch (const Poco::Exception & e) + { + LOG_ERROR(log, "Cannot load exter dictionary! You must resolve this manually. " << e.displayText()); + return; + } + catch (...) + { + LOG_ERROR(log, "Cannot load dictionary! You must resolve this manually."); + return; + } + } + + void reloadImpl(); + + void reloadPeriodically() + { + while (true) + { + if (destroy.tryWait(check_period_sec * 1000)) + return; + + reloadImpl(); + } + } + + static std::uint64_t getSeed() + { + timespec ts; + clock_gettime(CLOCK_MONOTONIC, &ts); + return ts.tv_nsec ^ getpid(); + } + +public: + /// Справочники будут обновляться в отдельном потоке, каждые reload_period секунд. + ExternalDictionaries(Context & context) + : context(context), log(&Logger::get("ExternalDictionaries")) + { + reloadImpl(); + reloading_thread = std::thread{&ExternalDictionaries::reloadPeriodically, this}; + } + + ~ExternalDictionaries() + { + destroy.set(); + reloading_thread.join(); + } + + MultiVersion::Version getDictionary(const std::string & name) const + { + const std::lock_guard lock{dictionaries_mutex}; + const auto it = dictionaries.find(name); + if (it == std::end(dictionaries)) + throw Exception{ + "No such dictionary: " + name, + ErrorCodes::BAD_ARGUMENTS + }; + + return it->second->get(); + } +}; + +} diff --git a/dbms/include/DB/Interpreters/Limits.h b/dbms/include/DB/Interpreters/Limits.h index 6f6ee03b483..7383ee4e4d8 100644 --- a/dbms/include/DB/Interpreters/Limits.h +++ b/dbms/include/DB/Interpreters/Limits.h @@ -87,6 +87,9 @@ struct Limits \ /** Максимальное использование памяти при обработке запроса. 0 - не ограничено. */ \ M(SettingUInt64, max_memory_usage, 0) \ + \ + /** Максимальная скорость обмена данными по сети в байтах в секунду. 0 - не ограничена. */ \ + M(SettingUInt64, max_network_bandwidth, 0) \ #define DECLARE(TYPE, NAME, DEFAULT) \ TYPE NAME {DEFAULT}; diff --git a/dbms/include/DB/Interpreters/evaluateMissingDefaults.h b/dbms/include/DB/Interpreters/evaluateMissingDefaults.h index 5473b847f71..420cda457c7 100644 --- a/dbms/include/DB/Interpreters/evaluateMissingDefaults.h +++ b/dbms/include/DB/Interpreters/evaluateMissingDefaults.h @@ -16,7 +16,7 @@ inline void evaluateMissingDefaults(Block & block, if (column_defaults.empty()) return; - ASTPtr default_expr_list{ext::make_unique().release()}; + ASTPtr default_expr_list{std::make_unique().release()}; for (const auto & column : required_columns) { diff --git a/dbms/include/DB/Storages/Distributed/DistributedBlockOutputStream.h b/dbms/include/DB/Storages/Distributed/DistributedBlockOutputStream.h index d3e57785d42..8d1e0b17f8b 100644 --- a/dbms/include/DB/Storages/Distributed/DistributedBlockOutputStream.h +++ b/dbms/include/DB/Storages/Distributed/DistributedBlockOutputStream.h @@ -10,7 +10,7 @@ #include #include -#include +#include #include #include diff --git a/dbms/include/DB/Storages/IStorage.h b/dbms/include/DB/Storages/IStorage.h index 815464cffcb..aeed983161d 100644 --- a/dbms/include/DB/Storages/IStorage.h +++ b/dbms/include/DB/Storages/IStorage.h @@ -14,7 +14,7 @@ #include #include #include -#include +#include namespace DB @@ -129,7 +129,7 @@ public: */ TableDataWriteLockPtr lockDataForAlter() { - auto res = ext::make_unique(data_lock); + auto res = std::make_unique(data_lock); if (is_dropped) throw Exception("Table is dropped", ErrorCodes::TABLE_IS_DROPPED); return res; @@ -137,7 +137,7 @@ public: TableStructureWriteLockPtr lockStructureForAlter() { - auto res = ext::make_unique(structure_lock); + auto res = std::make_unique(structure_lock); if (is_dropped) throw Exception("Table is dropped", ErrorCodes::TABLE_IS_DROPPED); return res; diff --git a/dbms/include/DB/Storages/MergeTree/MergeList.h b/dbms/include/DB/Storages/MergeTree/MergeList.h index 138db6ba944..533d8c46b1d 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeList.h +++ b/dbms/include/DB/Storages/MergeTree/MergeList.h @@ -1,7 +1,7 @@ #pragma once #include -#include +#include #include #include #include @@ -67,7 +67,7 @@ public: EntryPtr insert(Args &&... args) { std::lock_guard lock{mutex}; - return ext::make_unique(*this, merges.emplace(merges.end(), std::forward(args)...)); + return std::make_unique(*this, merges.emplace(merges.end(), std::forward(args)...)); } container_t get() const diff --git a/dbms/include/DB/Storages/MergeTree/MergeTreeWhereOptimizer.h b/dbms/include/DB/Storages/MergeTree/MergeTreeWhereOptimizer.h index fa272831634..8ec5657b756 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeTreeWhereOptimizer.h +++ b/dbms/include/DB/Storages/MergeTree/MergeTreeWhereOptimizer.h @@ -9,7 +9,7 @@ #include #include #include -#include +#include #include #include #include diff --git a/dbms/include/DB/Storages/StorageChunkRef.h b/dbms/include/DB/Storages/StorageChunkRef.h index d4fc86972d3..df2be812652 100644 --- a/dbms/include/DB/Storages/StorageChunkRef.h +++ b/dbms/include/DB/Storages/StorageChunkRef.h @@ -22,8 +22,6 @@ public: NameAndTypePair getColumn(const String & column_name) const override { return getSource().getColumn(column_name); }; bool hasColumn(const String & column_name) const override { return getSource().hasColumn(column_name); }; - bool supportsParallelReplicas() const override { return true; } - BlockInputStreams read( const Names & column_names, ASTPtr query, diff --git a/dbms/include/DB/Storages/StorageFactory.h b/dbms/include/DB/Storages/StorageFactory.h index cc6aa390379..832fa0ead11 100644 --- a/dbms/include/DB/Storages/StorageFactory.h +++ b/dbms/include/DB/Storages/StorageFactory.h @@ -1,6 +1,7 @@ #pragma once #include +#include namespace DB @@ -11,7 +12,7 @@ class Context; /** Позволяет создать таблицу по имени движка. */ -class StorageFactory +class StorageFactory : public Singleton { public: StoragePtr get( diff --git a/dbms/include/DB/Storages/StorageMergeTree.h b/dbms/include/DB/Storages/StorageMergeTree.h index a2c4f68ee26..e33c2f7f0c4 100644 --- a/dbms/include/DB/Storages/StorageMergeTree.h +++ b/dbms/include/DB/Storages/StorageMergeTree.h @@ -54,6 +54,7 @@ public: bool supportsSampling() const override { return data.supportsSampling(); } bool supportsFinal() const override { return data.supportsFinal(); } bool supportsPrewhere() const override { return data.supportsPrewhere(); } + bool supportsParallelReplicas() const override { return true; } const NamesAndTypesList & getColumnsListImpl() const override { return data.getColumnsListNonMaterialized(); } diff --git a/dbms/include/DB/Storages/StorageReplicatedMergeTree.h b/dbms/include/DB/Storages/StorageReplicatedMergeTree.h index b2944827e7a..9f9f83b4c1c 100644 --- a/dbms/include/DB/Storages/StorageReplicatedMergeTree.h +++ b/dbms/include/DB/Storages/StorageReplicatedMergeTree.h @@ -103,8 +103,6 @@ public: /// Добавить кусок в очередь кусков, чьи данные нужно проверить в фоновом потоке. void enqueuePartForCheck(const String & name); - void skipUnreplicated() { process_unreplicated = false; } - MergeTreeData & getData() { return data; } MergeTreeData * getUnreplicatedData() { return unreplicated_data.get(); } @@ -166,8 +164,6 @@ private: current_zookeeper = zookeeper; } - bool process_unreplicated = true; - /// Если true, таблица в офлайновом режиме, и в нее нельзя писать. bool is_readonly = false; diff --git a/dbms/include/DB/TableFunctions/TableFunctionRemote.h b/dbms/include/DB/TableFunctions/TableFunctionRemote.h index 8ce917e30c7..5bd461fe87e 100644 --- a/dbms/include/DB/TableFunctions/TableFunctionRemote.h +++ b/dbms/include/DB/TableFunctions/TableFunctionRemote.h @@ -92,9 +92,8 @@ private: /// Отправляем на первый попавшийся шард BlockInputStreamPtr input{ new RemoteBlockInputStream{ - cluster.pools.front().get(), query, &settings, - Tables(), QueryProcessingStage::Complete, context - } + cluster.pools.front().get(), query, &settings, nullptr, + Tables(), QueryProcessingStage::Complete, context} }; input->readPrefix(); diff --git a/dbms/src/Client/Connection.cpp b/dbms/src/Client/Connection.cpp index 059c27f5c8e..9b498318b1a 100644 --- a/dbms/src/Client/Connection.cpp +++ b/dbms/src/Client/Connection.cpp @@ -165,7 +165,7 @@ void Connection::forceConnected() bool Connection::ping() { - LOG_TRACE(log_wrapper.get(), "Ping (" << getServerAddress() << ")"); + // LOG_TRACE(log_wrapper.get(), "Ping (" << getServerAddress() << ")"); try { @@ -277,15 +277,22 @@ void Connection::sendData(const Block & block, const String & name) if (server_revision >= DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES) writeStringBinary(name, *out); + size_t prev_bytes = out->count(); + block.checkNestedArraysOffsets(); block_out->write(block); maybe_compressed_out->next(); out->next(); + + if (throttler) + throttler->add(out->count() - prev_bytes); } void Connection::sendPreparedData(ReadBuffer & input, size_t size, const String & name) { + /// NOTE В этом методе не используется throttler (хотя можно использовать, но это пока не важно). + writeVarUInt(Protocol::Client::Data, *out); if (server_revision >= DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES) @@ -436,8 +443,15 @@ Block Connection::receiveData() if (server_revision >= DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES) readStringBinary(external_table_name, *in); + size_t prev_bytes = in->count(); + /// Прочитать из сети один блок - return block_in->read(); + Block res = block_in->read(); + + if (throttler) + throttler->add(in->count() - prev_bytes); + + return res; } diff --git a/dbms/src/Client/ParallelReplicas.cpp b/dbms/src/Client/ParallelReplicas.cpp new file mode 100644 index 00000000000..7e8d52baa6f --- /dev/null +++ b/dbms/src/Client/ParallelReplicas.cpp @@ -0,0 +1,275 @@ +#include + +namespace DB +{ + +ParallelReplicas::ParallelReplicas(Connection * connection_, Settings * settings_, ThrottlerPtr throttler_) + : settings(settings_), throttler(throttler_), + active_replica_count(1), + supports_parallel_execution(false) +{ + registerReplica(connection_); +} + +ParallelReplicas::ParallelReplicas(IConnectionPool * pool_, Settings * settings_, ThrottlerPtr throttler_) + : settings(settings_), throttler(throttler_) +{ + if (pool_ == nullptr) + throw Exception("Null pool specified", ErrorCodes::LOGICAL_ERROR); + + bool has_many_replicas = (settings != nullptr) && (settings->max_parallel_replicas > 1); + if (has_many_replicas) + { + pool_entries = pool_->getMany(settings); + active_replica_count = pool_entries.size(); + supports_parallel_execution = (active_replica_count > 1); + + if (active_replica_count == 0) + throw Exception("No connection available", ErrorCodes::LOGICAL_ERROR); + + replica_map.reserve(active_replica_count); + for (auto & entry : pool_entries) + registerReplica(&*entry); + } + else + { + active_replica_count = 1; + supports_parallel_execution = false; + + pool_entry = pool_->get(settings); + registerReplica(&*pool_entry); + } +} + +void ParallelReplicas::sendExternalTablesData(std::vector & data) +{ + if (!sent_query) + throw Exception("Cannot send external tables data: query not yet sent.", ErrorCodes::LOGICAL_ERROR); + + if (data.size() < active_replica_count) + throw Exception("Mismatch between replicas and data sources", ErrorCodes::MISMATCH_REPLICAS_DATA_SOURCES); + + auto it = data.begin(); + for (auto & e : replica_map) + { + Connection * connection = e.second; + if (connection != nullptr) + connection->sendExternalTablesData(*it); + ++it; + } +} + +void ParallelReplicas::sendQuery(const String & query, const String & query_id, UInt64 stage, bool with_pending_data) +{ + if (sent_query) + throw Exception("Query already sent.", ErrorCodes::LOGICAL_ERROR); + + if (supports_parallel_execution) + { + Settings query_settings = *settings; + query_settings.parallel_replicas_count = active_replica_count; + UInt64 offset = 0; + + for (auto & e : replica_map) + { + Connection * connection = e.second; + if (connection != nullptr) + { + query_settings.parallel_replica_offset = offset; + connection->sendQuery(query, query_id, stage, &query_settings, with_pending_data); + ++offset; + } + } + + if (offset > 0) + sent_query = true; + } + else + { + auto it = replica_map.begin(); + Connection * connection = it->second; + if (connection != nullptr) + { + connection->sendQuery(query, query_id, stage, settings, with_pending_data); + sent_query = true; + } + } +} + +Connection::Packet ParallelReplicas::receivePacket() +{ + if (!sent_query) + throw Exception("Cannot receive packets: no query sent.", ErrorCodes::LOGICAL_ERROR); + if (!hasActiveReplicas()) + throw Exception("No more packets are available.", ErrorCodes::LOGICAL_ERROR); + + auto it = getReplicaForReading(); + if (it == replica_map.end()) + throw Exception("No available replica", ErrorCodes::NO_AVAILABLE_REPLICA); + + Connection * connection = it->second; + Connection::Packet packet = connection->receivePacket(); + + switch (packet.type) + { + case Protocol::Server::Data: + case Protocol::Server::Progress: + case Protocol::Server::ProfileInfo: + case Protocol::Server::Totals: + case Protocol::Server::Extremes: + break; + + case Protocol::Server::EndOfStream: + invalidateReplica(it); + break; + + case Protocol::Server::Exception: + default: + connection->disconnect(); + invalidateReplica(it); + break; + } + + return packet; +} + +void ParallelReplicas::sendCancel() +{ + if (!sent_query || cancelled) + throw Exception("Cannot cancel. Either no query sent or already cancelled.", ErrorCodes::LOGICAL_ERROR); + + for (auto & e : replica_map) + { + Connection * connection = e.second; + if (connection != nullptr) + connection->sendCancel(); + } + + cancelled = true; +} + +Connection::Packet ParallelReplicas::drain() +{ + if (!cancelled) + throw Exception("Cannot drain connections: cancel first.", ErrorCodes::LOGICAL_ERROR); + + Connection::Packet res; + res.type = Protocol::Server::EndOfStream; + + while (hasActiveReplicas()) + { + Connection::Packet packet = receivePacket(); + + switch (packet.type) + { + case Protocol::Server::Data: + case Protocol::Server::Progress: + case Protocol::Server::ProfileInfo: + case Protocol::Server::Totals: + case Protocol::Server::Extremes: + case Protocol::Server::EndOfStream: + break; + + case Protocol::Server::Exception: + default: + /// Если мы получили исключение или неизвестный пакет, сохраняем его. + res = packet; + break; + } + } + + return res; +} + +std::string ParallelReplicas::dumpAddresses() const +{ + bool is_first = true; + std::ostringstream os; + for (auto & e : replica_map) + { + const Connection * connection = e.second; + if (connection != nullptr) + { + os << (is_first ? "" : "; ") << connection->getServerAddress(); + if (is_first) { is_first = false; } + } + } + + return os.str(); +} + + +void ParallelReplicas::registerReplica(Connection * connection) +{ + if (connection == nullptr) + throw Exception("Invalid connection specified in parameter.", ErrorCodes::LOGICAL_ERROR); + auto res = replica_map.insert(std::make_pair(connection->socket.impl()->sockfd(), connection)); + if (!res.second) + throw Exception("Invalid set of connections.", ErrorCodes::LOGICAL_ERROR); + + if (throttler) + connection->setThrottler(throttler); +} + + +ParallelReplicas::ReplicaMap::iterator ParallelReplicas::getReplicaForReading() +{ + ReplicaMap::iterator it; + + if (supports_parallel_execution) + it = waitForReadEvent(); + else + { + it = replica_map.begin(); + if (it->second == nullptr) + it = replica_map.end(); + } + + return it; +} + +ParallelReplicas::ReplicaMap::iterator ParallelReplicas::waitForReadEvent() +{ + Poco::Net::Socket::SocketList read_list; + read_list.reserve(active_replica_count); + + /** Сначала проверяем, есть ли данные, которые уже лежат в буфере + * хоть одного соединения. + */ + for (auto & e : replica_map) + { + Connection * connection = e.second; + if ((connection != nullptr) && connection->hasReadBufferPendingData()) + read_list.push_back(connection->socket); + } + + /** Если не было найдено никаких данных, то проверяем, есть ли соединения + * готовые для чтения. + */ + if (read_list.empty()) + { + Poco::Net::Socket::SocketList write_list; + Poco::Net::Socket::SocketList except_list; + + for (auto & e : replica_map) + { + Connection * connection = e.second; + if (connection != nullptr) + read_list.push_back(connection->socket); + } + int n = Poco::Net::Socket::select(read_list, write_list, except_list, settings->poll_interval * 1000000); + if (n == 0) + return replica_map.end(); + } + + auto & socket = read_list[rand() % read_list.size()]; + return replica_map.find(socket.impl()->sockfd()); +} + +void ParallelReplicas::invalidateReplica(ParallelReplicas::ReplicaMap::iterator it) +{ + it->second = nullptr; + --active_replica_count; +} + +} diff --git a/dbms/src/Client/ShardReplicas.cpp b/dbms/src/Client/ShardReplicas.cpp deleted file mode 100644 index e9a7eae0335..00000000000 --- a/dbms/src/Client/ShardReplicas.cpp +++ /dev/null @@ -1,227 +0,0 @@ -#include -#include - -namespace DB -{ - ShardReplicas::ShardReplicas(std::vector & entries, const Settings & settings_) : - settings(settings_), - active_connection_count(entries.size()) - { - replica_hash.reserve(entries.size()); - - for (auto & entry : entries) - { - Connection * connection = &*entry; - if (connection == nullptr) - throw Exception("Invalid connection specified in parameter."); - auto res = replica_hash.insert(std::make_pair(connection->socket.impl()->sockfd(), connection)); - if (!res.second) - throw Exception("Invalid set of connections."); - } - } - - void ShardReplicas::sendExternalTablesData(std::vector & data) - { - if (!sent_query) - throw Exception("Cannot send external tables data: query not yet sent."); - - if (data.size() < active_connection_count) - throw Exception("Mismatch between replicas and data sources", ErrorCodes::MISMATCH_REPLICAS_DATA_SOURCES); - - auto it = data.begin(); - for (auto & e : replica_hash) - { - Connection * connection = e.second; - if (connection != nullptr) - connection->sendExternalTablesData(*it); - ++it; - } - } - - void ShardReplicas::sendQuery(const String & query, const String & query_id, UInt64 stage, bool with_pending_data) - { - if (sent_query) - throw Exception("Query already sent."); - - Settings query_settings = settings; - query_settings.parallel_replicas_count = replica_hash.size(); - UInt64 offset = 0; - - for (auto & e : replica_hash) - { - Connection * connection = e.second; - if (connection != nullptr) - { - query_settings.parallel_replica_offset = offset; - connection->sendQuery(query, query_id, stage, &query_settings, with_pending_data); - ++offset; - } - } - - sent_query = true; - } - - Connection::Packet ShardReplicas::receivePacket() - { - if (!sent_query) - throw Exception("Cannot receive packets: no query sent."); - if (active_connection_count == 0) - throw Exception("No more packets are available."); - - Connection ** connection = waitForReadEvent(); - if (connection == nullptr) - throw Exception("No available replica", ErrorCodes::NO_AVAILABLE_REPLICA); - - Connection::Packet packet = (*connection)->receivePacket(); - - switch (packet.type) - { - case Protocol::Server::Data: - case Protocol::Server::Progress: - case Protocol::Server::ProfileInfo: - case Protocol::Server::Totals: - case Protocol::Server::Extremes: - break; - - case Protocol::Server::EndOfStream: - *connection = nullptr; - --active_connection_count; - if (active_connection_count > 0) - { - Connection::Packet empty_packet; - empty_packet.type = Protocol::Server::Data; - return empty_packet; - } - break; - - case Protocol::Server::Exception: - default: - *connection = nullptr; - --active_connection_count; - if (!cancelled) - { - sendCancel(); - (void) drain(); - } - break; - } - - return packet; - } - - void ShardReplicas::disconnect() - { - for (auto & e : replica_hash) - { - Connection * & connection = e.second; - if (connection != nullptr) - { - connection->disconnect(); - connection = nullptr; - --active_connection_count; - } - } - } - - void ShardReplicas::sendCancel() - { - if (!sent_query || cancelled) - throw Exception("Cannot cancel. Either no query sent or already cancelled."); - - for (auto & e : replica_hash) - { - Connection * connection = e.second; - if (connection != nullptr) - connection->sendCancel(); - } - - cancelled = true; - } - - Connection::Packet ShardReplicas::drain() - { - if (!cancelled) - throw Exception("Cannot drain connections: cancel first."); - - Connection::Packet res; - res.type = Protocol::Server::EndOfStream; - - while (active_connection_count > 0) - { - Connection::Packet packet = receivePacket(); - - switch (packet.type) - { - case Protocol::Server::Data: - case Protocol::Server::Progress: - case Protocol::Server::ProfileInfo: - case Protocol::Server::Totals: - case Protocol::Server::Extremes: - break; - - case Protocol::Server::EndOfStream: - return res; - - case Protocol::Server::Exception: - default: - res = packet; - break; - } - } - - return res; - } - - std::string ShardReplicas::dumpAddresses() const - { - std::ostringstream os; - for (auto & e : replica_hash) - { - char prefix = '\0'; - const Connection * connection = e.second; - if (connection != nullptr) - { - os << prefix << connection->getServerAddress(); - if (prefix == '\0') - prefix = ';'; - } - } - - return os.str(); - } - - Connection ** ShardReplicas::waitForReadEvent() - { - Poco::Net::Socket::SocketList read_list; - read_list.reserve(active_connection_count); - - for (auto & e : replica_hash) - { - Connection * connection = e.second; - if ((connection != nullptr) && connection->hasReadBufferPendingData()) - read_list.push_back(connection->socket); - } - - if (read_list.empty()) - { - Poco::Net::Socket::SocketList write_list; - Poco::Net::Socket::SocketList except_list; - - for (auto & e : replica_hash) - { - Connection * connection = e.second; - if (connection != nullptr) - read_list.push_back(connection->socket); - } - int n = Poco::Net::Socket::select(read_list, write_list, except_list, settings.poll_interval * 1000000); - if (n == 0) - return nullptr; - } - - auto & socket = read_list[rand() % read_list.size()]; - auto it = replica_hash.find(socket.impl()->sockfd()); - if (it == replica_hash.end()) - throw Exception("Unexpected replica", ErrorCodes::UNEXPECTED_REPLICA); - return &(it->second); - } -} diff --git a/dbms/src/Core/Block.cpp b/dbms/src/Core/Block.cpp index d96d8c974e2..2ceef5e7e52 100644 --- a/dbms/src/Core/Block.cpp +++ b/dbms/src/Core/Block.cpp @@ -11,8 +11,7 @@ #include #include -#include -#include +#include #include diff --git a/dbms/src/IO/ReadHelpers.cpp b/dbms/src/IO/ReadHelpers.cpp index dac6048f6d7..06de94e0cee 100644 --- a/dbms/src/IO/ReadHelpers.cpp +++ b/dbms/src/IO/ReadHelpers.cpp @@ -55,7 +55,7 @@ void readString(String & s, ReadBuffer & buf) s.append(buf.position(), bytes); buf.position() += bytes; - if (buf.position() != buf.buffer().end()) + if (buf.hasPendingData()) return; } } @@ -121,7 +121,7 @@ void readEscapedString(DB::String & s, DB::ReadBuffer & buf) s.append(buf.position(), next_pos - buf.position()); buf.position() += next_pos - buf.position(); - if (buf.position() == buf.buffer().end()) + if (!buf.hasPendingData()) continue; if (*buf.position() == '\t' || *buf.position() == '\n') @@ -191,8 +191,8 @@ static void readAnyQuotedString(String & s, ReadBuffer & buf) s.append(buf.position(), next_pos - buf.position()); buf.position() += next_pos - buf.position(); - - if (buf.position() == buf.buffer().end()) + + if (!buf.hasPendingData()) continue; if (*buf.position() == quote) diff --git a/dbms/src/IO/tests/io_and_exceptions.cpp b/dbms/src/IO/tests/io_and_exceptions.cpp new file mode 100644 index 00000000000..e832707e5a1 --- /dev/null +++ b/dbms/src/IO/tests/io_and_exceptions.cpp @@ -0,0 +1,158 @@ +/** Воспроизводит баг в gcc 4.8.2 + * Баг: исключение не ловится. + * + * /usr/bin/c++ -std=c++11 -Wall -O3 ./io_and_exceptions.cpp && ./a.out + * + * Выводит: + * terminate called after throwing an instance of 'int' + * Aborted + * + * А должно ничего не выводить. + * + * В gcc 4.9 и clang 3.6 всё Ок. + */ + +typedef unsigned long size_t; + +class BufferBase +{ +public: + typedef char * Position; + + struct Buffer + { + Buffer(Position begin_pos_, Position end_pos_) : begin_pos(begin_pos_), end_pos(end_pos_) {} + + inline Position begin() const { return begin_pos; } + inline Position end() const { return end_pos; } + inline size_t size() const { return end_pos - begin_pos; } + inline void resize(size_t size) { end_pos = begin_pos + size; } + + private: + Position begin_pos; + Position end_pos; + }; + + BufferBase(Position ptr, size_t size, size_t offset) + : internal_buffer(ptr, ptr + size), working_buffer(ptr, ptr + size), pos(ptr + offset), bytes(0) {} + + void set(Position ptr, size_t size, size_t offset) + { + internal_buffer = Buffer(ptr, ptr + size); + working_buffer = Buffer(ptr, ptr + size); + pos = ptr + offset; + } + + inline Buffer & buffer() { return working_buffer; } + inline Position & position() { return pos; }; + inline size_t offset() const { return pos - working_buffer.begin(); } + + size_t count() const + { + return bytes + offset(); + } + +protected: + Buffer internal_buffer; + Buffer working_buffer; + + Position pos; + size_t bytes; +}; + + +class ReadBuffer : public BufferBase +{ +public: + ReadBuffer(Position ptr, size_t size) : BufferBase(ptr, size, 0) { working_buffer.resize(0); } + ReadBuffer(Position ptr, size_t size, size_t offset) : BufferBase(ptr, size, offset) {} + + inline bool next() + { + bytes += offset(); + bool res = nextImpl(); + if (!res) + working_buffer.resize(0); + + pos = working_buffer.begin(); + return res; + } + + virtual ~ReadBuffer() {} + + inline bool eof() + { + return pos == working_buffer.end() && !next(); + } + +private: + virtual bool nextImpl() { return false; }; +}; + + +class CompressedReadBuffer : public ReadBuffer +{ +private: + bool nextImpl() + { + throw 1; + return true; + } + +public: + CompressedReadBuffer() : ReadBuffer(nullptr, 0) + { + } +}; + + +void readIntText(unsigned & x, ReadBuffer & buf) +{ + x = 0; + while (!buf.eof()) + { + switch (*buf.position()) + { + case '+': + break; + case '9': + x *= 10; + break; + default: + return; + } + } +} + + +unsigned parse(const char * data) +{ + unsigned res; + ReadBuffer buf(const_cast(data), 10, 0); + readIntText(res, buf); + return res; +} + + + +int main() +{ + CompressedReadBuffer in; + + try + { + while (!in.eof()) + ; + } + catch (...) + { + } + + return 0; +} + + +void f() +{ + parse("123"); +} diff --git a/dbms/src/IO/tests/mempbrk.cpp b/dbms/src/IO/tests/mempbrk.cpp index d2062c10f48..4c1461d05aa 100644 --- a/dbms/src/IO/tests/mempbrk.cpp +++ b/dbms/src/IO/tests/mempbrk.cpp @@ -63,7 +63,7 @@ namespace test return end; } - + void readEscapedString(DB::String & s, DB::ReadBuffer & buf) { s = ""; @@ -74,7 +74,7 @@ namespace test s.append(buf.position(), next_pos - buf.position()); buf.position() += next_pos - buf.position(); - if (buf.position() == buf.buffer().end()) + if (!buf.hasPendingData()) continue; if (*buf.position() == '\t' || *buf.position() == '\n') diff --git a/dbms/src/Interpreters/Cluster.cpp b/dbms/src/Interpreters/Cluster.cpp index f18be042568..7da53e13f85 100644 --- a/dbms/src/Interpreters/Cluster.cpp +++ b/dbms/src/Interpreters/Cluster.cpp @@ -1,5 +1,6 @@ #include #include +#include #include #include #include @@ -255,19 +256,7 @@ bool Cluster::isLocal(const Address & address) /// - её порт совпадает с портом, который слушает сервер; /// - её хост резолвится в набор адресов, один из которых совпадает с одним из адресов сетевых интерфейсов сервера /// то нужно всегда ходить на этот шард без межпроцессного взаимодействия - const UInt16 clickhouse_port = Poco::Util::Application::instance().config().getInt("tcp_port", 0); - static auto interfaces = Poco::Net::NetworkInterface::list(); - - if (clickhouse_port == address.host_port.port() && - interfaces.end() != std::find_if(interfaces.begin(), interfaces.end(), - [&](const Poco::Net::NetworkInterface & interface) { return interface.address() == address.host_port.host(); })) - { - LOG_INFO(&Poco::Util::Application::instance().logger(), - "Replica with address " << address.host_port.toString() << " will be processed as local."); - return true; - } - - return false; + return isLocalAddress(address.host_port); } } diff --git a/dbms/src/Interpreters/Context.cpp b/dbms/src/Interpreters/Context.cpp index ec39314ba58..3e123550a44 100644 --- a/dbms/src/Interpreters/Context.cpp +++ b/dbms/src/Interpreters/Context.cpp @@ -495,13 +495,24 @@ const Dictionaries & Context::getDictionaries() const Poco::ScopedLock lock(shared->mutex); if (!shared->dictionaries) + shared->dictionaries = new Dictionaries; + + return *shared->dictionaries; +} + + +const ExternalDictionaries & Context::getExternalDictionaries() const +{ + Poco::ScopedLock lock(shared->mutex); + + if (!shared->external_dictionaries) { if (!this->global_context) throw Exception("Logical error: there is no global context", ErrorCodes::LOGICAL_ERROR); - shared->dictionaries = new Dictionaries{ *this->global_context }; + shared->external_dictionaries = new ExternalDictionaries{*this->global_context}; } - return *shared->dictionaries; + return *shared->external_dictionaries; } diff --git a/dbms/src/Interpreters/DictionaryFactory.cpp b/dbms/src/Interpreters/DictionaryFactory.cpp index 2843a31d08d..d36926a0c29 100644 --- a/dbms/src/Interpreters/DictionaryFactory.cpp +++ b/dbms/src/Interpreters/DictionaryFactory.cpp @@ -2,9 +2,10 @@ #include #include #include +#include #include +#include #include -#include namespace DB { @@ -12,39 +13,48 @@ namespace DB DictionaryPtr DictionaryFactory::create(const std::string & name, Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, Context & context) const { - auto dict_struct = DictionaryStructure::fromConfig(config, config_prefix + "structure"); + Poco::Util::AbstractConfiguration::Keys keys; + const auto & layout_prefix = config_prefix + ".layout"; + config.keys(layout_prefix, keys); + if (keys.size() != 1) + throw Exception{ + "Element dictionary.layout should have exactly one child element", + ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG + }; + + const DictionaryStructure dict_struct{config, config_prefix + ".structure"}; auto source_ptr = DictionarySourceFactory::instance().create( - config, config_prefix + "source.", dict_struct, context); + config, config_prefix + ".source", dict_struct, context); - const auto dict_lifetime = DictionaryLifetime::fromConfig(config, config_prefix + "lifetime"); + const DictionaryLifetime dict_lifetime{config, config_prefix + ".lifetime"}; - const auto & layout_prefix = config_prefix + "layout."; + const auto & layout_type = keys.front(); - if (config.has(layout_prefix + "flat")) + if ("flat" == layout_type) { - return ext::make_unique(name, dict_struct, std::move(source_ptr), dict_lifetime); + return std::make_unique(name, dict_struct, std::move(source_ptr), dict_lifetime); } - else if (config.has(layout_prefix + "hashed")) + else if ("hashed" == layout_type) { - return ext::make_unique(name, dict_struct, std::move(source_ptr), dict_lifetime); + return std::make_unique(name, dict_struct, std::move(source_ptr), dict_lifetime); } - else if (config.has(layout_prefix + "cache")) + else if ("cache" == layout_type) { - const auto size = config.getInt(layout_prefix + "cache.size", 0); + const auto size = config.getInt(layout_prefix + ".cache.size"); if (size == 0) throw Exception{ "Dictionary of type 'cache' cannot have size of 0 bytes", ErrorCodes::TOO_SMALL_BUFFER_SIZE }; - throw Exception{ - "Dictionary of type 'cache' is not yet implemented", - ErrorCodes::NOT_IMPLEMENTED - }; + return std::make_unique(name, dict_struct, std::move(source_ptr), dict_lifetime, size); } - throw Exception{"No dictionary type specified", ErrorCodes::BAD_ARGUMENTS}; + throw Exception{ + "Unknown dictionary layout type: " + layout_type, + ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG + }; }; } diff --git a/dbms/src/Interpreters/Dictionaries.cpp b/dbms/src/Interpreters/ExternalDictionaries.cpp similarity index 74% rename from dbms/src/Interpreters/Dictionaries.cpp rename to dbms/src/Interpreters/ExternalDictionaries.cpp index ee83289af47..3865b72feeb 100644 --- a/dbms/src/Interpreters/Dictionaries.cpp +++ b/dbms/src/Interpreters/ExternalDictionaries.cpp @@ -1,8 +1,9 @@ -#include +#include #include #include #include -#include +#include +#include namespace DB { @@ -28,24 +29,27 @@ namespace } } -void Dictionaries::reloadExternals() +void ExternalDictionaries::reloadImpl() { const auto config_path = getDictionariesConfigPath(Poco::Util::Application::instance().config()); const Poco::File config_file{config_path}; - if (!config_file.exists()) + if (config_path.empty() || !config_file.exists()) { LOG_WARNING(log, "config file '" + config_path + "' does not exist"); } else { const auto last_modified = config_file.getLastModified(); - if (last_modified > dictionaries_last_modified) + if (last_modified > config_last_modified) { /// definitions of dictionaries may have changed, recreate all of them - dictionaries_last_modified = last_modified; + config_last_modified = last_modified; - const config_ptr_t config{new Poco::Util::XMLConfiguration{config_path}}; + const auto config = new Poco::Util::XMLConfiguration{config_path}; + SCOPE_EXIT( + config->release(); + ); /// get all dictionaries' definitions Poco::Util::AbstractConfiguration::Keys keys; @@ -62,16 +66,14 @@ void Dictionaries::reloadExternals() continue; } - const auto & prefix = key + '.'; - - const auto & name = config->getString(prefix + "name"); + const auto name = config->getString(key + ".name"); if (name.empty()) { LOG_WARNING(log, "dictionary name cannot be empty"); continue; } - auto dict_ptr = DictionaryFactory::instance().create(name, *config, prefix, context); + auto dict_ptr = DictionaryFactory::instance().create(name, *config, key, context); if (!dict_ptr->isCached()) { const auto & lifetime = dict_ptr->getLifetime(); @@ -86,12 +88,12 @@ void Dictionaries::reloadExternals() } } - auto it = external_dictionaries.find(name); + auto it = dictionaries.find(name); /// add new dictionary or update an existing version - if (it == std::end(external_dictionaries)) + if (it == std::end(dictionaries)) { - const std::lock_guard lock{external_dictionaries_mutex}; - external_dictionaries.emplace(name, std::make_shared>(dict_ptr.release())); + const std::lock_guard lock{dictionaries_mutex}; + dictionaries.emplace(name, std::make_shared>(dict_ptr.release())); } else it->second->set(dict_ptr.release()); @@ -105,7 +107,7 @@ void Dictionaries::reloadExternals() } /// periodic update - for (auto & dictionary : external_dictionaries) + for (auto & dictionary : dictionaries) { try { @@ -125,6 +127,12 @@ void Dictionaries::reloadExternals() if (std::chrono::system_clock::now() < update_time) continue; + SCOPE_EXIT( + /// calculate next update time + std::uniform_int_distribution distribution{lifetime.min_sec, lifetime.max_sec}; + update_time = std::chrono::system_clock::now() + std::chrono::seconds{distribution(rnd_engine)}; + ); + /// check source modified if (current->getSource()->isModified()) { @@ -132,10 +140,6 @@ void Dictionaries::reloadExternals() auto new_version = current->clone(); dictionary.second->set(new_version.release()); } - - /// calculate next update time - std::uniform_int_distribution distribution{lifetime.min_sec, lifetime.max_sec}; - update_time = std::chrono::system_clock::now() + std::chrono::seconds{distribution(rnd_engine)}; } } catch (...) diff --git a/dbms/src/Interpreters/InterpreterAlterQuery.cpp b/dbms/src/Interpreters/InterpreterAlterQuery.cpp index 10d50729f75..0b27a35a221 100644 --- a/dbms/src/Interpreters/InterpreterAlterQuery.cpp +++ b/dbms/src/Interpreters/InterpreterAlterQuery.cpp @@ -10,9 +10,6 @@ #include #include #include -#include -#include -#include #include diff --git a/dbms/src/Interpreters/InterpreterCreateQuery.cpp b/dbms/src/Interpreters/InterpreterCreateQuery.cpp index 95bc1d2993b..a345ffe63bc 100644 --- a/dbms/src/Interpreters/InterpreterCreateQuery.cpp +++ b/dbms/src/Interpreters/InterpreterCreateQuery.cpp @@ -13,6 +13,7 @@ #include #include +#include #include #include @@ -194,7 +195,7 @@ StoragePtr InterpreterCreateQuery::execute(bool assume_metadata_exists) else throw Exception("Incorrect CREATE query: required ENGINE.", ErrorCodes::ENGINE_REQUIRED); - res = context.getStorageFactory().get( + res = StorageFactory::instance().get( storage_name, data_path, table_name, database_name, context, context.getGlobalContext(), query_ptr, columns, materialized_columns, alias_columns, column_defaults, create.attach); diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.cpp b/dbms/src/Interpreters/InterpreterSelectQuery.cpp index 90beac1638d..264d8aa4b40 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectQuery.cpp @@ -25,7 +25,6 @@ #include #include -#include #include #include @@ -105,12 +104,6 @@ void InterpreterSelectQuery::basicInit(BlockInputStreamPtr input_, const NamesAn + " does not support parallel execution on several replicas", ErrorCodes::STORAGE_DOESNT_SUPPORT_PARALLEL_REPLICAS); - if (StorageReplicatedMergeTree * storage_replicated_merge_tree = typeid_cast(&*storage)) - { - if (settings.parallel_replica_offset > 0) - storage_replicated_merge_tree->skipUnreplicated(); - } - table_lock = storage->lockStructure(false); if (table_column_names.empty()) context.setColumns(storage->getColumnsListNonMaterialized()); diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index 837830fa526..b4b1c3e0ee7 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -8,7 +8,7 @@ #include #include #include -#include +#include #include #include @@ -491,7 +491,7 @@ int Server::main(const std::vector & args) global_context->setMacros(Macros(config(), "macros")); std::string users_config_path = config().getString("users_config", config().getString("config-file", "config.xml")); - auto users_config_reloader = ext::make_unique(users_config_path, global_context.get()); + auto users_config_reloader = std::make_unique(users_config_path, global_context.get()); /// Максимальное количество одновременно выполняющихся запросов. global_context->getProcessList().setMaxSize(config().getInt("max_concurrent_queries", 0)); @@ -536,7 +536,7 @@ int Server::main(const std::vector & args) { const auto profile_events_transmitter = config().getBool("use_graphite", true) - ? ext::make_unique() + ? std::make_unique() : nullptr; const std::string listen_host = config().getString("listen_host", "::"); diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp index 8bfd831fd1b..5064169eb37 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp @@ -324,7 +324,7 @@ MergeTreeData::DataPartPtr MergeTreeDataMerger::mergeParts( { MarkRanges ranges(1, MarkRange(0, parts[i]->size)); - auto input = ext::make_unique( + auto input = std::make_unique( data.getFullPath() + parts[i]->name + '/', DEFAULT_MERGE_BLOCK_SIZE, union_column_names, data, parts[i], ranges, false, nullptr, ""); @@ -348,19 +348,19 @@ MergeTreeData::DataPartPtr MergeTreeDataMerger::mergeParts( switch (data.mode) { case MergeTreeData::Ordinary: - merged_stream = ext::make_unique(src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE); + merged_stream = std::make_unique(src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE); break; case MergeTreeData::Collapsing: - merged_stream = ext::make_unique(src_streams, data.getSortDescription(), data.sign_column, DEFAULT_MERGE_BLOCK_SIZE); + merged_stream = std::make_unique(src_streams, data.getSortDescription(), data.sign_column, DEFAULT_MERGE_BLOCK_SIZE); break; case MergeTreeData::Summing: - merged_stream = ext::make_unique(src_streams, data.getSortDescription(), data.columns_to_sum, DEFAULT_MERGE_BLOCK_SIZE); + merged_stream = std::make_unique(src_streams, data.getSortDescription(), data.columns_to_sum, DEFAULT_MERGE_BLOCK_SIZE); break; case MergeTreeData::Aggregating: - merged_stream = ext::make_unique(src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE); + merged_stream = std::make_unique(src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE); break; default: diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index 0ab86e6c196..3d3711ca19f 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -149,10 +149,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::read( relative_sample_size = 0; } - UInt64 parallel_replicas_count = UInt64(settings.parallel_replicas_count); - UInt64 parallel_replica_offset = UInt64(settings.parallel_replica_offset); - - if ((parallel_replicas_count > 1) && !data.sampling_expression.isNull() && (relative_sample_size == 0)) + if ((settings.parallel_replicas_count > 1) && !data.sampling_expression.isNull() && (relative_sample_size == 0)) relative_sample_size = 1; if (relative_sample_size != 0) @@ -175,12 +172,11 @@ BlockInputStreams MergeTreeDataSelectExecutor::read( UInt64 sampling_column_value_upper_limit; UInt64 upper_limit = static_cast(relative_sample_size * sampling_column_max); - if (parallel_replicas_count > 1) + if (settings.parallel_replicas_count > 1) { - UInt64 step = upper_limit / parallel_replicas_count; - sampling_column_value_lower_limit = parallel_replica_offset * step; - if ((parallel_replica_offset + 1) < parallel_replicas_count) - sampling_column_value_upper_limit = (parallel_replica_offset + 1) * step; + sampling_column_value_lower_limit = (settings.parallel_replica_offset * upper_limit) / settings.parallel_replicas_count; + if ((settings.parallel_replica_offset + 1) < settings.parallel_replicas_count) + sampling_column_value_upper_limit = ((settings.parallel_replica_offset + 1) * upper_limit) / settings.parallel_replicas_count; else sampling_column_value_upper_limit = upper_limit + 1; } @@ -191,25 +187,15 @@ BlockInputStreams MergeTreeDataSelectExecutor::read( } /// Добавим условие, чтобы отсечь еще что-нибудь при повторном просмотре индекса. - if (!key_condition.addCondition(data.sampling_expression->getColumnName(), - Range::createLeftBounded(sampling_column_value_lower_limit, true))) - throw Exception("Sampling column not in primary key", ErrorCodes::ILLEGAL_COLUMN); + if (sampling_column_value_lower_limit > 0) + if (!key_condition.addCondition(data.sampling_expression->getColumnName(), + Range::createLeftBounded(sampling_column_value_lower_limit, true))) + throw Exception("Sampling column not in primary key", ErrorCodes::ILLEGAL_COLUMN); if (!key_condition.addCondition(data.sampling_expression->getColumnName(), Range::createRightBounded(sampling_column_value_upper_limit, false))) throw Exception("Sampling column not in primary key", ErrorCodes::ILLEGAL_COLUMN); - /// Выражение для фильтрации: sampling_expression in [sampling_column_value_lower_limit, sampling_column_value_upper_limit) - - ASTPtr lower_filter_args = new ASTExpressionList; - lower_filter_args->children.push_back(data.sampling_expression); - lower_filter_args->children.push_back(new ASTLiteral(StringRange(), sampling_column_value_lower_limit)); - - ASTFunctionPtr lower_filter_function = new ASTFunction; - lower_filter_function->name = "greaterOrEquals"; - lower_filter_function->arguments = lower_filter_args; - lower_filter_function->children.push_back(lower_filter_function->arguments); - ASTPtr upper_filter_args = new ASTExpressionList; upper_filter_args->children.push_back(data.sampling_expression); upper_filter_args->children.push_back(new ASTLiteral(StringRange(), sampling_column_value_upper_limit)); @@ -219,14 +205,33 @@ BlockInputStreams MergeTreeDataSelectExecutor::read( upper_filter_function->arguments = upper_filter_args; upper_filter_function->children.push_back(upper_filter_function->arguments); - ASTPtr filter_function_args = new ASTExpressionList; - filter_function_args->children.push_back(lower_filter_function); - filter_function_args->children.push_back(upper_filter_function); + if (sampling_column_value_lower_limit > 0) + { + /// Выражение для фильтрации: sampling_expression in [sampling_column_value_lower_limit, sampling_column_value_upper_limit) - filter_function = new ASTFunction; - filter_function->name = "and"; - filter_function->arguments = filter_function_args; - filter_function->children.push_back(filter_function->arguments); + ASTPtr lower_filter_args = new ASTExpressionList; + lower_filter_args->children.push_back(data.sampling_expression); + lower_filter_args->children.push_back(new ASTLiteral(StringRange(), sampling_column_value_lower_limit)); + + ASTFunctionPtr lower_filter_function = new ASTFunction; + lower_filter_function->name = "greaterOrEquals"; + lower_filter_function->arguments = lower_filter_args; + lower_filter_function->children.push_back(lower_filter_function->arguments); + + ASTPtr filter_function_args = new ASTExpressionList; + filter_function_args->children.push_back(lower_filter_function); + filter_function_args->children.push_back(upper_filter_function); + + filter_function = new ASTFunction; + filter_function->name = "and"; + filter_function->arguments = filter_function_args; + filter_function->children.push_back(filter_function->arguments); + } + else + { + /// Выражение для фильтрации: sampling_expression < sampling_column_value_upper_limit + filter_function = upper_filter_function; + } filter_expression = ExpressionAnalyzer(filter_function, data.context, data.getColumnsList()).getActions(false); diff --git a/dbms/src/Storages/MergeTree/MergeTreePartChecker.cpp b/dbms/src/Storages/MergeTree/MergeTreePartChecker.cpp index 5f2cff11b7f..00746849888 100644 --- a/dbms/src/Storages/MergeTree/MergeTreePartChecker.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreePartChecker.cpp @@ -120,7 +120,7 @@ struct Stream /// Если засечка должна быть ровно на границе блоков, нам подходит и засечка, указывающая на конец предыдущего блока, /// и на начало следующего. - if (uncompressed_hashing_buf.position() == uncompressed_hashing_buf.buffer().end()) + if (!uncompressed_hashing_buf.hasPendingData()) { /// Получим засечку, указывающую на конец предыдущего блока. has_alternative_mark = true; diff --git a/dbms/src/Storages/StorageDistributed.cpp b/dbms/src/Storages/StorageDistributed.cpp index ca777c69da7..bcbae892eaa 100644 --- a/dbms/src/Storages/StorageDistributed.cpp +++ b/dbms/src/Storages/StorageDistributed.cpp @@ -12,7 +12,7 @@ #include -#include +#include namespace DB { @@ -149,7 +149,7 @@ BlockInputStreams StorageDistributed::read( Settings new_settings = settings; new_settings.queue_max_wait_ms = Cluster::saturate(new_settings.queue_max_wait_ms, settings.limits.max_execution_time); - size_t result_size = cluster.pools.size() + cluster.getLocalNodesNum(); + size_t result_size = (cluster.pools.size() * settings.max_parallel_replicas) + cluster.getLocalNodesNum(); processed_stage = result_size == 1 ? QueryProcessingStage::Complete @@ -160,10 +160,15 @@ BlockInputStreams StorageDistributed::read( query, remote_database, remote_table); const auto & modified_query = queryToString(modified_query_ast); + /// Ограничение сетевого трафика, если нужно. + ThrottlerPtr throttler; + if (settings.limits.max_network_bandwidth) + throttler.reset(new Throttler(settings.limits.max_network_bandwidth)); + /// Цикл по шардам. for (auto & conn_pool : cluster.pools) res.emplace_back(new RemoteBlockInputStream{ - conn_pool, modified_query, &new_settings, + conn_pool, modified_query, &new_settings, throttler, external_tables, processed_stage, context}); /// Добавляем запросы к локальному ClickHouse. @@ -234,7 +239,7 @@ bool StorageDistributed::hasColumn(const String & column_name) const void StorageDistributed::createDirectoryMonitor(const std::string & name) { - directory_monitors.emplace(name, ext::make_unique(*this, name)); + directory_monitors.emplace(name, std::make_unique(*this, name)); } void StorageDistributed::createDirectoryMonitors() diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index e14520cfc77..8a9ceb09d99 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -2027,7 +2027,7 @@ BlockInputStreams StorageReplicatedMergeTree::read( size_t part_index = 0; - if (process_unreplicated && unreplicated_reader && values.count(0)) + if ((settings.parallel_replica_offset == 0) && unreplicated_reader && values.count(0)) { res = unreplicated_reader->read(real_column_names, query, context, settings, processed_stage, @@ -2107,7 +2107,7 @@ void StorageReplicatedMergeTree::alter(const AlterCommands & params, auto zookeeper = getZooKeeper(); const MergeTreeMergeBlocker merge_blocker{merger}; const auto unreplicated_merge_blocker = unreplicated_merger ? - ext::make_unique(*unreplicated_merger) : nullptr; + std::make_unique(*unreplicated_merger) : nullptr; LOG_DEBUG(log, "Doing ALTER"); diff --git a/dbms/tests/queries/0_stateless/00124_distributed_with_many_replicas.reference b/dbms/tests/queries/0_stateless/00124_distributed_with_many_replicas.reference index efa2d8b20ab..f2c230b213c 100644 --- a/dbms/tests/queries/0_stateless/00124_distributed_with_many_replicas.reference +++ b/dbms/tests/queries/0_stateless/00124_distributed_with_many_replicas.reference @@ -3,48 +3,3 @@ 3 2015-03-01 3 foo 4 2015-04-01 4 bar 5 2015-05-01 5 foo -6 2015-06-01 6 bar -7 2015-07-01 7 foo -8 2015-08-01 8 bar -9 2015-09-01 9 foo -10 2015-10-01 10 bar -11 2015-11-01 1 foo -12 2015-12-01 2 bar -13 2015-01-01 3 foo -14 2015-02-01 4 bar -15 2015-03-01 5 foo -16 2015-04-01 6 bar -17 2015-05-01 7 foo -18 2015-06-01 8 bar -19 2015-07-01 9 foo -20 2015-08-01 10 bar -21 2015-09-01 1 foo -22 2015-10-01 2 bar -23 2015-11-01 3 foo -24 2015-12-01 4 bar -25 2015-01-01 5 foo -26 2015-02-01 6 bar -27 2015-03-01 7 foo -28 2015-04-01 8 bar -29 2015-05-01 9 foo -30 2015-06-01 10 bar -31 2015-07-01 1 foo -32 2015-08-01 2 bar -33 2015-09-01 3 foo -34 2015-10-01 4 bar -35 2015-11-01 5 foo -36 2015-12-01 6 bar -37 2015-01-01 7 foo -38 2015-02-01 8 bar -39 2015-03-01 9 foo -40 2015-04-01 10 bar -41 2015-05-01 1 foo -42 2015-06-01 2 bar -43 2015-07-01 3 foo -44 2015-08-01 4 bar -45 2015-09-01 5 foo -46 2015-10-01 6 bar -47 2015-11-01 7 foo -48 2015-12-01 8 bar -49 2015-01-01 9 foo -50 2015-02-01 10 bar diff --git a/dbms/tests/queries/0_stateless/00124_distributed_with_many_replicas.sql b/dbms/tests/queries/0_stateless/00124_distributed_with_many_replicas.sql index a97ce710c2e..489c46eb8f2 100644 --- a/dbms/tests/queries/0_stateless/00124_distributed_with_many_replicas.sql +++ b/dbms/tests/queries/0_stateless/00124_distributed_with_many_replicas.sql @@ -4,57 +4,7 @@ DROP TABLE IF EXISTS test.report; CREATE TABLE test.report(id UInt32, event_date Date, priority UInt32, description String) ENGINE = MergeTree(event_date, intHash32(id), (id, event_date, intHash32(id)), 8192); -INSERT INTO test.report(id,event_date,priority,description) VALUES(1, '2015-01-01', 1, 'foo'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(2, '2015-02-01', 2, 'bar'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(3, '2015-03-01', 3, 'foo'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(4, '2015-04-01', 4, 'bar'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(5, '2015-05-01', 5, 'foo'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(6, '2015-06-01', 6, 'bar'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(7, '2015-07-01', 7, 'foo'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(8, '2015-08-01', 8, 'bar'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(9, '2015-09-01', 9, 'foo'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(10, '2015-10-01', 10, 'bar'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(11, '2015-11-01', 1, 'foo'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(12, '2015-12-01', 2, 'bar'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(13, '2015-01-01', 3, 'foo'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(14, '2015-02-01', 4, 'bar'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(15, '2015-03-01', 5, 'foo'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(16, '2015-04-01', 6, 'bar'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(17, '2015-05-01', 7, 'foo'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(18, '2015-06-01', 8, 'bar'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(19, '2015-07-01', 9, 'foo'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(20, '2015-08-01', 10, 'bar'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(21, '2015-09-01', 1, 'foo'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(22, '2015-10-01', 2, 'bar'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(23, '2015-11-01', 3, 'foo'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(24, '2015-12-01', 4, 'bar'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(25, '2015-01-01', 5, 'foo'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(26, '2015-02-01', 6, 'bar'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(27, '2015-03-01', 7, 'foo'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(28, '2015-04-01', 8, 'bar'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(29, '2015-05-01', 9, 'foo'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(30, '2015-06-01', 10, 'bar'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(31, '2015-07-01', 1, 'foo'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(32, '2015-08-01', 2, 'bar'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(33, '2015-09-01', 3, 'foo'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(34, '2015-10-01', 4, 'bar'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(35, '2015-11-01', 5, 'foo'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(36, '2015-12-01', 6, 'bar'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(37, '2015-01-01', 7, 'foo'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(38, '2015-02-01', 8, 'bar'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(39, '2015-03-01', 9, 'foo'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(40, '2015-04-01', 10, 'bar'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(41, '2015-05-01', 1, 'foo'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(42, '2015-06-01', 2, 'bar'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(43, '2015-07-01', 3, 'foo'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(44, '2015-08-01', 4, 'bar'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(45, '2015-09-01', 5, 'foo'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(46, '2015-10-01', 6, 'bar'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(47, '2015-11-01', 7, 'foo'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(48, '2015-12-01', 8, 'bar'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(49, '2015-01-01', 9, 'foo'); -INSERT INTO test.report(id,event_date,priority,description) VALUES(50, '2015-02-01', 10, 'bar'); - +INSERT INTO test.report(id,event_date,priority,description) VALUES (1, '2015-01-01', 1, 'foo')(2, '2015-02-01', 2, 'bar')(3, '2015-03-01', 3, 'foo')(4, '2015-04-01', 4, 'bar')(5, '2015-05-01', 5, 'foo'); SELECT * FROM (SELECT id, event_date, priority, description FROM remote('127.0.0.{1|2}', test, report)) ORDER BY id ASC; DROP TABLE test.report; diff --git a/tools/init.d/template b/tools/init.d/template index b2ecbe66dd8..ca290798493 100755 --- a/tools/init.d/template +++ b/tools/init.d/template @@ -93,6 +93,11 @@ any_runs() if [[ $(running_processes) -gt 0 ]]; then return 0; else return 1; fi } +all_runs() +{ + [[ $(running_processes) -eq $NUMBER_OF_PROCESSES ]] +} + wait4done() { while any_runs; do @@ -108,7 +113,7 @@ start() echo -n "Start $PROGRAM service: " - if [[ $(running_processes) -eq $NUMBER_OF_PROCESSES ]]; then + if all_runs; then echo -n "already running " EXIT_STATUS=1 else @@ -236,7 +241,7 @@ main() restart ;; condstart) - any_runs || start + all_runs || start ;; condstop) any_runs && stop