diff --git a/dbms/include/DB/Columns/IColumn.h b/dbms/include/DB/Columns/IColumn.h index 8f7d736dc60..f6606868db5 100644 --- a/dbms/include/DB/Columns/IColumn.h +++ b/dbms/include/DB/Columns/IColumn.h @@ -48,6 +48,9 @@ public: */ virtual size_t sizeOfField() const { throw Exception("Cannot get sizeOfField() for column " + getName(), ErrorCodes::CANNOT_GET_SIZE_OF_FIELD); } + /** Создать столбец с такими же данными. */ + virtual SharedPtr clone() const { return cut(0, size()); } + /** Создать пустой столбец такого же типа */ virtual SharedPtr cloneEmpty() const { return cloneResized(0); } diff --git a/dbms/include/DB/Core/Block.h b/dbms/include/DB/Core/Block.h index a8eb9c085a3..d700e4c4f8d 100644 --- a/dbms/include/DB/Core/Block.h +++ b/dbms/include/DB/Core/Block.h @@ -111,6 +111,9 @@ public: /** Получить такой же блок, но пустой. */ Block cloneEmpty() const; + /** Получить блок со столбцами, переставленными в порядке их имён. */ + Block sortColumns() const; + /** Заменяет столбцы смещений внутри вложенных таблиц на один общий для таблицы. * Кидает исключение, если эти смещения вдруг оказались неодинаковы. */ diff --git a/dbms/include/DB/Core/NamesAndTypes.h b/dbms/include/DB/Core/NamesAndTypes.h index 96e2cf6b2da..ad3c60defa7 100644 --- a/dbms/include/DB/Core/NamesAndTypes.h +++ b/dbms/include/DB/Core/NamesAndTypes.h @@ -6,6 +6,7 @@ #include #include +#include #include #include @@ -44,121 +45,29 @@ class NamesAndTypesList : public std::list public: using std::list::list; - void readText(ReadBuffer & buf, const DataTypeFactory & data_type_factory) - { - DB::assertString("columns format version: 1\n", buf); - size_t count; - DB::readText(count, buf); - DB::assertString(" columns:\n", buf); - resize(count); - for (NameAndTypePair & it : *this) - { - DB::readBackQuotedString(it.name, buf); - DB::assertString(" ", buf); - String type_name; - DB::readString(type_name, buf); - it.type = data_type_factory.get(type_name); - DB::assertString("\n", buf); - } - } + void readText(ReadBuffer & buf, const DataTypeFactory & data_type_factory); + void writeText(WriteBuffer & buf) const; - void writeText(WriteBuffer & buf) const - { - DB::writeString("columns format version: 1\n", buf); - DB::writeText(size(), buf); - DB::writeString(" columns:\n", buf); - for (const auto & it : *this) - { - DB::writeBackQuotedString(it.name, buf); - DB::writeChar(' ', buf); - DB::writeString(it.type->getName(), buf); - DB::writeChar('\n', buf); - } - } - - String toString() const - { - String s; - { - WriteBufferFromString out(s); - writeText(out); - } - return s; - } - - static NamesAndTypesList parse(const String & s, const DataTypeFactory & data_type_factory) - { - ReadBufferFromString in(s); - NamesAndTypesList res; - res.readText(in, data_type_factory); - assertEOF(in); - return res; - } + String toString() const; + static NamesAndTypesList parse(const String & s, const DataTypeFactory & data_type_factory); /// Все элементы rhs должны быть различны. - bool isSubsetOf(const NamesAndTypesList & rhs) const - { - NamesAndTypes vector(rhs.begin(), rhs.end()); - vector.insert(vector.end(), begin(), end()); - std::sort(vector.begin(), vector.end()); - return std::unique(vector.begin(), vector.end()) == vector.begin() + rhs.size(); - } + bool isSubsetOf(const NamesAndTypesList & rhs) const; /// Расстояние Хемминга между множествами /// (иными словами, добавленные и удаленные столбцы считаются один раз; столбцы, изменившие тип, - дважды). - size_t sizeOfDifference(const NamesAndTypesList & rhs) const - { - NamesAndTypes vector(rhs.begin(), rhs.end()); - vector.insert(vector.end(), begin(), end()); - std::sort(vector.begin(), vector.end()); - return (std::unique(vector.begin(), vector.end()) - vector.begin()) * 2 - size() - rhs.size(); - } + size_t sizeOfDifference(const NamesAndTypesList & rhs) const; - Names getNames() const - { - Names res; - res.reserve(size()); - for (const NameAndTypePair & column : *this) - { - res.push_back(column.name); - } - return res; - } + Names getNames() const; /// Оставить только столбцы, имена которых есть в names. В names могут быть лишние столбцы. - NamesAndTypesList filter(const NameSet & names) const - { - NamesAndTypesList res; - for (const NameAndTypePair & column : *this) - { - if (names.count(column.name)) - res.push_back(column); - } - return res; - } + NamesAndTypesList filter(const NameSet & names) const; /// Оставить только столбцы, имена которых есть в names. В names могут быть лишние столбцы. - NamesAndTypesList filter(const Names & names) const - { - return filter(NameSet(names.begin(), names.end())); - } + NamesAndTypesList filter(const Names & names) const; /// В отличие от filter, возвращает столбцы в том порядке, в котором они идут в names. - NamesAndTypesList addTypes(const Names & names) const - { - std::map types; - for (const NameAndTypePair & column : *this) - types[column.name] = column.type; - NamesAndTypesList res; - for (const String & name : names) - { - auto it = types.find(name); - if (it == types.end()) - throw Exception("No column " + name, ErrorCodes::THERE_IS_NO_COLUMN); - res.push_back(NameAndTypePair(name, it->second)); - } - return res; - } + NamesAndTypesList addTypes(const Names & names) const; }; typedef SharedPtr NamesAndTypesListPtr; diff --git a/dbms/include/DB/DataStreams/IProfilingBlockInputStream.h b/dbms/include/DB/DataStreams/IProfilingBlockInputStream.h index b8308083d0a..ac04ae593d8 100644 --- a/dbms/include/DB/DataStreams/IProfilingBlockInputStream.h +++ b/dbms/include/DB/DataStreams/IProfilingBlockInputStream.h @@ -65,7 +65,13 @@ public: * - проверяются ограничения и квоты, которые должны быть проверены не в рамках одного источника, * а над общим количеством потраченных ресурсов во всех источниках сразу (информация в ProcessList-е). */ - virtual void progress(const Progress & value) { progressImpl(value); } + virtual void progress(const Progress & value) + { + /// Данные для прогресса берутся из листовых источников. + if (children.empty()) + progressImpl(value); + } + void progressImpl(const Progress & value); @@ -77,6 +83,10 @@ public: */ void setProcessListElement(ProcessList::Element * elem); + /** Установить информацию о приблизительном общем количестве строк, которых нужно прочитать. + */ + void setTotalRowsApprox(size_t value) { total_rows_approx = value; } + /** Попросить прервать получение данных как можно скорее. * По-умолчанию - просто выставляет флаг is_cancelled и просит прерваться всех детей. @@ -161,6 +171,10 @@ protected: Block totals; /// Минимумы и максимумы. Первая строчка блока - минимумы, вторая - максимумы. Block extremes; + /// Приблизительное общее количество строк, которых нужно прочитать. Для прогресс-бара. + size_t total_rows_approx = 0; + /// Информация о приблизительном общем количестве строк собрана в родительском источнике. + bool collected_total_rows_approx = false; /// Ограничения и квоты. @@ -182,6 +196,14 @@ protected: */ bool checkLimits(); void checkQuota(Block & block); + + /// Собрать информацию о приблизительном общем числе строк по всем детям. + void collectTotalRowsApprox(); + + /** Передать информацию о приблизительном общем числе строк в колбэк прогресса. + * Сделано так, что отправка происходит лишь в верхнем источнике. + */ + void collectAndSendTotalRowsApprox(); }; } diff --git a/dbms/include/DB/DataStreams/JSONRowOutputStream.h b/dbms/include/DB/DataStreams/JSONRowOutputStream.h index d0139b50d08..7ef012b3a24 100644 --- a/dbms/include/DB/DataStreams/JSONRowOutputStream.h +++ b/dbms/include/DB/DataStreams/JSONRowOutputStream.h @@ -4,7 +4,6 @@ #include #include -#include #include #include @@ -26,7 +25,13 @@ public: void writePrefix() override; void writeSuffix() override; - void flush() override { ostr.next(); dst_ostr.next(); } + void flush() override + { + ostr->next(); + + if (validating_ostr) + dst_ostr.next(); + } void setRowsBeforeLimit(size_t rows_before_limit_) override { @@ -44,11 +49,13 @@ protected: virtual void writeExtremes(); WriteBuffer & dst_ostr; - WriteBufferValidUTF8 ostr; /// Валидирует и пишет в dst_ostr. - size_t field_number; - size_t row_count; - bool applied_limit; - size_t rows_before_limit; + std::unique_ptr validating_ostr; /// Валидирует UTF-8 последовательности. + WriteBuffer * ostr; + + size_t field_number = 0; + size_t row_count = 0; + bool applied_limit = false; + size_t rows_before_limit = 0; NamesAndTypes fields; Block totals; Block extremes; diff --git a/dbms/include/DB/DataTypes/DataTypeAggregateFunction.h b/dbms/include/DB/DataTypes/DataTypeAggregateFunction.h index 7cc1422d626..07ee9f432ac 100644 --- a/dbms/include/DB/DataTypes/DataTypeAggregateFunction.h +++ b/dbms/include/DB/DataTypes/DataTypeAggregateFunction.h @@ -19,7 +19,7 @@ private: AggregateFunctionPtr function; DataTypes argument_types; Array parameters; - + public: DataTypeAggregateFunction(const AggregateFunctionPtr & function_, const DataTypes & argument_types_, const Array & parameters_) : function(function_), argument_types(argument_types_), parameters(parameters_) @@ -27,7 +27,7 @@ public: } std::string getFunctionName() const { return function->getName(); } - + std::string getName() const { std::stringstream stream; @@ -47,20 +47,20 @@ public: for (DataTypes::const_iterator it = argument_types.begin(); it != argument_types.end(); ++it) stream << ", " << (*it)->getName(); - + stream << ")"; return stream.str(); } DataTypePtr getReturnType() const { return function->getReturnType(); }; DataTypes getArgumentsDataTypes() const { return argument_types; } - + DataTypePtr clone() const { return new DataTypeAggregateFunction(function, argument_types, parameters); } void serializeBinary(const Field & field, WriteBuffer & ostr) const; void deserializeBinary(Field & field, ReadBuffer & istr) const; void serializeBinary(const IColumn & column, WriteBuffer & ostr, size_t offset = 0, size_t limit = 0) const; - void deserializeBinary(IColumn & column, ReadBuffer & istr, size_t limit) const; + void deserializeBinary(IColumn & column, ReadBuffer & istr, size_t limit, double avg_value_size_hint) const; void serializeText(const Field & field, WriteBuffer & ostr) const; void deserializeText(Field & field, ReadBuffer & istr) const; void serializeTextEscaped(const Field & field, WriteBuffer & ostr) const; diff --git a/dbms/include/DB/DataTypes/DataTypeArray.h b/dbms/include/DB/DataTypes/DataTypeArray.h index c8186339d03..de111073468 100644 --- a/dbms/include/DB/DataTypes/DataTypeArray.h +++ b/dbms/include/DB/DataTypes/DataTypeArray.h @@ -19,7 +19,7 @@ private: public: DataTypeArray(DataTypePtr nested_); - + std::string getName() const { return "Array(" + nested->getName() + ")"; @@ -41,7 +41,7 @@ public: void serializeTextQuoted(const Field & field, WriteBuffer & ostr) const; void deserializeTextQuoted(Field & field, ReadBuffer & istr) const; - + void serializeTextJSON(const Field & field, WriteBuffer & ostr) const; /** Потоковая сериализация массивов устроена по-особенному: @@ -57,7 +57,7 @@ public: /** Прочитать только значения, без размеров. * При этом, в column уже заранее должны быть считаны все размеры. */ - void deserializeBinary(IColumn & column, ReadBuffer & istr, size_t limit) const; + void deserializeBinary(IColumn & column, ReadBuffer & istr, size_t limit, double avg_value_size_hint) const; /** Записать размеры. */ void serializeOffsets(const IColumn & column, WriteBuffer & ostr, size_t offset = 0, size_t limit = 0) const; diff --git a/dbms/include/DB/DataTypes/DataTypeFixedString.h b/dbms/include/DB/DataTypes/DataTypeFixedString.h index 68913ab514c..faffb0353fa 100644 --- a/dbms/include/DB/DataTypes/DataTypeFixedString.h +++ b/dbms/include/DB/DataTypes/DataTypeFixedString.h @@ -17,24 +17,24 @@ class DataTypeFixedString : public IDataType { private: size_t n; - + public: DataTypeFixedString(size_t n_) : n(n_) { if (n == 0) throw Exception("FixedString size must be positive", ErrorCodes::ARGUMENT_OUT_OF_BOUND); } - + std::string getName() const { return "FixedString(" + toString(n) + ")"; } - + DataTypePtr clone() const { return new DataTypeFixedString(n); } - + size_t getN() const { return n; @@ -43,7 +43,7 @@ public: void serializeBinary(const Field & field, WriteBuffer & ostr) const; void deserializeBinary(Field & field, ReadBuffer & istr) const; void serializeBinary(const IColumn & column, WriteBuffer & ostr, size_t offset = 0, size_t limit = 0) const; - void deserializeBinary(IColumn & column, ReadBuffer & istr, size_t limit) const; + void deserializeBinary(IColumn & column, ReadBuffer & istr, size_t limit, double avg_value_size_hint) const; void serializeText(const Field & field, WriteBuffer & ostr) const; void deserializeText(Field & field, ReadBuffer & istr) const; @@ -53,7 +53,7 @@ public: void serializeTextQuoted(const Field & field, WriteBuffer & ostr) const; void deserializeTextQuoted(Field & field, ReadBuffer & istr) const; - + void serializeTextJSON(const Field & field, WriteBuffer & ostr) const; ColumnPtr createColumn() const; diff --git a/dbms/include/DB/DataTypes/DataTypeNested.h b/dbms/include/DB/DataTypes/DataTypeNested.h index 5184926b469..d6092574762 100644 --- a/dbms/include/DB/DataTypes/DataTypeNested.h +++ b/dbms/include/DB/DataTypes/DataTypeNested.h @@ -19,9 +19,9 @@ private: public: DataTypeNested(NamesAndTypesListPtr nested_); - + std::string getName() const; - + static std::string concatenateNestedName(const std::string & nested_table_name, const std::string & nested_field_name); /// Возвращает префикс имени до первой точки '.'. Или имя без изменений, если точки нет. static std::string extractNestedTableName(const std::string & nested_name); @@ -44,7 +44,7 @@ public: void serializeTextQuoted(const Field & field, WriteBuffer & ostr) const; void deserializeTextQuoted(Field & field, ReadBuffer & istr) const; - + void serializeTextJSON(const Field & field, WriteBuffer & ostr) const; /** Потоковая сериализация массивов устроена по-особенному: @@ -60,7 +60,7 @@ public: /** Прочитать только значения, без размеров. * При этом, в column уже заранее должны быть считаны все размеры. */ - void deserializeBinary(IColumn & column, ReadBuffer & istr, size_t limit) const; + void deserializeBinary(IColumn & column, ReadBuffer & istr, size_t limit, double avg_value_size_hint) const; /** Записать размеры. */ void serializeOffsets(const IColumn & column, WriteBuffer & ostr, size_t offset = 0, size_t limit = 0) const; diff --git a/dbms/include/DB/DataTypes/DataTypeString.h b/dbms/include/DB/DataTypes/DataTypeString.h index e9769170ac4..6804236d7b7 100644 --- a/dbms/include/DB/DataTypes/DataTypeString.h +++ b/dbms/include/DB/DataTypes/DataTypeString.h @@ -22,7 +22,7 @@ public: { return "String"; } - + DataTypePtr clone() const { return new DataTypeString; @@ -31,7 +31,7 @@ public: void serializeBinary(const Field & field, WriteBuffer & ostr) const; void deserializeBinary(Field & field, ReadBuffer & istr) const; void serializeBinary(const IColumn & column, WriteBuffer & ostr, size_t offset = 0, size_t limit = 0) const; - void deserializeBinary(IColumn & column, ReadBuffer & istr, size_t limit) const; + void deserializeBinary(IColumn & column, ReadBuffer & istr, size_t limit, double avg_value_size_hint) const; void serializeText(const Field & field, WriteBuffer & ostr) const; void deserializeText(Field & field, ReadBuffer & istr) const; diff --git a/dbms/include/DB/DataTypes/DataTypeTuple.h b/dbms/include/DB/DataTypes/DataTypeTuple.h index 00bb7b64bc6..3dce9351c78 100644 --- a/dbms/include/DB/DataTypes/DataTypeTuple.h +++ b/dbms/include/DB/DataTypes/DataTypeTuple.h @@ -19,7 +19,7 @@ private: DataTypes elems; public: DataTypeTuple(DataTypes elems_) : elems(elems_) {} - + std::string getName() const { std::stringstream s; @@ -28,7 +28,7 @@ public: for (DataTypes::const_iterator it = elems.begin(); it != elems.end(); ++it) s << (it == elems.begin() ? "" : ", ") << (*it)->getName(); s << ")"; - + return s.str(); } @@ -62,7 +62,7 @@ public: } writeChar(')', ostr); } - + void deserializeText(Field & field, ReadBuffer & istr) const { size_t size = elems.size(); @@ -82,7 +82,7 @@ public: { serializeText(field, ostr); } - + void deserializeTextEscaped(Field & field, ReadBuffer & istr) const { deserializeText(field, istr); @@ -92,7 +92,7 @@ public: { serializeText(field, ostr); } - + void deserializeTextQuoted(Field & field, ReadBuffer & istr) const { deserializeText(field, istr); @@ -129,11 +129,11 @@ public: * Именно из-за этого (невозможности читать меньший кусок записанных данных), Tuple не могут быть использованы для хранения данных в таблицах. * (Хотя могут быть использованы для передачи данных по сети в Native формате.) */ - void deserializeBinary(IColumn & column, ReadBuffer & istr, size_t limit) const + void deserializeBinary(IColumn & column, ReadBuffer & istr, size_t limit, double avg_value_size_hint) const { ColumnTuple & real_column = static_cast(column); for (size_t i = 0, size = elems.size(); i < size; ++i) - elems[i]->deserializeBinary(*real_column.getData().getByPosition(i).column, istr, limit); + elems[i]->deserializeBinary(*real_column.getData().getByPosition(i).column, istr, limit, avg_value_size_hint); } ColumnPtr createColumn() const @@ -147,7 +147,7 @@ public: } return new ColumnTuple(tuple_block); } - + ColumnPtr createConstColumn(size_t size, const Field & field) const { return new ColumnConstArray(size, get(field), new DataTypeTuple(elems)); diff --git a/dbms/include/DB/DataTypes/IDataType.h b/dbms/include/DB/DataTypes/IDataType.h index 98a536e5574..831048551cf 100644 --- a/dbms/include/DB/DataTypes/IDataType.h +++ b/dbms/include/DB/DataTypes/IDataType.h @@ -30,7 +30,7 @@ public: /// Если тип числовой, уместны ли с ним все арифметические операции и приведение типов. /// true для чисел, false для даты и даты-с-временем. virtual bool behavesAsNumber() const { return false; } - + /// Клонировать virtual SharedPtr clone() const = 0; @@ -49,9 +49,11 @@ public: * - в этом случае, столбец сериализуется до конца. */ virtual void serializeBinary(const IColumn & column, WriteBuffer & ostr, size_t offset = 0, size_t limit = 0) const = 0; - - /** Считать не более limit значений и дописать их в конец столбца. */ - virtual void deserializeBinary(IColumn & column, ReadBuffer & istr, size_t limit) const = 0; + + /** Считать не более limit значений и дописать их в конец столбца. + * avg_value_size_hint - если не 0, то может использоваться, чтобы избежать реаллокаций при чтении строкового столбца. + */ + virtual void deserializeBinary(IColumn & column, ReadBuffer & istr, size_t limit, double avg_value_size_hint) const = 0; /** Текстовая сериализация - для вывода на экран / сохранения в текстовый файл и т. п. * Без эскейпинга и квотирования. @@ -68,7 +70,7 @@ public: */ virtual void serializeTextQuoted(const Field & field, WriteBuffer & ostr) const = 0; virtual void deserializeTextQuoted(Field & field, ReadBuffer & istr) const = 0; - + /** Текстовая сериализация в виде литерала для использования в формате JSON. */ virtual void serializeTextJSON(const Field & field, WriteBuffer & ostr) const = 0; diff --git a/dbms/include/DB/DataTypes/IDataTypeDummy.h b/dbms/include/DB/DataTypes/IDataTypeDummy.h index 93c56afd565..511491f226a 100644 --- a/dbms/include/DB/DataTypes/IDataTypeDummy.h +++ b/dbms/include/DB/DataTypes/IDataTypeDummy.h @@ -25,8 +25,8 @@ public: void serializeBinary(const IColumn & column, WriteBuffer & ostr, size_t offset = 0, size_t limit = 0) const { throwNoSerialization(); } - - void deserializeBinary(IColumn & column, ReadBuffer & istr, size_t limit) const { throwNoSerialization(); } + + void deserializeBinary(IColumn & column, ReadBuffer & istr, size_t limit, double avg_value_size_hint) const { throwNoSerialization(); } void serializeText(const Field & field, WriteBuffer & ostr) const { throwNoSerialization(); } void deserializeText(Field & field, ReadBuffer & istr) const { throwNoSerialization(); } @@ -36,7 +36,7 @@ public: void serializeTextQuoted(const Field & field, WriteBuffer & ostr) const { throwNoSerialization(); } void deserializeTextQuoted(Field & field, ReadBuffer & istr) const { throwNoSerialization(); } - + void serializeTextJSON(const Field & field, WriteBuffer & ostr) const { throwNoSerialization(); } SharedPtr createColumn() const diff --git a/dbms/include/DB/DataTypes/IDataTypeNumberFixed.h b/dbms/include/DB/DataTypes/IDataTypeNumberFixed.h index 1c978e24b19..b175a64ebcf 100644 --- a/dbms/include/DB/DataTypes/IDataTypeNumberFixed.h +++ b/dbms/include/DB/DataTypes/IDataTypeNumberFixed.h @@ -51,7 +51,7 @@ public: ostr.write(reinterpret_cast(&x[offset]), sizeof(typename ColumnType::value_type) * limit); } - void deserializeBinary(IColumn & column, ReadBuffer & istr, size_t limit) const + void deserializeBinary(IColumn & column, ReadBuffer & istr, size_t limit, double avg_value_size_hint) const { typename ColumnType::Container_t & x = typeid_cast(column).getData(); size_t initial_size = x.size(); diff --git a/dbms/include/DB/Interpreters/Settings.h b/dbms/include/DB/Interpreters/Settings.h index 317f77f6832..d907b5f4258 100644 --- a/dbms/include/DB/Interpreters/Settings.h +++ b/dbms/include/DB/Interpreters/Settings.h @@ -92,6 +92,18 @@ struct Settings M(SettingUInt64, max_parallel_replicas, 1) \ M(SettingUInt64, parallel_replicas_count, 0) \ M(SettingUInt64, parallel_replica_offset, 0) \ + \ + /** Тонкие настройки для чтения из MergeTree */ \ + \ + /** Если из одного файла читается хотя бы столько строк, чтение можно распараллелить. */ \ + M(SettingUInt64, merge_tree_min_rows_for_concurrent_read, (20 * 8192)) \ + /** Можно пропускать чтение более чем стольки строк ценой одного seek по файлу. */ \ + M(SettingUInt64, merge_tree_min_rows_for_seek, (5 * 8192)) \ + /** Если отрезок индекса может содержать нужные ключи, делим его на столько частей и рекурсивно проверяем их. */ \ + M(SettingUInt64, merge_tree_coarse_index_granularity, 8) \ + /** Максимальное количество строк на запрос, для использования кэша разжатых данных. Если запрос большой - кэш не используется. \ + * (Чтобы большие запросы не вымывали кэш.) */ \ + M(SettingUInt64, merge_tree_max_rows_to_use_cache, (1024 * 1024)) \ /// Всевозможные ограничения на выполнение запроса. Limits limits; diff --git a/dbms/include/DB/Storages/MergeTree/MergeTreeBlockInputStream.h b/dbms/include/DB/Storages/MergeTree/MergeTreeBlockInputStream.h index 2c5e36570af..8c3c1cc9738 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeTreeBlockInputStream.h +++ b/dbms/include/DB/Storages/MergeTree/MergeTreeBlockInputStream.h @@ -25,7 +25,8 @@ public: all_mark_ranges(mark_ranges_), remaining_mark_ranges(mark_ranges_), use_uncompressed_cache(use_uncompressed_cache_), prewhere_actions(prewhere_actions_), prewhere_column(prewhere_column_), - log(&Logger::get("MergeTreeBlockInputStream")) + log(&Logger::get("MergeTreeBlockInputStream")), + ordered_names{column_names} { std::reverse(remaining_mark_ranges.begin(), remaining_mark_ranges.end()); @@ -69,6 +70,7 @@ public: } /// Оценим общее количество строк - для прогресс-бара. + size_t total_rows = 0; for (const auto & range : all_mark_ranges) total_rows += range.end - range.begin; total_rows *= storage.index_granularity; @@ -79,6 +81,8 @@ public: ? ", up to " + toString((all_mark_ranges.back().end - all_mark_ranges.front().begin) * storage.index_granularity) : "") << " rows starting from " << all_mark_ranges.front().begin * storage.index_granularity); + + setTotalRowsApprox(total_rows); } String getName() const override { return "MergeTreeBlockInputStream"; } @@ -157,10 +161,6 @@ protected: if (!reader) { - /// Отправим информацию о том, что собираемся читать примерно столько-то строк. - /// NOTE В конструкторе это делать не получилось бы, потому что тогда ещё не установлен progress_callback. - progressImpl(Progress(0, 0, total_rows)); - injectRequiredColumns(columns); injectRequiredColumns(pre_columns); @@ -192,7 +192,7 @@ protected: remaining_mark_ranges.pop_back(); } progressImpl(Progress(res.rows(), res.bytes())); - pre_reader->fillMissingColumns(res); + pre_reader->fillMissingColumns(res, ordered_names); /// Вычислим выражение в PREWHERE. prewhere_actions->execute(res); @@ -295,7 +295,7 @@ protected: else throw Exception("Illegal type " + column->getName() + " of column for filter. Must be ColumnUInt8 or ColumnConstUInt8.", ErrorCodes::ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER); - reader->fillMissingColumns(res); + reader->fillMissingColumns(res, ordered_names); } while (!remaining_mark_ranges.empty() && !res && !isCancelled()); } @@ -317,7 +317,7 @@ protected: progressImpl(Progress(res.rows(), res.bytes())); - reader->fillMissingColumns(res); + reader->fillMissingColumns(res, ordered_names); } if (remaining_mark_ranges.empty()) @@ -353,9 +353,11 @@ private: ExpressionActionsPtr prewhere_actions; String prewhere_column; bool remove_prewhere_column; - size_t total_rows = 0; /// Приблизительное общее количество строк - для прогресс-бара. Logger * log; + + /// requested column names in specific order as expected by other stages + const Names ordered_names; }; } diff --git a/dbms/include/DB/Storages/MergeTree/MergeTreeData.h b/dbms/include/DB/Storages/MergeTree/MergeTreeData.h index cf969d4d3fd..d2fac675c8a 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeTreeData.h +++ b/dbms/include/DB/Storages/MergeTree/MergeTreeData.h @@ -96,24 +96,9 @@ struct MergeTreeSettings /// Сколько заданий на слияние кусков разрешено одновременно иметь в очереди ReplicatedMergeTree. size_t max_replicated_merges_in_queue = 6; - /// Если из одного файла читается хотя бы столько строк, чтение можно распараллелить. - size_t min_rows_for_concurrent_read = 20 * 8192; - /// Через сколько секунд удалять ненужные куски. time_t old_parts_lifetime = 8 * 60; - /** Настройки чтения и работы с индексом. */ - - /// Можно пропускать чтение более чем стольки строк ценой одного seek по файлу. - size_t min_rows_for_seek = 5 * 8192; - - /// Если отрезок индекса может содержать нужные ключи, делим его на столько частей и рекурсивно проверяем их. - size_t coarse_index_granularity = 8; - - /// Максимальное количество строк на запрос, для использования кэша разжатых данных. Если запрос большой - кэш не используется. - /// (Чтобы большие запросы не вымывали кэш.) - size_t max_rows_to_use_cache = 1024 * 1024; - /** Настройки вставок. */ /// Если в таблице хотя бы столько активных кусков, искусственно замедлять вставки в таблицу. diff --git a/dbms/include/DB/Storages/MergeTree/MergeTreeDataSelectExecutor.h b/dbms/include/DB/Storages/MergeTree/MergeTreeDataSelectExecutor.h index b7f80cd37cd..3f1d4f37e82 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeTreeDataSelectExecutor.h +++ b/dbms/include/DB/Storages/MergeTree/MergeTreeDataSelectExecutor.h @@ -48,10 +48,6 @@ private: typedef std::vector RangesInDataParts; - size_t min_marks_for_seek; - size_t min_marks_for_concurrent_read; - size_t max_marks_to_use_cache; - BlockInputStreams spreadMarkRangesAmongThreads( RangesInDataParts parts, size_t threads, @@ -60,7 +56,8 @@ private: bool use_uncompressed_cache, ExpressionActionsPtr prewhere_actions, const String & prewhere_column, - const Names & virt_columns); + const Names & virt_columns, + const Settings & settings); BlockInputStreams spreadMarkRangesAmongThreadsFinal( RangesInDataParts parts, @@ -70,12 +67,13 @@ private: bool use_uncompressed_cache, ExpressionActionsPtr prewhere_actions, const String & prewhere_column, - const Names & virt_columns); + const Names & virt_columns, + const Settings & settings); /// Создать выражение "Sign == 1". void createPositiveSignCondition(ExpressionActionsPtr & out_expression, String & out_column); - MarkRanges markRangesFromPkRange(const MergeTreeData::DataPart::Index & index, PKCondition & key_condition); + MarkRanges markRangesFromPkRange(const MergeTreeData::DataPart::Index & index, PKCondition & key_condition, const Settings & settings); }; } diff --git a/dbms/include/DB/Storages/MergeTree/MergeTreeReader.h b/dbms/include/DB/Storages/MergeTree/MergeTreeReader.h index dde138d9926..b30aeef2fe7 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeTreeReader.h +++ b/dbms/include/DB/Storages/MergeTree/MergeTreeReader.h @@ -173,7 +173,10 @@ public: } if (!minimum_size_column) - throw std::logic_error{"could not find a column of minimum size in MergeTree"}; + throw Exception{ + "could not find a column of minimum size in MergeTree", + ErrorCodes::LOGICAL_ERROR + }; addStream(minimum_size_column->name, *minimum_size_column->type, all_mark_ranges); columns.emplace(std::begin(columns), *minimum_size_column); @@ -182,7 +185,7 @@ public: } /// Заполняет столбцы, которых нет в блоке, значениями по умолчанию. - void fillMissingColumns(Block & res) + void fillMissingColumns(Block & res, const Names & ordered_names) { try { @@ -259,15 +262,23 @@ public: columns.erase(std::begin(columns)); } - /// sort columns to ensure consistent order among all block + /// sort columns to ensure consistent order among all blocks if (should_sort) { - Block sorted_block; + Block ordered_block; - for (const auto & name_and_type : columns) - sorted_block.insert(res.getByName(name_and_type.name)); + for (const auto & name : ordered_names) + if (res.has(name)) + ordered_block.insert(res.getByName(name)); - std::swap(res, sorted_block); + if (res.columns() != ordered_block.columns()) + throw Exception{ + "Ordered block has different columns than original one:\n" + + ordered_block.dumpNames() + "\nvs.\n" + res.dumpNames(), + ErrorCodes::LOGICAL_ERROR + }; + + std::swap(res, ordered_block); } else if (added_column) { @@ -294,6 +305,9 @@ private: std::string path_prefix; size_t max_mark_range; + /// Используется в качестве подсказки, чтобы уменьшить количество реаллокаций при создании столбца переменной длины. + double avg_value_size_hint = 0; + Stream(const String & path_prefix, UncompressedCache * uncompressed_cache, MarkCache * mark_cache, const MarkRanges & all_mark_ranges) : path_prefix(path_prefix) { @@ -494,7 +508,20 @@ private: { Stream & stream = *streams[name]; stream.seekToMark(from_mark); - type.deserializeBinary(column, *stream.data_buffer, max_rows_to_read); + type.deserializeBinary(column, *stream.data_buffer, max_rows_to_read, stream.avg_value_size_hint); + + /// Вычисление подсказки о среднем размере значения. + size_t column_size = column.size(); + if (column_size) + { + double current_avg_value_size = static_cast(column.byteSize()) / column_size; + + /// Эвристика, чтобы при изменениях, значение avg_value_size_hint быстро росло, но медленно уменьшалось. + if (current_avg_value_size > stream.avg_value_size_hint) + stream.avg_value_size_hint = current_avg_value_size; + else if (current_avg_value_size * 2 < stream.avg_value_size_hint) + stream.avg_value_size_hint = (current_avg_value_size + stream.avg_value_size_hint * 3) / 4; + } } } }; diff --git a/dbms/src/Client/Benchmark.cpp b/dbms/src/Client/Benchmark.cpp index 46b5c62f680..bad0d707e1a 100644 --- a/dbms/src/Client/Benchmark.cpp +++ b/dbms/src/Client/Benchmark.cpp @@ -55,9 +55,10 @@ class Benchmark public: Benchmark(unsigned concurrency_, double delay_, const String & host_, UInt16 port_, const String & default_database_, - const String & user_, const String & password_) + const String & user_, const String & password_, const Settings & settings_) : concurrency(concurrency_), delay(delay_), queue(concurrency), pool(concurrency), - connections(concurrency, host_, port_, default_database_, user_, password_, data_type_factory) + connections(concurrency, host_, port_, default_database_, user_, password_, data_type_factory), + settings(settings_) { std::cerr << std::fixed << std::setprecision(3); @@ -81,6 +82,7 @@ private: DataTypeFactory data_type_factory; ConnectionPool connections; + Settings settings; struct Stats { @@ -237,7 +239,7 @@ private: void execute(ConnectionPool::Entry & connection, Query & query) { Stopwatch watch; - RemoteBlockInputStream stream(connection, query, nullptr); + RemoteBlockInputStream stream(connection, query, &settings); Progress progress; stream.setProgressCallback([&progress](const Progress & value) { progress.incrementPiecewiseAtomically(value); }); @@ -304,6 +306,12 @@ int main(int argc, char ** argv) ("user", boost::program_options::value()->default_value("default"), "") ("password", boost::program_options::value()->default_value(""), "") ("database", boost::program_options::value()->default_value("default"), "") + #define DECLARE_SETTING(TYPE, NAME, DEFAULT) (#NAME, boost::program_options::value (), "Settings.h") + #define DECLARE_LIMIT(TYPE, NAME, DEFAULT) (#NAME, boost::program_options::value (), "Limits.h") + APPLY_FOR_SETTINGS(DECLARE_SETTING) + APPLY_FOR_LIMITS(DECLARE_LIMIT) + #undef DECLARE_SETTING + #undef DECLARE_LIMIT ; boost::program_options::variables_map options; @@ -316,6 +324,16 @@ int main(int argc, char ** argv) return 1; } + /// Извлекаем settings and limits из полученных options + Settings settings; + + #define EXTRACT_SETTING(TYPE, NAME, DEFAULT) \ + if (options.count(#NAME)) \ + settings.set(#NAME, options[#NAME].as()); + APPLY_FOR_SETTINGS(EXTRACT_SETTING) + APPLY_FOR_LIMITS(EXTRACT_SETTING) + #undef EXTRACT_SETTING + Benchmark benchmark( options["concurrency"].as(), options["delay"].as(), @@ -323,7 +341,8 @@ int main(int argc, char ** argv) options["port"].as(), options["database"].as(), options["user"].as(), - options["password"].as()); + options["password"].as(), + settings); } catch (const Exception & e) { diff --git a/dbms/src/Core/Block.cpp b/dbms/src/Core/Block.cpp index 2ceef5e7e52..9f5c0a93cb6 100644 --- a/dbms/src/Core/Block.cpp +++ b/dbms/src/Core/Block.cpp @@ -81,6 +81,7 @@ void Block::insert(const ColumnWithNameAndType & elem) index_by_position.push_back(it); } + void Block::insertDefault(const String & name, const DataTypePtr & type) { insert({ @@ -279,6 +280,17 @@ Block Block::cloneEmpty() const } +Block Block::sortColumns() const +{ + Block sorted_block; + + for (const auto & name : index_by_name) + sorted_block.insert(*name.second); + + return sorted_block; +} + + ColumnsWithNameAndType Block::getColumns() const { return ColumnsWithNameAndType(data.begin(), data.end()); diff --git a/dbms/src/Core/NamesAndTypes.cpp b/dbms/src/Core/NamesAndTypes.cpp new file mode 100644 index 00000000000..d70e6a9932a --- /dev/null +++ b/dbms/src/Core/NamesAndTypes.cpp @@ -0,0 +1,120 @@ +#include + +namespace DB +{ + +void NamesAndTypesList::readText(ReadBuffer & buf, const DataTypeFactory & data_type_factory) +{ + DB::assertString("columns format version: 1\n", buf); + size_t count; + DB::readText(count, buf); + DB::assertString(" columns:\n", buf); + resize(count); + for (NameAndTypePair & it : *this) + { + DB::readBackQuotedString(it.name, buf); + DB::assertString(" ", buf); + String type_name; + DB::readString(type_name, buf); + it.type = data_type_factory.get(type_name); + DB::assertString("\n", buf); + } +} + +void NamesAndTypesList::writeText(WriteBuffer & buf) const +{ + DB::writeString("columns format version: 1\n", buf); + DB::writeText(size(), buf); + DB::writeString(" columns:\n", buf); + for (const auto & it : *this) + { + DB::writeBackQuotedString(it.name, buf); + DB::writeChar(' ', buf); + DB::writeString(it.type->getName(), buf); + DB::writeChar('\n', buf); + } +} + +String NamesAndTypesList::toString() const +{ + String s; + { + WriteBufferFromString out(s); + writeText(out); + } + return s; +} + +NamesAndTypesList NamesAndTypesList::parse(const String & s, const DataTypeFactory & data_type_factory) +{ + ReadBufferFromString in(s); + NamesAndTypesList res; + res.readText(in, data_type_factory); + assertEOF(in); + return res; +} + +bool NamesAndTypesList::isSubsetOf(const NamesAndTypesList & rhs) const +{ + NamesAndTypes vector(rhs.begin(), rhs.end()); + vector.insert(vector.end(), begin(), end()); + std::sort(vector.begin(), vector.end()); + return std::unique(vector.begin(), vector.end()) == vector.begin() + rhs.size(); +} + +size_t NamesAndTypesList::sizeOfDifference(const NamesAndTypesList & rhs) const +{ + NamesAndTypes vector(rhs.begin(), rhs.end()); + vector.insert(vector.end(), begin(), end()); + std::sort(vector.begin(), vector.end()); + return (std::unique(vector.begin(), vector.end()) - vector.begin()) * 2 - size() - rhs.size(); +} + +Names NamesAndTypesList::getNames() const +{ + Names res; + res.reserve(size()); + for (const NameAndTypePair & column : *this) + { + res.push_back(column.name); + } + return res; +} + +NamesAndTypesList NamesAndTypesList::filter(const NameSet & names) const +{ + NamesAndTypesList res; + for (const NameAndTypePair & column : *this) + { + if (names.count(column.name)) + res.push_back(column); + } + return res; +} + +NamesAndTypesList NamesAndTypesList::filter(const Names & names) const +{ + return filter(NameSet(names.begin(), names.end())); +} + +NamesAndTypesList NamesAndTypesList::addTypes(const Names & names) const +{ + /// NOTE Лучше сделать map в IStorage, чем создавать его здесь каждый раз заново. + google::dense_hash_map types; + types.set_empty_key(StringRef()); + + for (const NameAndTypePair & column : *this) + types[column.name] = &column.type; + + NamesAndTypesList res; + for (const String & name : names) + { + auto it = types.find(name); + if (it == types.end()) + throw Exception("No column " + name, ErrorCodes::THERE_IS_NO_COLUMN); + res.push_back(NameAndTypePair(name, *it->second)); + } + return res; +} + +} diff --git a/dbms/src/DataStreams/IProfilingBlockInputStream.cpp b/dbms/src/DataStreams/IProfilingBlockInputStream.cpp index cf5810b9f09..fde2a68ce0a 100644 --- a/dbms/src/DataStreams/IProfilingBlockInputStream.cpp +++ b/dbms/src/DataStreams/IProfilingBlockInputStream.cpp @@ -13,6 +13,8 @@ namespace DB Block IProfilingBlockInputStream::read() { + collectAndSendTotalRowsApprox(); + if (!info.started) { info.total_stopwatch.start(); @@ -211,66 +213,62 @@ void IProfilingBlockInputStream::checkQuota(Block & block) void IProfilingBlockInputStream::progressImpl(const Progress & value) { - /// Данные для прогресса берутся из листовых источников. - if (children.empty()) + if (progress_callback) + progress_callback(value); + + if (process_list_elem) { - if (progress_callback) - progress_callback(value); + if (!process_list_elem->update(value)) + cancel(); - if (process_list_elem) + /// Общее количество данных, обработанных или предполагаемых к обработке во всех листовых источниках, возможно, на удалённых серверах. + + size_t rows_processed = process_list_elem->progress.rows; + size_t bytes_processed = process_list_elem->progress.bytes; + + size_t total_rows_estimate = std::max(process_list_elem->progress.rows, process_list_elem->progress.total_rows); + + /** Проверяем ограничения на объём данных для чтения, скорость выполнения запроса, квоту на объём данных для чтения. + * NOTE: Может быть, имеет смысл сделать, чтобы они проверялись прямо в ProcessList? + */ + + if (limits.mode == LIMITS_TOTAL + && ((limits.max_rows_to_read && total_rows_estimate > limits.max_rows_to_read) + || (limits.max_bytes_to_read && bytes_processed > limits.max_bytes_to_read))) { - if (!process_list_elem->update(value)) - cancel(); - - /// Общее количество данных, обработанных или предполагаемых к обработке во всех листовых источниках, возможно, на удалённых серверах. - - size_t rows_processed = process_list_elem->progress.rows; - size_t bytes_processed = process_list_elem->progress.bytes; - - size_t total_rows_estimate = std::max(process_list_elem->progress.rows, process_list_elem->progress.total_rows); - - /** Проверяем ограничения на объём данных для чтения, скорость выполнения запроса, квоту на объём данных для чтения. - * NOTE: Может быть, имеет смысл сделать, чтобы они проверялись прямо в ProcessList? - */ - - if (limits.mode == LIMITS_TOTAL - && ((limits.max_rows_to_read && total_rows_estimate > limits.max_rows_to_read) - || (limits.max_bytes_to_read && bytes_processed > limits.max_bytes_to_read))) + if (limits.read_overflow_mode == OverflowMode::THROW) { - if (limits.read_overflow_mode == OverflowMode::THROW) - { - if (limits.max_rows_to_read && total_rows_estimate > limits.max_rows_to_read) - throw Exception("Limit for rows to read exceeded: " + toString(total_rows_estimate) - + " rows read (or to read), maximum: " + toString(limits.max_rows_to_read), - ErrorCodes::TOO_MUCH_ROWS); - else - throw Exception("Limit for (uncompressed) bytes to read exceeded: " + toString(bytes_processed) - + " bytes read, maximum: " + toString(limits.max_bytes_to_read), - ErrorCodes::TOO_MUCH_ROWS); - } - else if (limits.read_overflow_mode == OverflowMode::BREAK) - cancel(); + if (limits.max_rows_to_read && total_rows_estimate > limits.max_rows_to_read) + throw Exception("Limit for rows to read exceeded: " + toString(total_rows_estimate) + + " rows read (or to read), maximum: " + toString(limits.max_rows_to_read), + ErrorCodes::TOO_MUCH_ROWS); else - throw Exception("Logical error: unknown overflow mode", ErrorCodes::LOGICAL_ERROR); + throw Exception("Limit for (uncompressed) bytes to read exceeded: " + toString(bytes_processed) + + " bytes read, maximum: " + toString(limits.max_bytes_to_read), + ErrorCodes::TOO_MUCH_ROWS); } + else if (limits.read_overflow_mode == OverflowMode::BREAK) + cancel(); + else + throw Exception("Logical error: unknown overflow mode", ErrorCodes::LOGICAL_ERROR); + } - if (limits.min_execution_speed) + if (limits.min_execution_speed) + { + double total_elapsed = info.total_stopwatch.elapsedSeconds(); + + if (total_elapsed > limits.timeout_before_checking_execution_speed.totalMicroseconds() / 1000000.0 + && rows_processed / total_elapsed < limits.min_execution_speed) { - double total_elapsed = info.total_stopwatch.elapsedSeconds(); - - if (total_elapsed > limits.timeout_before_checking_execution_speed.totalMicroseconds() / 1000000.0 - && rows_processed / total_elapsed < limits.min_execution_speed) - { - throw Exception("Query is executing too slow: " + toString(rows_processed / total_elapsed) - + " rows/sec., minimum: " + toString(limits.min_execution_speed), - ErrorCodes::TOO_SLOW); - } + throw Exception("Query is executing too slow: " + toString(rows_processed / total_elapsed) + + " rows/sec., minimum: " + toString(limits.min_execution_speed), + ErrorCodes::TOO_SLOW); } + } - if (quota != nullptr && limits.mode == LIMITS_TOTAL) - { - quota->checkAndAddReadRowsBytes(time(0), value.rows, value.bytes); - } + if (quota != nullptr && limits.mode == LIMITS_TOTAL) + { + quota->checkAndAddReadRowsBytes(time(0), value.rows, value.bytes); } } } @@ -343,5 +341,31 @@ const Block & IProfilingBlockInputStream::getExtremes() const return extremes; } +void IProfilingBlockInputStream::collectTotalRowsApprox() +{ + if (collected_total_rows_approx) + return; + + collected_total_rows_approx = true; + + for (auto & child : children) + { + if (IProfilingBlockInputStream * p_child = dynamic_cast(&*child)) + { + p_child->collectTotalRowsApprox(); + total_rows_approx += p_child->total_rows_approx; + } + } +} + +void IProfilingBlockInputStream::collectAndSendTotalRowsApprox() +{ + if (collected_total_rows_approx) + return; + + collectTotalRowsApprox(); + progressImpl(Progress(0, 0, total_rows_approx)); +} + } diff --git a/dbms/src/DataStreams/JSONCompactRowOutputStream.cpp b/dbms/src/DataStreams/JSONCompactRowOutputStream.cpp index 0990d14a9d0..1555862226a 100644 --- a/dbms/src/DataStreams/JSONCompactRowOutputStream.cpp +++ b/dbms/src/DataStreams/JSONCompactRowOutputStream.cpp @@ -17,28 +17,28 @@ JSONCompactRowOutputStream::JSONCompactRowOutputStream(WriteBuffer & ostr_, cons void JSONCompactRowOutputStream::writeField(const Field & field) { - fields[field_number].type->serializeTextJSON(field, ostr); + fields[field_number].type->serializeTextJSON(field, *ostr); ++field_number; } void JSONCompactRowOutputStream::writeFieldDelimiter() { - writeCString(", ", ostr); + writeCString(", ", *ostr); } void JSONCompactRowOutputStream::writeRowStartDelimiter() { if (row_count > 0) - writeCString(",\n", ostr); - writeCString("\t\t[", ostr); + writeCString(",\n", *ostr); + writeCString("\t\t[", *ostr); } void JSONCompactRowOutputStream::writeRowEndDelimiter() { - writeChar(']', ostr); + writeChar(']', *ostr); field_number = 0; ++row_count; } @@ -48,21 +48,21 @@ void JSONCompactRowOutputStream::writeTotals() { if (totals) { - writeCString(",\n", ostr); - writeChar('\n', ostr); - writeCString("\t\"totals\": [", ostr); + writeCString(",\n", *ostr); + writeChar('\n', *ostr); + writeCString("\t\"totals\": [", *ostr); size_t totals_columns = totals.columns(); for (size_t i = 0; i < totals_columns; ++i) { if (i != 0) - writeChar(',', ostr); + writeChar(',', *ostr); const ColumnWithNameAndType & column = totals.getByPosition(i); - column.type->serializeTextJSON((*column.column)[0], ostr); + column.type->serializeTextJSON((*column.column)[0], *ostr); } - writeChar(']', ostr); + writeChar(']', *ostr); } } @@ -90,17 +90,17 @@ void JSONCompactRowOutputStream::writeExtremes() { if (extremes) { - writeCString(",\n", ostr); - writeChar('\n', ostr); - writeCString("\t\"extremes\":\n", ostr); - writeCString("\t{\n", ostr); + writeCString(",\n", *ostr); + writeChar('\n', *ostr); + writeCString("\t\"extremes\":\n", *ostr); + writeCString("\t{\n", *ostr); - writeExtremesElement("min", extremes, 0, ostr); - writeCString(",\n", ostr); - writeExtremesElement("max", extremes, 1, ostr); + writeExtremesElement("min", extremes, 0, *ostr); + writeCString(",\n", *ostr); + writeExtremesElement("max", extremes, 1, *ostr); - writeChar('\n', ostr); - writeCString("\t}", ostr); + writeChar('\n', *ostr); + writeCString("\t}", *ostr); } } diff --git a/dbms/src/DataStreams/JSONRowOutputStream.cpp b/dbms/src/DataStreams/JSONRowOutputStream.cpp index 5cdb6f82c8d..afcff6956f8 100644 --- a/dbms/src/DataStreams/JSONRowOutputStream.cpp +++ b/dbms/src/DataStreams/JSONRowOutputStream.cpp @@ -1,6 +1,6 @@ -#include - #include +#include +#include namespace DB @@ -10,71 +10,89 @@ using Poco::SharedPtr; JSONRowOutputStream::JSONRowOutputStream(WriteBuffer & ostr_, const Block & sample_) - : dst_ostr(ostr_), ostr(dst_ostr), field_number(0), row_count(0), applied_limit(false), rows_before_limit(0) + : dst_ostr(ostr_) { NamesAndTypesList columns(sample_.getColumnsList()); fields.assign(columns.begin(), columns.end()); + + bool have_non_numeric_columns = false; + for (size_t i = 0; i < sample_.columns(); ++i) + { + if (!sample_.unsafeGetByPosition(i).type->isNumeric()) + { + have_non_numeric_columns = true; + break; + } + } + + if (have_non_numeric_columns) + { + validating_ostr.reset(new WriteBufferValidUTF8(dst_ostr)); + ostr = validating_ostr.get(); + } + else + ostr = &dst_ostr; } void JSONRowOutputStream::writePrefix() { - writeCString("{\n", ostr); - writeCString("\t\"meta\":\n", ostr); - writeCString("\t[\n", ostr); - + writeCString("{\n", *ostr); + writeCString("\t\"meta\":\n", *ostr); + writeCString("\t[\n", *ostr); + for (size_t i = 0; i < fields.size(); ++i) { - writeCString("\t\t{\n", ostr); - - writeCString("\t\t\t\"name\": ", ostr); - writeDoubleQuotedString(fields[i].name, ostr); - writeCString(",\n", ostr); - writeCString("\t\t\t\"type\": ", ostr); - writeDoubleQuotedString(fields[i].type->getName(), ostr); - writeChar('\n', ostr); - - writeCString("\t\t}", ostr); + writeCString("\t\t{\n", *ostr); + + writeCString("\t\t\t\"name\": ", *ostr); + writeDoubleQuotedString(fields[i].name, *ostr); + writeCString(",\n", *ostr); + writeCString("\t\t\t\"type\": ", *ostr); + writeDoubleQuotedString(fields[i].type->getName(), *ostr); + writeChar('\n', *ostr); + + writeCString("\t\t}", *ostr); if (i + 1 < fields.size()) - writeChar(',', ostr); - writeChar('\n', ostr); + writeChar(',', *ostr); + writeChar('\n', *ostr); } - - writeCString("\t],\n", ostr); - writeChar('\n', ostr); - writeCString("\t\"data\":\n", ostr); - writeCString("\t[\n", ostr); + + writeCString("\t],\n", *ostr); + writeChar('\n', *ostr); + writeCString("\t\"data\":\n", *ostr); + writeCString("\t[\n", *ostr); } void JSONRowOutputStream::writeField(const Field & field) { - writeCString("\t\t\t", ostr); - writeDoubleQuotedString(fields[field_number].name, ostr); - writeCString(": ", ostr); - fields[field_number].type->serializeTextJSON(field, ostr); + writeCString("\t\t\t", *ostr); + writeDoubleQuotedString(fields[field_number].name, *ostr); + writeCString(": ", *ostr); + fields[field_number].type->serializeTextJSON(field, *ostr); ++field_number; } void JSONRowOutputStream::writeFieldDelimiter() { - writeCString(",\n", ostr); + writeCString(",\n", *ostr); } void JSONRowOutputStream::writeRowStartDelimiter() { if (row_count > 0) - writeCString(",\n", ostr); - writeCString("\t\t{\n", ostr); + writeCString(",\n", *ostr); + writeCString("\t\t{\n", *ostr); } void JSONRowOutputStream::writeRowEndDelimiter() { - writeChar('\n', ostr); - writeCString("\t\t}", ostr); + writeChar('\n', *ostr); + writeCString("\t\t}", *ostr); field_number = 0; ++row_count; } @@ -82,30 +100,30 @@ void JSONRowOutputStream::writeRowEndDelimiter() void JSONRowOutputStream::writeSuffix() { - writeChar('\n', ostr); - writeCString("\t]", ostr); + writeChar('\n', *ostr); + writeCString("\t]", *ostr); writeTotals(); writeExtremes(); - writeCString(",\n\n", ostr); - writeCString("\t\"rows\": ", ostr); - writeIntText(row_count, ostr); - + writeCString(",\n\n", *ostr); + writeCString("\t\"rows\": ", *ostr); + writeIntText(row_count, *ostr); + writeRowsBeforeLimitAtLeast(); - - writeChar('\n', ostr); - writeCString("}\n", ostr); - ostr.next(); + + writeChar('\n', *ostr); + writeCString("}\n", *ostr); + ostr->next(); } void JSONRowOutputStream::writeRowsBeforeLimitAtLeast() { if (applied_limit) { - writeCString(",\n\n", ostr); - writeCString("\t\"rows_before_limit_at_least\": ", ostr); - writeIntText(rows_before_limit, ostr); + writeCString(",\n\n", *ostr); + writeCString("\t\"rows_before_limit_at_least\": ", *ostr); + writeIntText(rows_before_limit, *ostr); } } @@ -113,10 +131,10 @@ void JSONRowOutputStream::writeTotals() { if (totals) { - writeCString(",\n", ostr); - writeChar('\n', ostr); - writeCString("\t\"totals\":\n", ostr); - writeCString("\t{\n", ostr); + writeCString(",\n", *ostr); + writeChar('\n', *ostr); + writeCString("\t\"totals\":\n", *ostr); + writeCString("\t{\n", *ostr); size_t totals_columns = totals.columns(); for (size_t i = 0; i < totals_columns; ++i) @@ -124,16 +142,16 @@ void JSONRowOutputStream::writeTotals() const ColumnWithNameAndType & column = totals.getByPosition(i); if (i != 0) - writeCString(",\n", ostr); + writeCString(",\n", *ostr); - writeCString("\t\t", ostr); - writeDoubleQuotedString(column.name, ostr); - writeCString(": ", ostr); - column.type->serializeTextJSON((*column.column)[0], ostr); + writeCString("\t\t", *ostr); + writeDoubleQuotedString(column.name, *ostr); + writeCString(": ", *ostr); + column.type->serializeTextJSON((*column.column)[0], *ostr); } - writeChar('\n', ostr); - writeCString("\t}", ostr); + writeChar('\n', *ostr); + writeCString("\t}", *ostr); } } @@ -167,17 +185,17 @@ void JSONRowOutputStream::writeExtremes() { if (extremes) { - writeCString(",\n", ostr); - writeChar('\n', ostr); - writeCString("\t\"extremes\":\n", ostr); - writeCString("\t{\n", ostr); + writeCString(",\n", *ostr); + writeChar('\n', *ostr); + writeCString("\t\"extremes\":\n", *ostr); + writeCString("\t{\n", *ostr); - writeExtremesElement("min", extremes, 0, ostr); - writeCString(",\n", ostr); - writeExtremesElement("max", extremes, 1, ostr); + writeExtremesElement("min", extremes, 0, *ostr); + writeCString(",\n", *ostr); + writeExtremesElement("max", extremes, 1, *ostr); - writeChar('\n', ostr); - writeCString("\t}", ostr); + writeChar('\n', *ostr); + writeCString("\t}", *ostr); } } diff --git a/dbms/src/DataStreams/NativeBlockInputStream.cpp b/dbms/src/DataStreams/NativeBlockInputStream.cpp index f0a2936fd34..ae48f05f257 100644 --- a/dbms/src/DataStreams/NativeBlockInputStream.cpp +++ b/dbms/src/DataStreams/NativeBlockInputStream.cpp @@ -23,7 +23,7 @@ static void readData(const IDataType & type, IColumn & column, ReadBuffer & istr if (const DataTypeArray * type_arr = typeid_cast(&type)) { IColumn & offsets_column = *typeid_cast(column).getOffsetsColumn(); - type_arr->getOffsetsType()->deserializeBinary(offsets_column, istr, rows); + type_arr->getOffsetsType()->deserializeBinary(offsets_column, istr, rows, 0); if (offsets_column.size() != rows) throw Exception("Cannot read all data in NativeBlockInputStream.", ErrorCodes::CANNOT_READ_ALL_DATA); @@ -39,7 +39,7 @@ static void readData(const IDataType & type, IColumn & column, ReadBuffer & istr { ColumnNested & column_nested = typeid_cast(column); IColumn & offsets_column = *column_nested.getOffsetsColumn(); - type_nested->getOffsetsType()->deserializeBinary(offsets_column, istr, rows); + type_nested->getOffsetsType()->deserializeBinary(offsets_column, istr, rows, 0); if (offsets_column.size() != rows) throw Exception("Cannot read all data in NativeBlockInputStream.", ErrorCodes::CANNOT_READ_ALL_DATA); @@ -58,7 +58,7 @@ static void readData(const IDataType & type, IColumn & column, ReadBuffer & istr } } else - type.deserializeBinary(column, istr, rows); + type.deserializeBinary(column, istr, rows, 0); /// TODO Использовать avg_value_size_hint. if (column.size() != rows) throw Exception("Cannot read all data in NativeBlockInputStream.", ErrorCodes::CANNOT_READ_ALL_DATA); diff --git a/dbms/src/DataTypes/DataTypeAggregateFunction.cpp b/dbms/src/DataTypes/DataTypeAggregateFunction.cpp index 6fbb46789a3..080baed8276 100644 --- a/dbms/src/DataTypes/DataTypeAggregateFunction.cpp +++ b/dbms/src/DataTypes/DataTypeAggregateFunction.cpp @@ -45,7 +45,7 @@ void DataTypeAggregateFunction::serializeBinary(const IColumn & column, WriteBuf function->serialize(*it, ostr); } -void DataTypeAggregateFunction::deserializeBinary(IColumn & column, ReadBuffer & istr, size_t limit) const +void DataTypeAggregateFunction::deserializeBinary(IColumn & column, ReadBuffer & istr, size_t limit, double avg_value_size_hint) const { ColumnAggregateFunction & real_column = typeid_cast(column); ColumnAggregateFunction::Container_t & vec = real_column.getData(); diff --git a/dbms/src/DataTypes/DataTypeArray.cpp b/dbms/src/DataTypes/DataTypeArray.cpp index 826e830bad2..fc7fee20fe4 100644 --- a/dbms/src/DataTypes/DataTypeArray.cpp +++ b/dbms/src/DataTypes/DataTypeArray.cpp @@ -68,7 +68,7 @@ void DataTypeArray::serializeBinary(const IColumn & column, WriteBuffer & ostr, } -void DataTypeArray::deserializeBinary(IColumn & column, ReadBuffer & istr, size_t limit) const +void DataTypeArray::deserializeBinary(IColumn & column, ReadBuffer & istr, size_t limit, double avg_value_size_hint) const { ColumnArray & column_array = typeid_cast(column); ColumnArray::Offsets_t & offsets = column_array.getOffsets(); @@ -79,7 +79,7 @@ void DataTypeArray::deserializeBinary(IColumn & column, ReadBuffer & istr, size_ if (last_offset < nested_column.size()) throw Exception("Nested column longer than last offset", ErrorCodes::LOGICAL_ERROR); size_t nested_limit = last_offset - nested_column.size(); - nested->deserializeBinary(nested_column, istr, nested_limit); + nested->deserializeBinary(nested_column, istr, nested_limit, 0); if (column_array.getData().size() != last_offset) throw Exception("Cannot read all array values", ErrorCodes::CANNOT_READ_ALL_DATA); diff --git a/dbms/src/DataTypes/DataTypeFixedString.cpp b/dbms/src/DataTypes/DataTypeFixedString.cpp index ca4c77e91bd..6e5a86463ae 100644 --- a/dbms/src/DataTypes/DataTypeFixedString.cpp +++ b/dbms/src/DataTypes/DataTypeFixedString.cpp @@ -49,7 +49,7 @@ void DataTypeFixedString::serializeBinary(const IColumn & column, WriteBuffer & } -void DataTypeFixedString::deserializeBinary(IColumn & column, ReadBuffer & istr, size_t limit) const +void DataTypeFixedString::deserializeBinary(IColumn & column, ReadBuffer & istr, size_t limit, double avg_value_size_hint) const { ColumnFixedString::Chars_t & data = typeid_cast(column).getChars(); diff --git a/dbms/src/DataTypes/DataTypeNested.cpp b/dbms/src/DataTypes/DataTypeNested.cpp index 7bbb5fb3f6e..e6d022f0141 100644 --- a/dbms/src/DataTypes/DataTypeNested.cpp +++ b/dbms/src/DataTypes/DataTypeNested.cpp @@ -115,7 +115,7 @@ void DataTypeNested::serializeBinary(const IColumn & column, WriteBuffer & ostr, } -void DataTypeNested::deserializeBinary(IColumn & column, ReadBuffer & istr, size_t limit) const +void DataTypeNested::deserializeBinary(IColumn & column, ReadBuffer & istr, size_t limit, double avg_value_size_hint) const { ColumnNested & column_nested = typeid_cast(column); ColumnNested::Offsets_t & offsets = column_nested.getOffsets(); @@ -129,7 +129,7 @@ void DataTypeNested::deserializeBinary(IColumn & column, ReadBuffer & istr, size NamesAndTypesList::const_iterator it = nested->begin(); for (size_t i = 0; i < nested->size(); ++i, ++it) { - it->type->deserializeBinary(*column_nested.getData()[i], istr, nested_limit); + it->type->deserializeBinary(*column_nested.getData()[i], istr, nested_limit, 0); if (column_nested.getData()[i]->size() != last_offset) throw Exception("Cannot read all nested column values", ErrorCodes::CANNOT_READ_ALL_DATA); } diff --git a/dbms/src/DataTypes/DataTypeString.cpp b/dbms/src/DataTypes/DataTypeString.cpp index 308f44d248e..5ae25eb0444 100644 --- a/dbms/src/DataTypes/DataTypeString.cpp +++ b/dbms/src/DataTypes/DataTypeString.cpp @@ -12,6 +12,8 @@ #include #include +#include + namespace DB { @@ -70,15 +72,9 @@ void DataTypeString::serializeBinary(const IColumn & column, WriteBuffer & ostr, } -void DataTypeString::deserializeBinary(IColumn & column, ReadBuffer & istr, size_t limit) const +template +static NO_INLINE void deserializeBinarySSE2(ColumnString::Chars_t & data, ColumnString::Offsets_t & offsets, ReadBuffer & istr, size_t limit) { - ColumnString & column_string = typeid_cast(column); - ColumnString::Chars_t & data = column_string.getChars(); - ColumnString::Offsets_t & offsets = column_string.getOffsets(); - - data.reserve(data.size() + limit * DBMS_APPROX_STRING_SIZE); - offsets.reserve(offsets.size() + limit); - size_t offset = data.size(); for (size_t i = 0; i < limit; ++i) { @@ -93,12 +89,76 @@ void DataTypeString::deserializeBinary(IColumn & column, ReadBuffer & istr, size data.resize(offset); - istr.readStrict(reinterpret_cast(&data[offset - size - 1]), sizeof(ColumnUInt8::value_type) * size); + if (size) + { + /// Оптимистичная ветка, в которой возможно более эффективное копирование. + if (offset + 16 * UNROLL_TIMES <= data.capacity() && istr.position() + size + 16 * UNROLL_TIMES <= istr.buffer().end()) + { + const __m128i * sse_src_pos = reinterpret_cast(istr.position()); + const __m128i * sse_src_end = sse_src_pos + (size + (16 * UNROLL_TIMES - 1)) / 16 / UNROLL_TIMES * UNROLL_TIMES; + __m128i * sse_dst_pos = reinterpret_cast<__m128i *>(&data[offset - size - 1]); + + while (sse_src_pos < sse_src_end) + { + /// NOTE gcc 4.9.2 разворачивает цикл, но почему-то использует только один xmm регистр. + ///for (size_t j = 0; j < UNROLL_TIMES; ++j) + /// _mm_storeu_si128(sse_dst_pos + j, _mm_loadu_si128(sse_src_pos + j)); + + sse_src_pos += UNROLL_TIMES; + sse_dst_pos += UNROLL_TIMES; + + if (UNROLL_TIMES >= 4) __asm__("movdqu %0, %%xmm0" :: "m"(sse_src_pos[-4])); + if (UNROLL_TIMES >= 3) __asm__("movdqu %0, %%xmm1" :: "m"(sse_src_pos[-3])); + if (UNROLL_TIMES >= 2) __asm__("movdqu %0, %%xmm2" :: "m"(sse_src_pos[-2])); + if (UNROLL_TIMES >= 1) __asm__("movdqu %0, %%xmm3" :: "m"(sse_src_pos[-1])); + + if (UNROLL_TIMES >= 4) __asm__("movdqu %%xmm0, %0" : "=m"(sse_dst_pos[-4])); + if (UNROLL_TIMES >= 3) __asm__("movdqu %%xmm1, %0" : "=m"(sse_dst_pos[-3])); + if (UNROLL_TIMES >= 2) __asm__("movdqu %%xmm2, %0" : "=m"(sse_dst_pos[-2])); + if (UNROLL_TIMES >= 1) __asm__("movdqu %%xmm3, %0" : "=m"(sse_dst_pos[-1])); + } + + istr.position() += size; + } + else + { + istr.readStrict(reinterpret_cast(&data[offset - size - 1]), size); + } + } + data[offset - 1] = 0; } } +void DataTypeString::deserializeBinary(IColumn & column, ReadBuffer & istr, size_t limit, double avg_value_size_hint) const +{ + ColumnString & column_string = typeid_cast(column); + ColumnString::Chars_t & data = column_string.getChars(); + ColumnString::Offsets_t & offsets = column_string.getOffsets(); + + /// Выбрано наугад. + constexpr auto avg_value_size_hint_reserve_multiplier = 1.2; + + double avg_chars_size = (avg_value_size_hint && avg_value_size_hint > sizeof(offsets[0]) + ? (avg_value_size_hint - sizeof(offsets[0])) * avg_value_size_hint_reserve_multiplier + : DBMS_APPROX_STRING_SIZE); + + data.reserve(data.size() + std::ceil(limit * avg_chars_size)); + + offsets.reserve(offsets.size() + limit); + + if (avg_chars_size >= 64) + deserializeBinarySSE2<4>(data, offsets, istr, limit); + else if (avg_chars_size >= 48) + deserializeBinarySSE2<3>(data, offsets, istr, limit); + else if (avg_chars_size >= 32) + deserializeBinarySSE2<2>(data, offsets, istr, limit); + else + deserializeBinarySSE2<1>(data, offsets, istr, limit); +} + + void DataTypeString::serializeText(const Field & field, WriteBuffer & ostr) const { writeString(get(field), ostr); diff --git a/dbms/src/DataTypes/tests/data_type_nested.cpp b/dbms/src/DataTypes/tests/data_type_nested.cpp index 0be9102f84d..222af5b57d1 100644 --- a/dbms/src/DataTypes/tests/data_type_nested.cpp +++ b/dbms/src/DataTypes/tests/data_type_nested.cpp @@ -110,7 +110,7 @@ int main(int argc, char ** argv) { std::ifstream istr("test"); DB::ReadBufferFromIStream in_buf(istr); - data_type.deserializeBinary(*column, in_buf, n); + data_type.deserializeBinary(*column, in_buf, n, 0); } stopwatch.stop(); diff --git a/dbms/src/DataTypes/tests/data_type_string.cpp b/dbms/src/DataTypes/tests/data_type_string.cpp index bfd4da59046..251818d2295 100644 --- a/dbms/src/DataTypes/tests/data_type_string.cpp +++ b/dbms/src/DataTypes/tests/data_type_string.cpp @@ -26,7 +26,7 @@ int main(int argc, char ** argv) Poco::SharedPtr column = new DB::ColumnString(); DB::ColumnString::Chars_t & data = column->getChars(); DB::ColumnString::Offsets_t & offsets = column->getOffsets(); - + data.resize(n * size); offsets.resize(n); for (size_t i = 0; i < n; ++i) @@ -52,7 +52,7 @@ int main(int argc, char ** argv) DB::ReadBufferFromIStream in_buf(istr); stopwatch.restart(); - data_type.deserializeBinary(*column, in_buf, n); + data_type.deserializeBinary(*column, in_buf, n, 0); stopwatch.stop(); std::cout << "Reading, elapsed: " << static_cast(stopwatch.elapsed()) / 1000000 << std::endl; diff --git a/dbms/src/Interpreters/Compiler.cpp b/dbms/src/Interpreters/Compiler.cpp index 6d2c68f5116..bc26fa1339a 100644 --- a/dbms/src/Interpreters/Compiler.cpp +++ b/dbms/src/Interpreters/Compiler.cpp @@ -206,7 +206,7 @@ void Compiler::compile( command << "LD_LIBRARY_PATH=/usr/share/clickhouse/bin/" " /usr/share/clickhouse/bin/clang" - " -x c++ -std=gnu++11 -O3 -g -Wall -Werror -Wnon-virtual-dtor -march=native -D NDEBUG" + " -x c++ -std=gnu++1y -O3 -g -Wall -Werror -Wnon-virtual-dtor -march=native -D NDEBUG" " -shared -fPIC -fvisibility=hidden -fno-implement-inlines" " -isystem /usr/share/clickhouse/headers/usr/local/include/" " -isystem /usr/share/clickhouse/headers/usr/include/" diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index 3d3711ca19f..6ae056f33ef 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -7,6 +7,8 @@ #include #include #include +#include +#include #include #include @@ -14,13 +16,9 @@ namespace DB { -MergeTreeDataSelectExecutor::MergeTreeDataSelectExecutor(MergeTreeData & data_) : data(data_), log(&Logger::get(data.getLogName() + " (SelectExecutor)")) +MergeTreeDataSelectExecutor::MergeTreeDataSelectExecutor(MergeTreeData & data_) + : data(data_), log(&Logger::get(data.getLogName() + " (SelectExecutor)")) { - min_marks_for_seek = (data.settings.min_rows_for_seek + data.index_granularity - 1) / data.index_granularity; - min_marks_for_concurrent_read = (data.settings.min_rows_for_concurrent_read + data.index_granularity - 1) / data.index_granularity; - max_marks_to_use_cache = (data.settings.max_rows_to_use_cache + data.index_granularity - 1) / data.index_granularity; - - } /// Построить блок состоящий только из возможных значений виртуальных столбцов @@ -132,7 +130,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::read( for (size_t i = 0; i < parts.size(); ++i) { MergeTreeData::DataPartPtr & part = parts[i]; - MarkRanges ranges = markRangesFromPkRange(part->index, key_condition); + MarkRanges ranges = markRangesFromPkRange(part->index, key_condition, settings); for (size_t j = 0; j < ranges.size(); ++j) total_count += ranges[j].end - ranges[j].begin; @@ -253,7 +251,14 @@ BlockInputStreams MergeTreeDataSelectExecutor::read( ExpressionAnalyzer analyzer(select.prewhere_expression, data.context, data.getColumnsList()); prewhere_actions = analyzer.getActions(false); prewhere_column = select.prewhere_expression->getColumnName(); - /// TODO: Чтобы работали подзапросы в PREWHERE, можно тут сохранить analyzer.getSetsWithSubqueries(), а потом их выполнить. + SubqueriesForSets prewhere_subqueries = analyzer.getSubqueriesForSets(); + + /** Вычислим подзапросы прямо сейчас. + * NOTE Недостаток - эти вычисления не вписываются в конвейер выполнения запроса. + * Они делаются до начала выполнения конвейера; их нельзя прервать; во время вычислений не отправляются пакеты прогресса. + */ + if (!prewhere_subqueries.empty()) + CreatingSetsBlockInputStream(new NullBlockInputStream, prewhere_subqueries, settings.limits).read(); } RangesInDataParts parts_with_ranges; @@ -264,7 +269,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::read( for (auto & part : parts) { RangesInDataPart ranges(part, (*part_index)++); - ranges.ranges = markRangesFromPkRange(part->index, key_condition); + ranges.ranges = markRangesFromPkRange(part->index, key_condition, settings); if (!ranges.ranges.empty()) { @@ -298,7 +303,8 @@ BlockInputStreams MergeTreeDataSelectExecutor::read( settings.use_uncompressed_cache, prewhere_actions, prewhere_column, - virt_column_names); + virt_column_names, + settings); } else { @@ -310,7 +316,8 @@ BlockInputStreams MergeTreeDataSelectExecutor::read( settings.use_uncompressed_cache, prewhere_actions, prewhere_column, - virt_column_names); + virt_column_names, + settings); } if (relative_sample_size != 0) @@ -320,6 +327,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::read( return res; } + BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongThreads( RangesInDataParts parts, size_t threads, @@ -328,8 +336,12 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongThreads( bool use_uncompressed_cache, ExpressionActionsPtr prewhere_actions, const String & prewhere_column, - const Names & virt_columns) + const Names & virt_columns, + const Settings & settings) { + size_t min_marks_for_concurrent_read = (settings.merge_tree_min_rows_for_concurrent_read + data.index_granularity - 1) / data.index_granularity; + size_t max_marks_to_use_cache = (settings.merge_tree_max_rows_to_use_cache + data.index_granularity - 1) / data.index_granularity; + /// На всякий случай перемешаем куски. std::random_shuffle(parts.begin(), parts.end()); @@ -419,6 +431,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongThreads( data.getFullPath() + part.data_part->name + '/', max_block_size, column_names, data, part.data_part, ranges_to_get_from_part, use_uncompressed_cache, prewhere_actions, prewhere_column)); + for (const String & virt_column : virt_columns) { if (virt_column == "_part") @@ -451,8 +464,11 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongThreadsFinal bool use_uncompressed_cache, ExpressionActionsPtr prewhere_actions, const String & prewhere_column, - const Names & virt_columns) + const Names & virt_columns, + const Settings & settings) { + size_t max_marks_to_use_cache = (settings.merge_tree_max_rows_to_use_cache + data.index_granularity - 1) / data.index_granularity; + size_t sum_marks = 0; for (size_t i = 0; i < parts.size(); ++i) for (size_t j = 0; j < parts[i].ranges.size(); ++j) @@ -529,8 +545,11 @@ void MergeTreeDataSelectExecutor::createPositiveSignCondition(ExpressionActionsP } /// Получает набор диапазонов засечек, вне которых не могут находиться ключи из заданного диапазона. -MarkRanges MergeTreeDataSelectExecutor::markRangesFromPkRange(const MergeTreeData::DataPart::Index & index, PKCondition & key_condition) +MarkRanges MergeTreeDataSelectExecutor::markRangesFromPkRange( + const MergeTreeData::DataPart::Index & index, PKCondition & key_condition, const Settings & settings) { + size_t min_marks_for_seek = (settings.merge_tree_min_rows_for_seek + data.index_granularity - 1) / data.index_granularity; + MarkRanges res; size_t key_size = data.getSortDescription().size(); @@ -575,7 +594,7 @@ MarkRanges MergeTreeDataSelectExecutor::markRangesFromPkRange(const MergeTreeDat else { /// Разбиваем отрезок и кладем результат в стек справа налево. - size_t step = (range.end - range.begin - 1) / data.settings.coarse_index_granularity + 1; + size_t step = (range.end - range.begin - 1) / settings.merge_tree_coarse_index_granularity + 1; size_t end; for (end = range.end; end > range.begin + step; end -= step) diff --git a/dbms/src/Storages/StorageBuffer.cpp b/dbms/src/Storages/StorageBuffer.cpp index edaffabb06c..3535e67d38e 100644 --- a/dbms/src/Storages/StorageBuffer.cpp +++ b/dbms/src/Storages/StorageBuffer.cpp @@ -71,11 +71,10 @@ protected: if (!buffer.data) return res; - for (size_t i = 0, size = buffer.data.columns(); i < size; ++i) + for (const auto & name : column_names) { - auto & col = buffer.data.unsafeGetByPosition(i); - if (column_names.count(col.name)) - res.insert(col); + auto & col = buffer.data.getByName(name); + res.insert(ColumnWithNameAndType(col.column->clone(), col.type, name)); } return res; @@ -213,14 +212,17 @@ private: void insertIntoBuffer(const Block & block, StorageBuffer::Buffer & buffer, std::unique_lock && lock) { + /// Сортируем столбцы в блоке. Это нужно, чтобы было проще потом конкатенировать блоки. + Block sorted_block = block.sortColumns(); + if (!buffer.data) { buffer.first_write_time = time(0); - buffer.data = block.cloneEmpty(); + buffer.data = sorted_block.cloneEmpty(); } /// Если после вставки в буфер, ограничения будут превышены, то будем сбрасывать буфер. - if (storage.checkThresholds(buffer, time(0), block.rowsInFirstColumn(), block.bytes())) + if (storage.checkThresholds(buffer, time(0), sorted_block.rowsInFirstColumn(), sorted_block.bytes())) { /// Вытащим из буфера блок, заменим буфер на пустой. После этого можно разблокировать mutex. Block block_to_write; @@ -230,13 +232,13 @@ private: if (!storage.no_destination) { - appendBlock(block, block_to_write); + appendBlock(sorted_block, block_to_write); storage.writeBlockToDestination(block_to_write, storage.context.tryGetTable(storage.destination_database, storage.destination_table)); } } else - appendBlock(block, buffer.data); + appendBlock(sorted_block, buffer.data); } }; diff --git a/dbms/src/Storages/StorageLog.cpp b/dbms/src/Storages/StorageLog.cpp index c6fef123e4c..4e9fcf85e95 100644 --- a/dbms/src/Storages/StorageLog.cpp +++ b/dbms/src/Storages/StorageLog.cpp @@ -338,7 +338,7 @@ void LogBlockInputStream::readData(const String & name, const IDataType & type, } } else - type.deserializeBinary(column, streams[name]->compressed, max_rows_to_read); + type.deserializeBinary(column, streams[name]->compressed, max_rows_to_read, 0); /// TODO Использовать avg_value_size_hint. } diff --git a/dbms/src/Storages/StorageSet.cpp b/dbms/src/Storages/StorageSet.cpp index 1c675955d82..7ca0a603099 100644 --- a/dbms/src/Storages/StorageSet.cpp +++ b/dbms/src/Storages/StorageSet.cpp @@ -23,19 +23,7 @@ SetOrJoinBlockOutputStream::SetOrJoinBlockOutputStream(StorageSetOrJoinBase & ta void SetOrJoinBlockOutputStream::write(const Block & block) { /// Сортируем столбцы в блоке. Это нужно, так как Set и Join рассчитывают на одинаковый порядок столбцов в разных блоках. - - size_t columns = block.columns(); - std::vector names(columns); - - for (size_t i = 0; i < columns; ++i) - names[i] = block.unsafeGetByPosition(i).name; - - std::sort(names.begin(), names.end()); - - Block sorted_block; - - for (const auto & name : names) - sorted_block.insert(block.getByName(name)); /// NOTE Можно чуть-чуть оптимальнее. + Block sorted_block = block.sortColumns(); table.insertBlock(sorted_block); backup_stream.write(sorted_block); diff --git a/dbms/src/Storages/StorageTinyLog.cpp b/dbms/src/Storages/StorageTinyLog.cpp index ab05a03e9e5..1d49c24cde7 100644 --- a/dbms/src/Storages/StorageTinyLog.cpp +++ b/dbms/src/Storages/StorageTinyLog.cpp @@ -276,7 +276,7 @@ void TinyLogBlockInputStream::readData(const String & name, const IDataType & ty } } else - type.deserializeBinary(column, streams[name]->compressed, limit); + type.deserializeBinary(column, streams[name]->compressed, limit, 0); /// TODO Использовать avg_value_size_hint. } diff --git a/dbms/tests/queries/0_stateless/00126_buffer.reference b/dbms/tests/queries/0_stateless/00126_buffer.reference new file mode 100644 index 00000000000..b20406d54a5 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00126_buffer.reference @@ -0,0 +1,90 @@ +1 2 [3] +2 [3] 1 +[3] 1 2 +1 [3] 2 +2 1 [3] +[3] 2 1 +1 2 +2 [3] +[3] 1 +1 [3] +2 1 +[3] 2 +1 +2 +[3] +1 2 [3] +9 8 [7] +2 [3] 1 +8 [7] 9 +[3] 1 2 +[7] 9 8 +1 [3] 2 +9 [7] 8 +2 1 [3] +8 9 [7] +[3] 2 1 +[7] 8 9 +1 2 +9 8 +2 [3] +8 [7] +[3] 1 +[7] 9 +1 [3] +9 [7] +2 1 +8 9 +[3] 2 +[7] 8 +1 +9 +2 +8 +[3] +[7] +1 2 [3] +9 8 [7] +11 [33] +2 [3] 1 +8 [7] 9 + [33] 11 +[3] 1 2 +[7] 9 8 +[33] 11 +1 [3] 2 +9 [7] 8 +11 [33] +2 1 [3] +8 9 [7] + 11 [33] +[3] 2 1 +[7] 8 9 +[33] 11 +1 2 +9 8 +11 +2 [3] +8 [7] + [33] +[3] 1 +[7] 9 +[33] 11 +1 [3] +9 [7] +11 [33] +2 1 +8 9 + 11 +[3] 2 +[7] 8 +[33] +1 +9 +11 +2 +8 + +[3] +[7] +[33] diff --git a/dbms/tests/queries/0_stateless/00126_buffer.sql b/dbms/tests/queries/0_stateless/00126_buffer.sql new file mode 100644 index 00000000000..4426933da0d --- /dev/null +++ b/dbms/tests/queries/0_stateless/00126_buffer.sql @@ -0,0 +1,62 @@ +DROP TABLE IF EXISTS test.buffer; +DROP TABLE IF EXISTS test.null; + +CREATE TABLE test.null (a UInt8, b String, c Array(UInt32)) ENGINE = Null; +CREATE TABLE test.buffer (a UInt8, b String, c Array(UInt32)) ENGINE = Buffer(test, null, 1, 1000, 1000, 1000, 1000, 1000000, 1000000); + +INSERT INTO test.buffer VALUES (1, '2', [3]); + +SELECT a, b, c FROM test.buffer ORDER BY a, b, c; +SELECT b, c, a FROM test.buffer ORDER BY a, b, c; +SELECT c, a, b FROM test.buffer ORDER BY a, b, c; +SELECT a, c, b FROM test.buffer ORDER BY a, b, c; +SELECT b, a, c FROM test.buffer ORDER BY a, b, c; +SELECT c, b, a FROM test.buffer ORDER BY a, b, c; +SELECT a, b FROM test.buffer ORDER BY a, b, c; +SELECT b, c FROM test.buffer ORDER BY a, b, c; +SELECT c, a FROM test.buffer ORDER BY a, b, c; +SELECT a, c FROM test.buffer ORDER BY a, b, c; +SELECT b, a FROM test.buffer ORDER BY a, b, c; +SELECT c, b FROM test.buffer ORDER BY a, b, c; +SELECT a FROM test.buffer ORDER BY a, b, c; +SELECT b FROM test.buffer ORDER BY a, b, c; +SELECT c FROM test.buffer ORDER BY a, b, c; + +INSERT INTO test.buffer (c, b, a) VALUES ([7], '8', 9); + +SELECT a, b, c FROM test.buffer ORDER BY a, b, c; +SELECT b, c, a FROM test.buffer ORDER BY a, b, c; +SELECT c, a, b FROM test.buffer ORDER BY a, b, c; +SELECT a, c, b FROM test.buffer ORDER BY a, b, c; +SELECT b, a, c FROM test.buffer ORDER BY a, b, c; +SELECT c, b, a FROM test.buffer ORDER BY a, b, c; +SELECT a, b FROM test.buffer ORDER BY a, b, c; +SELECT b, c FROM test.buffer ORDER BY a, b, c; +SELECT c, a FROM test.buffer ORDER BY a, b, c; +SELECT a, c FROM test.buffer ORDER BY a, b, c; +SELECT b, a FROM test.buffer ORDER BY a, b, c; +SELECT c, b FROM test.buffer ORDER BY a, b, c; +SELECT a FROM test.buffer ORDER BY a, b, c; +SELECT b FROM test.buffer ORDER BY a, b, c; +SELECT c FROM test.buffer ORDER BY a, b, c; + +INSERT INTO test.buffer (a, c) VALUES (11, [33]); + +SELECT a, b, c FROM test.buffer ORDER BY a, b, c; +SELECT b, c, a FROM test.buffer ORDER BY a, b, c; +SELECT c, a, b FROM test.buffer ORDER BY a, b, c; +SELECT a, c, b FROM test.buffer ORDER BY a, b, c; +SELECT b, a, c FROM test.buffer ORDER BY a, b, c; +SELECT c, b, a FROM test.buffer ORDER BY a, b, c; +SELECT a, b FROM test.buffer ORDER BY a, b, c; +SELECT b, c FROM test.buffer ORDER BY a, b, c; +SELECT c, a FROM test.buffer ORDER BY a, b, c; +SELECT a, c FROM test.buffer ORDER BY a, b, c; +SELECT b, a FROM test.buffer ORDER BY a, b, c; +SELECT c, b FROM test.buffer ORDER BY a, b, c; +SELECT a FROM test.buffer ORDER BY a, b, c; +SELECT b FROM test.buffer ORDER BY a, b, c; +SELECT c FROM test.buffer ORDER BY a, b, c; + +DROP TABLE test.buffer; +DROP TABLE test.null;