diff --git a/dbms/include/DB/Client/Connection.h b/dbms/include/DB/Client/Connection.h index 8bba6f362c4..b36fd2a9992 100644 --- a/dbms/include/DB/Client/Connection.h +++ b/dbms/include/DB/Client/Connection.h @@ -26,8 +26,6 @@ namespace DB using Poco::SharedPtr; -class ParallelReplicas; - /// Поток блоков читающих из таблицы и ее имя typedef std::pair ExternalTableData; /// Вектор пар, описывающих таблицы @@ -44,6 +42,7 @@ typedef std::vector ExternalTablesData; class Connection : private boost::noncopyable { friend class ParallelReplicas; + friend class MultiplexedConnections; public: Connection(const String & host_, UInt16 port_, const String & default_database_, diff --git a/dbms/include/DB/Client/ConnectionPool.h b/dbms/include/DB/Client/ConnectionPool.h index c752fb49719..cb55677e87e 100644 --- a/dbms/include/DB/Client/ConnectionPool.h +++ b/dbms/include/DB/Client/ConnectionPool.h @@ -57,7 +57,7 @@ protected: typedef SharedPtr ConnectionPoolPtr; typedef std::vector ConnectionPools; - +typedef SharedPtr ConnectionPoolsPtr; /** Обычный пул соединений, без отказоустойчивости. diff --git a/dbms/include/DB/Client/ParallelReplicas.h b/dbms/include/DB/Client/MultiplexedConnections.h similarity index 64% rename from dbms/include/DB/Client/ParallelReplicas.h rename to dbms/include/DB/Client/MultiplexedConnections.h index c1e99a50215..299e5deb7fd 100644 --- a/dbms/include/DB/Client/ParallelReplicas.h +++ b/dbms/include/DB/Client/MultiplexedConnections.h @@ -6,30 +6,38 @@ #include #include - namespace DB { -/** Для получения данных сразу из нескольких реплик (соединений) в рамках одного потока. - * В качестве вырожденного случая, может также работать с одним соединением. +/** Для получения данных сразу из нескольких реплик (соединений) из одного или нексольких шардов + * в рамках одного потока. В качестве вырожденного случая, может также работать с одним соединением. * Предполагается, что все функции кроме sendCancel всегда выполняются в одном потоке. * * Интерфейс почти совпадает с Connection. */ -class ParallelReplicas final : private boost::noncopyable +class MultiplexedConnections final : private boost::noncopyable { public: /// Принимает готовое соединение. - ParallelReplicas(Connection * connection_, const Settings * settings_, ThrottlerPtr throttler_); + MultiplexedConnections(Connection * connection_, const Settings * settings_, ThrottlerPtr throttler_); /** Принимает пул, из которого нужно будет достать одно или несколько соединений. * Если флаг append_extra_info установлен, к каждому полученному блоку прилагается * дополнительная информация. * Если флаг get_all_replicas установлен, достаются все соединения. */ - ParallelReplicas(IConnectionPool * pool_, const Settings * settings_, ThrottlerPtr throttler_, - bool append_extra_info = false, bool get_all_replicas = false); + MultiplexedConnections(IConnectionPool * pool_, const Settings * settings_, ThrottlerPtr throttler_, + bool append_extra_info = false, bool do_broadcast = false); + + /** Принимает пулы, один для каждого шарда, из которих нужно будет достать одно или несколько + * соединений. + * Если флаг append_extra_info установлен, к каждому полученному блоку прилагается + * дополнительная информация. + * Если флаг do_broadcast установлен, достаются все соединения. + */ + MultiplexedConnections(ConnectionPools & pools_, const Settings * settings_, ThrottlerPtr throttler_, + bool append_extra_info = false, bool do_broadcast = false); /// Отправить на реплики всё содержимое внешних таблиц. void sendExternalTablesData(std::vector & data); @@ -65,15 +73,44 @@ public: /// Проверить, есть ли действительные реплики. /// Без блокировки, потому что sendCancel() не меняет состояние реплик. - bool hasActiveReplicas() const { return active_replica_count > 0; } + bool hasActiveConnections() const { return active_connection_total_count > 0; } private: - /// Реплики хэшированные по id сокета - using ReplicaMap = std::unordered_map; + /// Соединения 1-го шарда, затем соединения 2-го шарда, и т.д. + using Connections = std::vector; + + /// Состояние соединений одного шарда. + struct ShardState + { + /// Количество выделенных соединений, т.е. реплик, для этого шарда. + size_t allocated_connection_count; + /// Текущее количество действительных соединений к репликам этого шарда. + size_t active_connection_count; + }; + + /// Описание одной реплики. + struct ReplicaState + { + /// Индекс соединения. + size_t connection_index; + /// Владелец этой реплики. + ShardState * shard_state; + }; + + /// Реплики хэшированные по id сокета. + using ReplicaMap = std::unordered_map; + + /// Состояние каждого шарда. + using ShardStates = std::vector; private: - /// Зарегистрировать реплику. - void registerReplica(Connection * connection); + void initFromShard(IConnectionPool * pool); + + /// Зарегистрировать шарды. + void registerShards(); + + /// Зарегистрировать реплики одного шарда. + void registerReplicas(size_t index_begin, size_t index_end, ShardState & shard_state); /// Внутренняя версия функции receivePacket без блокировки. Connection::Packet receivePacketUnlocked(); @@ -94,13 +131,15 @@ private: private: const Settings * settings; + + Connections connections; ReplicaMap replica_map; + ShardStates shard_states; /// Если не nullptr, то используется, чтобы ограничить сетевой трафик. ThrottlerPtr throttler; std::vector pool_entries; - ConnectionPool::Entry pool_entry; /// Соединение, c которого был получен последний блок. Connection * current_connection; @@ -108,7 +147,7 @@ private: std::unique_ptr block_extra_info; /// Текущее количество действительных соединений к репликам. - size_t active_replica_count; + size_t active_connection_total_count = 0; /// Запрос выполняется параллельно на нескольких репликах. bool supports_parallel_execution; /// Отправили запрос @@ -116,6 +155,8 @@ private: /// Отменили запрос bool cancelled = false; + bool do_broadcast = false; + /// Мьютекс для того, чтобы функция sendCancel могла выполняться безопасно /// в отдельном потоке. mutable Poco::FastMutex cancel_mutex; diff --git a/dbms/include/DB/Columns/ColumnAggregateFunction.h b/dbms/include/DB/Columns/ColumnAggregateFunction.h index a2f9cbb45fa..6677c190f6d 100644 --- a/dbms/include/DB/Columns/ColumnAggregateFunction.h +++ b/dbms/include/DB/Columns/ColumnAggregateFunction.h @@ -107,6 +107,18 @@ public: { const IAggregateFunction * function = holder->func; ColumnPtr res = function->getReturnType()->createColumn(); + + /** Если агрегатная функция возвращает нефинализированное состояние, + * то надо просто скопировать указатели на него а также разделяемое владение аренами. + */ + if (typeid_cast(res.get())) + { + ColumnAggregateFunction * res_ = new ColumnAggregateFunction(*this); + res = res_; + res_->getData().assign(getData().begin(), getData().end()); + return res; + } + IColumn & column = *res; res->reserve(getData().size()); diff --git a/dbms/include/DB/DataStreams/RemoteBlockInputStream.h b/dbms/include/DB/DataStreams/RemoteBlockInputStream.h index 8e7d270456e..89866341d15 100644 --- a/dbms/include/DB/DataStreams/RemoteBlockInputStream.h +++ b/dbms/include/DB/DataStreams/RemoteBlockInputStream.h @@ -3,67 +3,63 @@ #include #include -#include -#include #include #include - #include -#include - +#include namespace DB { -/** Позволяет выполнить запрос (SELECT) на удалённых репликах одного шарда и получить результат. +/** Позволяет выполнить запрос на удалённых репликах одного шарда и получить результат. */ class RemoteBlockInputStream : public IProfilingBlockInputStream { -private: - void init(const Settings * settings_) - { - if (settings_) - { - send_settings = true; - settings = *settings_; - } - else - send_settings = false; - } - public: /// Принимает готовое соединение. - RemoteBlockInputStream(Connection & connection_, const String & query_, const Settings * settings_, ThrottlerPtr throttler_ = nullptr, - const Tables & external_tables_ = Tables(), QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete, - const Context & context = getDefaultContext()) - : connection(&connection_), query(query_), throttler(throttler_), external_tables(external_tables_), stage(stage_), context(context) - { - init(settings_); - } + RemoteBlockInputStream(Connection & connection_, const String & query_, const Settings * settings_, + ThrottlerPtr throttler_ = nullptr, const Tables & external_tables_ = Tables(), + QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete, + const Context & context_ = getDefaultContext()); /// Принимает готовое соединение. Захватывает владение соединением из пула. - RemoteBlockInputStream(ConnectionPool::Entry & pool_entry_, const String & query_, const Settings * settings_, ThrottlerPtr throttler_ = nullptr, - const Tables & external_tables_ = Tables(), QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete, - const Context & context = getDefaultContext()) - : pool_entry(pool_entry_), connection(&*pool_entry_), query(query_), throttler(throttler_), - external_tables(external_tables_), stage(stage_), context(context) - { - init(settings_); - } + RemoteBlockInputStream(ConnectionPool::Entry & pool_entry_, const String & query_, const Settings * settings_, + ThrottlerPtr throttler_ = nullptr, const Tables & external_tables_ = Tables(), + QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete, + const Context & context_ = getDefaultContext()); /// Принимает пул, из которого нужно будет достать одно или несколько соединений. - RemoteBlockInputStream(IConnectionPool * pool_, const String & query_, const Settings * settings_, ThrottlerPtr throttler_ = nullptr, - const Tables & external_tables_ = Tables(), QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete, - const Context & context = getDefaultContext()) - : pool(pool_), query(query_), throttler(throttler_), external_tables(external_tables_), stage(stage_), context(context) - { - init(settings_); - } + RemoteBlockInputStream(IConnectionPool * pool_, const String & query_, const Settings * settings_, + ThrottlerPtr throttler_ = nullptr, const Tables & external_tables_ = Tables(), + QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete, + const Context & context_ = getDefaultContext()); + /// Принимает пулы - один для каждого шарда -, из которых нужно будет достать одно или несколько соединений. + RemoteBlockInputStream(ConnectionPoolsPtr & pools_, const String & query_, const Settings * settings_, + ThrottlerPtr throttler_ = nullptr, const Tables & external_tables_ = Tables(), + QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete, + const Context & context_ = getDefaultContext()); + + ~RemoteBlockInputStream() override; + + /// Отправить запрос на все существующие реплики. + void doBroadcast(); + + /// Кроме блоков, получить информацию о блоках. + void appendExtraInfo(); + + /// Отправляет запрос (инициирует вычисления) раньше, чем read. + void readPrefix() override; + + /** Отменяем умолчальное уведомление о прогрессе, + * так как колбэк прогресса вызывается самостоятельно. + */ + void progress(const Progress & value) override {} + + void cancel() override; String getName() const override { return "Remote"; } - String getID() const override { std::stringstream res; @@ -71,249 +67,35 @@ public: return res.str(); } - - /** Отменяем умолчальное уведомление о прогрессе, - * так как колбэк прогресса вызывается самостоятельно. - */ - void progress(const Progress & value) override {} - - - void cancel() override - { - bool old_val = false; - if (!is_cancelled.compare_exchange_strong(old_val, true, std::memory_order_seq_cst, std::memory_order_relaxed)) - return; - - { - std::lock_guard lock(external_tables_mutex); - - /// Останавливаем отправку внешних данных. - for (auto & vec : external_tables_data) - for (auto & elem : vec) - if (IProfilingBlockInputStream * stream = dynamic_cast(elem.first.get())) - stream->cancel(); - } - - if (!isQueryPending() || hasThrownException()) - return; - - tryCancel("Cancelling query"); - } - - - ~RemoteBlockInputStream() override - { - /** Если прервались в середине цикла общения с репликами, то прервываем - * все соединения, затем читаем и пропускаем оставшиеся пакеты чтобы - * эти соединения не остались висеть в рассихронизированном состоянии. - */ - if (established || isQueryPending()) - parallel_replicas->disconnect(); - } - - - /// Отправляет запрос (инициирует вычисления) раньше, чем read. - void readPrefix() override - { - if (!sent_query) - sendQuery(); - } - - /// Отправить запрос на все существующие реплики. - void reachAllReplicas() - { - reach_all_replicas = true; - } - - /// Кроме блоков, получить информацию о блоках. - void appendExtraInfo() - { - append_extra_info = true; - } - BlockExtraInfo getBlockExtraInfo() const override { - return parallel_replicas->getBlockExtraInfo(); + return multiplexed_connections->getBlockExtraInfo(); } protected: /// Отправить на удаленные серверы все временные таблицы. - void sendExternalTables() - { - size_t count = parallel_replicas->size(); + void sendExternalTables(); - { - std::lock_guard lock(external_tables_mutex); + Block readImpl() override; - external_tables_data.reserve(count); - - for (size_t i = 0; i < count; ++i) - { - ExternalTablesData res; - for (const auto & table : external_tables) - { - StoragePtr cur = table.second; - QueryProcessingStage::Enum stage = QueryProcessingStage::Complete; - DB::BlockInputStreams input = cur->read(cur->getColumnNamesList(), ASTPtr(), context, settings, - stage, DEFAULT_BLOCK_SIZE, 1); - if (input.size() == 0) - res.push_back(std::make_pair(new OneBlockInputStream(cur->getSampleBlock()), table.first)); - else - res.push_back(std::make_pair(input[0], table.first)); - } - external_tables_data.push_back(std::move(res)); - } - } - - parallel_replicas->sendExternalTablesData(external_tables_data); - } - - - Block readImpl() override - { - if (!sent_query) - { - sendQuery(); - - if (settings.skip_unavailable_shards && 0 == parallel_replicas->size()) - return {}; - } - - while (true) - { - if (isCancelled()) - return Block(); - - Connection::Packet packet = parallel_replicas->receivePacket(); - - switch (packet.type) - { - case Protocol::Server::Data: - /// Если блок не пуст и не является заголовочным блоком - if (packet.block && packet.block.rows() > 0) - return packet.block; - break; /// Если блок пустой - получим другие пакеты до EndOfStream. - - case Protocol::Server::Exception: - got_exception_from_replica = true; - packet.exception->rethrow(); - break; - - case Protocol::Server::EndOfStream: - if (!parallel_replicas->hasActiveReplicas()) - { - finished = true; - return Block(); - } - break; - - case Protocol::Server::Progress: - /** Используем прогресс с удалённого сервера. - * В том числе, запишем его в ProcessList, - * и будем использовать его для проверки - * ограничений (например, минимальная скорость выполнения запроса) - * и квот (например, на количество строчек для чтения). - */ - progressImpl(packet.progress); - break; - - case Protocol::Server::ProfileInfo: - info = packet.profile_info; - break; - - case Protocol::Server::Totals: - totals = packet.block; - break; - - case Protocol::Server::Extremes: - extremes = packet.block; - break; - - default: - got_unknown_packet_from_replica = true; - throw Exception("Unknown packet from server", ErrorCodes::UNKNOWN_PACKET_FROM_SERVER); - } - } - } - - void readSuffixImpl() override - { - /** Если одно из: - * - ничего не начинали делать; - * - получили все пакеты до EndOfStream; - * - получили с одной реплики эксепшен; - * - получили с одной реплики неизвестный пакет; - * - то больше читать ничего не нужно. - */ - if (!isQueryPending() || hasThrownException()) - return; - - /** Если ещё прочитали не все данные, но они больше не нужны. - * Это может быть из-за того, что данных достаточно (например, при использовании LIMIT). - */ - - /// Отправим просьбу прервать выполнение запроса, если ещё не отправляли. - tryCancel("Cancelling query because enough data has been read"); - - /// Получим оставшиеся пакеты, чтобы не было рассинхронизации в соединениях с репликами. - Connection::Packet packet = parallel_replicas->drain(); - switch (packet.type) - { - case Protocol::Server::EndOfStream: - finished = true; - break; - - case Protocol::Server::Exception: - got_exception_from_replica = true; - packet.exception->rethrow(); - break; - - default: - got_unknown_packet_from_replica = true; - throw Exception("Unknown packet from server", ErrorCodes::UNKNOWN_PACKET_FROM_SERVER); - } - } + void readSuffixImpl() override; /// Создать объект для общения с репликами одного шарда, на которых должен выполниться запрос. - void createParallelReplicas() - { - Settings * parallel_replicas_settings = send_settings ? &settings : nullptr; - if (connection != nullptr) - parallel_replicas = std::make_unique(connection, parallel_replicas_settings, throttler); - else - parallel_replicas = std::make_unique(pool, parallel_replicas_settings, throttler, - append_extra_info, reach_all_replicas); - } + void createMultiplexedConnections(); /// Возвращает true, если запрос отправлен. - bool isQueryPending() const - { - return sent_query && !finished; - } + bool isQueryPending() const; /// Возвращает true, если исключение было выкинуто. - bool hasThrownException() const - { - return got_exception_from_replica || got_unknown_packet_from_replica; - } + bool hasThrownException() const; private: - void sendQuery() - { - createParallelReplicas(); + void init(const Settings * settings_); - if (settings.skip_unavailable_shards && 0 == parallel_replicas->size()) - return; + void sendQuery(); - established = true; - - parallel_replicas->sendQuery(query, "", stage, true); - - established = false; - sent_query = true; - - sendExternalTables(); - } + /// Отправить запрос на отмену всех соединений к репликам, если такой запрос ещё не был отправлен. + void tryCancel(const char * reason); /// ITable::read requires a Context, therefore we should create one if the user can't supply it static Context & getDefaultContext() @@ -322,23 +104,18 @@ private: return instance; } - /// Отправить запрос на отмену всех соединений к репликам, если такой запрос ещё не был отправлен. - void tryCancel(const char * reason) - { - bool old_val = false; - if (!was_cancelled.compare_exchange_strong(old_val, true, std::memory_order_seq_cst, std::memory_order_relaxed)) - return; - - LOG_TRACE(log, "(" << parallel_replicas->dumpAddresses() << ") " << reason); - parallel_replicas->sendCancel(); - } - private: - IConnectionPool * pool = nullptr; - + /// Готовое соединение. ConnectionPool::Entry pool_entry; Connection * connection = nullptr; - std::unique_ptr parallel_replicas; + + /// Пул соединений одного шарда. + IConnectionPool * pool = nullptr; + + /// Пулы соединений одного или нескольких шардов. + ConnectionPoolsPtr pools; + + std::unique_ptr multiplexed_connections; const String query; bool send_settings; @@ -384,7 +161,7 @@ private: std::atomic got_unknown_packet_from_replica { false }; bool append_extra_info = false; - bool reach_all_replicas = false; + bool do_broadcast = false; Logger * log = &Logger::get("RemoteBlockInputStream"); }; diff --git a/dbms/include/DB/Dictionaries/FlatDictionary.h b/dbms/include/DB/Dictionaries/FlatDictionary.h index 56a736c7c7d..aba5fded457 100644 --- a/dbms/include/DB/Dictionaries/FlatDictionary.h +++ b/dbms/include/DB/Dictionaries/FlatDictionary.h @@ -322,7 +322,7 @@ private: { const auto & null_value_ref = std::get(attr.null_values) = null_value.get(); std::get>>(attr.arrays) = - std::make_unique>(initial_array_size, null_value_ref); + std::make_unique>(initial_array_size, StringRef{null_value_ref}); attr.string_arena = std::make_unique(); break; } @@ -393,7 +393,7 @@ private: { auto & array = *std::get>>(attribute.arrays); if (id >= array.size()) - array.resize_fill(id + 1, std::get(attribute.null_values)); + array.resize_fill(id + 1, StringRef{std::get(attribute.null_values)}); const auto & string = value.get(); const auto string_in_arena = attribute.string_arena->insert(string.data(), string.size()); array[id] = StringRef{string_in_arena, string.size()}; diff --git a/dbms/include/DB/Dictionaries/RangeHashedDictionary.h b/dbms/include/DB/Dictionaries/RangeHashedDictionary.h index 061b2f75773..ce1c549cc4d 100644 --- a/dbms/include/DB/Dictionaries/RangeHashedDictionary.h +++ b/dbms/include/DB/Dictionaries/RangeHashedDictionary.h @@ -117,7 +117,7 @@ public: const auto val_it = std::find_if(std::begin(ranges_and_values), std::end(ranges_and_values), [date] (const value_t & v) { return v.range.contains(date); }); - const auto string_ref = val_it != std::end(ranges_and_values) ? val_it->value : null_value; + const auto string_ref = val_it != std::end(ranges_and_values) ? val_it->value : StringRef{null_value}; out->insertData(string_ref.data, string_ref.size); } else diff --git a/dbms/include/DB/Functions/FunctionsDictionaries.h b/dbms/include/DB/Functions/FunctionsDictionaries.h index 884dcfd77d5..68249f9c32c 100644 --- a/dbms/include/DB/Functions/FunctionsDictionaries.h +++ b/dbms/include/DB/Functions/FunctionsDictionaries.h @@ -1106,15 +1106,22 @@ private: if (const auto default_col = typeid_cast(default_col_untyped)) { + /// vector ids, vector defaults const auto out = new ColumnString; block.getByPosition(result).column = out; dictionary->getString(attr_name, id_col->getData(), default_col, out); } else if (const auto default_col = typeid_cast *>(default_col_untyped)) { + /// vector ids, const defaults const auto out = new ColumnString; block.getByPosition(result).column = out; - dictionary->getString(attr_name, id_col->getData(), out); + + /// @todo avoid materialization + const auto default_col_materialized = default_col->convertToFullColumn(); + + dictionary->getString(attr_name, id_col->getData(), + static_cast(default_col_materialized.get()), out); } else throw Exception{ @@ -1132,19 +1139,25 @@ private: if (const auto default_col = typeid_cast(default_col_untyped)) { - const PODArray ids(1, id_col->getData()); - auto out = std::make_unique(); - dictionary->getString(attr_name, ids, default_col, out.get()); + /// const ids, vector defaults + /// @todo avoid materialization + const PODArray ids(id_col->size(), id_col->getData()); + const auto out = new ColumnString; + block.getByPosition(result).column = out; - block.getByPosition(result).column = new ColumnConst{ - id_col->size(), out->getDataAt(0).toString() - }; + dictionary->getString(attr_name, ids, default_col, out); } else if (const auto default_col = typeid_cast *>(default_col_untyped)) { + /// const ids, const defaults const PODArray ids(1, id_col->getData()); auto out = std::make_unique(); - dictionary->getString(attr_name, ids, out.get()); + + /// create ColumnString with default + const auto defs = std::make_unique(); + defs->insert(Field{default_col->getData()}); + + dictionary->getString(attr_name, ids, defs.get(), out.get()); block.getByPosition(result).column = new ColumnConst{ id_col->size(), out->getDataAt(0).toString() @@ -1179,6 +1192,13 @@ template <> struct DictGetTraits\ {\ dict->get##TYPE(name, ids, dates, out);\ }\ + template \ + static void getOrDefault(\ + const DictionaryType * const dict, const std::string & name, const PODArray & ids,\ + const PODArray & def, PODArray & out)\ + {\ + dict->get##TYPE(name, ids, def, out);\ + }\ }; DECLARE_DICT_GET_TRAITS(UInt8, DataTypeUInt8) DECLARE_DICT_GET_TRAITS(UInt16, DataTypeUInt16) @@ -1473,6 +1493,230 @@ using FunctionDictGetDate = FunctionDictGet; using FunctionDictGetDateTime = FunctionDictGet; +template +class FunctionDictGetOrDefault final : public IFunction +{ + using Type = typename DataType::FieldType; + +public: + static const std::string name; + + static IFunction * create(const Context & context) + { + return new FunctionDictGetOrDefault{context.getExternalDictionaries()}; + } + + FunctionDictGetOrDefault(const ExternalDictionaries & dictionaries) : dictionaries(dictionaries) {} + + String getName() const override { return name; } + +private: + DataTypePtr getReturnType(const DataTypes & arguments) const override + { + if (arguments.size() != 4) + throw Exception{ + "Number of arguments for function " + getName() + " doesn't match: passed " + + toString(arguments.size()) + ", should be 4.", + ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH + }; + + if (!typeid_cast(arguments[0].get())) + { + throw Exception{ + "Illegal type " + arguments[0]->getName() + " of first argument of function " + getName() + + ", expected a string.", + ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT + }; + } + + if (!typeid_cast(arguments[1].get())) + { + throw Exception{ + "Illegal type " + arguments[1]->getName() + " of second argument of function " + getName() + + ", expected a string.", + ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT + }; + } + + if (!typeid_cast(arguments[2].get())) + { + throw Exception{ + "Illegal type " + arguments[2]->getName() + " of third argument of function " + getName() + + ", must be UInt64.", + ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT + }; + } + + if (!typeid_cast(arguments[3].get())) + { + throw Exception{ + "Illegal type " + arguments[3]->getName() + " of fourth argument of function " + getName() + + ", must be " + DataType{}.getName() + ".", + ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT + }; + } + + return new DataType; + } + + void execute(Block & block, const ColumnNumbers & arguments, const size_t result) override + { + const auto dict_name_col = typeid_cast *>(block.getByPosition(arguments[0]).column.get()); + if (!dict_name_col) + throw Exception{ + "First argument of function " + getName() + " must be a constant string", + ErrorCodes::ILLEGAL_COLUMN + }; + + auto dict = dictionaries.getDictionary(dict_name_col->getData()); + const auto dict_ptr = dict.get(); + + if (!executeDispatch(block, arguments, result, dict_ptr) && + !executeDispatch(block, arguments, result, dict_ptr) && + !executeDispatch(block, arguments, result, dict_ptr)) + throw Exception{ + "Unsupported dictionary type " + dict_ptr->getTypeName(), + ErrorCodes::UNKNOWN_TYPE + }; + } + + template + bool executeDispatch(Block & block, const ColumnNumbers & arguments, const size_t result, + const IDictionaryBase * const dictionary) + { + const auto dict = typeid_cast(dictionary); + if (!dict) + return false; + + if (arguments.size() != 4) + throw Exception{ + "Function " + getName() + " for dictionary of type " + dict->getTypeName() + + " requires exactly 4 arguments.", + ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH + }; + + const auto attr_name_col = typeid_cast *>(block.getByPosition(arguments[1]).column.get()); + if (!attr_name_col) + throw Exception{ + "Second argument of function " + getName() + " must be a constant string", + ErrorCodes::ILLEGAL_COLUMN + }; + + const auto & attr_name = attr_name_col->getData(); + + const auto id_col_untyped = block.getByPosition(arguments[2]).column.get(); + if (const auto id_col = typeid_cast *>(id_col_untyped)) + executeDispatch(block, arguments, result, dict, attr_name, id_col); + else if (const auto id_col = typeid_cast *>(id_col_untyped)) + executeDispatch(block, arguments, result, dict, attr_name, id_col); + else + throw Exception{ + "Third argument of function " + getName() + " must be UInt64", + ErrorCodes::ILLEGAL_COLUMN + }; + + return true; + } + + template + void executeDispatch( + Block & block, const ColumnNumbers & arguments, const size_t result, const DictionaryType * const dictionary, + const std::string & attr_name, const ColumnVector * const id_col) + { + const auto default_col_untyped = block.getByPosition(arguments[3]).column.get(); + + if (const auto default_col = typeid_cast *>(default_col_untyped)) + { + /// vector ids, vector defaults + const auto out = new ColumnVector(id_col->size()); + block.getByPosition(result).column = out; + + const auto & ids = id_col->getData(); + auto & data = out->getData(); + const auto & defs = default_col->getData(); + + DictGetTraits::getOrDefault(dictionary, attr_name, ids, defs, data); + } + else if (const auto default_col = typeid_cast *>(default_col_untyped)) + { + /// vector ids, const defaults + const auto out = new ColumnVector(id_col->size()); + block.getByPosition(result).column = out; + + const auto & ids = id_col->getData(); + auto & data = out->getData(); + + /// @todo avoid materialization + const PODArray defs(id_col->size(), default_col->getData()); + + DictGetTraits::getOrDefault(dictionary, attr_name, ids, defs, data); + } + else + throw Exception{ + "Fourth argument of function " + getName() + " must be " + DataType{}.getName(), + ErrorCodes::ILLEGAL_COLUMN + }; + } + + template + void executeDispatch( + Block & block, const ColumnNumbers & arguments, const size_t result, const DictionaryType * const dictionary, + const std::string & attr_name, const ColumnConst * const id_col) + { + const auto default_col_untyped = block.getByPosition(arguments[3]).column.get(); + + if (const auto default_col = typeid_cast *>(default_col_untyped)) + { + /// const ids, vector defaults + /// @todo avoid materialization + const PODArray ids(id_col->size(), id_col->getData()); + + const auto out = new ColumnVector(id_col->size()); + block.getByPosition(result).column = out; + + auto & data = out->getData(); + const auto & defs = default_col->getData(); + + DictGetTraits::getOrDefault(dictionary, attr_name, ids, defs, data); + } + else if (const auto default_col = typeid_cast *>(default_col_untyped)) + { + /// const ids, const defaults + const PODArray ids(1, id_col->getData()); + PODArray data(1); + const PODArray defs(1, default_col->getData()); + DictGetTraits::getOrDefault(dictionary, attr_name, ids, defs, data); + + block.getByPosition(result).column = new ColumnConst{id_col->size(), data.front()}; + } + else + throw Exception{ + "Fourth argument of function " + getName() + " must be " + DataType{}.getName(), + ErrorCodes::ILLEGAL_COLUMN + }; + } + + const ExternalDictionaries & dictionaries; +}; + +template +const std::string FunctionDictGetOrDefault::name = "dictGet" + DataType{}.getName() + "OrDefault"; + + +using FunctionDictGetUInt8OrDefault = FunctionDictGetOrDefault; +using FunctionDictGetUInt16OrDefault = FunctionDictGetOrDefault; +using FunctionDictGetUInt32OrDefault = FunctionDictGetOrDefault; +using FunctionDictGetUInt64OrDefault = FunctionDictGetOrDefault; +using FunctionDictGetInt8OrDefault = FunctionDictGetOrDefault; +using FunctionDictGetInt16OrDefault = FunctionDictGetOrDefault; +using FunctionDictGetInt32OrDefault = FunctionDictGetOrDefault; +using FunctionDictGetInt64OrDefault = FunctionDictGetOrDefault; +using FunctionDictGetFloat32OrDefault = FunctionDictGetOrDefault; +using FunctionDictGetFloat64OrDefault = FunctionDictGetOrDefault; +using FunctionDictGetDateOrDefault = FunctionDictGetOrDefault; +using FunctionDictGetDateTimeOrDefault = FunctionDictGetOrDefault; + + class FunctionDictGetHierarchy final : public IFunction { public: diff --git a/dbms/include/DB/Interpreters/Aggregator.h b/dbms/include/DB/Interpreters/Aggregator.h index 785af306698..ee45c92151c 100644 --- a/dbms/include/DB/Interpreters/Aggregator.h +++ b/dbms/include/DB/Interpreters/Aggregator.h @@ -743,6 +743,18 @@ struct AggregatedDataVariants : private boost::noncopyable typedef SharedPtr AggregatedDataVariantsPtr; typedef std::vector ManyAggregatedDataVariants; +/** Как считаются "тотальные" значения при наличии WITH TOTALS? + * (Более подробно смотрите в TotalsHavingBlockInputStream.) + * + * В случае отсутствия group_by_overflow_mode = 'any', данные агрегируются как обычно, но состояния агрегатных функций не финализируются. + * Позже, состояния агрегатных функций для всех строк (прошедших через HAVING) мерджатся в одну - это и будет TOTALS. + * + * В случае наличия group_by_overflow_mode = 'any', данные агрегируются как обычно, кроме ключей, не поместившихся в max_rows_to_group_by. + * Для этих ключей, данные агрегируются в одну дополнительную строку - далее см. под названиями overflow_row, overflows... + * Позже, состояния агрегатных функций для всех строк (прошедших через HAVING) мерджатся в одну, + * а также к ним прибавляется или не прибавляется (в зависимости от настройки totals_mode) также overflow_row - это и будет TOTALS. + */ + /** Агрегирует источник блоков. */ @@ -1032,7 +1044,7 @@ protected: size_t rows, Filler && filler) const; - BlocksList prepareBlocksAndFillWithoutKey(AggregatedDataVariants & data_variants, bool final) const; + BlocksList prepareBlocksAndFillWithoutKey(AggregatedDataVariants & data_variants, bool final, bool is_overflows) const; BlocksList prepareBlocksAndFillSingleLevel(AggregatedDataVariants & data_variants, bool final) const; BlocksList prepareBlocksAndFillTwoLevel(AggregatedDataVariants & data_variants, bool final, boost::threadpool::pool * thread_pool) const; diff --git a/dbms/include/DB/Interpreters/InterpreterAlterQuery.h b/dbms/include/DB/Interpreters/InterpreterAlterQuery.h index 0b22933e7d0..15adb928fea 100644 --- a/dbms/include/DB/Interpreters/InterpreterAlterQuery.h +++ b/dbms/include/DB/Interpreters/InterpreterAlterQuery.h @@ -4,7 +4,6 @@ #include #include #include -#include namespace DB { diff --git a/dbms/include/DB/Interpreters/InterpreterCheckQuery.h b/dbms/include/DB/Interpreters/InterpreterCheckQuery.h index 65f344fc78a..884c78a6978 100644 --- a/dbms/include/DB/Interpreters/InterpreterCheckQuery.h +++ b/dbms/include/DB/Interpreters/InterpreterCheckQuery.h @@ -2,7 +2,6 @@ #include #include -#include namespace DB { diff --git a/dbms/include/DB/Interpreters/InterpreterDescribeQuery.h b/dbms/include/DB/Interpreters/InterpreterDescribeQuery.h index a24bc0a099b..42e861b7b1e 100644 --- a/dbms/include/DB/Interpreters/InterpreterDescribeQuery.h +++ b/dbms/include/DB/Interpreters/InterpreterDescribeQuery.h @@ -2,7 +2,6 @@ #include #include -#include #include #include #include diff --git a/dbms/include/DB/Interpreters/InterpreterExistsQuery.h b/dbms/include/DB/Interpreters/InterpreterExistsQuery.h index b4f7636232c..ca2b8f3c5b5 100644 --- a/dbms/include/DB/Interpreters/InterpreterExistsQuery.h +++ b/dbms/include/DB/Interpreters/InterpreterExistsQuery.h @@ -2,7 +2,6 @@ #include #include -#include #include #include #include diff --git a/dbms/include/DB/Interpreters/InterpreterShowCreateQuery.h b/dbms/include/DB/Interpreters/InterpreterShowCreateQuery.h index 43c4eeb0ad4..929f2597d54 100644 --- a/dbms/include/DB/Interpreters/InterpreterShowCreateQuery.h +++ b/dbms/include/DB/Interpreters/InterpreterShowCreateQuery.h @@ -2,7 +2,6 @@ #include #include -#include #include #include #include diff --git a/dbms/include/DB/Interpreters/Settings.h b/dbms/include/DB/Interpreters/Settings.h index 3e30345907a..c17e287534b 100644 --- a/dbms/include/DB/Interpreters/Settings.h +++ b/dbms/include/DB/Interpreters/Settings.h @@ -168,6 +168,8 @@ struct Settings M(SettingUInt64, select_sequential_consistency, 0) \ /** Максимальное количество различных шардов и максимальное количество реплик одного шарда в функции remote. */ \ M(SettingUInt64, table_function_remote_max_addresses, 1000) \ + /** Маскимальное количество потоков при распределённой обработке одного запроса **/ \ + M(SettingUInt64, max_distributed_processing_threads, 8) \ /// Всевозможные ограничения на выполнение запроса. Limits limits; diff --git a/dbms/include/DB/Parsers/ASTIdentifier.h b/dbms/include/DB/Parsers/ASTIdentifier.h index 9056e7dec58..5032387e5b0 100644 --- a/dbms/include/DB/Parsers/ASTIdentifier.h +++ b/dbms/include/DB/Parsers/ASTIdentifier.h @@ -8,7 +8,7 @@ namespace DB { -/** Идентификатор (столбца или алиас, или именованый элемент кортежа) +/** Идентификатор (столбца или алиас) */ class ASTIdentifier : public ASTWithAlias { @@ -21,7 +21,7 @@ public: Format, }; - /// имя + /// имя. У составного идентификатора здесь будет конкатенированное имя (вида a.b.c), а отдельные составляюшие будут доступны внутри children. String name; /// чего идентифицирует этот идентификатор @@ -44,16 +44,7 @@ public: } protected: - void formatImplWithoutAlias(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override - { - settings.ostr << (settings.hilite ? hilite_identifier : ""); - - WriteBufferFromOStream wb(settings.ostr, 32); - writeProbablyBackQuotedString(name, wb); - wb.next(); - - settings.ostr << (settings.hilite ? hilite_none : ""); - } + void formatImplWithoutAlias(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override; }; } diff --git a/dbms/include/DB/Parsers/ASTJoin.h b/dbms/include/DB/Parsers/ASTJoin.h index 34eaeecd5e5..bd6b5b42f04 100644 --- a/dbms/include/DB/Parsers/ASTJoin.h +++ b/dbms/include/DB/Parsers/ASTJoin.h @@ -42,6 +42,7 @@ public: ASTPtr table; /// "Правая" таблица для соединения - подзапрос или имя таблицы. ASTPtr using_expr_list; /// По каким столбцам выполнять соединение. + ASTPtr on_expr; /// Условие соединения. Поддерживается либо USING либо ON, но не одновременно. ASTJoin() = default; ASTJoin(const StringRange range_) : IAST(range_) {} @@ -108,11 +109,16 @@ protected: table->formatImpl(settings, state, frame); - if (kind != ASTJoin::Cross) + if (using_expr_list) { settings.ostr << (settings.hilite ? hilite_keyword : "") << " USING " << (settings.hilite ? hilite_none : ""); using_expr_list->formatImpl(settings, state, frame); } + else if (on_expr) + { + settings.ostr << (settings.hilite ? hilite_keyword : "") << " ON " << (settings.hilite ? hilite_none : ""); + on_expr->formatImpl(settings, state, frame); + } } }; diff --git a/dbms/include/DB/Parsers/ExpressionElementParsers.h b/dbms/include/DB/Parsers/ExpressionElementParsers.h index e060a47fc17..f86d5cce61f 100644 --- a/dbms/include/DB/Parsers/ExpressionElementParsers.h +++ b/dbms/include/DB/Parsers/ExpressionElementParsers.h @@ -89,6 +89,15 @@ protected: bool parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_parsed_pos, Expected & expected); }; +/** Беззнаковое целое число, используется в качестве правой части оператора взятия элемента кортежа (x.1). + */ +class ParserUnsignedInteger : public IParserBase +{ +protected: + const char * getName() const { return "unsigned integer"; } + bool parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_parsed_pos, Expected & expected); +}; + /** Строка в одинарных кавычках. */ @@ -128,7 +137,13 @@ protected: */ class ParserAlias : public IParserBase { +public: + ParserAlias(bool allow_alias_without_as_keyword_) + : allow_alias_without_as_keyword(allow_alias_without_as_keyword_) {} protected: + bool allow_alias_without_as_keyword; + static const char * restricted_keywords[]; + const char * getName() const { return "alias"; } bool parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_parsed_pos, Expected & expected); }; @@ -149,9 +164,11 @@ protected: class ParserWithOptionalAlias : public IParserBase { public: - ParserWithOptionalAlias(ParserPtr && elem_parser_) : elem_parser(std::move(elem_parser_)) {} + ParserWithOptionalAlias(ParserPtr && elem_parser_, bool allow_alias_without_as_keyword_) + : elem_parser(std::move(elem_parser_)), allow_alias_without_as_keyword(allow_alias_without_as_keyword_) {} protected: ParserPtr elem_parser; + bool allow_alias_without_as_keyword; const char * getName() const { return "element of expression with optional alias"; } bool parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_parsed_pos, Expected & expected); diff --git a/dbms/include/DB/Parsers/ExpressionListParsers.h b/dbms/include/DB/Parsers/ExpressionListParsers.h index 76692d16f55..67f4e3b4576 100644 --- a/dbms/include/DB/Parsers/ExpressionListParsers.h +++ b/dbms/include/DB/Parsers/ExpressionListParsers.h @@ -111,13 +111,25 @@ protected: }; -class ParserAccessExpression : public IParserBase +class ParserTupleElementExpression : public IParserBase { private: static const char * operators[]; protected: - const char * getName() const { return "access expression"; } + const char * getName() const { return "tuple element expression"; } + + bool parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_parsed_pos, Expected & expected); +}; + + +class ParserArrayElementExpression : public IParserBase +{ +private: + static const char * operators[]; + +protected: + const char * getName() const { return "array element expression"; } bool parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_parsed_pos, Expected & expected); }; @@ -127,7 +139,7 @@ class ParserUnaryMinusExpression : public IParserBase { private: static const char * operators[]; - ParserPrefixUnaryOperatorExpression operator_parser {operators, ParserPtr(new ParserAccessExpression)}; + ParserPrefixUnaryOperatorExpression operator_parser {operators, ParserPtr(new ParserTupleElementExpression)}; protected: const char * getName() const { return "unary minus expression"; } @@ -270,7 +282,7 @@ protected: class ParserExpressionWithOptionalAlias : public IParserBase { public: - ParserExpressionWithOptionalAlias(); + ParserExpressionWithOptionalAlias(bool allow_alias_without_as_keyword); protected: ParserPtr impl; @@ -286,7 +298,13 @@ protected: /** Список выражений, разделённых запятыми, возможно пустой. */ class ParserExpressionList : public IParserBase { +public: + ParserExpressionList(bool allow_alias_without_as_keyword_) + : allow_alias_without_as_keyword(allow_alias_without_as_keyword_) {} + protected: + bool allow_alias_without_as_keyword; + const char * getName() const { return "list of expressions"; } bool parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_parsed_pos, Expected & expected); }; @@ -294,6 +312,9 @@ protected: class ParserNotEmptyExpressionList : public IParserBase { +public: + ParserNotEmptyExpressionList(bool allow_alias_without_as_keyword) + : nested_parser(allow_alias_without_as_keyword) {} private: ParserExpressionList nested_parser; protected: diff --git a/dbms/include/DB/Parsers/ParserShowProcesslistQuery.h b/dbms/include/DB/Parsers/ParserShowProcesslistQuery.h index d40f9bc4079..5db0b8496a5 100644 --- a/dbms/include/DB/Parsers/ParserShowProcesslistQuery.h +++ b/dbms/include/DB/Parsers/ParserShowProcesslistQuery.h @@ -4,7 +4,6 @@ #include #include #include -#include #include diff --git a/dbms/include/DB/Storages/MergeTree/MergeTreeDataMerger.h b/dbms/include/DB/Storages/MergeTree/MergeTreeDataMerger.h index abc56eafb3d..86afed06a81 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeTreeDataMerger.h +++ b/dbms/include/DB/Storages/MergeTree/MergeTreeDataMerger.h @@ -52,11 +52,12 @@ public: static size_t estimateDiskSpaceForMerge(const MergeTreeData::DataPartsVector & parts); /** Отменяет все мерджи. Все выполняющиеся сейчас вызовы mergeParts скоро бросят исключение. - * Все новые вызовы будут бросать исключения, пока не будет вызван uncancelAll(). + * Все новые вызовы будут бросать исключения, пока не будет вызван uncancel(). + * Считает количество таких вызовов для поддержки нескольких наложенных друг на друга отмен. */ - bool cancelAll() { return cancelled.exchange(true, std::memory_order_relaxed); } - bool uncancelAll() { return cancelled.exchange(false, std::memory_order_relaxed); } - bool isCancelled() const { return cancelled.load(std::memory_order_relaxed); } + void cancel() { cancelled.fetch_add(1); } + void uncancel() { cancelled.fetch_sub(1); } + bool isCancelled() const { return cancelled.load() > 0; } private: MergeTreeData & data; @@ -66,24 +67,28 @@ private: /// Когда в последний раз писали в лог, что место на диске кончилось (чтобы не писать об этом слишком часто). time_t disk_space_warning_time = 0; - std::atomic cancelled{false}; + std::atomic cancelled {0}; }; + +/** Временно приостанавливает мерджи. + */ class MergeTreeMergeBlocker { public: MergeTreeMergeBlocker(MergeTreeDataMerger & merger) - : merger(merger), was_cancelled{!merger.cancelAll()} {} + : merger(merger) + { + merger.cancel(); + } ~MergeTreeMergeBlocker() { - if (was_cancelled) - merger.uncancelAll(); + merger.uncancel(); } private: MergeTreeDataMerger & merger; - const bool was_cancelled; }; } diff --git a/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h b/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h index 79afb5c0efa..6b2c5993e8d 100644 --- a/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h +++ b/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h @@ -159,14 +159,6 @@ public: */ if (quorum) { - static std::once_flag once_flag; - std::call_once(once_flag, [&] - { - zookeeper->createIfNotExists(storage.zookeeper_path + "/quorum", ""); - zookeeper->createIfNotExists(storage.zookeeper_path + "/quorum/last_part", ""); - zookeeper->createIfNotExists(storage.zookeeper_path + "/quorum/failed_parts", ""); - }); - ReplicatedMergeTreeQuorumEntry quorum_entry; quorum_entry.part_name = part_name; quorum_entry.required_number_of_replicas = quorum; diff --git a/dbms/include/DB/Storages/StorageReplicatedMergeTree.h b/dbms/include/DB/Storages/StorageReplicatedMergeTree.h index 1adfa256469..0c7ed307d2c 100644 --- a/dbms/include/DB/Storages/StorageReplicatedMergeTree.h +++ b/dbms/include/DB/Storages/StorageReplicatedMergeTree.h @@ -344,6 +344,10 @@ private: */ void createReplica(); + /** Создать узлы в ZK, которые должны быть всегда, но которые могли не существовать при работе старых версий сервера. + */ + void createNewZooKeeperNodes(); + /** Проверить, что список столбцов и настройки таблицы совпадают с указанными в ZK (/metadata). * Если нет - бросить исключение. */ diff --git a/dbms/src/Client/MultiplexedConnections.cpp b/dbms/src/Client/MultiplexedConnections.cpp new file mode 100644 index 00000000000..10048cc8701 --- /dev/null +++ b/dbms/src/Client/MultiplexedConnections.cpp @@ -0,0 +1,430 @@ +#include + +namespace DB +{ + +MultiplexedConnections::MultiplexedConnections(Connection * connection_, const Settings * settings_, ThrottlerPtr throttler_) + : settings(settings_), throttler(throttler_), supports_parallel_execution(false) +{ + if (connection_ == nullptr) + throw Exception("Invalid connection specified", ErrorCodes::LOGICAL_ERROR); + + active_connection_total_count = 1; + + ShardState shard_state; + shard_state.allocated_connection_count = active_connection_total_count; + shard_state.active_connection_count = active_connection_total_count; + + shard_states.push_back(shard_state); + + ReplicaState replica_state; + replica_state.connection_index = 0; + replica_state.shard_state = &shard_states[0]; + + connection_->setThrottler(throttler); + connections.push_back(connection_); + + auto res = replica_map.emplace(connections[0]->socket.impl()->sockfd(), replica_state); + if (!res.second) + throw Exception("Invalid set of connections", ErrorCodes::LOGICAL_ERROR); +} + +MultiplexedConnections::MultiplexedConnections(IConnectionPool * pool_, const Settings * settings_, ThrottlerPtr throttler_, + bool append_extra_info, bool do_broadcast_) + : settings(settings_), throttler(throttler_), do_broadcast(do_broadcast_) +{ + if (pool_ == nullptr) + throw Exception("Invalid pool specified", ErrorCodes::LOGICAL_ERROR); + + initFromShard(pool_); + registerShards(); + + supports_parallel_execution = active_connection_total_count > 1; + + if (append_extra_info) + block_extra_info.reset(new BlockExtraInfo); +} + +MultiplexedConnections::MultiplexedConnections(ConnectionPools & pools_, const Settings * settings_, ThrottlerPtr throttler_, + bool append_extra_info, bool do_broadcast_) + : settings(settings_), throttler(throttler_), do_broadcast(do_broadcast_) +{ + if (pools_.empty()) + throw Exception("Pools are not specified", ErrorCodes::LOGICAL_ERROR); + + for (auto & pool : pools_) + { + if (pool.isNull()) + throw Exception("Invalid pool specified", ErrorCodes::LOGICAL_ERROR); + initFromShard(pool.get()); + } + + registerShards(); + + supports_parallel_execution = active_connection_total_count > 1; + + if (append_extra_info) + block_extra_info.reset(new BlockExtraInfo); +} + +void MultiplexedConnections::sendExternalTablesData(std::vector & data) +{ + Poco::ScopedLock lock(cancel_mutex); + + if (!sent_query) + throw Exception("Cannot send external tables data: query not yet sent.", ErrorCodes::LOGICAL_ERROR); + + if (data.size() < active_connection_total_count) + throw Exception("Mismatch between replicas and data sources", ErrorCodes::MISMATCH_REPLICAS_DATA_SOURCES); + + auto it = data.begin(); + for (auto & e : replica_map) + { + ReplicaState & state = e.second; + Connection * connection = connections[state.connection_index]; + if (connection != nullptr) + connection->sendExternalTablesData(*it); + ++it; + } +} + +void MultiplexedConnections::sendQuery(const String & query, const String & query_id, UInt64 stage, bool with_pending_data) +{ + Poco::ScopedLock lock(cancel_mutex); + + if (sent_query) + throw Exception("Query already sent.", ErrorCodes::LOGICAL_ERROR); + + if (supports_parallel_execution) + { + if (settings == nullptr) + { + /// Каждый шард имеет один адрес. + auto it = connections.begin(); + for (size_t i = 0; i < shard_states.size(); ++i) + { + Connection * connection = *it; + if (connection == nullptr) + throw Exception("MultiplexedConnections: Internal error", ErrorCodes::LOGICAL_ERROR); + + connection->sendQuery(query, query_id, stage, nullptr, with_pending_data); + ++it; + } + } + else + { + /// Каждый шард имеет одну или несколько реплик. + auto it = connections.begin(); + for (const auto & shard_state : shard_states) + { + Settings query_settings = *settings; + query_settings.parallel_replicas_count = shard_state.active_connection_count; + + UInt64 offset = 0; + + for (size_t i = 0; i < shard_state.allocated_connection_count; ++i) + { + Connection * connection = *it; + if (connection == nullptr) + throw Exception("MultiplexedConnections: Internal error", ErrorCodes::LOGICAL_ERROR); + + query_settings.parallel_replica_offset = offset; + connection->sendQuery(query, query_id, stage, &query_settings, with_pending_data); + ++offset; + ++it; + } + } + } + } + else + { + Connection * connection = connections[0]; + if (connection == nullptr) + throw Exception("MultiplexedConnections: Internal error", ErrorCodes::LOGICAL_ERROR); + + connection->sendQuery(query, query_id, stage, settings, with_pending_data); + } + + sent_query = true; +} + +Connection::Packet MultiplexedConnections::receivePacket() +{ + Poco::ScopedLock lock(cancel_mutex); + const auto & packet = receivePacketUnlocked(); + if (block_extra_info) + { + if (packet.type == Protocol::Server::Data) + current_connection->fillBlockExtraInfo(*block_extra_info); + else + block_extra_info->is_valid = false; + } + return packet; +} + +BlockExtraInfo MultiplexedConnections::getBlockExtraInfo() const +{ + if (!block_extra_info) + throw Exception("MultiplexedConnections object not configured for block extra info support", + ErrorCodes::LOGICAL_ERROR); + return *block_extra_info; +} + +void MultiplexedConnections::disconnect() +{ + Poco::ScopedLock lock(cancel_mutex); + + for (auto it = replica_map.begin(); it != replica_map.end(); ++it) + { + ReplicaState & state = it->second; + Connection * connection = connections[state.connection_index]; + if (connection != nullptr) + { + connection->disconnect(); + invalidateReplica(it); + } + } +} + +void MultiplexedConnections::sendCancel() +{ + Poco::ScopedLock lock(cancel_mutex); + + if (!sent_query || cancelled) + throw Exception("Cannot cancel. Either no query sent or already cancelled.", ErrorCodes::LOGICAL_ERROR); + + for (const auto & e : replica_map) + { + const ReplicaState & state = e.second; + Connection * connection = connections[state.connection_index]; + if (connection != nullptr) + connection->sendCancel(); + } + + cancelled = true; +} + +Connection::Packet MultiplexedConnections::drain() +{ + Poco::ScopedLock lock(cancel_mutex); + + if (!cancelled) + throw Exception("Cannot drain connections: cancel first.", ErrorCodes::LOGICAL_ERROR); + + Connection::Packet res; + res.type = Protocol::Server::EndOfStream; + + while (hasActiveConnections()) + { + Connection::Packet packet = receivePacketUnlocked(); + + switch (packet.type) + { + case Protocol::Server::Data: + case Protocol::Server::Progress: + case Protocol::Server::ProfileInfo: + case Protocol::Server::Totals: + case Protocol::Server::Extremes: + case Protocol::Server::EndOfStream: + break; + + case Protocol::Server::Exception: + default: + /// Если мы получили исключение или неизвестный пакет, сохраняем его. + res = packet; + break; + } + } + + return res; +} + +std::string MultiplexedConnections::dumpAddresses() const +{ + Poco::ScopedLock lock(cancel_mutex); + return dumpAddressesUnlocked(); +} + +std::string MultiplexedConnections::dumpAddressesUnlocked() const +{ + bool is_first = true; + std::ostringstream os; + for (const auto & e : replica_map) + { + const ReplicaState & state = e.second; + const Connection * connection = connections[state.connection_index]; + if (connection != nullptr) + { + os << (is_first ? "" : "; ") << connection->getDescription(); + is_first = false; + } + } + + return os.str(); +} + +void MultiplexedConnections::initFromShard(IConnectionPool * pool) +{ + auto entries = pool->getMany(settings, do_broadcast); + + /// Если getMany() не выделил соединений и не кинул исключения, это значит, что была + /// установлена настройка skip_unavailable_shards. Тогда просто возвращаемся. + if (entries.empty()) + return; + + ShardState shard_state; + shard_state.allocated_connection_count = entries.size(); + shard_state.active_connection_count = entries.size(); + active_connection_total_count += shard_state.active_connection_count; + + shard_states.push_back(shard_state); + + pool_entries.insert(pool_entries.end(), entries.begin(), entries.end()); +} + +void MultiplexedConnections::registerShards() +{ + replica_map.reserve(pool_entries.size()); + connections.reserve(pool_entries.size()); + + size_t offset = 0; + for (auto & shard_state : shard_states) + { + size_t index_begin = offset; + size_t index_end = offset + shard_state.allocated_connection_count; + registerReplicas(index_begin, index_end, shard_state); + offset = index_end; + } +} + +void MultiplexedConnections::registerReplicas(size_t index_begin, size_t index_end, ShardState & shard_state) +{ + for (size_t i = index_begin; i < index_end; ++i) + { + ReplicaState replica_state; + replica_state.connection_index = i; + replica_state.shard_state = &shard_state; + + Connection * connection = &*(pool_entries[i]); + if (connection == nullptr) + throw Exception("MultiplexedConnections: Internal error", ErrorCodes::LOGICAL_ERROR); + + connection->setThrottler(throttler); + connections.push_back(connection); + + auto res = replica_map.emplace(connection->socket.impl()->sockfd(), replica_state); + if (!res.second) + throw Exception("Invalid set of connections", ErrorCodes::LOGICAL_ERROR); + } +} + +Connection::Packet MultiplexedConnections::receivePacketUnlocked() +{ + if (!sent_query) + throw Exception("Cannot receive packets: no query sent.", ErrorCodes::LOGICAL_ERROR); + if (!hasActiveConnections()) + throw Exception("No more packets are available.", ErrorCodes::LOGICAL_ERROR); + + auto it = getReplicaForReading(); + if (it == replica_map.end()) + throw Exception("Logical error: no available replica", ErrorCodes::NO_AVAILABLE_REPLICA); + + ReplicaState & state = it->second; + current_connection = connections[state.connection_index]; + if (current_connection == nullptr) + throw Exception("MultiplexedConnections: Internal error", ErrorCodes::LOGICAL_ERROR); + + Connection::Packet packet = current_connection->receivePacket(); + + switch (packet.type) + { + case Protocol::Server::Data: + case Protocol::Server::Progress: + case Protocol::Server::ProfileInfo: + case Protocol::Server::Totals: + case Protocol::Server::Extremes: + break; + + case Protocol::Server::EndOfStream: + invalidateReplica(it); + break; + + case Protocol::Server::Exception: + default: + current_connection->disconnect(); + invalidateReplica(it); + break; + } + + return packet; +} + +MultiplexedConnections::ReplicaMap::iterator MultiplexedConnections::getReplicaForReading() +{ + ReplicaMap::iterator it; + + if (supports_parallel_execution) + it = waitForReadEvent(); + else + { + it = replica_map.begin(); + const ReplicaState & state = it->second; + Connection * connection = connections[state.connection_index]; + if (connection == nullptr) + it = replica_map.end(); + } + + return it; +} + +MultiplexedConnections::ReplicaMap::iterator MultiplexedConnections::waitForReadEvent() +{ + Poco::Net::Socket::SocketList read_list; + read_list.reserve(active_connection_total_count); + + /// Сначала проверяем, есть ли данные, которые уже лежат в буфере + /// хоть одного соединения. + for (const auto & e : replica_map) + { + const ReplicaState & state = e.second; + Connection * connection = connections[state.connection_index]; + if ((connection != nullptr) && connection->hasReadBufferPendingData()) + read_list.push_back(connection->socket); + } + + /// Если не было найдено никаких данных, то проверяем, есть ли соединения + /// готовые для чтения. + if (read_list.empty()) + { + Poco::Net::Socket::SocketList write_list; + Poco::Net::Socket::SocketList except_list; + + for (const auto & e : replica_map) + { + const ReplicaState & state = e.second; + Connection * connection = connections[state.connection_index]; + if (connection != nullptr) + read_list.push_back(connection->socket); + } + + int n = Poco::Net::Socket::select(read_list, write_list, except_list, settings->receive_timeout); + + if (n == 0) + throw Exception("Timeout exceeded while reading from " + dumpAddressesUnlocked(), ErrorCodes::TIMEOUT_EXCEEDED); + } + + auto & socket = read_list[rand() % read_list.size()]; + return replica_map.find(socket.impl()->sockfd()); +} + +void MultiplexedConnections::invalidateReplica(MultiplexedConnections::ReplicaMap::iterator it) +{ + ReplicaState & state = it->second; + ShardState * shard_state = state.shard_state; + + connections[state.connection_index] = nullptr; + --shard_state->active_connection_count; + --active_connection_total_count; +} + +} diff --git a/dbms/src/Client/ParallelReplicas.cpp b/dbms/src/Client/ParallelReplicas.cpp deleted file mode 100644 index f9d0be0c5c4..00000000000 --- a/dbms/src/Client/ParallelReplicas.cpp +++ /dev/null @@ -1,328 +0,0 @@ -#include - -namespace DB -{ - -ParallelReplicas::ParallelReplicas(Connection * connection_, const Settings * settings_, ThrottlerPtr throttler_) - : settings(settings_), throttler(throttler_), - active_replica_count(1), - supports_parallel_execution(false) -{ - registerReplica(connection_); -} - -ParallelReplicas::ParallelReplicas(IConnectionPool * pool_, const Settings * settings_, ThrottlerPtr throttler_, - bool append_extra_info, bool get_all_replicas) - : settings(settings_), throttler(throttler_) -{ - if (pool_ == nullptr) - throw Exception("Null pool specified", ErrorCodes::LOGICAL_ERROR); - - bool has_many_replicas = get_all_replicas || ((settings != nullptr) && (settings->max_parallel_replicas > 1)); - if (has_many_replicas) - { - pool_entries = pool_->getMany(settings, get_all_replicas); - active_replica_count = pool_entries.size(); - supports_parallel_execution = (active_replica_count > 1); - - if (active_replica_count == 0) - throw Exception("No connection available", ErrorCodes::LOGICAL_ERROR); - - replica_map.reserve(active_replica_count); - for (auto & entry : pool_entries) - registerReplica(&*entry); - } - else - { - active_replica_count = 1; - supports_parallel_execution = false; - - pool_entry = pool_->get(settings); - if (!pool_entry.isNull()) - registerReplica(&*pool_entry); - } - - if (append_extra_info) - block_extra_info.reset(new BlockExtraInfo); -} - -void ParallelReplicas::sendExternalTablesData(std::vector & data) -{ - Poco::ScopedLock lock(cancel_mutex); - - if (!sent_query) - throw Exception("Cannot send external tables data: query not yet sent.", ErrorCodes::LOGICAL_ERROR); - - if (data.size() < active_replica_count) - throw Exception("Mismatch between replicas and data sources", ErrorCodes::MISMATCH_REPLICAS_DATA_SOURCES); - - auto it = data.begin(); - for (auto & e : replica_map) - { - Connection * connection = e.second; - if (connection != nullptr) - connection->sendExternalTablesData(*it); - ++it; - } -} - -void ParallelReplicas::sendQuery(const String & query, const String & query_id, UInt64 stage, bool with_pending_data) -{ - Poco::ScopedLock lock(cancel_mutex); - - if (sent_query) - throw Exception("Query already sent.", ErrorCodes::LOGICAL_ERROR); - - if (supports_parallel_execution) - { - Settings query_settings = *settings; - query_settings.parallel_replicas_count = active_replica_count; - UInt64 offset = 0; - - for (auto & e : replica_map) - { - Connection * connection = e.second; - if (connection != nullptr) - { - query_settings.parallel_replica_offset = offset; - connection->sendQuery(query, query_id, stage, &query_settings, with_pending_data); - ++offset; - } - } - - if (offset > 0) - sent_query = true; - } - else - { - auto it = replica_map.begin(); - Connection * connection = it->second; - if (connection != nullptr) - { - connection->sendQuery(query, query_id, stage, settings, with_pending_data); - sent_query = true; - } - } -} - -Connection::Packet ParallelReplicas::receivePacket() -{ - Poco::ScopedLock lock(cancel_mutex); - const auto & packet = receivePacketUnlocked(); - if (block_extra_info) - { - if (packet.type == Protocol::Server::Data) - current_connection->fillBlockExtraInfo(*block_extra_info); - else - block_extra_info->is_valid = false; - } - return packet; -} - -BlockExtraInfo ParallelReplicas::getBlockExtraInfo() const -{ - if (!block_extra_info) - throw Exception("ParallelReplicas object not configured for block extra info support", - ErrorCodes::LOGICAL_ERROR); - return *block_extra_info; -} - -void ParallelReplicas::disconnect() -{ - Poco::ScopedLock lock(cancel_mutex); - - for (auto it = replica_map.begin(); it != replica_map.end(); ++it) - { - Connection * connection = it->second; - if (connection != nullptr) - { - connection->disconnect(); - invalidateReplica(it); - } - } -} - -void ParallelReplicas::sendCancel() -{ - Poco::ScopedLock lock(cancel_mutex); - - if (!sent_query || cancelled) - throw Exception("Cannot cancel. Either no query sent or already cancelled.", ErrorCodes::LOGICAL_ERROR); - - for (auto & e : replica_map) - { - Connection * connection = e.second; - if (connection != nullptr) - connection->sendCancel(); - } - - cancelled = true; -} - -Connection::Packet ParallelReplicas::drain() -{ - Poco::ScopedLock lock(cancel_mutex); - - if (!cancelled) - throw Exception("Cannot drain connections: cancel first.", ErrorCodes::LOGICAL_ERROR); - - Connection::Packet res; - res.type = Protocol::Server::EndOfStream; - - while (hasActiveReplicas()) - { - Connection::Packet packet = receivePacketUnlocked(); - - switch (packet.type) - { - case Protocol::Server::Data: - case Protocol::Server::Progress: - case Protocol::Server::ProfileInfo: - case Protocol::Server::Totals: - case Protocol::Server::Extremes: - case Protocol::Server::EndOfStream: - break; - - case Protocol::Server::Exception: - default: - /// Если мы получили исключение или неизвестный пакет, сохраняем его. - res = packet; - break; - } - } - - return res; -} - -std::string ParallelReplicas::dumpAddresses() const -{ - Poco::ScopedLock lock(cancel_mutex); - return dumpAddressesUnlocked(); -} - -std::string ParallelReplicas::dumpAddressesUnlocked() const -{ - bool is_first = true; - std::ostringstream os; - for (auto & e : replica_map) - { - const Connection * connection = e.second; - if (connection != nullptr) - { - os << (is_first ? "" : "; ") << connection->getDescription(); - is_first = false; - } - } - - return os.str(); -} - -void ParallelReplicas::registerReplica(Connection * connection) -{ - if (connection == nullptr) - throw Exception("Invalid connection specified in parameter.", ErrorCodes::LOGICAL_ERROR); - auto res = replica_map.insert(std::make_pair(connection->socket.impl()->sockfd(), connection)); - if (!res.second) - throw Exception("Invalid set of connections.", ErrorCodes::LOGICAL_ERROR); - - connection->setThrottler(throttler); -} - -Connection::Packet ParallelReplicas::receivePacketUnlocked() -{ - if (!sent_query) - throw Exception("Cannot receive packets: no query sent.", ErrorCodes::LOGICAL_ERROR); - if (!hasActiveReplicas()) - throw Exception("No more packets are available.", ErrorCodes::LOGICAL_ERROR); - - auto it = getReplicaForReading(); - if (it == replica_map.end()) - throw Exception("Logical error: no available replica", ErrorCodes::NO_AVAILABLE_REPLICA); - - current_connection = it->second; - Connection::Packet packet = current_connection->receivePacket(); - - switch (packet.type) - { - case Protocol::Server::Data: - case Protocol::Server::Progress: - case Protocol::Server::ProfileInfo: - case Protocol::Server::Totals: - case Protocol::Server::Extremes: - break; - - case Protocol::Server::EndOfStream: - invalidateReplica(it); - break; - - case Protocol::Server::Exception: - default: - current_connection->disconnect(); - invalidateReplica(it); - break; - } - - return packet; -} - -ParallelReplicas::ReplicaMap::iterator ParallelReplicas::getReplicaForReading() -{ - ReplicaMap::iterator it; - - if (supports_parallel_execution) - it = waitForReadEvent(); - else - { - it = replica_map.begin(); - if (it->second == nullptr) - it = replica_map.end(); - } - - return it; -} - -ParallelReplicas::ReplicaMap::iterator ParallelReplicas::waitForReadEvent() -{ - Poco::Net::Socket::SocketList read_list; - read_list.reserve(active_replica_count); - - /// Сначала проверяем, есть ли данные, которые уже лежат в буфере - /// хоть одного соединения. - for (auto & e : replica_map) - { - Connection * connection = e.second; - if ((connection != nullptr) && connection->hasReadBufferPendingData()) - read_list.push_back(connection->socket); - } - - /// Если не было найдено никаких данных, то проверяем, есть ли соединения - /// готовые для чтения. - if (read_list.empty()) - { - Poco::Net::Socket::SocketList write_list; - Poco::Net::Socket::SocketList except_list; - - for (auto & e : replica_map) - { - Connection * connection = e.second; - if (connection != nullptr) - read_list.push_back(connection->socket); - } - - int n = Poco::Net::Socket::select(read_list, write_list, except_list, settings->receive_timeout); - - if (n == 0) - throw Exception("Timeout exceeded while reading from " + dumpAddressesUnlocked(), ErrorCodes::TIMEOUT_EXCEEDED); - } - - auto & socket = read_list[rand() % read_list.size()]; - return replica_map.find(socket.impl()->sockfd()); -} - -void ParallelReplicas::invalidateReplica(ParallelReplicas::ReplicaMap::iterator it) -{ - it->second = nullptr; - --active_replica_count; -} - -} diff --git a/dbms/src/DataStreams/RemoteBlockInputStream.cpp b/dbms/src/DataStreams/RemoteBlockInputStream.cpp new file mode 100644 index 00000000000..0ae282c5cca --- /dev/null +++ b/dbms/src/DataStreams/RemoteBlockInputStream.cpp @@ -0,0 +1,290 @@ +#include +#include +#include + +namespace DB +{ + +RemoteBlockInputStream::RemoteBlockInputStream(Connection & connection_, const String & query_, + const Settings * settings_, ThrottlerPtr throttler_, const Tables & external_tables_, + QueryProcessingStage::Enum stage_, const Context & context_) + : connection(&connection_), query(query_), throttler(throttler_), external_tables(external_tables_), + stage(stage_), context(context_) +{ + init(settings_); +} + +RemoteBlockInputStream::RemoteBlockInputStream(ConnectionPool::Entry & pool_entry_, const String & query_, + const Settings * settings_, ThrottlerPtr throttler_, const Tables & external_tables_, + QueryProcessingStage::Enum stage_, const Context & context_) + : pool_entry(pool_entry_), connection(&*pool_entry_), query(query_), throttler(throttler_), + external_tables(external_tables_), stage(stage_), context(context_) +{ + init(settings_); +} + +RemoteBlockInputStream::RemoteBlockInputStream(IConnectionPool * pool_, const String & query_, + const Settings * settings_, ThrottlerPtr throttler_, const Tables & external_tables_, + QueryProcessingStage::Enum stage_, const Context & context_) + : pool(pool_), query(query_), throttler(throttler_), external_tables(external_tables_), + stage(stage_), context(context_) +{ + init(settings_); +} + +RemoteBlockInputStream::RemoteBlockInputStream(ConnectionPoolsPtr & pools_, const String & query_, + const Settings * settings_, ThrottlerPtr throttler_, const Tables & external_tables_, + QueryProcessingStage::Enum stage_, const Context & context_) + : pools(pools_), query(query_), throttler(throttler_), external_tables(external_tables_), + stage(stage_), context(context_) +{ + init(settings_); +} + +RemoteBlockInputStream::~RemoteBlockInputStream() +{ + /** Если прервались в середине цикла общения с репликами, то прервываем + * все соединения, затем читаем и пропускаем оставшиеся пакеты чтобы + * эти соединения не остались висеть в рассихронизированном состоянии. + */ + if (established || isQueryPending()) + multiplexed_connections->disconnect(); +} + +void RemoteBlockInputStream::doBroadcast() +{ + do_broadcast = true; +} + +void RemoteBlockInputStream::appendExtraInfo() +{ + append_extra_info = true; +} + +void RemoteBlockInputStream::readPrefix() +{ + if (!sent_query) + sendQuery(); +} + +void RemoteBlockInputStream::cancel() +{ + bool old_val = false; + if (!is_cancelled.compare_exchange_strong(old_val, true, std::memory_order_seq_cst, std::memory_order_relaxed)) + return; + + { + std::lock_guard lock(external_tables_mutex); + + /// Останавливаем отправку внешних данных. + for (auto & vec : external_tables_data) + for (auto & elem : vec) + if (IProfilingBlockInputStream * stream = dynamic_cast(elem.first.get())) + stream->cancel(); + } + + if (!isQueryPending() || hasThrownException()) + return; + + tryCancel("Cancelling query"); +} + +void RemoteBlockInputStream::sendExternalTables() +{ + size_t count = multiplexed_connections->size(); + + { + std::lock_guard lock(external_tables_mutex); + + external_tables_data.reserve(count); + + for (size_t i = 0; i < count; ++i) + { + ExternalTablesData res; + for (const auto & table : external_tables) + { + StoragePtr cur = table.second; + QueryProcessingStage::Enum stage = QueryProcessingStage::Complete; + DB::BlockInputStreams input = cur->read(cur->getColumnNamesList(), ASTPtr(), context, settings, + stage, DEFAULT_BLOCK_SIZE, 1); + if (input.size() == 0) + res.push_back(std::make_pair(new OneBlockInputStream(cur->getSampleBlock()), table.first)); + else + res.push_back(std::make_pair(input[0], table.first)); + } + external_tables_data.push_back(std::move(res)); + } + } + + multiplexed_connections->sendExternalTablesData(external_tables_data); +} + +Block RemoteBlockInputStream::readImpl() +{ + if (!sent_query) + { + sendQuery(); + + if (settings.skip_unavailable_shards && (0 == multiplexed_connections->size())) + return {}; + } + + while (true) + { + if (isCancelled()) + return Block(); + + Connection::Packet packet = multiplexed_connections->receivePacket(); + + switch (packet.type) + { + case Protocol::Server::Data: + /// Если блок не пуст и не является заголовочным блоком + if (packet.block && (packet.block.rows() > 0)) + return packet.block; + break; /// Если блок пуст - получим другие пакеты до EndOfStream. + + case Protocol::Server::Exception: + got_exception_from_replica = true; + packet.exception->rethrow(); + break; + + case Protocol::Server::EndOfStream: + if (!multiplexed_connections->hasActiveConnections()) + { + finished = true; + return Block(); + } + break; + + case Protocol::Server::Progress: + /** Используем прогресс с удалённого сервера. + * В том числе, запишем его в ProcessList, + * и будем использовать его для проверки + * ограничений (например, минимальная скорость выполнения запроса) + * и квот (например, на количество строчек для чтения). + */ + progressImpl(packet.progress); + break; + + case Protocol::Server::ProfileInfo: + info = packet.profile_info; + break; + + case Protocol::Server::Totals: + totals = packet.block; + break; + + case Protocol::Server::Extremes: + extremes = packet.block; + break; + + default: + got_unknown_packet_from_replica = true; + throw Exception("Unknown packet from server", ErrorCodes::UNKNOWN_PACKET_FROM_SERVER); + } + } +} + +void RemoteBlockInputStream::readSuffixImpl() +{ + /** Если одно из: + * - ничего не начинали делать; + * - получили все пакеты до EndOfStream; + * - получили с одной реплики эксепшен; + * - получили с одной реплики неизвестный пакет; + * то больше читать ничего не нужно. + */ + if (!isQueryPending() || hasThrownException()) + return; + + /** Если ещё прочитали не все данные, но они больше не нужны. + * Это может быть из-за того, что данных достаточно (например, при использовании LIMIT). + */ + + /// Отправим просьбу прервать выполнение запроса, если ещё не отправляли. + tryCancel("Cancelling query because enough data has been read"); + + /// Получим оставшиеся пакеты, чтобы не было рассинхронизации в соединениях с репликами. + Connection::Packet packet = multiplexed_connections->drain(); + switch (packet.type) + { + case Protocol::Server::EndOfStream: + finished = true; + break; + + case Protocol::Server::Exception: + got_exception_from_replica = true; + packet.exception->rethrow(); + break; + + default: + got_unknown_packet_from_replica = true; + throw Exception("Unknown packet from server", ErrorCodes::UNKNOWN_PACKET_FROM_SERVER); + } +} + +void RemoteBlockInputStream::createMultiplexedConnections() +{ + Settings * multiplexed_connections_settings = send_settings ? &settings : nullptr; + if (connection != nullptr) + multiplexed_connections = std::make_unique(connection, multiplexed_connections_settings, throttler); + else if (pool != nullptr) + multiplexed_connections = std::make_unique(pool, multiplexed_connections_settings, throttler, + append_extra_info, do_broadcast); + else if (!pools.isNull()) + multiplexed_connections = std::make_unique(*pools, multiplexed_connections_settings, throttler, + append_extra_info, do_broadcast); + else + throw Exception("Internal error", ErrorCodes::LOGICAL_ERROR); +} + +void RemoteBlockInputStream::init(const Settings * settings_) +{ + if (settings_) + { + send_settings = true; + settings = *settings_; + } + else + send_settings = false; +} + +void RemoteBlockInputStream::sendQuery() +{ + createMultiplexedConnections(); + + if (settings.skip_unavailable_shards && 0 == multiplexed_connections->size()) + return; + + established = true; + + multiplexed_connections->sendQuery(query, "", stage, true); + + established = false; + sent_query = true; + + sendExternalTables(); +} + +void RemoteBlockInputStream::tryCancel(const char * reason) +{ + bool old_val = false; + if (!was_cancelled.compare_exchange_strong(old_val, true, std::memory_order_seq_cst, std::memory_order_relaxed)) + return; + + LOG_TRACE(log, "(" << multiplexed_connections->dumpAddresses() << ") " << reason); + multiplexed_connections->sendCancel(); +} + +bool RemoteBlockInputStream::isQueryPending() const +{ + return sent_query && !finished; +} + +bool RemoteBlockInputStream::hasThrownException() const +{ + return got_exception_from_replica || got_unknown_packet_from_replica; +} + +} diff --git a/dbms/src/DataTypes/DataTypeFactory.cpp b/dbms/src/DataTypes/DataTypeFactory.cpp index afb7c289317..47ddfedbbf4 100644 --- a/dbms/src/DataTypes/DataTypeFactory.cpp +++ b/dbms/src/DataTypes/DataTypeFactory.cpp @@ -69,7 +69,7 @@ DataTypePtr DataTypeFactory::get(const String & name) const DataTypes argument_types; Array params_row; - ParserExpressionList args_parser; + ParserExpressionList args_parser(false); ASTPtr args_ast = parseQuery(args_parser, parameters.data(), parameters.data() + parameters.size(), "parameters for data type " + name); ASTExpressionList & args_list = typeid_cast(*args_ast); @@ -136,7 +136,7 @@ DataTypePtr DataTypeFactory::get(const String & name) const if (base_name == "Tuple") { - ParserExpressionList columns_p; + ParserExpressionList columns_p(false); ASTPtr columns_ast = parseQuery(columns_p, parameters.data(), parameters.data() + parameters.size(), "parameters for data type " + name); DataTypes elems; diff --git a/dbms/src/Functions/FunctionsDictionaries.cpp b/dbms/src/Functions/FunctionsDictionaries.cpp index bfcf6b44ff2..a18a1e65b01 100644 --- a/dbms/src/Functions/FunctionsDictionaries.cpp +++ b/dbms/src/Functions/FunctionsDictionaries.cpp @@ -36,6 +36,18 @@ void registerFunctionsDictionaries(FunctionFactory & factory) factory.registerFunction(); factory.registerFunction(); factory.registerFunction(); + factory.registerFunction(); + factory.registerFunction(); + factory.registerFunction(); + factory.registerFunction(); + factory.registerFunction(); + factory.registerFunction(); + factory.registerFunction(); + factory.registerFunction(); + factory.registerFunction(); + factory.registerFunction(); + factory.registerFunction(); + factory.registerFunction(); factory.registerFunction(); } diff --git a/dbms/src/Interpreters/Aggregator.cpp b/dbms/src/Interpreters/Aggregator.cpp index 096f1c92151..30343df15a8 100644 --- a/dbms/src/Interpreters/Aggregator.cpp +++ b/dbms/src/Interpreters/Aggregator.cpp @@ -929,7 +929,7 @@ Block Aggregator::prepareBlockAndFill( } -BlocksList Aggregator::prepareBlocksAndFillWithoutKey(AggregatedDataVariants & data_variants, bool final) const +BlocksList Aggregator::prepareBlocksAndFillWithoutKey(AggregatedDataVariants & data_variants, bool final, bool is_overflows) const { size_t rows = 1; @@ -957,7 +957,8 @@ BlocksList Aggregator::prepareBlocksAndFillWithoutKey(AggregatedDataVariants & d }; Block block = prepareBlockAndFill(data_variants, final, rows, filler); - if (overflow_row) + + if (is_overflows) block.info.is_overflows = true; BlocksList blocks; @@ -1143,7 +1144,8 @@ BlocksList Aggregator::convertToBlocks(AggregatedDataVariants & data_variants, b return BlocksList(); if (data_variants.type == AggregatedDataVariants::Type::without_key || overflow_row) - blocks.splice(blocks.end(), prepareBlocksAndFillWithoutKey(data_variants, final)); + blocks.splice(blocks.end(), prepareBlocksAndFillWithoutKey( + data_variants, final, data_variants.type != AggregatedDataVariants::Type::without_key)); if (isCancelled()) return BlocksList(); diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.cpp b/dbms/src/Interpreters/InterpreterSelectQuery.cpp index fcbc3e4cb67..468cbbd605b 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectQuery.cpp @@ -125,11 +125,6 @@ void InterpreterSelectQuery::basicInit(BlockInputStreamPtr input_) storage = context.getTable(database_name, table_name); } - if (!storage->supportsParallelReplicas() && (settings.parallel_replicas_count > 0)) - throw Exception("Storage engine " + storage->getName() - + " does not support parallel execution on several replicas", - ErrorCodes::STORAGE_DOESNT_SUPPORT_PARALLEL_REPLICAS); - table_lock = storage->lockStructure(false); if (table_column_names.empty()) table_column_names = storage->getColumnsListNonMaterialized(); diff --git a/dbms/src/Interpreters/tests/expression_analyzer.cpp b/dbms/src/Interpreters/tests/expression_analyzer.cpp index 1cfe359328d..b9520cb5b57 100644 --- a/dbms/src/Interpreters/tests/expression_analyzer.cpp +++ b/dbms/src/Interpreters/tests/expression_analyzer.cpp @@ -31,7 +31,7 @@ int main(int argc, char ** argv) } ASTPtr root; - ParserPtr parsers[] = {ParserPtr(new ParserSelectQuery), ParserPtr(new ParserExpressionList)}; + ParserPtr parsers[] = {ParserPtr(new ParserSelectQuery), ParserPtr(new ParserExpressionList(false))}; for (size_t i = 0; i < sizeof(parsers)/sizeof(parsers[0]); ++i) { IParser & parser = *parsers[i]; diff --git a/dbms/src/Interpreters/tests/select_query.cpp b/dbms/src/Interpreters/tests/select_query.cpp index 10c67867bd3..90af32517b0 100644 --- a/dbms/src/Interpreters/tests/select_query.cpp +++ b/dbms/src/Interpreters/tests/select_query.cpp @@ -46,7 +46,7 @@ int main(int argc, char ** argv) DB::WriteBufferFromOStream out(std::cout); DB::BlockInputStreamPtr query_plan; - DB::executeQuery(in, out, context, query_plan); + DB::executeQuery(in, out, context, query_plan, {}); if (query_plan) { diff --git a/dbms/src/Parsers/ASTIdentifier.cpp b/dbms/src/Parsers/ASTIdentifier.cpp new file mode 100644 index 00000000000..6d5eb27503d --- /dev/null +++ b/dbms/src/Parsers/ASTIdentifier.cpp @@ -0,0 +1,38 @@ +#include + + +namespace DB +{ + +void ASTIdentifier::formatImplWithoutAlias(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const +{ + auto format_element = [&](const String & name) + { + settings.ostr << (settings.hilite ? hilite_identifier : ""); + + WriteBufferFromOStream wb(settings.ostr, 32); + writeProbablyBackQuotedString(name, wb); + wb.next(); + + settings.ostr << (settings.hilite ? hilite_none : ""); + }; + + /// Простой или составной идентификатор? + + if (children.size() > 1) + { + for (size_t i = 0, size = children.size(); i < size; ++i) + { + if (i != 0) + settings.ostr << '.'; + + format_element(static_cast(*children[i].get()).name); + } + } + else + { + format_element(name); + } +} + +} diff --git a/dbms/src/Parsers/ExpressionElementParsers.cpp b/dbms/src/Parsers/ExpressionElementParsers.cpp index e58849423e9..8b76d394804 100644 --- a/dbms/src/Parsers/ExpressionElementParsers.cpp +++ b/dbms/src/Parsers/ExpressionElementParsers.cpp @@ -18,6 +18,7 @@ #include #include +#include namespace DB @@ -29,7 +30,7 @@ bool ParserArray::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_parsed_ Pos begin = pos; ASTPtr contents_node; ParserString open("["), close("]"); - ParserExpressionList contents; + ParserExpressionList contents(false); ParserWhiteSpaceOrComments ws; if (!open.ignore(pos, end, max_parsed_pos, expected)) @@ -58,7 +59,7 @@ bool ParserParenthesisExpression::parseImpl(Pos & pos, Pos end, ASTPtr & node, P Pos begin = pos; ASTPtr contents_node; ParserString open("("), close(")"); - ParserExpressionList contents; + ParserExpressionList contents(false); ParserWhiteSpaceOrComments ws; if (!open.ignore(pos, end, max_parsed_pos, expected)) @@ -165,44 +166,27 @@ bool ParserCompoundIdentifier::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos { Pos begin = pos; - /// Идентификатор в обратных кавычках - if (pos != end && *pos == '`') - { - ReadBuffer buf(const_cast(pos), end - pos, 0); - String s; - readBackQuotedString(s, buf); - pos += buf.count(); - node = new ASTIdentifier(StringRange(begin, pos), s); - return true; - } - else - { - while (pos != end) - { - while (pos != end - && ((*pos >= 'a' && *pos <= 'z') - || (*pos >= 'A' && *pos <= 'Z') - || (*pos == '_') - || (pos != begin && *pos >= '0' && *pos <= '9'))) - ++pos; + ASTPtr id_list; + if (!ParserList(ParserPtr(new ParserIdentifier), ParserPtr(new ParserString(".")), false) + .parse(pos, end, id_list, max_parsed_pos, expected)) + return false; - /// Если следующий символ - точка '.' и за ней следует, не цифра, - /// то продолжаем парсинг имени идентификатора - if (pos != begin && pos + 1 < end && *pos == '.' && - !(*(pos + 1) >= '0' && *(pos + 1) <= '9')) - ++pos; - else - break; - } - - if (pos != begin) - { - node = new ASTIdentifier(StringRange(begin, pos), String(begin, pos - begin)); - return true; - } - else - return false; + String name; + const ASTExpressionList & list = static_cast(*id_list.get()); + for (const auto & child : list.children) + { + if (!name.empty()) + name += '.'; + name += static_cast(*child.get()).name; } + + node = new ASTIdentifier(StringRange(begin, pos), name); + + /// В children запомним идентификаторы-составляющие, если их больше одного. + if (list.children.size() > 1) + node->children.insert(node->children.end(), list.children.begin(), list.children.end()); + + return true; } @@ -212,7 +196,7 @@ bool ParserFunction::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_pars ParserIdentifier id_parser; ParserString open("("), close(")"); - ParserExpressionList contents; + ParserExpressionList contents(false); ParserWhiteSpaceOrComments ws; ASTPtr identifier; @@ -357,6 +341,29 @@ bool ParserNumber::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_parsed } +bool ParserUnsignedInteger::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_parsed_pos, Expected & expected) +{ + Field res; + + Pos begin = pos; + if (pos == end) + return false; + + UInt64 x = 0; + ReadBuffer in(const_cast(pos), end - pos, 0); + if (!tryReadIntText(x, in) || in.offset() == 0) + { + expected = "unsigned integer"; + return false; + } + + res = x; + pos += in.offset(); + node = new ASTLiteral(StringRange(begin, pos), res); + return true; +} + + bool ParserStringLiteral::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_parsed_pos, Expected & expected) { Pos begin = pos; @@ -473,18 +480,48 @@ bool ParserLiteral::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_parse if (str_p.parse(pos, end, node, max_parsed_pos, expected)) return true; - expected = "literal: one of nullptr, number, single quoted string"; + expected = "literal: one of NULL, number, single quoted string"; return false; } +const char * ParserAlias::restricted_keywords[] = +{ + "FROM", + "FINAL", + "SAMPLE", + "ARRAY", + "LEFT", + "RIGHT", + "INNER", + "FULL", + "CROSS", + "JOIN", + "ANY", + "ALL", + "ON", + "USING", + "PREWHERE", + "WHERE", + "GROUP", + "WITH", + "HAVING", + "ORDER", + "LIMIT", + "SETTINGS", + "FORMAT", + "UNION", + nullptr +}; + bool ParserAlias::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_parsed_pos, Expected & expected) { ParserWhiteSpaceOrComments ws; ParserString s_as("AS", true, true); ParserIdentifier id_p; - if (!s_as.parse(pos, end, node, max_parsed_pos, expected)) + bool has_as_word = s_as.parse(pos, end, node, max_parsed_pos, expected); + if (!allow_alias_without_as_keyword && !has_as_word) return false; ws.ignore(pos, end); @@ -492,6 +529,20 @@ bool ParserAlias::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_parsed_ if (!id_p.parse(pos, end, node, max_parsed_pos, expected)) return false; + if (!has_as_word) + { + /** В этом случае алиас не может совпадать с ключевым словом - для того, + * чтобы в запросе "SELECT x FROM t", слово FROM не считалось алиасом, + * а в запросе "SELECT x FRO FROM t", слово FRO считалось алиасом. + */ + + const String & name = static_cast(*node.get()).name; + + for (const char ** keyword = restricted_keywords; *keyword != nullptr; ++keyword) + if (0 == strcasecmp(name.data(), *keyword)) + return false; + } + return true; } @@ -544,7 +595,7 @@ bool ParserExpressionElement::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & bool ParserWithOptionalAlias::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_parsed_pos, Expected & expected) { ParserWhiteSpaceOrComments ws; - ParserAlias alias_p; + ParserAlias alias_p(allow_alias_without_as_keyword); if (!elem_parser->parse(pos, end, node, max_parsed_pos, expected)) return false; @@ -574,7 +625,7 @@ bool ParserOrderByElement::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & ma Pos begin = pos; ParserWhiteSpaceOrComments ws; - ParserExpressionWithOptionalAlias elem_p; + ParserExpressionWithOptionalAlias elem_p(false); ParserString ascending("ASCENDING", true, true); ParserString descending("DESCENDING", true, true); ParserString asc("ASC", true, true); diff --git a/dbms/src/Parsers/ExpressionListParsers.cpp b/dbms/src/Parsers/ExpressionListParsers.cpp index 0bbcd94b3f0..35143e8697a 100644 --- a/dbms/src/Parsers/ExpressionListParsers.cpp +++ b/dbms/src/Parsers/ExpressionListParsers.cpp @@ -58,10 +58,15 @@ const char * ParserLogicalNotExpression::operators[] = nullptr }; -const char * ParserAccessExpression::operators[] = +const char * ParserArrayElementExpression::operators[] = +{ + "[", "arrayElement", + nullptr +}; + +const char * ParserTupleElementExpression::operators[] = { ".", "tupleElement", - "[", "arrayElement", nullptr }; @@ -460,25 +465,35 @@ bool ParserUnaryMinusExpression::parseImpl(Pos & pos, Pos end, ASTPtr & node, Po } -bool ParserAccessExpression::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_parsed_pos, Expected &expected) +bool ParserArrayElementExpression::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_parsed_pos, Expected &expected) { return ParserLeftAssociativeBinaryOperatorList{ operators, ParserPtr(new ParserExpressionElement), - ParserPtr(new ParserExpressionWithOptionalAlias) + ParserPtr(new ParserExpressionWithOptionalAlias(false)) }.parse(pos, end, node, max_parsed_pos, expected); } -ParserExpressionWithOptionalAlias::ParserExpressionWithOptionalAlias() - : impl(new ParserWithOptionalAlias(ParserPtr(new ParserLambdaExpression))) +bool ParserTupleElementExpression::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_parsed_pos, Expected &expected) +{ + return ParserLeftAssociativeBinaryOperatorList{ + operators, + ParserPtr(new ParserArrayElementExpression), + ParserPtr(new ParserUnsignedInteger) + }.parse(pos, end, node, max_parsed_pos, expected); +} + + +ParserExpressionWithOptionalAlias::ParserExpressionWithOptionalAlias(bool allow_alias_without_as_keyword) + : impl(new ParserWithOptionalAlias(ParserPtr(new ParserLambdaExpression), allow_alias_without_as_keyword)) { } bool ParserExpressionList::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_parsed_pos, Expected & expected) { - return ParserList(ParserPtr(new ParserExpressionWithOptionalAlias), ParserPtr(new ParserString(","))).parse(pos, end, node, max_parsed_pos, expected); + return ParserList(ParserPtr(new ParserExpressionWithOptionalAlias(allow_alias_without_as_keyword)), ParserPtr(new ParserString(","))).parse(pos, end, node, max_parsed_pos, expected); } diff --git a/dbms/src/Parsers/ParserJoin.cpp b/dbms/src/Parsers/ParserJoin.cpp index 3c55cf82f0b..ecf2df3a8ee 100644 --- a/dbms/src/Parsers/ParserJoin.cpp +++ b/dbms/src/Parsers/ParserJoin.cpp @@ -28,9 +28,11 @@ bool ParserJoin::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_parsed_p ParserString s_outer("OUTER", true, true); ParserString s_join("JOIN", true, true); ParserString s_using("USING", true, true); + ParserString s_on("ON", true, true); - ParserNotEmptyExpressionList exp_list; - ParserWithOptionalAlias subquery(ParserPtr(new ParserSubquery)); + ParserNotEmptyExpressionList exp_list(false); + ParserLogicalOrExpression exp_elem; + ParserWithOptionalAlias subquery(ParserPtr(new ParserSubquery), true); ParserIdentifier identifier; ws.ignore(pos, end); @@ -93,15 +95,41 @@ bool ParserJoin::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_parsed_p if (join->kind != ASTJoin::Cross) { - if (!s_using.ignore(pos, end, max_parsed_pos, expected)) + if (s_using.ignore(pos, end, max_parsed_pos, expected)) + { + ws.ignore(pos, end); + + /// Выражение для USING можно указать как в скобках, так и без них. + bool in_parens = ParserString("(").ignore(pos, end); + if (in_parens) + ws.ignore(pos, end); + + if (!exp_list.parse(pos, end, join->using_expr_list, max_parsed_pos, expected)) + return false; + + if (in_parens) + { + ws.ignore(pos, end); + if (!ParserString(")").ignore(pos, end)) + return false; + } + + ws.ignore(pos, end); + } + else if (s_on.ignore(pos, end, max_parsed_pos, expected)) + { + ws.ignore(pos, end); + + if (!exp_elem.parse(pos, end, join->on_expr, max_parsed_pos, expected)) + return false; + + ws.ignore(pos, end); + } + else + { + expected = "USING or ON"; return false; - - ws.ignore(pos, end); - - if (!exp_list.parse(pos, end, join->using_expr_list, max_parsed_pos, expected)) - return false; - - ws.ignore(pos, end); + } } join->children.push_back(join->table); diff --git a/dbms/src/Parsers/ParserSelectQuery.cpp b/dbms/src/Parsers/ParserSelectQuery.cpp index 393687db555..57349298d08 100644 --- a/dbms/src/Parsers/ParserSelectQuery.cpp +++ b/dbms/src/Parsers/ParserSelectQuery.cpp @@ -41,8 +41,9 @@ bool ParserSelectQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_p ParserString s_union("UNION", true, true); ParserString s_all("ALL", true, true); - ParserNotEmptyExpressionList exp_list; - ParserExpressionWithOptionalAlias exp_elem; + ParserNotEmptyExpressionList exp_list(false); + ParserNotEmptyExpressionList exp_list_for_select_clause(true); /// Разрешает алиасы без слова AS. + ParserExpressionWithOptionalAlias exp_elem(false); ParserJoin join; ParserOrderByExpressionList order_list; @@ -61,7 +62,7 @@ bool ParserSelectQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_p ws.ignore(pos, end); } - if (!exp_list.parse(pos, end, select_query->select_expression_list, max_parsed_pos, expected)) + if (!exp_list_for_select_clause.parse(pos, end, select_query->select_expression_list, max_parsed_pos, expected)) return false; ws.ignore(pos, end); @@ -112,6 +113,9 @@ bool ParserSelectQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_p if (s_dot.ignore(pos, end, max_parsed_pos, expected)) { select_query->database = select_query->table; + + ws.ignore(pos, end); + if (!ident.parse(pos, end, select_query->table, max_parsed_pos, expected)) return false; @@ -127,7 +131,7 @@ bool ParserSelectQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_p return false; /// Может быть указан алиас. На данный момент, он ничего не значит и не используется. - ParserAlias().ignore(pos, end); + ParserAlias(true).ignore(pos, end); ws.ignore(pos, end); } diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp index b91cff233da..ef810c73f49 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp @@ -104,6 +104,7 @@ void ReplicatedMergeTreeRestartingThread::run() time_t new_absolute_delay = 0; time_t new_relative_delay = 0; + /// TODO Ловить здесь исключение. checkReplicationDelays(new_absolute_delay, new_relative_delay); absolute_delay.store(new_absolute_delay, std::memory_order_relaxed); @@ -169,10 +170,6 @@ bool ReplicatedMergeTreeRestartingThread::tryStartup() storage.shutdown_called = false; storage.shutdown_event.reset(); - storage.merger.uncancelAll(); - if (storage.unreplicated_merger) - storage.unreplicated_merger->uncancelAll(); - storage.queue_updating_thread = std::thread(&StorageReplicatedMergeTree::queueUpdatingThread, &storage); storage.cleanup_thread.reset(new ReplicatedMergeTreeCleanupThread(storage)); storage.alter_thread = std::thread(&StorageReplicatedMergeTree::alterThread, &storage); @@ -181,6 +178,10 @@ bool ReplicatedMergeTreeRestartingThread::tryStartup() std::bind(&StorageReplicatedMergeTree::queueTask, &storage, std::placeholders::_1)); storage.queue_task_handle->wake(); + storage.merger.uncancel(); + if (storage.unreplicated_merger) + storage.unreplicated_merger->uncancel(); + return true; } catch (...) @@ -328,9 +329,9 @@ void ReplicatedMergeTreeRestartingThread::partialShutdown() storage.parts_to_check_event.set(); storage.replica_is_active_node = nullptr; - storage.merger.cancelAll(); + storage.merger.cancel(); if (storage.unreplicated_merger) - storage.unreplicated_merger->cancelAll(); + storage.unreplicated_merger->cancel(); LOG_TRACE(log, "Waiting for threads to finish"); if (storage.is_leader_node) @@ -366,140 +367,14 @@ void ReplicatedMergeTreeRestartingThread::goReadOnlyPermanently() } -static time_t extractTimeOfLogEntryIfGetPart(zkutil::ZooKeeperPtr & zookeeper, const String & name, const String & path) -{ - String content; - zkutil::Stat stat; - if (!zookeeper->tryGet(path + "/" + name, content, &stat)) - return 0; /// Узел уже успел удалиться. - - ReplicatedMergeTreeLogEntry entry; - entry.parse(content, stat); - - if (entry.type == ReplicatedMergeTreeLogEntry::GET_PART) - return entry.create_time; - - return 0; -} - -/// В массиве имён узлов - элементов очереди/лога находит первый, имеющий тип GET_PART и возвращает его время; либо ноль, если не нашёл. -static time_t findFirstGetPartEntry(zkutil::ZooKeeperPtr & zookeeper, const Strings & nodes, const String & path) -{ - for (const auto & name : nodes) - { - time_t res = extractTimeOfLogEntryIfGetPart(zookeeper, name, path); - if (res) - return res; - } - - return 0; -} - - void ReplicatedMergeTreeRestartingThread::checkReplicationDelays(time_t & out_absolute_delay, time_t & out_relative_delay) { - /** Нужно получить следующую информацию: - * 1. Время последней записи типа GET в логе. - * 2. Время первой записи типа GET в очереди каждой активной реплики - * (или в логе, после log_pointer реплики - то есть, среди записей, ещё не попавших в очередь реплики). - * - * Разница между этими величинами называется (абсолютным) отставанием реплик. - * Кроме абсолютного отставания также будем рассматривать относительное - от реплики с минимальным отставанием. - * - * Если относительное отставание текущей реплики больше некоторого порога, - * и текущая реплика является лидером, то текущая реплика должна уступить лидерство. - * - * Также в случае превышения абсолютного либо относительного отставания некоторого порога, необходимо: - * - не отвечать Ok на некоторую ручку проверки реплик для балансировщика; - * - не принимать соединения для обработки запросов. - * Это делается в других местах. - * - * NOTE Реализация громоздкая, так как нужные значения вынимаются путём обхода списка узлов. - * Могут быть проблемы в случае разрастания лога до большого размера. - */ - out_absolute_delay = 0; out_relative_delay = 0; auto zookeeper = storage.getZooKeeper(); - /// Последняя запись GET в логе. - String log_path = storage.zookeeper_path + "/log"; - Strings log_entries_desc = zookeeper->getChildren(log_path); - std::sort(log_entries_desc.begin(), log_entries_desc.end(), std::greater()); - time_t last_entry_to_get_part = findFirstGetPartEntry(zookeeper, log_entries_desc, log_path); - - /** Возможно, что в логе нет записей типа GET. Тогда считаем, что никто не отстаёт. - * В очередях у реплик могут быть не выполненные старые записи типа GET, - * которые туда добавлены не из лога, а для восстановления битых кусков. - * Не будем считать это отставанием. - */ - - if (!last_entry_to_get_part) - return; - - /// Для каждой активной реплики время первой невыполненной записи типа GET, либо ноль, если таких записей нет. - std::map replicas_first_entry_to_get_part; - - Strings active_replicas = zookeeper->getChildren(storage.zookeeper_path + "/leader_election"); - for (const auto & node : active_replicas) - { - String replica; - if (!zookeeper->tryGet(storage.zookeeper_path + "/leader_election/" + node, replica)) - continue; /// Реплика только что перестала быть активной. - - String queue_path = storage.zookeeper_path + "/replicas/" + replica + "/queue"; - Strings queue_entries = zookeeper->getChildren(queue_path); - std::sort(queue_entries.begin(), queue_entries.end()); - time_t & first_time = replicas_first_entry_to_get_part[replica]; - first_time = findFirstGetPartEntry(zookeeper, queue_entries, queue_path); - - if (!first_time) - { - /// Ищем среди записей лога после log_pointer для реплики. - String log_pointer = zookeeper->get(storage.zookeeper_path + "/replicas/" + replica + "/log_pointer"); - String log_min_entry = "log-" + storage.padIndex(parse(log_pointer)); - - for (const auto & name : log_entries_desc) - { - if (name < log_min_entry) - break; - - first_time = extractTimeOfLogEntryIfGetPart(zookeeper, name, log_path); - if (first_time) - break; - } - } - } - - if (active_replicas.empty()) - { - /// Нет активных реплик. Очень необычная ситуация - как же тогда у нас была сессия с ZK, чтобы это выяснить? - /// Предполагаем, что всё же может быть потенциальный race condition при установке эфемерной ноды для leader election, а значит, это нормально. - LOG_ERROR(log, "No active replicas when checking replication delays: very strange."); - return; - } - - time_t first_entry_of_most_recent_replica = -1; - for (const auto & replica_time : replicas_first_entry_to_get_part) - { - if (0 == replica_time.second) - { - /// Есть реплика, которая совсем не отстаёт. - first_entry_of_most_recent_replica = 0; - break; - } - - if (replica_time.second > first_entry_of_most_recent_replica) - first_entry_of_most_recent_replica = replica_time.second; - } - - time_t our_first_entry_to_get_part = replicas_first_entry_to_get_part[storage.replica_name]; - if (0 == our_first_entry_to_get_part) - return; /// Если мы совсем не отстаём. - - out_absolute_delay = last_entry_to_get_part - our_first_entry_to_get_part; - out_relative_delay = first_entry_of_most_recent_replica - our_first_entry_to_get_part; + // TODO LOG_TRACE(log, "Absolute delay: " << out_absolute_delay << ". Relative delay: " << out_relative_delay << "."); } diff --git a/dbms/src/Storages/StorageBuffer.cpp b/dbms/src/Storages/StorageBuffer.cpp index 6081b50cf09..36224ce0b9a 100644 --- a/dbms/src/Storages/StorageBuffer.cpp +++ b/dbms/src/Storages/StorageBuffer.cpp @@ -4,6 +4,7 @@ #include #include #include +#include #include #include diff --git a/dbms/src/Storages/StorageDistributed.cpp b/dbms/src/Storages/StorageDistributed.cpp index 25033bc6cab..6b9bf83659c 100644 --- a/dbms/src/Storages/StorageDistributed.cpp +++ b/dbms/src/Storages/StorageDistributed.cpp @@ -9,6 +9,7 @@ #include #include #include +#include #include #include @@ -192,7 +193,33 @@ BlockInputStreams StorageDistributed::read( if (settings.global_subqueries_method == GlobalSubqueriesMethod::PUSH) external_tables = context.getExternalTables(); + /// Распределить шарды равномерно по потокам. + + size_t remote_count = cluster.getRemoteShardCount(); + + /// Отключаем мультиплексирование шардов, если есть ORDER BY без GROUP BY. + const ASTSelectQuery & ast = *(static_cast(modified_query_ast.get())); + bool enable_shard_multiplexing = !(ast.order_expression_list && !ast.group_expression_list); + + size_t thread_count; + + if (!enable_shard_multiplexing) + thread_count = remote_count; + else if (remote_count == 0) + thread_count = 0; + else if (settings.max_distributed_processing_threads == 0) + thread_count = 1; + else + thread_count = std::min(remote_count, static_cast(settings.max_distributed_processing_threads)); + + size_t pools_per_thread = (thread_count > 0) ? (remote_count / thread_count) : 0; + size_t remainder = (thread_count > 0) ? (remote_count % thread_count) : 0; + + ConnectionPoolsPtr pools; + bool do_init = true; + /// Цикл по шардам. + size_t current_thread = 0; for (const auto & shard_info : cluster.getShardsInfo()) { if (shard_info.isLocal()) @@ -207,17 +234,42 @@ BlockInputStreams StorageDistributed::read( InterpreterSelectQuery interpreter(modified_query_ast, new_context, processed_stage); /** Материализация нужна, так как с удалённых серверов константы приходят материализованными. - * Если этого не делать, то в разных потоках будут получаться разные типы (Const и не-Const) столбцов, - * а это не разрешено, так как весь код исходит из допущения, что в потоке блоков все типы одинаковые. - */ + * Если этого не делать, то в разных потоках будут получаться разные типы (Const и не-Const) столбцов, + * а это не разрешено, так как весь код исходит из допущения, что в потоке блоков все типы одинаковые. + */ res.emplace_back(new MaterializingBlockInputStream(interpreter.execute().in)); } } else { - res.emplace_back(new RemoteBlockInputStream{ - shard_info.pool, modified_query, &new_settings, throttler, - external_tables, processed_stage, context}); + size_t excess = (current_thread < remainder) ? 1 : 0; + size_t actual_pools_per_thread = pools_per_thread + excess; + + if (actual_pools_per_thread == 1) + { + res.emplace_back(new RemoteBlockInputStream{ + shard_info.pool, modified_query, &new_settings, throttler, + external_tables, processed_stage, context}); + ++current_thread; + } + else + { + if (do_init) + { + pools = new ConnectionPools; + do_init = false; + } + + pools->push_back(shard_info.pool); + if (pools->size() == actual_pools_per_thread) + { + res.emplace_back(new RemoteBlockInputStream{ + pools, modified_query, &new_settings, throttler, + external_tables, processed_stage, context}); + do_init = true; + ++current_thread; + } + } } } @@ -278,7 +330,32 @@ BlockInputStreams StorageDistributed::describe(const Context & context, const Se BlockInputStreams res; + /// Распределить шарды равномерно по потокам. + + size_t remote_count = 0; + for (const auto & shard_info : cluster.getShardsInfo()) + { + if (shard_info.hasRemoteConnections()) + ++remote_count; + } + + size_t thread_count; + + if (remote_count == 0) + thread_count = 0; + else if (settings.max_distributed_processing_threads == 0) + thread_count = 1; + else + thread_count = std::min(remote_count, static_cast(settings.max_distributed_processing_threads)); + + size_t pools_per_thread = (thread_count > 0) ? (remote_count / thread_count) : 0; + size_t remainder = (thread_count > 0) ? (remote_count % thread_count) : 0; + + ConnectionPoolsPtr pools; + bool do_init = true; + /// Цикл по шардам. + size_t current_thread = 0; for (const auto & shard_info : cluster.getShardsInfo()) { if (shard_info.isLocal()) @@ -299,10 +376,37 @@ BlockInputStreams StorageDistributed::describe(const Context & context, const Se if (shard_info.hasRemoteConnections()) { - auto stream = new RemoteBlockInputStream{shard_info.pool, query, &new_settings, throttler}; - stream->reachAllReplicas(); - stream->appendExtraInfo(); - res.emplace_back(stream); + size_t excess = (current_thread < remainder) ? 1 : 0; + size_t actual_pools_per_thread = pools_per_thread + excess; + + if (actual_pools_per_thread == 1) + { + auto stream = new RemoteBlockInputStream{shard_info.pool, query, &new_settings, throttler}; + stream->doBroadcast(); + stream->appendExtraInfo(); + res.emplace_back(stream); + ++current_thread; + } + else + { + if (do_init) + { + pools = new ConnectionPools; + do_init = false; + } + + pools->push_back(shard_info.pool); + if (pools->size() == actual_pools_per_thread) + { + auto stream = new RemoteBlockInputStream{pools, query, &new_settings, throttler}; + stream->doBroadcast(); + stream->appendExtraInfo(); + res.emplace_back(stream); + + do_init = true; + ++current_thread; + } + } } } diff --git a/dbms/src/Storages/StorageMergeTree.cpp b/dbms/src/Storages/StorageMergeTree.cpp index ea6fd49bed8..28a45d6d2d0 100644 --- a/dbms/src/Storages/StorageMergeTree.cpp +++ b/dbms/src/Storages/StorageMergeTree.cpp @@ -100,7 +100,7 @@ void StorageMergeTree::shutdown() if (shutdown_called) return; shutdown_called = true; - merger.cancelAll(); + merger.cancel(); background_pool.removeTask(merge_task_handle); } diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index 0591603c6f1..502de59c948 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -162,6 +162,8 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree( checkParts(skip_sanity_checks); } + createNewZooKeeperNodes(); + initVirtualParts(); String unreplicated_path = full_path + "unreplicated/"; @@ -194,6 +196,20 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree( } +void StorageReplicatedMergeTree::createNewZooKeeperNodes() +{ + auto zookeeper = getZooKeeper(); + + /// Работа с кворумом. + zookeeper->createIfNotExists(zookeeper_path + "/quorum", ""); + zookeeper->createIfNotExists(zookeeper_path + "/quorum/last_part", ""); + zookeeper->createIfNotExists(zookeeper_path + "/quorum/failed_parts", ""); + + /// Отслеживание отставания реплик. + zookeeper->createIfNotExists(replica_path + "/min_unprocessed_insert_time", ""); +} + + StoragePtr StorageReplicatedMergeTree::create( const String & zookeeper_path_, const String & replica_name_, @@ -1801,139 +1817,153 @@ void StorageReplicatedMergeTree::alterThread() bool changed_version = (stat.version != columns_version); - MergeTreeData::DataParts parts; - - /// Если описание столбцов изменилось, обновим структуру таблицы локально. - if (changed_version) { - LOG_INFO(log, "Changed version of 'columns' node in ZooKeeper. Waiting for structure write lock."); + /// Если потребуется блокировать структуру таблицы, то приостановим мерджи. + std::unique_ptr merge_blocker; + std::unique_ptr unreplicated_merge_blocker; - auto table_lock = lockStructureForAlter(); - - const auto columns_changed = columns != data.getColumnsListNonMaterialized(); - const auto materialized_columns_changed = materialized_columns != data.materialized_columns; - const auto alias_columns_changed = alias_columns != data.alias_columns; - const auto column_defaults_changed = column_defaults != data.column_defaults; - - if (columns_changed || materialized_columns_changed || alias_columns_changed || - column_defaults_changed) + if (changed_version || force_recheck_parts) { - LOG_INFO(log, "Columns list changed in ZooKeeper. Applying changes locally."); - - InterpreterAlterQuery::updateMetadata(database_name, table_name, columns, - materialized_columns, alias_columns, column_defaults, context); - - if (columns_changed) - { - data.setColumnsList(columns); - - if (unreplicated_data) - unreplicated_data->setColumnsList(columns); - } - - if (materialized_columns_changed) - { - this->materialized_columns = materialized_columns; - data.materialized_columns = std::move(materialized_columns); - } - - if (alias_columns_changed) - { - this->alias_columns = alias_columns; - data.alias_columns = std::move(alias_columns); - } - - if (column_defaults_changed) - { - this->column_defaults = column_defaults; - data.column_defaults = std::move(column_defaults); - } - - LOG_INFO(log, "Applied changes to table."); - } - else - { - LOG_INFO(log, "Columns version changed in ZooKeeper, but data wasn't changed. It's like cyclic ALTERs."); + merge_blocker = std::make_unique(merger); + if (unreplicated_merger) + unreplicated_merge_blocker = std::make_unique(*unreplicated_merger); } - /// Нужно получить список кусков под блокировкой таблицы, чтобы избежать race condition с мерджем. - parts = data.getDataParts(); - - columns_version = stat.version; - } - - /// Обновим куски. - if (changed_version || force_recheck_parts) - { - auto table_lock = lockStructure(false); + MergeTreeData::DataParts parts; + /// Если описание столбцов изменилось, обновим структуру таблицы локально. if (changed_version) - LOG_INFO(log, "ALTER-ing parts"); + { + LOG_INFO(log, "Changed version of 'columns' node in ZooKeeper. Waiting for structure write lock."); - int changed_parts = 0; + auto table_lock = lockStructureForAlter(); - if (!changed_version) + const auto columns_changed = columns != data.getColumnsListNonMaterialized(); + const auto materialized_columns_changed = materialized_columns != data.materialized_columns; + const auto alias_columns_changed = alias_columns != data.alias_columns; + const auto column_defaults_changed = column_defaults != data.column_defaults; + + if (columns_changed || materialized_columns_changed || alias_columns_changed || + column_defaults_changed) + { + LOG_INFO(log, "Columns list changed in ZooKeeper. Applying changes locally."); + + InterpreterAlterQuery::updateMetadata(database_name, table_name, columns, + materialized_columns, alias_columns, column_defaults, context); + + if (columns_changed) + { + data.setColumnsList(columns); + + if (unreplicated_data) + unreplicated_data->setColumnsList(columns); + } + + if (materialized_columns_changed) + { + this->materialized_columns = materialized_columns; + data.materialized_columns = std::move(materialized_columns); + } + + if (alias_columns_changed) + { + this->alias_columns = alias_columns; + data.alias_columns = std::move(alias_columns); + } + + if (column_defaults_changed) + { + this->column_defaults = column_defaults; + data.column_defaults = std::move(column_defaults); + } + + LOG_INFO(log, "Applied changes to table."); + } + else + { + LOG_INFO(log, "Columns version changed in ZooKeeper, but data wasn't changed. It's like cyclic ALTERs."); + } + + /// Нужно получить список кусков под блокировкой таблицы, чтобы избежать race condition с мерджем. parts = data.getDataParts(); - const auto columns_plus_materialized = data.getColumnsList(); - - for (const MergeTreeData::DataPartPtr & part : parts) - { - /// Обновим кусок и запишем результат во временные файлы. - /// TODO: Можно пропускать проверку на слишком большие изменения, если в ZooKeeper есть, например, - /// нода /flags/force_alter. - auto transaction = data.alterDataPart(part, columns_plus_materialized); - - if (!transaction) - continue; - - ++changed_parts; - - /// Обновим метаданные куска в ZooKeeper. - zkutil::Ops ops; - ops.push_back(new zkutil::Op::SetData( - replica_path + "/parts/" + part->name + "/columns", transaction->getNewColumns().toString(), -1)); - ops.push_back(new zkutil::Op::SetData( - replica_path + "/parts/" + part->name + "/checksums", transaction->getNewChecksums().toString(), -1)); - zookeeper->multi(ops); - - /// Применим изменения файлов. - transaction->commit(); + columns_version = stat.version; } - /// То же самое для нереплицируемых данных. - if (unreplicated_data) + /// Обновим куски. + if (changed_version || force_recheck_parts) { - parts = unreplicated_data->getDataParts(); + auto table_lock = lockStructure(false); + + if (changed_version) + LOG_INFO(log, "ALTER-ing parts"); + + int changed_parts = 0; + + if (!changed_version) + parts = data.getDataParts(); + + const auto columns_plus_materialized = data.getColumnsList(); for (const MergeTreeData::DataPartPtr & part : parts) { - auto transaction = unreplicated_data->alterDataPart(part, columns_plus_materialized); + /// Обновим кусок и запишем результат во временные файлы. + /// TODO: Можно пропускать проверку на слишком большие изменения, если в ZooKeeper есть, например, + /// нода /flags/force_alter. + auto transaction = data.alterDataPart(part, columns_plus_materialized); if (!transaction) continue; ++changed_parts; + /// Обновим метаданные куска в ZooKeeper. + zkutil::Ops ops; + ops.push_back(new zkutil::Op::SetData( + replica_path + "/parts/" + part->name + "/columns", transaction->getNewColumns().toString(), -1)); + ops.push_back(new zkutil::Op::SetData( + replica_path + "/parts/" + part->name + "/checksums", transaction->getNewChecksums().toString(), -1)); + zookeeper->multi(ops); + + /// Применим изменения файлов. transaction->commit(); } + + /// То же самое для нереплицируемых данных. + if (unreplicated_data) + { + parts = unreplicated_data->getDataParts(); + + for (const MergeTreeData::DataPartPtr & part : parts) + { + auto transaction = unreplicated_data->alterDataPart(part, columns_plus_materialized); + + if (!transaction) + continue; + + ++changed_parts; + + transaction->commit(); + } + } + + /// Список столбцов для конкретной реплики. + zookeeper->set(replica_path + "/columns", columns_str); + + if (changed_version) + { + if (changed_parts != 0) + LOG_INFO(log, "ALTER-ed " << changed_parts << " parts"); + else + LOG_INFO(log, "No parts ALTER-ed"); + } + + force_recheck_parts = false; } - /// Список столбцов для конкретной реплики. - zookeeper->set(replica_path + "/columns", columns_str); - - if (changed_version) - { - if (changed_parts != 0) - LOG_INFO(log, "ALTER-ed " << changed_parts << " parts"); - else - LOG_INFO(log, "No parts ALTER-ed"); - } - - force_recheck_parts = false; + /// Важно, что уничтожается parts и merge_blocker перед wait-ом. } - parts.clear(); alter_thread_event->wait(); } catch (...) diff --git a/dbms/src/Storages/tests/merge_tree.cpp b/dbms/src/Storages/tests/merge_tree.cpp index 52347dd8670..63db9007dd1 100644 --- a/dbms/src/Storages/tests/merge_tree.cpp +++ b/dbms/src/Storages/tests/merge_tree.cpp @@ -39,7 +39,7 @@ int main(int argc, char ** argv) const char * begin = primary_expr_str.data(); const char * end = begin + primary_expr_str.size(); const char * max_parsed_pos = begin; - ParserExpressionList parser; + ParserExpressionList parser(false); if (!parser.parse(begin, end, primary_expr, max_parsed_pos, expected)) throw Poco::Exception("Cannot parse " + primary_expr_str); diff --git a/dbms/tests/queries/0_stateless/00267_tuple_array_access_operators_priority.reference b/dbms/tests/queries/0_stateless/00267_tuple_array_access_operators_priority.reference new file mode 100644 index 00000000000..7326d960397 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00267_tuple_array_access_operators_priority.reference @@ -0,0 +1 @@ +Ok diff --git a/dbms/tests/queries/0_stateless/00267_tuple_array_access_operators_priority.sql b/dbms/tests/queries/0_stateless/00267_tuple_array_access_operators_priority.sql new file mode 100644 index 00000000000..54294d4f2b7 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00267_tuple_array_access_operators_priority.sql @@ -0,0 +1 @@ +SELECT 1+-a[1].2*2 = -245 ? 'Ok' : 'Fail' AS res FROM (SELECT [('Hello', 123)] AS a); diff --git a/dbms/tests/queries/0_stateless/00268_aliases_without_as_keyword.reference b/dbms/tests/queries/0_stateless/00268_aliases_without_as_keyword.reference new file mode 100644 index 00000000000..2b2f2e1b926 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00268_aliases_without_as_keyword.reference @@ -0,0 +1,2 @@ +1 +3 diff --git a/dbms/tests/queries/0_stateless/00268_aliases_without_as_keyword.sql b/dbms/tests/queries/0_stateless/00268_aliases_without_as_keyword.sql new file mode 100644 index 00000000000..e13e1e0c10d --- /dev/null +++ b/dbms/tests/queries/0_stateless/00268_aliases_without_as_keyword.sql @@ -0,0 +1,2 @@ +SELECT 1 x FROM system.one; +SELECT 1 + (2 AS x) y FROM system.one; diff --git a/dbms/tests/queries/0_stateless/00269_database_table_whitespace.reference b/dbms/tests/queries/0_stateless/00269_database_table_whitespace.reference new file mode 100644 index 00000000000..aa47d0d46d4 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00269_database_table_whitespace.reference @@ -0,0 +1,2 @@ +0 +0 diff --git a/dbms/tests/queries/0_stateless/00269_database_table_whitespace.sql b/dbms/tests/queries/0_stateless/00269_database_table_whitespace.sql new file mode 100644 index 00000000000..8e69d2713bc --- /dev/null +++ b/dbms/tests/queries/0_stateless/00269_database_table_whitespace.sql @@ -0,0 +1,2 @@ +SELECT * FROM system . one; +SELECT * FROM system /* Hello */. `one`;