diff --git a/dbms/include/DB/AggregateFunctions/AggregateFunctionsArgMinMax.h b/dbms/include/DB/AggregateFunctions/AggregateFunctionsArgMinMax.h index 67265e90ebc..1a7969f3448 100644 --- a/dbms/include/DB/AggregateFunctions/AggregateFunctionsArgMinMax.h +++ b/dbms/include/DB/AggregateFunctions/AggregateFunctionsArgMinMax.h @@ -3,41 +3,31 @@ #include #include -#include +#include namespace DB { -struct AggregateFunctionArgMinTraits -{ - static bool better(const Field & lhs, const Field & rhs) { return lhs < rhs; } - static String name() { return "argMin"; } -}; - -struct AggregateFunctionArgMaxTraits -{ - static bool better(const Field & lhs, const Field & rhs) { return lhs > rhs; } - static String name() { return "argMax"; } -}; - +/// Возможные значения параметров шаблонов см. в AggregateFunctionsMinMaxAny.h +template struct AggregateFunctionsArgMinMaxData { - Field result; // аргумент, при котором достигается минимальное/максимальное значение value. - Field value; // значение, для которого считается минимум/максимум. + ResultData result; // аргумент, при котором достигается минимальное/максимальное значение value. + ValueData value; // значение, для которого считается минимум/максимум. }; /// Возвращает первое попавшееся значение arg для минимального/максимального value. Пример: argMax(arg, value). -template -class AggregateFunctionsArgMinMax final : public IAggregateFunctionHelper +template +class AggregateFunctionsArgMinMax final : public IAggregateFunctionHelper { private: DataTypePtr type_res; DataTypePtr type_val; public: - String getName() const { return Traits::name(); } + String getName() const { return (0 == strcmp(decltype(Data::value)::name(), "min")) ? "argMin" : "argMax"; } DataTypePtr getReturnType() const { @@ -55,105 +45,37 @@ public: void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num) const { - Field result; - Field value; - columns[0]->get(row_num, result); - columns[1]->get(row_num, value); - Data & d = data(place); - - if (!d.value.isNull()) - { - if (Traits::better(value, d.value)) - { - d.result = result; - d.value = value; - } - } - else - { - d.result = result; - d.value = value; - } + if (this->data(place).value.changeIfBetter(*columns[1], row_num)) + this->data(place).result.change(*columns[0], row_num); } void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const { - Data & d = data(place); - const Data & d_rhs = data(rhs); - - if (!d.value.isNull()) - { - if (Traits::better(d_rhs.value, d.value)) - { - d.result = d_rhs.result; - d.value = d_rhs.value; - } - } - else - { - d.result = d_rhs.result; - d.value = d_rhs.value; - } + if (this->data(place).value.changeIfBetter(this->data(rhs).value)) + this->data(place).result.change(this->data(rhs).result); } void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const { - const Data & d = data(place); - - if (unlikely(d.result.isNull())) - { - writeBinary(false, buf); - } - else - { - writeBinary(true, buf); - type_res->serializeBinary(d.result, buf); - type_val->serializeBinary(d.value, buf); - } + this->data(place).result.write(buf, *type_res.get()); + this->data(place).value.write(buf, *type_val.get()); } void deserializeMerge(AggregateDataPtr place, ReadBuffer & buf) const { - Data & d = data(place); + Data rhs; /// Для строчек не очень оптимально, так как может делаться одна лишняя аллокация. - bool is_not_null = false; - readBinary(is_not_null, buf); + rhs.result.read(buf, *type_res.get()); + rhs.value.read(buf, *type_val.get()); - if (is_not_null) - { - if (!d.value.isNull()) - { - Field result_; - Field value_; - - type_res->deserializeBinary(result_, buf); - type_val->deserializeBinary(value_, buf); - - if (Traits::better(value_, d.value)) - { - d.result = result_; - d.value = value_; - } - } - else - { - type_res->deserializeBinary(d.result, buf); - type_val->deserializeBinary(d.value, buf); - } - } + if (this->data(place).value.changeIfBetter(rhs.value)) + this->data(place).result.change(rhs.result); } void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const { - if (unlikely(data(place).value.isNull())) - to.insertDefault(); - else - to.insert(data(place).result); + this->data(place).result.insertResultInto(to); } }; -typedef AggregateFunctionsArgMinMax AggregateFunctionArgMin; -typedef AggregateFunctionsArgMinMax AggregateFunctionArgMax; - - } diff --git a/dbms/include/DB/AggregateFunctions/AggregateFunctionsMinMaxAny.h b/dbms/include/DB/AggregateFunctions/AggregateFunctionsMinMaxAny.h index 9f7311fce5a..96bdd1791a8 100644 --- a/dbms/include/DB/AggregateFunctions/AggregateFunctionsMinMaxAny.h +++ b/dbms/include/DB/AggregateFunctions/AggregateFunctionsMinMaxAny.h @@ -67,40 +67,70 @@ struct SingleValueDataFixed value = to.value; } - void changeFirstTime(const IColumn & column, size_t row_num) + bool changeFirstTime(const IColumn & column, size_t row_num) { if (!has()) + { change(column, row_num); + return true; + } + else + return false; } - void changeFirstTime(const Self & to) + bool changeFirstTime(const Self & to) { if (!has()) + { change(to); + return true; + } + else + return false; } - void changeIfLess(const IColumn & column, size_t row_num) + bool changeIfLess(const IColumn & column, size_t row_num) { if (!has() || static_cast &>(column).getData()[row_num] < value) + { change(column, row_num); + return true; + } + else + return false; } - void changeIfLess(const Self & to) + bool changeIfLess(const Self & to) { if (to.has() && (!has() || to.value < value)) + { change(to); + return true; + } + else + return false; } - void changeIfGreater(const IColumn & column, size_t row_num) + bool changeIfGreater(const IColumn & column, size_t row_num) { if (!has() || static_cast &>(column).getData()[row_num] > value) + { change(column, row_num); + return true; + } + else + return false; } - void changeIfGreater(const Self & to) + bool changeIfGreater(const Self & to) { if (to.has() && (!has() || to.value > value)) + { change(to); + return true; + } + else + return false; } }; @@ -238,40 +268,70 @@ struct __attribute__((__packed__)) SingleValueDataString changeImpl(to.getStringRef()); } - void changeFirstTime(const IColumn & column, size_t row_num) + bool changeFirstTime(const IColumn & column, size_t row_num) { if (!has()) + { change(column, row_num); + return true; + } + else + return false; } - void changeFirstTime(const Self & to) + bool changeFirstTime(const Self & to) { if (!has()) + { change(to); + return true; + } + else + return false; } - void changeIfLess(const IColumn & column, size_t row_num) + bool changeIfLess(const IColumn & column, size_t row_num) { if (!has() || static_cast(column).getDataAtWithTerminatingZero(row_num) < getStringRef()) + { change(column, row_num); + return true; + } + else + return false; } - void changeIfLess(const Self & to) + bool changeIfLess(const Self & to) { if (to.has() && (!has() || to.getStringRef() < getStringRef())) + { change(to); + return true; + } + else + return false; } - void changeIfGreater(const IColumn & column, size_t row_num) + bool changeIfGreater(const IColumn & column, size_t row_num) { if (!has() || static_cast(column).getDataAtWithTerminatingZero(row_num) > getStringRef()) + { change(column, row_num); + return true; + } + else + return false; } - void changeIfGreater(const Self & to) + bool changeIfGreater(const Self & to) { if (to.has() && (!has() || to.getStringRef() > getStringRef())) + { change(to); + return true; + } + else + return false; } }; @@ -326,54 +386,90 @@ struct SingleValueDataGeneric value = to.value; } - void changeFirstTime(const IColumn & column, size_t row_num) + bool changeFirstTime(const IColumn & column, size_t row_num) { if (!has()) + { change(column, row_num); + return true; + } + else + return false; } - void changeFirstTime(const Self & to) + bool changeFirstTime(const Self & to) { if (!has()) + { change(to); + return true; + } + else + return false; } - void changeIfLess(const IColumn & column, size_t row_num) + bool changeIfLess(const IColumn & column, size_t row_num) { if (!has()) + { change(column, row_num); + return true; + } else { Field new_value; column.get(row_num, new_value); if (new_value < value) + { value = new_value; + return true; + } + else + return false; } } - void changeIfLess(const Self & to) + bool changeIfLess(const Self & to) { if (to.has() && (!has() || to.value < value)) + { change(to); + return true; + } + else + return false; } - void changeIfGreater(const IColumn & column, size_t row_num) + bool changeIfGreater(const IColumn & column, size_t row_num) { if (!has()) + { change(column, row_num); + return true; + } else { Field new_value; column.get(row_num, new_value); if (new_value > value) + { value = new_value; + return true; + } + else + return false; } } - void changeIfGreater(const Self & to) + bool changeIfGreater(const Self & to) { if (to.has() && (!has() || to.value > value)) + { change(to); + return true; + } + else + return false; } }; @@ -388,8 +484,8 @@ struct AggregateFunctionMinData : Data { typedef AggregateFunctionMinData Self; - void changeIfBetter(const IColumn & column, size_t row_num) { this->changeIfLess(column, row_num); } - void changeIfBetter(const Self & to) { this->changeIfLess(to); } + bool changeIfBetter(const IColumn & column, size_t row_num) { return this->changeIfLess(column, row_num); } + bool changeIfBetter(const Self & to) { return this->changeIfLess(to); } static const char * name() { return "min"; } }; @@ -399,8 +495,8 @@ struct AggregateFunctionMaxData : Data { typedef AggregateFunctionMaxData Self; - void changeIfBetter(const IColumn & column, size_t row_num) { this->changeIfGreater(column, row_num); } - void changeIfBetter(const Self & to) { this->changeIfGreater(to); } + bool changeIfBetter(const IColumn & column, size_t row_num) { return this->changeIfGreater(column, row_num); } + bool changeIfBetter(const Self & to) { return this->changeIfGreater(to); } static const char * name() { return "max"; } }; @@ -410,8 +506,8 @@ struct AggregateFunctionAnyData : Data { typedef AggregateFunctionAnyData Self; - void changeIfBetter(const IColumn & column, size_t row_num) { this->changeFirstTime(column, row_num); } - void changeIfBetter(const Self & to) { this->changeFirstTime(to); } + bool changeIfBetter(const IColumn & column, size_t row_num) { return this->changeFirstTime(column, row_num); } + bool changeIfBetter(const Self & to) { return this->changeFirstTime(to); } static const char * name() { return "any"; } }; @@ -421,8 +517,8 @@ struct AggregateFunctionAnyLastData : Data { typedef AggregateFunctionAnyLastData Self; - void changeIfBetter(const IColumn & column, size_t row_num) { this->change(column, row_num); } - void changeIfBetter(const Self & to) { this->change(to); } + bool changeIfBetter(const IColumn & column, size_t row_num) { this->change(column, row_num); return true; } + bool changeIfBetter(const Self & to) { this->change(to); return true; } static const char * name() { return "anyLast"; } }; diff --git a/dbms/include/DB/Client/ParallelReplicas.h b/dbms/include/DB/Client/ParallelReplicas.h index 235401597ae..27e1b7782c1 100644 --- a/dbms/include/DB/Client/ParallelReplicas.h +++ b/dbms/include/DB/Client/ParallelReplicas.h @@ -39,7 +39,7 @@ public: /// Разорвать все действующие соединения. void disconnect(); - /// Отменить запросы к репликам + /// Отправить на реплики просьбу отменить выполнение запроса void sendCancel(); /** На каждой реплике читать и пропускать все пакеты до EndOfStream или Exception. diff --git a/dbms/include/DB/Columns/ColumnConst.h b/dbms/include/DB/Columns/ColumnConst.h index 4d599448b63..3b30d53f0f1 100644 --- a/dbms/include/DB/Columns/ColumnConst.h +++ b/dbms/include/DB/Columns/ColumnConst.h @@ -156,8 +156,9 @@ typedef ColumnConst ColumnConstArray; template ColumnPtr ColumnConst::convertToFullColumn() const { - ColumnVector * res = new ColumnVector; - res->getData().assign(s, data); + ColumnVector * res_ = new ColumnVector; + ColumnPtr res = res_; + res_->getData().assign(s, data); return res; } diff --git a/dbms/include/DB/Columns/IColumnDummy.h b/dbms/include/DB/Columns/IColumnDummy.h index ab12d3e11d0..509c56b14a8 100644 --- a/dbms/include/DB/Columns/IColumnDummy.h +++ b/dbms/include/DB/Columns/IColumnDummy.h @@ -1,7 +1,6 @@ #pragma once #include -#include namespace DB diff --git a/dbms/include/DB/Common/HashTable/ClearableHashMap.h b/dbms/include/DB/Common/HashTable/ClearableHashMap.h index fb0340aecf4..666b39dd601 100644 --- a/dbms/include/DB/Common/HashTable/ClearableHashMap.h +++ b/dbms/include/DB/Common/HashTable/ClearableHashMap.h @@ -71,7 +71,7 @@ public: this->emplace(x, it, inserted); if (inserted) - new(&it->second) mapped_type(); /// В отличие от HashMap, всегда инициализируем значение. + new(&it->second) mapped_type(); return it->second; } diff --git a/dbms/include/DB/Common/HashTable/HashMap.h b/dbms/include/DB/Common/HashTable/HashMap.h index f9257d68557..649d663f514 100644 --- a/dbms/include/DB/Common/HashTable/HashMap.h +++ b/dbms/include/DB/Common/HashTable/HashMap.h @@ -137,8 +137,21 @@ public: bool inserted; this->emplace(x, it, inserted); - /// Если тривиальный конструктор, то инициализация нулями (через вызов конструктора для POD-ов) не нужна, так как таблица и так заполнена нулями. - if (!__has_trivial_constructor(mapped_type) && inserted) + /** Может показаться, что инициализация не обязательна для POD-типов (или __has_trivial_constructor), + * так как кусок памяти для хэш-таблицы изначально инициализирован нулями. + * Но, на самом деле, пустая ячейка может быть не инициализирована нулями в следующих случаях: + * - ZeroValueStorage (в нём зануляется только ключ); + * - после ресайза и переноса части ячеек в новую половину хэш-таблицы, у старых ячеек, тоже зануляется только ключ. + * + * По производительности, разницы почти всегда нет, за счёт того, что it->second как правило присваивается сразу + * после вызова operator[], и так как operator[] инлайнится, компилятор убирает лишнюю инициализацию. + * + * Иногда из-за инициализации, производительность даже растёт. Это происходит в коде вида ++map[key]. + * Когда мы делаем инициализацию, то для новых ячеек, достаточно сразу сделать store 1. + * А если бы мы не делали инициализацию, то не смотря на то, что в ячейке был ноль, + * компилятор не может об этом догадаться, и генерирует код load, increment, store. + */ + if (inserted) new(&it->second) mapped_type(); return it->second; diff --git a/dbms/include/DB/Common/HashTable/SmallTable.h b/dbms/include/DB/Common/HashTable/SmallTable.h index 809ef3b112e..10ec8479b93 100644 --- a/dbms/include/DB/Common/HashTable/SmallTable.h +++ b/dbms/include/DB/Common/HashTable/SmallTable.h @@ -185,6 +185,24 @@ public: } + /// То же самое, но вернуть false, если переполнено. + bool ALWAYS_INLINE tryEmplace(Key x, iterator & it, bool & inserted) + { + Cell * res = findCell(x); + it = iteratorTo(res); + inserted = res == buf + m_size; + if (inserted) + { + if (res == buf + capacity) + return false; + + new(res) Cell(x, *this); + ++m_size; + } + return true; + } + + /// Скопировать ячейку из другой хэш-таблицы. Предполагается, что такого ключа в таблице ещё не было. void ALWAYS_INLINE insertUnique(const Cell * cell) { @@ -192,6 +210,12 @@ public: ++m_size; } + void ALWAYS_INLINE insertUnique(Key x) + { + new(&buf[m_size]) Cell(x, *this); + ++m_size; + } + iterator ALWAYS_INLINE find(Key x) { return iteratorTo(findCell(x)); } const_iterator ALWAYS_INLINE find(Key x) const { return iteratorTo(findCell(x)); } diff --git a/dbms/include/DB/Common/HashTable/TwoLevelHashMap.h b/dbms/include/DB/Common/HashTable/TwoLevelHashMap.h index 2c22de7eb58..464dcb58469 100644 --- a/dbms/include/DB/Common/HashTable/TwoLevelHashMap.h +++ b/dbms/include/DB/Common/HashTable/TwoLevelHashMap.h @@ -27,7 +27,7 @@ public: bool inserted; this->emplace(x, it, inserted); - if (!__has_trivial_constructor(mapped_type) && inserted) + if (inserted) new(&it->second) mapped_type(); return it->second; diff --git a/dbms/include/DB/DataStreams/BlockIO.h b/dbms/include/DB/DataStreams/BlockIO.h index 461e475e026..6c3c83a63bd 100644 --- a/dbms/include/DB/DataStreams/BlockIO.h +++ b/dbms/include/DB/DataStreams/BlockIO.h @@ -10,13 +10,34 @@ namespace DB struct BlockIO { + /** process_list_entry должен уничтожаться позже, чем in и out, + * так как внутри in и out есть ссылка на объект внутри process_list_entry + * (MemoryTracker * current_memory_tracker), + * которая может использоваться до уничтожения in и out. + */ + ProcessList::EntryPtr process_list_entry; + BlockInputStreamPtr in; BlockOutputStreamPtr out; Block in_sample; /// Пример блока, который будет прочитан из in. Block out_sample; /// Пример блока, которого нужно писать в out. - ProcessList::EntryPtr process_list_entry; + BlockIO & operator= (const BlockIO & rhs) + { + /// Обеспечиваем правильный порядок уничтожения. + out = nullptr; + in = nullptr; + process_list_entry = nullptr; + + process_list_entry = rhs.process_list_entry; + in = rhs.in; + out = rhs.out; + in_sample = rhs.in_sample; + out_sample = rhs.out_sample; + + return *this; + } }; } diff --git a/dbms/include/DB/DataStreams/CreatingSetsBlockInputStream.h b/dbms/include/DB/DataStreams/CreatingSetsBlockInputStream.h index 44140244a6b..c35ccec1915 100644 --- a/dbms/include/DB/DataStreams/CreatingSetsBlockInputStream.h +++ b/dbms/include/DB/DataStreams/CreatingSetsBlockInputStream.h @@ -2,8 +2,6 @@ #include #include -#include -#include namespace DB diff --git a/dbms/include/DB/DataStreams/ParallelInputsProcessor.h b/dbms/include/DB/DataStreams/ParallelInputsProcessor.h index e186d888693..8725fbca677 100644 --- a/dbms/include/DB/DataStreams/ParallelInputsProcessor.h +++ b/dbms/include/DB/DataStreams/ParallelInputsProcessor.h @@ -1,7 +1,7 @@ #pragma once #include -#include +#include #include #include #include @@ -49,7 +49,7 @@ public: : inputs(inputs_), max_threads(std::min(inputs_.size(), max_threads_)), handler(handler_) { for (size_t i = 0; i < inputs_.size(); ++i) - input_queue.emplace(inputs_[i], i); + input_stack.emplace(inputs_[i], i); } ~ParallelInputsProcessor() @@ -162,16 +162,16 @@ private: /// Выбираем следующий источник. { - std::lock_guard lock(input_queue_mutex); + std::lock_guard lock(input_stack_mutex); /// Если свободных источников нет, то этот поток больше не нужен. (Но другие потоки могут работать со своими источниками.) - if (input_queue.empty()) + if (input_stack.empty()) break; - input = input_queue.front(); + input = input_stack.top(); /// Убираем источник из очереди доступных источников. - input_queue.pop(); + input_stack.pop(); } /// Основная работа. @@ -183,15 +183,15 @@ private: /// Если этот источник ещё не иссяк, то положим полученный блок в очередь готовых. { - std::lock_guard lock(input_queue_mutex); + std::lock_guard lock(input_stack_mutex); if (block) { - input_queue.push(input); + input_stack.push(input); } else { - if (input_queue.empty()) + if (input_stack.empty()) break; } } @@ -214,12 +214,15 @@ private: typedef std::vector ThreadsData; ThreadsData threads; - /// Очередь доступных источников, которые не заняты каким-либо потоком в данный момент. - typedef std::queue InputQueue; - InputQueue input_queue; + /** Стек доступных источников, которые не заняты каким-либо потоком в данный момент. + * Стек вместо очереди - чтобы выполнять работу по чтению одного источника более последовательно. + * То есть, продолжать обработку источника, который недавно обрабатывался. + */ + typedef std::stack InputStack; + InputStack input_stack; - /// Для операций с input_queue. - std::mutex input_queue_mutex; + /// Для операций с input_stack. + std::mutex input_stack_mutex; /// Сколько источников иссякло. std::atomic active_threads { 0 }; diff --git a/dbms/include/DB/DataStreams/RemoteBlockInputStream.h b/dbms/include/DB/DataStreams/RemoteBlockInputStream.h index b508de2e35b..d00e1fb4c39 100644 --- a/dbms/include/DB/DataStreams/RemoteBlockInputStream.h +++ b/dbms/include/DB/DataStreams/RemoteBlockInputStream.h @@ -22,13 +22,6 @@ class RemoteBlockInputStream : public IProfilingBlockInputStream private: void init(const Settings * settings_) { - established.store(false, std::memory_order_seq_cst); - sent_query.store(false, std::memory_order_seq_cst); - finished.store(false, std::memory_order_seq_cst); - got_exception_from_replica.store(false, std::memory_order_seq_cst); - got_unknown_packet_from_replica.store(false, std::memory_order_seq_cst); - was_cancelled.store(false, std::memory_order_seq_cst); - if (settings_) { send_settings = true; @@ -93,11 +86,7 @@ public: if (hasNoQueryInProgress() || hasThrownException()) return; - if (tryCancel()) - { - std::string addresses = parallel_replicas->dumpAddresses(); - LOG_TRACE(log, "(" + addresses + ") Cancelling query"); - } + tryCancel("Cancelling query"); } @@ -107,7 +96,7 @@ public: * все соединения, затем читаем и пропускаем оставшиеся пакеты чтобы * эти соединения не остались висеть в рассихронизированном состоянии. */ - if (established.load(std::memory_order_seq_cst) || isQueryInProgress()) + if (established || isQueryInProgress()) parallel_replicas->disconnect(); } @@ -142,16 +131,16 @@ protected: Block readImpl() override { - if (!sent_query.load(std::memory_order_seq_cst)) + if (!sent_query) { createParallelReplicas(); - established.store(true, std::memory_order_seq_cst); + established = true; parallel_replicas->sendQuery(query, "", stage, true); - established.store(false, std::memory_order_seq_cst); - sent_query.store(true, std::memory_order_seq_cst); + established = false; + sent_query = true; sendExternalTables(); } @@ -169,14 +158,14 @@ protected: break; /// Если блок пустой - получим другие пакеты до EndOfStream. case Protocol::Server::Exception: - got_exception_from_replica.store(true, std::memory_order_seq_cst); + got_exception_from_replica = true; packet.exception->rethrow(); break; case Protocol::Server::EndOfStream: if (!parallel_replicas->hasActiveReplicas()) { - finished.store(true, std::memory_order_seq_cst); + finished = true; return Block(); } break; @@ -208,7 +197,7 @@ protected: break; default: - got_unknown_packet_from_replica.store(true, std::memory_order_seq_cst); + got_unknown_packet_from_replica = true; throw Exception("Unknown packet from server", ErrorCodes::UNKNOWN_PACKET_FROM_SERVER); } } @@ -231,27 +220,23 @@ protected: */ /// Отправим просьбу прервать выполнение запроса, если ещё не отправляли. - if (tryCancel()) - { - std::string addresses = parallel_replicas->dumpAddresses(); - LOG_TRACE(log, "(" + addresses + ") Cancelling query because enough data has been read"); - } + tryCancel("Cancelling query because enough data has been read"); /// Получим оставшиеся пакеты, чтобы не было рассинхронизации в соединениях с репликами. Connection::Packet packet = parallel_replicas->drain(); switch (packet.type) { case Protocol::Server::EndOfStream: - finished.store(true, std::memory_order_seq_cst); + finished = true; break; case Protocol::Server::Exception: - got_exception_from_replica.store(true, std::memory_order_seq_cst); + got_exception_from_replica = true; packet.exception->rethrow(); break; default: - got_unknown_packet_from_replica.store(true, std::memory_order_seq_cst); + got_unknown_packet_from_replica = true; throw Exception("Unknown packet from server", ErrorCodes::UNKNOWN_PACKET_FROM_SERVER); } } @@ -269,19 +254,19 @@ protected: /// Возвращает true, если запрос отправлен, а ещё не выполнен. bool isQueryInProgress() const { - return sent_query.load(std::memory_order_seq_cst) && !finished.load(std::memory_order_seq_cst) && !was_cancelled.load(std::memory_order_seq_cst); + return sent_query && !finished && !was_cancelled; } /// Возвращает true, если никакой запрос не отправлен или один запрос уже выполнен. bool hasNoQueryInProgress() const { - return !sent_query.load(std::memory_order_seq_cst) || finished.load(std::memory_order_seq_cst); + return !sent_query || finished; } /// Возвращает true, если исключение было выкинуто. bool hasThrownException() const { - return got_exception_from_replica.load(std::memory_order_seq_cst) || got_unknown_packet_from_replica.load(std::memory_order_seq_cst); + return got_exception_from_replica || got_unknown_packet_from_replica; } private: @@ -293,17 +278,14 @@ private: } /// Отправить запрос на отмену всех соединений к репликам, если такой запрос ещё не был отправлен. - bool tryCancel() + void tryCancel(const char * reason) { bool old_val = false; - bool new_val = true; - if (was_cancelled.compare_exchange_strong(old_val, new_val, std::memory_order_seq_cst, std::memory_order_relaxed)) - { - parallel_replicas->sendCancel(); - return true; - } - else - return false; + if (!was_cancelled.compare_exchange_strong(old_val, true, std::memory_order_seq_cst, std::memory_order_seq_cst)) + return; + + LOG_TRACE(log, "(" << parallel_replicas->dumpAddresses() << ") " << reason); + parallel_replicas->sendCancel(); } private: @@ -324,33 +306,33 @@ private: Context context; /// Установили соединения с репликами, но ещё не отправили запрос. - std::atomic established; + std::atomic established { false }; /// Отправили запрос (это делается перед получением первого блока). - std::atomic sent_query; + std::atomic sent_query { false }; /** Получили все данные от всех реплик, до пакета EndOfStream. * Если при уничтожении объекта, ещё не все данные считаны, * то для того, чтобы не было рассинхронизации, на реплики отправляются просьбы прервать выполнение запроса, * и после этого считываются все пакеты до EndOfStream. */ - std::atomic finished; + std::atomic finished { false }; /** На каждую реплику была отправлена просьба прервать выполнение запроса, так как данные больше не нужны. * Это может быть из-за того, что данных достаточно (например, при использовании LIMIT), * или если на стороне клиента произошло исключение. */ - std::atomic was_cancelled; + std::atomic was_cancelled { false }; /** С одной репилки было получено исключение. В этом случае получать больше пакетов или * просить прервать запрос на этой реплике не нужно. */ - std::atomic got_exception_from_replica; + std::atomic got_exception_from_replica { false }; /** С одной реплики был получен неизвестный пакет. В этом случае получать больше пакетов или * просить прервать запрос на этой реплике не нужно. */ - std::atomic got_unknown_packet_from_replica; + std::atomic got_unknown_packet_from_replica { false }; Logger * log = &Logger::get("RemoteBlockInputStream"); }; diff --git a/dbms/include/DB/Dictionaries/CacheDictionary.h b/dbms/include/DB/Dictionaries/CacheDictionary.h index ff79e3d1580..dd0b55e856d 100644 --- a/dbms/include/DB/Dictionaries/CacheDictionary.h +++ b/dbms/include/DB/Dictionaries/CacheDictionary.h @@ -5,6 +5,7 @@ #include #include #include +#include #include #include #include @@ -413,6 +414,15 @@ private: auto stream = source_ptr->loadIds(ids); stream->readPrefix(); + HashMap remaining_ids{ids.size()}; + for (const auto id : ids) + remaining_ids.insert({ id, 0 }); + + std::uniform_int_distribution distribution{ + dict_lifetime.min_sec, + dict_lifetime.max_sec + }; + const Poco::ScopedWriteRWLock write_lock{rw_lock}; while (const auto block = stream->read()) @@ -434,7 +444,7 @@ private: for (const auto i : ext::range(0, ids.size())) { const auto id = ids[i]; - const auto & cell_idx = getCellIdx(id); + const auto cell_idx = getCellIdx(id); auto & cell = cells[cell_idx]; for (const auto attribute_idx : ext::range(0, attributes.size())) @@ -445,19 +455,33 @@ private: setAttributeValue(attribute, cell_idx, attribute_column[i]); } - std::uniform_int_distribution distribution{ - dict_lifetime.min_sec, - dict_lifetime.max_sec - }; - cell.id = id; cell.expires_at = std::chrono::system_clock::now() + std::chrono::seconds{distribution(rnd_engine)}; on_cell_updated(id, cell_idx); + remaining_ids[id] = 1; } } stream->readSuffix(); + + for (const auto id_found_pair : remaining_ids) + { + if (id_found_pair.second) + continue; + + const auto id = id_found_pair.first; + const auto cell_idx = getCellIdx(id); + auto & cell = cells[cell_idx]; + + for (auto & attribute : attributes) + setDefaultAttributeValue(attribute, cell_idx); + + cell.id = id; + cell.expires_at = std::chrono::system_clock::now() + std::chrono::seconds{distribution(rnd_engine)}; + + on_cell_updated(id, cell_idx); + } } std::uint64_t getCellIdx(const id_t id) const @@ -467,6 +491,36 @@ private: return idx; } + void setDefaultAttributeValue(attribute_t & attribute, const id_t idx) const + { + switch (attribute.type) + { + case AttributeType::uint8: std::get>(attribute.arrays)[idx] = std::get(attribute.null_values); break; + case AttributeType::uint16: std::get>(attribute.arrays)[idx] = std::get(attribute.null_values); break; + case AttributeType::uint32: std::get>(attribute.arrays)[idx] = std::get(attribute.null_values); break; + case AttributeType::uint64: std::get>(attribute.arrays)[idx] = std::get(attribute.null_values); break; + case AttributeType::int8: std::get>(attribute.arrays)[idx] = std::get(attribute.null_values); break; + case AttributeType::int16: std::get>(attribute.arrays)[idx] = std::get(attribute.null_values); break; + case AttributeType::int32: std::get>(attribute.arrays)[idx] = std::get(attribute.null_values); break; + case AttributeType::int64: std::get>(attribute.arrays)[idx] = std::get(attribute.null_values); break; + case AttributeType::float32: std::get>(attribute.arrays)[idx] = std::get(attribute.null_values); break; + case AttributeType::float64: std::get>(attribute.arrays)[idx] = std::get(attribute.null_values); break; + case AttributeType::string: + { + const auto & null_value_ref = std::get(attribute.null_values); + auto & string_ref = std::get>(attribute.arrays)[idx]; + if (string_ref.data == null_value_ref.data()) + return; + + delete[] string_ref.data; + + string_ref = StringRef{null_value_ref}; + + break; + } + } + } + void setAttributeValue(attribute_t & attribute, const id_t idx, const Field & value) const { switch (attribute.type) @@ -485,7 +539,8 @@ private: { const auto & string = value.get(); auto & string_ref = std::get>(attribute.arrays)[idx]; - if (string_ref.data) + const auto & null_value_ref = std::get(attribute.null_values); + if (string_ref.data != null_value_ref.data()) delete[] string_ref.data; const auto size = string.size(); diff --git a/dbms/include/DB/Dictionaries/ClickHouseDictionarySource.h b/dbms/include/DB/Dictionaries/ClickHouseDictionarySource.h index df6f42cc888..e268769bcc9 100644 --- a/dbms/include/DB/Dictionaries/ClickHouseDictionarySource.h +++ b/dbms/include/DB/Dictionaries/ClickHouseDictionarySource.h @@ -11,7 +11,7 @@ namespace DB { -const auto max_connections = 1; +const auto max_connections = 16; /** Allows loading dictionaries from local or remote ClickHouse instance * @todo use ConnectionPoolWithFailover @@ -75,56 +75,70 @@ public: DictionarySourcePtr clone() const override { return std::make_unique(*this); } private: - /// @todo escape table and column names static std::string composeLoadAllQuery(const Block & block, const std::string & table) { - std::string query{"SELECT "}; + std::string query; - auto first = true; - for (const auto idx : ext::range(0, block.columns())) { - if (!first) - query += ", "; + WriteBufferFromString out{query}; + writeString("SELECT ", out); - query += block.getByPosition(idx).name; - first = false; + auto first = true; + for (const auto idx : ext::range(0, block.columns())) + { + if (!first) + writeString(", ", out); + + writeString(block.getByPosition(idx).name, out); + first = false; + } + + writeString(" FROM ", out); + writeProbablyBackQuotedString(table, out); + writeChar(';', out); } - query += " FROM " + table + ';'; - return query; } std::string composeLoadIdsQuery(const std::vector ids) { - std::string query{"SELECT "}; + std::string query; - auto first = true; - for (const auto idx : ext::range(0, sample_block.columns())) { - if (!first) - query += ", "; + WriteBufferFromString out{query}; + writeString("SELECT ", out); - first = false; - query += sample_block.getByPosition(idx).name; + auto first = true; + for (const auto idx : ext::range(0, sample_block.columns())) + { + if (!first) + writeString(", ", out); + + writeString(sample_block.getByPosition(idx).name, out); + first = false; + } + + const auto & id_column_name = sample_block.getByPosition(0).name; + writeString(" FROM ", out); + writeProbablyBackQuotedString(table, out); + writeString(" WHERE ", out); + writeProbablyBackQuotedString(id_column_name, out); + writeString(" IN (", out); + + first = true; + for (const auto id : ids) + { + if (!first) + writeString(", ", out); + + first = false; + writeString(toString(id), out); + } + + writeString(");", out); } - const auto & id_column_name = sample_block.getByPosition(0).name; - - query += " FROM " + table + " WHERE " + id_column_name + " IN ("; - - first = true; - for (const auto id : ids) - { - if (!first) - query += ','; - - first = false; - query += toString(id); - } - - query += ");"; - return query; } diff --git a/dbms/include/DB/Dictionaries/FlatDictionary.h b/dbms/include/DB/Dictionaries/FlatDictionary.h index 0fb1a1ce740..72b55217569 100644 --- a/dbms/include/DB/Dictionaries/FlatDictionary.h +++ b/dbms/include/DB/Dictionaries/FlatDictionary.h @@ -165,6 +165,7 @@ private: { const auto size = dict_struct.attributes.size(); attributes.reserve(size); + for (const auto & attribute : dict_struct.attributes) { attribute_index_by_name.emplace(attribute.name, attributes.size()); @@ -275,56 +276,16 @@ private: switch (attribute.type) { - case AttributeType::uint8: - { - setAttributeValueImpl(attribute, id, value.get()); - break; - } - case AttributeType::uint16: - { - setAttributeValueImpl(attribute, id, value.get()); - break; - } - case AttributeType::uint32: - { - setAttributeValueImpl(attribute, id, value.get()); - break; - } - case AttributeType::uint64: - { - setAttributeValueImpl(attribute, id, value.get()); - break; - } - case AttributeType::int8: - { - setAttributeValueImpl(attribute, id, value.get()); - break; - } - case AttributeType::int16: - { - setAttributeValueImpl(attribute, id, value.get()); - break; - } - case AttributeType::int32: - { - setAttributeValueImpl(attribute, id, value.get()); - break; - } - case AttributeType::int64: - { - setAttributeValueImpl(attribute, id, value.get()); - break; - } - case AttributeType::float32: - { - setAttributeValueImpl(attribute, id, value.get()); - break; - } - case AttributeType::float64: - { - setAttributeValueImpl(attribute, id, value.get()); - break; - } + case AttributeType::uint8: setAttributeValueImpl(attribute, id, value.get()); break; + case AttributeType::uint16: setAttributeValueImpl(attribute, id, value.get()); break; + case AttributeType::uint32: setAttributeValueImpl(attribute, id, value.get()); break; + case AttributeType::uint64: setAttributeValueImpl(attribute, id, value.get()); break; + case AttributeType::int8: setAttributeValueImpl(attribute, id, value.get()); break; + case AttributeType::int16: setAttributeValueImpl(attribute, id, value.get()); break; + case AttributeType::int32: setAttributeValueImpl(attribute, id, value.get()); break; + case AttributeType::int64: setAttributeValueImpl(attribute, id, value.get()); break; + case AttributeType::float32: setAttributeValueImpl(attribute, id, value.get()); break; + case AttributeType::float64: setAttributeValueImpl(attribute, id, value.get()); break; case AttributeType::string: { auto & array = *std::get>>(attribute.arrays); diff --git a/dbms/include/DB/Dictionaries/HashedDictionary.h b/dbms/include/DB/Dictionaries/HashedDictionary.h index 0e27d281f2d..4a942a59fd2 100644 --- a/dbms/include/DB/Dictionaries/HashedDictionary.h +++ b/dbms/include/DB/Dictionaries/HashedDictionary.h @@ -7,6 +7,7 @@ #include #include #include +#include namespace DB { @@ -44,21 +45,15 @@ public: id_t toParent(const id_t id) const override { const auto attr = hierarchical_attribute; + const auto & map = *std::get>>(attr->maps); + const auto it = map.find(id); - const auto it = attr->uint64_map->find(id); - return it != attr->uint64_map->end() ? it->second : attr->uint64_null_value; + return it != map.end() ? it->second : std::get(attr->null_values); } void toParent(const PODArray & ids, PODArray & out) const override { - const auto & attr = *hierarchical_attribute->uint64_map; - const auto null_value = hierarchical_attribute->uint64_null_value; - - for (const auto i : ext::range(0, ids.size())) - { - const auto it = attr.find(ids[i]); - out[i] = it != attr.end() ? it->second : null_value; - } + getItems(*hierarchical_attribute, ids, out); } #define DECLARE_INDIVIDUAL_GETTER(TYPE, LC_TYPE) \ @@ -71,11 +66,10 @@ public: ErrorCodes::TYPE_MISMATCH\ };\ \ - const auto it = attribute.LC_TYPE##_map->find(id);\ - if (it != attribute.LC_TYPE##_map->end())\ - return TYPE{it->second};\ + const auto & map = *std::get>>(attribute.maps);\ + const auto it = map.find(id);\ \ - return attribute.LC_TYPE##_null_value;\ + return it != map.end() ? TYPE{it->second} : std::get(attribute.null_values);\ } DECLARE_INDIVIDUAL_GETTER(UInt8, uint8) DECLARE_INDIVIDUAL_GETTER(UInt16, uint16) @@ -87,8 +81,21 @@ public: DECLARE_INDIVIDUAL_GETTER(Int64, int64) DECLARE_INDIVIDUAL_GETTER(Float32, float32) DECLARE_INDIVIDUAL_GETTER(Float64, float64) - DECLARE_INDIVIDUAL_GETTER(String, string) #undef DECLARE_INDIVIDUAL_GETTER + String getString(const std::string & attribute_name, const id_t id) const override + { + const auto & attribute = getAttribute(attribute_name); + if (attribute.type != AttributeType::string) + throw Exception{ + "Type mismatch: attribute " + attribute_name + " has type " + toString(attribute.type), + ErrorCodes::TYPE_MISMATCH + }; + + const auto & map = *std::get>>(attribute.maps); + const auto it = map.find(id); + + return it != map.end() ? String{it->second} : std::get(attribute.null_values); + } #define DECLARE_MULTIPLE_GETTER(TYPE, LC_TYPE)\ void get##TYPE(const std::string & attribute_name, const PODArray & ids, PODArray & out) const override\ @@ -100,14 +107,7 @@ public: ErrorCodes::TYPE_MISMATCH\ };\ \ - const auto & attr = *attribute.LC_TYPE##_map;\ - const auto null_value = attribute.LC_TYPE##_null_value;\ - \ - for (const auto i : ext::range(0, ids.size()))\ - {\ - const auto it = attr.find(ids[i]);\ - out[i] = it != attr.end() ? it->second : null_value;\ - }\ + getItems(attribute, ids, out);\ } DECLARE_MULTIPLE_GETTER(UInt8, uint8) DECLARE_MULTIPLE_GETTER(UInt16, uint16) @@ -129,8 +129,8 @@ public: ErrorCodes::TYPE_MISMATCH }; - const auto & attr = *attribute.string_map; - const auto null_value = attribute.string_null_value; + const auto & attr = *std::get>>(attribute.maps); + const auto & null_value = std::get(attribute.null_values); for (const auto i : ext::range(0, ids.size())) { @@ -141,38 +141,32 @@ public: } private: - struct attribute_t + struct attribute_t final { AttributeType type; - UInt8 uint8_null_value; - UInt16 uint16_null_value; - UInt32 uint32_null_value; - UInt64 uint64_null_value; - Int8 int8_null_value; - Int16 int16_null_value; - Int32 int32_null_value; - Int64 int64_null_value; - Float32 float32_null_value; - Float64 float64_null_value; - String string_null_value; - std::unique_ptr> uint8_map; - std::unique_ptr> uint16_map; - std::unique_ptr> uint32_map; - std::unique_ptr> uint64_map; - std::unique_ptr> int8_map; - std::unique_ptr> int16_map; - std::unique_ptr> int32_map; - std::unique_ptr> int64_map; - std::unique_ptr> float32_map; - std::unique_ptr> float64_map; + std::tuple null_values; + std::tuple>, + std::unique_ptr>, + std::unique_ptr>, + std::unique_ptr>, + std::unique_ptr>, + std::unique_ptr>, + std::unique_ptr>, + std::unique_ptr>, + std::unique_ptr>, + std::unique_ptr>, + std::unique_ptr>> maps; std::unique_ptr string_arena; - std::unique_ptr> string_map; }; void createAttributes() { const auto size = dict_struct.attributes.size(); attributes.reserve(size); + for (const auto & attribute : dict_struct.attributes) { attribute_index_by_name.emplace(attribute.name, attributes.size()); @@ -214,124 +208,85 @@ private: stream->readSuffix(); } + template + void createAttributeImpl(attribute_t & attribute, const std::string & null_value) + { + std::get(attribute.null_values) = DB::parse(null_value); + std::get>>(attribute.maps) = std::make_unique>(); + } + attribute_t createAttributeWithType(const AttributeType type, const std::string & null_value) { attribute_t attr{type}; switch (type) { - case AttributeType::uint8: - attr.uint8_null_value = DB::parse(null_value); - attr.uint8_map.reset(new HashMap); - break; - case AttributeType::uint16: - attr.uint16_null_value = DB::parse(null_value); - attr.uint16_map.reset(new HashMap); - break; - case AttributeType::uint32: - attr.uint32_null_value = DB::parse(null_value); - attr.uint32_map.reset(new HashMap); - break; - case AttributeType::uint64: - attr.uint64_null_value = DB::parse(null_value); - attr.uint64_map.reset(new HashMap); - break; - case AttributeType::int8: - attr.int8_null_value = DB::parse(null_value); - attr.int8_map.reset(new HashMap); - break; - case AttributeType::int16: - attr.int16_null_value = DB::parse(null_value); - attr.int16_map.reset(new HashMap); - break; - case AttributeType::int32: - attr.int32_null_value = DB::parse(null_value); - attr.int32_map.reset(new HashMap); - break; - case AttributeType::int64: - attr.int64_null_value = DB::parse(null_value); - attr.int64_map.reset(new HashMap); - break; - case AttributeType::float32: - attr.float32_null_value = DB::parse(null_value); - attr.float32_map.reset(new HashMap); - break; - case AttributeType::float64: - attr.float64_null_value = DB::parse(null_value); - attr.float64_map.reset(new HashMap); - break; + case AttributeType::uint8: createAttributeImpl(attr, null_value); break; + case AttributeType::uint16: createAttributeImpl(attr, null_value); break; + case AttributeType::uint32: createAttributeImpl(attr, null_value); break; + case AttributeType::uint64: createAttributeImpl(attr, null_value); break; + case AttributeType::int8: createAttributeImpl(attr, null_value); break; + case AttributeType::int16: createAttributeImpl(attr, null_value); break; + case AttributeType::int32: createAttributeImpl(attr, null_value); break; + case AttributeType::int64: createAttributeImpl(attr, null_value); break; + case AttributeType::float32: createAttributeImpl(attr, null_value); break; + case AttributeType::float64: createAttributeImpl(attr, null_value); break; case AttributeType::string: - attr.string_null_value = null_value; - attr.string_arena.reset(new Arena); - attr.string_map.reset(new HashMap); + { + const auto & null_value_ref = std::get(attr.null_values) = DB::parse(null_value); + std::get>>(attr.maps) = + std::make_unique>(); + attr.string_arena = std::make_unique(); break; + } } return attr; } + template + void getItems(const attribute_t & attribute, const PODArray & ids, PODArray & out) const + { + const auto & attr = *std::get>>(attribute.maps); + const auto null_value = std::get(attribute.null_values); + + for (const auto i : ext::range(0, ids.size())) + { + const auto it = attr.find(ids[i]); + out[i] = it != attr.end() ? it->second : null_value; + } + } + + template + void setAttributeValueImpl(attribute_t & attribute, const id_t id, const T value) + { + auto & map = *std::get>>(attribute.maps); + map.insert({ id, value }); + } + void setAttributeValue(attribute_t & attribute, const id_t id, const Field & value) { switch (attribute.type) { - case AttributeType::uint8: - { - attribute.uint8_map->insert({ id, value.get() }); - break; - } - case AttributeType::uint16: - { - attribute.uint16_map->insert({ id, value.get() }); - break; - } - case AttributeType::uint32: - { - attribute.uint32_map->insert({ id, value.get() }); - break; - } - case AttributeType::uint64: - { - attribute.uint64_map->insert({ id, value.get() }); - break; - } - case AttributeType::int8: - { - attribute.int8_map->insert({ id, value.get() }); - break; - } - case AttributeType::int16: - { - attribute.int16_map->insert({ id, value.get() }); - break; - } - case AttributeType::int32: - { - attribute.int32_map->insert({ id, value.get() }); - break; - } - case AttributeType::int64: - { - attribute.int64_map->insert({ id, value.get() }); - break; - } - case AttributeType::float32: - { - attribute.float32_map->insert({ id, value.get() }); - break; - } - case AttributeType::float64: - { - attribute.float64_map->insert({ id, value.get() }); - break; - } + case AttributeType::uint8: setAttributeValueImpl(attribute, id, value.get()); break; + case AttributeType::uint16: setAttributeValueImpl(attribute, id, value.get()); break; + case AttributeType::uint32: setAttributeValueImpl(attribute, id, value.get()); break; + case AttributeType::uint64: setAttributeValueImpl(attribute, id, value.get()); break; + case AttributeType::int8: setAttributeValueImpl(attribute, id, value.get()); break; + case AttributeType::int16: setAttributeValueImpl(attribute, id, value.get()); break; + case AttributeType::int32: setAttributeValueImpl(attribute, id, value.get()); break; + case AttributeType::int64: setAttributeValueImpl(attribute, id, value.get()); break; + case AttributeType::float32: setAttributeValueImpl(attribute, id, value.get()); break; + case AttributeType::float64: setAttributeValueImpl(attribute, id, value.get()); break; case AttributeType::string: { + auto & map = *std::get>>(attribute.maps); const auto & string = value.get(); const auto string_in_arena = attribute.string_arena->insert(string.data(), string.size()); - attribute.string_map->insert({ id, StringRef{string_in_arena, string.size()} }); + map.insert({ id, StringRef{string_in_arena, string.size()} }); break; } - }; + } } const attribute_t & getAttribute(const std::string & attribute_name) const diff --git a/dbms/include/DB/Dictionaries/MySQLDictionarySource.h b/dbms/include/DB/Dictionaries/MySQLDictionarySource.h index 6c7b16ed3f8..952a4bad9fa 100644 --- a/dbms/include/DB/Dictionaries/MySQLDictionarySource.h +++ b/dbms/include/DB/Dictionaries/MySQLDictionarySource.h @@ -41,10 +41,10 @@ public: BlockInputStreamPtr loadIds(const std::vector ids) override { - throw Exception{ - "Method unsupported", - ErrorCodes::NOT_IMPLEMENTED - }; + last_modification = getLastModification(); + const auto query = composeLoadIdsQuery(ids); + + return new MySQLBlockInputStream{pool.Get()->query(query), sample_block, max_block_size}; } bool isModified() const override { return getLastModification() > last_modification; } @@ -76,22 +76,69 @@ private: return mysqlxx::DateTime{std::time(nullptr)}; } - /// @todo escape table and column names static std::string composeLoadAllQuery(const Block & block, const std::string & table) { - std::string query{"SELECT "}; + std::string query; - auto first = true; - for (const auto idx : ext::range(0, block.columns())) { - if (!first) - query += ", "; + WriteBufferFromString out{query}; + writeString("SELECT ", out); - query += block.getByPosition(idx).name; - first = false; + auto first = true; + for (const auto idx : ext::range(0, block.columns())) + { + if (!first) + writeString(", ", out); + + writeString(block.getByPosition(idx).name, out); + first = false; + } + + writeString(" FROM ", out); + writeProbablyBackQuotedString(table, out); + writeChar(';', out); } - query += " FROM " + table + ';'; + return query; + } + + std::string composeLoadIdsQuery(const std::vector ids) + { + std::string query; + + { + WriteBufferFromString out{query}; + writeString("SELECT ", out); + + auto first = true; + for (const auto idx : ext::range(0, sample_block.columns())) + { + if (!first) + writeString(", ", out); + + writeString(sample_block.getByPosition(idx).name, out); + first = false; + } + + const auto & id_column_name = sample_block.getByPosition(0).name; + writeString(" FROM ", out); + writeProbablyBackQuotedString(table, out); + writeString(" WHERE ", out); + writeProbablyBackQuotedString(id_column_name, out); + writeString(" IN (", out); + + first = true; + for (const auto id : ids) + { + if (!first) + writeString(", ", out); + + first = false; + writeString(toString(id), out); + } + + writeString(");", out); + } return query; } diff --git a/dbms/include/DB/Functions/FunctionsCoding.h b/dbms/include/DB/Functions/FunctionsCoding.h index 403dd0eaaff..13cd8131e84 100644 --- a/dbms/include/DB/Functions/FunctionsCoding.h +++ b/dbms/include/DB/Functions/FunctionsCoding.h @@ -1324,7 +1324,6 @@ public: if (col_fstr_in) { ColumnString * col_str = new ColumnString; - col_res = col_str; ColumnString::Chars_t & out_vec = col_str->getChars(); diff --git a/dbms/include/DB/Functions/FunctionsDateTime.h b/dbms/include/DB/Functions/FunctionsDateTime.h index 71f4390294b..e35c1051f7d 100644 --- a/dbms/include/DB/Functions/FunctionsDateTime.h +++ b/dbms/include/DB/Functions/FunctionsDateTime.h @@ -408,6 +408,7 @@ public: if (const ColumnUInt32 * times = typeid_cast(&*block.getByPosition(arguments[0]).column)) { ColumnUInt32 * res = new ColumnUInt32; + ColumnPtr res_holder = res; ColumnUInt32::Container_t & res_vec = res->getData(); const ColumnUInt32::Container_t & vec = times->getData(); @@ -417,7 +418,7 @@ public: for (size_t i = 0; i < size; ++i) res_vec[i] = vec[i] / TIME_SLOT_SIZE * TIME_SLOT_SIZE; - block.getByPosition(result).column = res; + block.getByPosition(result).column = res_holder; } else if (const ColumnConstUInt32 * const_times = typeid_cast(&*block.getByPosition(arguments[0]).column)) { @@ -551,22 +552,23 @@ public: const ColumnConstUInt32 * const_durations = typeid_cast(&*block.getByPosition(arguments[1]).column); ColumnArray * res = new ColumnArray(new ColumnUInt32); + ColumnPtr res_holder = res; ColumnUInt32::Container_t & res_values = typeid_cast(res->getData()).getData(); if (starts && durations) { TimeSlotsImpl::vector_vector(starts->getData(), durations->getData(), res_values, res->getOffsets()); - block.getByPosition(result).column = res; + block.getByPosition(result).column = res_holder; } else if (starts && const_durations) { TimeSlotsImpl::vector_constant(starts->getData(), const_durations->getData(), res_values, res->getOffsets()); - block.getByPosition(result).column = res; + block.getByPosition(result).column = res_holder; } else if (const_starts && durations) { TimeSlotsImpl::constant_vector(const_starts->getData(), durations->getData(), res_values, res->getOffsets()); - block.getByPosition(result).column = res; + block.getByPosition(result).column = res_holder; } else if (const_starts && const_durations) { diff --git a/dbms/include/DB/Functions/FunctionsDictionaries.h b/dbms/include/DB/Functions/FunctionsDictionaries.h index e9125da5e51..26383f44d3f 100644 --- a/dbms/include/DB/Functions/FunctionsDictionaries.h +++ b/dbms/include/DB/Functions/FunctionsDictionaries.h @@ -1107,26 +1107,61 @@ private: const auto id_col_untyped = block.getByPosition(arguments[1]).column.get(); if (const auto id_col = typeid_cast *>(id_col_untyped)) { + const auto & in = id_col->getData(); + const auto size = in.size(); + + /// copy of `in` array + auto in_array = std::make_unique>(std::begin(in), std::end(in)); + /// used for storing and handling result of ::toParent call + auto out_array = std::make_unique>(size); + /// resulting hierarchies + std::vector> hierarchies(size); + + /// total number of non-zero elements, used for allocating all the required memory upfront + std::size_t total_count = 0; + + while (true) + { + auto all_zeroes = true; + + /// erase zeroed identifiers, store non-zeroed ones + for (const auto i : ext::range(0, size)) + { + const auto id = (*in_array)[i]; + if (0 == id) + continue; + + all_zeroes = false; + /// place id at it's corresponding place + hierarchies[i].push_back(id); + + ++total_count; + } + + if (all_zeroes) + break; + + /// translate all non-zero identifiers at once + dictionary->toParent(*in_array, *out_array); + + /// we're going to use the `in_array` from this iteration as `out_array` on the next one + std::swap(in_array, out_array); + } + const auto backend = new ColumnVector; const auto array = new ColumnArray{backend}; block.getByPosition(result).column = array; - const auto & in = id_col->getData(); - const auto size = in.size(); auto & out = backend->getData(); auto & offsets = array->getOffsets(); + out.reserve(total_count); offsets.resize(size); - out.reserve(size * 4); - for (const auto idx : ext::range(0, size)) + for (const auto i : ext::range(0, size)) { - IDictionary::id_t cur = in[idx]; - while (cur) - { - out.push_back(cur); - cur = dictionary->toParent(cur); - } - offsets[idx] = out.size(); + const auto & ids = hierarchies[i]; + out.insert_assume_reserved(std::begin(ids), std::end(ids)); + offsets[i] = out.size(); } } else if (const auto id_col = typeid_cast *>(id_col_untyped)) diff --git a/dbms/include/DB/Functions/FunctionsString.h b/dbms/include/DB/Functions/FunctionsString.h index f576cb3a95b..14ea47f5478 100644 --- a/dbms/include/DB/Functions/FunctionsString.h +++ b/dbms/include/DB/Functions/FunctionsString.h @@ -1408,23 +1408,6 @@ private: ErrorCodes::ILLEGAL_COLUMN }; } - - static void vector(const ColumnString::Chars_t & data, const ColumnString::Offsets_t & offsets, - ColumnString::Chars_t & res_data, ColumnString::Offsets_t & res_offsets) - { - res_data.resize(data.size()); - res_offsets.assign(offsets); - size_t size = offsets.size(); - - ColumnString::Offset_t prev_offset = 0; - for (size_t i = 0; i < size; ++i) - { - for (size_t j = prev_offset; j < offsets[i] - 1; ++j) - res_data[j] = data[offsets[i] + prev_offset - 2 - j]; - res_data[offsets[i] - 1] = 0; - prev_offset = offsets[i]; - } - } }; diff --git a/dbms/include/DB/Functions/FunctionsStringArray.h b/dbms/include/DB/Functions/FunctionsStringArray.h index 12f8f994b94..30c1b532f65 100644 --- a/dbms/include/DB/Functions/FunctionsStringArray.h +++ b/dbms/include/DB/Functions/FunctionsStringArray.h @@ -338,6 +338,7 @@ public: typeid_cast(&*block.getByPosition(arrayArgumentPosition).column); ColumnArray * col_res = new ColumnArray(new ColumnString); + ColumnPtr col_res_holder = col_res; ColumnString & res_strings = typeid_cast(col_res->getData()); ColumnArray::Offsets_t & res_offsets = col_res->getOffsets(); ColumnString::Chars_t & res_strings_chars = res_strings.getChars(); @@ -385,7 +386,7 @@ public: res_offsets.push_back(current_dst_offset); } - block.getByPosition(result).column = col_res; + block.getByPosition(result).column = col_res_holder; } else if (col_const_str) { diff --git a/dbms/include/DB/IO/BufferWithOwnMemory.h b/dbms/include/DB/IO/BufferWithOwnMemory.h index 144acdfd853..09034322bcc 100644 --- a/dbms/include/DB/IO/BufferWithOwnMemory.h +++ b/dbms/include/DB/IO/BufferWithOwnMemory.h @@ -113,6 +113,7 @@ private: return; free(reinterpret_cast(m_data)); + m_data = nullptr; /// Чтобы избежать double free, если последующий вызов alloc кинет исключение. if (current_memory_tracker) current_memory_tracker->free(m_capacity); diff --git a/dbms/include/DB/Interpreters/ExternalDictionaries.h b/dbms/include/DB/Interpreters/ExternalDictionaries.h index 171d1a39242..c067bdc78c7 100644 --- a/dbms/include/DB/Interpreters/ExternalDictionaries.h +++ b/dbms/include/DB/Interpreters/ExternalDictionaries.h @@ -40,6 +40,8 @@ private: mutable std::mutex dictionaries_mutex; std::unordered_map>> dictionaries; + /// exception pointers for notifying user about failures on dictionary creation + std::unordered_map stored_exceptions; std::unordered_map update_times; std::mt19937_64 rnd_engine{getSeed()}; @@ -52,24 +54,6 @@ private: Poco::Timestamp config_last_modified{0}; - void handleException() const - { - try - { - throw; - } - catch (const Poco::Exception & e) - { - LOG_ERROR(log, "Cannot load exter dictionary! You must resolve this manually. " << e.displayText()); - return; - } - catch (...) - { - LOG_ERROR(log, "Cannot load dictionary! You must resolve this manually."); - return; - } - } - void reloadImpl(); void reloadPeriodically() @@ -110,10 +94,16 @@ public: const std::lock_guard lock{dictionaries_mutex}; const auto it = dictionaries.find(name); if (it == std::end(dictionaries)) - throw Exception{ - "No such dictionary: " + name, - ErrorCodes::BAD_ARGUMENTS - }; + { + const auto exception_it = stored_exceptions.find(name); + if (exception_it != std::end(stored_exceptions)) + std::rethrow_exception(exception_it->second); + else + throw Exception{ + "No such dictionary: " + name, + ErrorCodes::BAD_ARGUMENTS + }; + } return it->second->get(); } diff --git a/dbms/include/DB/Interpreters/Join.h b/dbms/include/DB/Interpreters/Join.h index c40d333599d..ff232ad4cef 100644 --- a/dbms/include/DB/Interpreters/Join.h +++ b/dbms/include/DB/Interpreters/Join.h @@ -5,7 +5,6 @@ #include #include -#include #include #include @@ -67,7 +66,7 @@ public: { } - bool empty() { return type == Set::EMPTY; } + bool empty() { return type == Type::EMPTY; } /** Добавить в отображение для соединения блок "правой" таблицы. * Возвращает false, если превышено какое-нибудь ограничение, и больше не нужно вставлять. @@ -155,7 +154,17 @@ private: /// Дополнительные данные - строки, а также продолжения односвязных списков строк. Arena pool; - Set::Type type = Set::EMPTY; + enum class Type + { + EMPTY, + KEY_64, + KEY_STRING, + HASHED, + }; + + Type type = Type::EMPTY; + + static Type chooseMethod(const ConstColumnPlainPtrs & key_columns, bool & keys_fit_128_bits, Sizes & key_sizes); bool keys_fit_128_bits; Sizes key_sizes; @@ -174,7 +183,7 @@ private: */ mutable Poco::RWLock rwlock; - void init(Set::Type type_); + void init(Type type_); template void insertFromBlockImpl(Maps & maps, size_t rows, const ConstColumnPlainPtrs & key_columns, size_t keys_size, Block * stored_block); diff --git a/dbms/include/DB/Interpreters/Set.h b/dbms/include/DB/Interpreters/Set.h index 809d2963010..4b45f73ae96 100644 --- a/dbms/include/DB/Interpreters/Set.h +++ b/dbms/include/DB/Interpreters/Set.h @@ -24,6 +24,245 @@ namespace DB { +/** Методы для разных вариантов реализации множеств. + * Используются в качестве параметра шаблона. + */ + + +/// Для случая, когда есть один числовой ключ. +template /// UInt8/16/32/64 для любых типов соответствующей битности. +struct SetMethodOneNumber +{ + typedef TData Data; + typedef typename Data::key_type Key; + + Data data; + + /// Для использования одного Method в разных потоках, используйте разные State. + struct State + { + const FieldType * vec; + + /** Вызывается в начале обработки каждого блока. + * Устанавливает переменные, необходимые для остальных методов, вызываемых во внутренних циклах. + */ + void init(const ConstColumnPlainPtrs & key_columns) + { + vec = &static_cast *>(key_columns[0])->getData()[0]; + } + + /// Достать из ключевых столбцов ключ для вставки в хэш-таблицу. + Key getKey( + const ConstColumnPlainPtrs & key_columns, /// Ключевые столбцы. + size_t keys_size, /// Количество ключевых столбцов. + size_t i, /// Из какой строки блока достать ключ. + const Sizes & key_sizes) const /// Если ключи фиксированной длины - их длины. Не используется в методах по ключам переменной длины. + { + return unionCastToUInt64(vec[i]); + } + }; + + /** Разместить дополнительные данные, если это необходимо, в случае, когда в хэш-таблицу был вставлен новый ключ. + */ + static void onNewKey(typename Data::value_type & value, size_t keys_size, size_t i, Arena & pool) {} +}; + +/// Для случая, когда есть один строковый ключ. +template +struct SetMethodString +{ + typedef TData Data; + typedef typename Data::key_type Key; + + Data data; + + struct State + { + const ColumnString::Offsets_t * offsets; + const ColumnString::Chars_t * chars; + + void init(const ConstColumnPlainPtrs & key_columns) + { + const IColumn & column = *key_columns[0]; + const ColumnString & column_string = static_cast(column); + offsets = &column_string.getOffsets(); + chars = &column_string.getChars(); + } + + Key getKey( + const ConstColumnPlainPtrs & key_columns, + size_t keys_size, + size_t i, + const Sizes & key_sizes) const + { + return StringRef( + &(*chars)[i == 0 ? 0 : (*offsets)[i - 1]], + (i == 0 ? (*offsets)[i] : ((*offsets)[i] - (*offsets)[i - 1])) - 1); + } + }; + + static void onNewKey(typename Data::value_type & value, size_t keys_size, size_t i, Arena & pool) + { + value.data = pool.insert(value.data, value.size); + } +}; + +/// Для случая, когда есть один строковый ключ фиксированной длины. +template +struct SetMethodFixedString +{ + typedef TData Data; + typedef typename Data::key_type Key; + + Data data; + + struct State + { + size_t n; + const ColumnFixedString::Chars_t * chars; + + void init(const ConstColumnPlainPtrs & key_columns) + { + const IColumn & column = *key_columns[0]; + const ColumnFixedString & column_string = static_cast(column); + n = column_string.getN(); + chars = &column_string.getChars(); + } + + Key getKey( + const ConstColumnPlainPtrs & key_columns, + size_t keys_size, + size_t i, + const Sizes & key_sizes) const + { + return StringRef(&(*chars)[i * n], n); + } + }; + + static void onNewKey(typename Data::value_type & value, size_t keys_size, size_t i, Arena & pool) + { + value.data = pool.insert(value.data, value.size); + } +}; + +/// Для случая, когда все ключи фиксированной длины, и они помещаются в N (например, 128) бит. +template +struct SetMethodKeysFixed +{ + typedef TData Data; + typedef typename Data::key_type Key; + + Data data; + + struct State + { + void init(const ConstColumnPlainPtrs & key_columns) + { + } + + Key getKey( + const ConstColumnPlainPtrs & key_columns, + size_t keys_size, + size_t i, + const Sizes & key_sizes) const + { + return packFixed(i, keys_size, key_columns, key_sizes); + } + }; + + static void onNewKey(typename Data::value_type & value, size_t keys_size, size_t i, Arena & pool) {} +}; + +/// Для остальных случаев. По 128 битному хэшу от ключа. (При этом, строки, содержащие нули посередине, могут склеиться.) +template +struct SetMethodHashed +{ + typedef TData Data; + typedef typename Data::key_type Key; + + Data data; + + struct State + { + void init(const ConstColumnPlainPtrs & key_columns) + { + } + + Key getKey( + const ConstColumnPlainPtrs & key_columns, + size_t keys_size, + size_t i, + const Sizes & key_sizes) const + { + return hash128(i, keys_size, key_columns); + } + }; + + static void onNewKey(typename Data::value_type & value, size_t keys_size, size_t i, Arena & pool) {} +}; + + +/** Разные варианты реализации множества. + */ +struct SetVariants +{ + /// TODO Использовать для этих двух вариантов bit- или byte- set. + std::unique_ptr>>> key8; + std::unique_ptr>>> key16; + + /** Также для эксперимента проверялась возможность использовать SmallSet, + * пока количество элементов в множестве небольшое (и, при необходимости, конвертировать в полноценный HashSet). + * Но этот эксперимент показал, что преимущество есть только в редких случаях. + */ + std::unique_ptr>>> key32; + std::unique_ptr>>> key64; + std::unique_ptr>> key_string; + std::unique_ptr>> key_fixed_string; + std::unique_ptr>> keys128; + std::unique_ptr>> keys256; + std::unique_ptr>> hashed; + + /** В отличие от Aggregator, здесь не используется метод concat. + * Это сделано потому что метод hashed, хоть и медленнее, но в данном случае, использует меньше оперативки. + * так как при его использовании, сами значения ключей не сохраняются. + */ + + Arena string_pool; + + #define APPLY_FOR_SET_VARIANTS(M) \ + M(key8) \ + M(key16) \ + M(key32) \ + M(key64) \ + M(key_string) \ + M(key_fixed_string) \ + M(keys128) \ + M(keys256) \ + M(hashed) + + enum class Type + { + EMPTY, + + #define M(NAME) NAME, + APPLY_FOR_SET_VARIANTS(M) + #undef M + }; + + Type type = Type::EMPTY; + + bool empty() const { return type == Type::EMPTY; } + + static Type chooseMethod(const ConstColumnPlainPtrs & key_columns, Sizes & key_sizes); + + void init(Type type_); + + size_t getTotalRowCount() const; + /// Считает размер в байтах буфера Set и размер string_pool'а + size_t getTotalByteCount() const; +}; + + /** Структура данных для реализации выражения IN. */ class Set @@ -37,7 +276,7 @@ public: { } - bool empty() { return type == EMPTY; } + bool empty() const { return data.empty(); } /** Создать множество по выражению (для перечисления в самом запросе). * types - типы того, что стоит слева от IN. @@ -49,11 +288,6 @@ public: // Возвращает false, если превышено какое-нибудь ограничение, и больше не нужно вставлять. bool insertFromBlock(const Block & block, bool create_ordered_set = false); - /// Считает суммарное число ключей во всех Set'ах - size_t getTotalRowCount() const; - /// Считает суммарный размер в байтах буфферов всех Set'ов + размер string_pool'а - size_t getTotalByteCount() const; - /** Для указанных столбцов блока проверить принадлежность их значений множеству. * Записать результат в столбец в позиции result. */ @@ -80,43 +314,14 @@ public: /// проверяет есть ли в Set элементы для заданного диапазона индекса BoolMask mayBeTrueInRange(const Range & range); - enum Type - { - EMPTY = 0, - KEY_64 = 1, - KEY_STRING = 2, - HASHED = 3, - }; - - static Type chooseMethod(const ConstColumnPlainPtrs & key_columns, bool & keys_fit_128_bits, Sizes & key_sizes); + size_t getTotalRowCount() const { return data.getTotalRowCount(); } + size_t getTotalByteCount() const { return data.getTotalByteCount(); } private: - /** Разные структуры данных, которые могут использоваться для проверки принадлежности - * одного или нескольких столбцов значений множеству. - */ - typedef HashSet> SetUInt64; - typedef HashSetWithSavedHash SetString; - typedef HashSet SetHashed; - - /// Специализация для случая, когда есть один числовой ключ. - std::unique_ptr key64; - - /// Специализация для случая, когда есть один строковый ключ. - std::unique_ptr key_string; - Arena string_pool; - - /** Сравнивает 128 битные хэши. - * Если все ключи фиксированной длины, влезающие целиком в 128 бит, то укладывает их без изменений в 128 бит. - * Иначе - вычисляет SipHash от набора из всех ключей. - * (При этом, строки, содержащие нули посередине, могут склеиться.) - */ - std::unique_ptr hashed; - - Type type = EMPTY; - - bool keys_fit_128_bits; Sizes key_sizes; + SetVariants data; + /** Типы данных, из которых было создано множество. * При проверке на принадлежность множеству, типы проверяемых столбцов должны с ними совпадать. */ @@ -129,24 +334,7 @@ private: size_t max_bytes; OverflowMode overflow_mode; - void init(Type type_) - { - type = type_; - - switch (type) - { - case EMPTY: break; - case KEY_64: key64 .reset(new SetUInt64); break; - case KEY_STRING: key_string .reset(new SetString); break; - case HASHED: hashed .reset(new SetHashed); break; - - default: - throw Exception("Unknown Set variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT); - } - } - /// Если в левой части IN стоит массив. Проверяем, что хоть один элемент массива лежит в множестве. - void executeConstArray(const ColumnConstArray * key_column, ColumnUInt8::Container_t & vec_res, bool negative) const; void executeArray(const ColumnArray * key_column, ColumnUInt8::Container_t & vec_res, bool negative) const; /// Если в левой части набор столбцов тех же типов, что элементы множества. @@ -155,8 +343,8 @@ private: /// Проверить не превышены ли допустимые размеры множества ключей bool checkSetSizeLimits() const; - /// вектор упорядоченных элементов Set - /// нужен для работы индекса по первичному ключу в секции In + /// Вектор упорядоченных элементов Set. + /// Нужен для работы индекса по первичному ключу в операторе IN. typedef std::vector OrderedSetElements; typedef std::unique_ptr OrderedSetElementsPtr; OrderedSetElementsPtr ordered_set_elements; @@ -167,6 +355,31 @@ private: * Поэтому остальные функции по работе с множеством, не защинены. */ mutable Poco::RWLock rwlock; + + + template + void insertFromBlockImpl( + Method & method, + const ConstColumnPlainPtrs & key_columns, + size_t rows, + SetVariants & variants); + + template + void executeImpl( + Method & method, + const ConstColumnPlainPtrs & key_columns, + ColumnUInt8::Container_t & vec_res, + bool negative, + size_t rows) const; + + template + void executeArrayImpl( + Method & method, + const ConstColumnPlainPtrs & key_columns, + const ColumnArray::Offsets_t & offsets, + ColumnUInt8::Container_t & vec_res, + bool negative, + size_t rows) const; }; typedef Poco::SharedPtr SetPtr; diff --git a/dbms/include/DB/Interpreters/Settings.h b/dbms/include/DB/Interpreters/Settings.h index 119628bfc8e..3f886c921f8 100644 --- a/dbms/include/DB/Interpreters/Settings.h +++ b/dbms/include/DB/Interpreters/Settings.h @@ -108,7 +108,7 @@ struct Settings M(SettingUInt64, merge_tree_max_rows_to_use_cache, (1024 * 1024)) \ \ /** Минимальная длина выражения expr = x1 OR ... expr = xN для оптимизации */ \ - M(SettingUInt64, optimize_min_equality_disjunction_chain_length, 4) \ + M(SettingUInt64, optimize_min_equality_disjunction_chain_length, 3) \ /// Всевозможные ограничения на выполнение запроса. Limits limits; diff --git a/dbms/include/DB/Interpreters/SpecializedAggregator.h b/dbms/include/DB/Interpreters/SpecializedAggregator.h index a0fde02b5e0..7830f5f8da2 100644 --- a/dbms/include/DB/Interpreters/SpecializedAggregator.h +++ b/dbms/include/DB/Interpreters/SpecializedAggregator.h @@ -140,7 +140,6 @@ void AggregateFunctionsCreator::operator()() for (size_t rollback_j = 0; rollback_j < column_num; ++rollback_j) func->destroy(aggregate_data + offsets_of_aggregate_states[rollback_j]); - aggregate_data = nullptr; throw; } } @@ -239,10 +238,14 @@ void NO_INLINE Aggregator::executeSpecializedCase( method.onNewKey(*it, keys_size, i, keys, *aggregates_pool); AggregateDataPtr & aggregate_data = Method::getAggregateData(it->second); - aggregate_data = aggregates_pool->alloc(total_size_of_aggregate_states); + + aggregate_data = nullptr; + AggregateDataPtr place = aggregates_pool->alloc(total_size_of_aggregate_states); AggregateFunctionsList::forEach(AggregateFunctionsCreator( - aggregate_functions, offsets_of_aggregate_states, aggregate_columns, aggregate_data)); + aggregate_functions, offsets_of_aggregate_states, aggregate_columns, place)); + + aggregate_data = place; } else method.onExistingKey(key, keys, *aggregates_pool); diff --git a/dbms/include/DB/Parsers/formatAST.h b/dbms/include/DB/Parsers/formatAST.h index f9d781e7969..811d946f044 100644 --- a/dbms/include/DB/Parsers/formatAST.h +++ b/dbms/include/DB/Parsers/formatAST.h @@ -2,32 +2,8 @@ #include +#include #include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -//#include namespace DB @@ -38,39 +14,6 @@ namespace DB */ void formatAST(const IAST & ast, std::ostream & s, size_t indent = 0, bool hilite = true, bool one_line = false, bool need_parens = false); -void formatAST(const ASTSelectQuery & ast, std::ostream & s, size_t indent = 0, bool hilite = true, bool one_line = false, bool need_parens = false); -void formatAST(const ASTCreateQuery & ast, std::ostream & s, size_t indent = 0, bool hilite = true, bool one_line = false, bool need_parens = false); -void formatAST(const ASTDropQuery & ast, std::ostream & s, size_t indent = 0, bool hilite = true, bool one_line = false, bool need_parens = false); -void formatAST(const ASTInsertQuery & ast, std::ostream & s, size_t indent = 0, bool hilite = true, bool one_line = false, bool need_parens = false); -void formatAST(const ASTRenameQuery & ast, std::ostream & s, size_t indent = 0, bool hilite = true, bool one_line = false, bool need_parens = false); -void formatAST(const ASTShowTablesQuery & ast, std::ostream & s, size_t indent = 0, bool hilite = true, bool one_line = false, bool need_parens = false); -void formatAST(const ASTUseQuery & ast, std::ostream & s, size_t indent = 0, bool hilite = true, bool one_line = false, bool need_parens = false); -void formatAST(const ASTSetQuery & ast, std::ostream & s, size_t indent = 0, bool hilite = true, bool one_line = false, bool need_parens = false); -void formatAST(const ASTOptimizeQuery & ast, std::ostream & s, size_t indent = 0, bool hilite = true, bool one_line = false, bool need_parens = false); -void formatAST(const ASTExistsQuery & ast, std::ostream & s, size_t indent = 0, bool hilite = true, bool one_line = false, bool need_parens = false); -void formatAST(const ASTDescribeQuery & ast, std::ostream & s, size_t indent = 0, bool hilite = true, bool one_line = false, bool need_parens = false); -void formatAST(const ASTShowCreateQuery & ast, std::ostream & s, size_t indent = 0, bool hilite = true, bool one_line = false, bool need_parens = false); -void formatAST(const ASTExpressionList & ast, std::ostream & s, size_t indent = 0, bool hilite = true, bool one_line = false, bool need_parens = false); -void formatAST(const ASTFunction & ast, std::ostream & s, size_t indent = 0, bool hilite = true, bool one_line = false, bool need_parens = false); -void formatAST(const ASTIdentifier & ast, std::ostream & s, size_t indent = 0, bool hilite = true, bool one_line = false, bool need_parens = false); -void formatAST(const ASTLiteral & ast, std::ostream & s, size_t indent = 0, bool hilite = true, bool one_line = false, bool need_parens = false); -void formatAST(const ASTNameTypePair & ast, std::ostream & s, size_t indent = 0, bool hilite = true, bool one_line = false, bool need_parens = false); -void formatAST(const ASTColumnDeclaration & ast, std::ostream & s, size_t indent = 0, bool hilite = true, bool one_line = false, bool need_parens = false); -void formatAST(const ASTAsterisk & ast, std::ostream & s, size_t indent = 0, bool hilite = true, bool one_line = false, bool need_parens = false); -void formatAST(const ASTOrderByElement & ast, std::ostream & s, size_t indent = 0, bool hilite = true, bool one_line = false, bool need_parens = false); -void formatAST(const ASTSubquery & ast, std::ostream & s, size_t indent = 0, bool hilite = true, bool one_line = false, bool need_parens = false); -void formatAST(const ASTAlterQuery & ast, std::ostream & s, size_t indent = 0, bool hilite = true, bool one_line = false, bool need_parens = false); -void formatAST(const ASTSet & ast, std::ostream & s, size_t indent = 0, bool hilite = true, bool one_line = false, bool need_parens = false); -void formatAST(const ASTJoin & ast, std::ostream & s, size_t indent = 0, bool hilite = true, bool one_line = false, bool need_parens = false); -void formatAST(const ASTCheckQuery & ast, std::ostream & s, size_t indent = 0, bool hilite = true, bool one_line = false, bool need_parens = false); -//void formatAST(const ASTMultiQuery & ast, std::ostream & s, size_t indent = 0, bool hilite = true, bool one_line = false, bool need_parens = false); - -void formatAST(const ASTQueryWithTableAndOutput & ast, std::string name, std::ostream & s, - size_t indent = 0, bool hilite = true, bool one_line = false, bool need_parens = false); - -void formatAST(const ASTShowProcesslistQuery & ast, std::ostream & s, - size_t indent = 0, bool hilite = true, bool one_line = false, bool need_parens = false); - String formatColumnsForCreateQuery(NamesAndTypesList & columns); String backQuoteIfNeed(const String & x); diff --git a/dbms/src/AggregateFunctions/AggregateFunctionFactory.cpp b/dbms/src/AggregateFunctions/AggregateFunctionFactory.cpp index 1ce1c1f2083..153d6600581 100644 --- a/dbms/src/AggregateFunctions/AggregateFunctionFactory.cpp +++ b/dbms/src/AggregateFunctions/AggregateFunctionFactory.cpp @@ -154,6 +154,80 @@ static IAggregateFunction * createAggregateFunctionSingleValue(const String & na } +/// argMin, argMax +template