This commit is contained in:
Evgeniy Gatov 2015-02-17 18:27:54 +03:00
commit c99a3e9d79
41 changed files with 770 additions and 386 deletions

View File

@ -48,6 +48,9 @@ public:
*/
virtual size_t sizeOfField() const { throw Exception("Cannot get sizeOfField() for column " + getName(), ErrorCodes::CANNOT_GET_SIZE_OF_FIELD); }
/** Создать столбец с такими же данными. */
virtual SharedPtr<IColumn> clone() const { return cut(0, size()); }
/** Создать пустой столбец такого же типа */
virtual SharedPtr<IColumn> cloneEmpty() const { return cloneResized(0); }

View File

@ -111,6 +111,9 @@ public:
/** Получить такой же блок, но пустой. */
Block cloneEmpty() const;
/** Получить блок со столбцами, переставленными в порядке их имён. */
Block sortColumns() const;
/** Заменяет столбцы смещений внутри вложенных таблиц на один общий для таблицы.
* Кидает исключение, если эти смещения вдруг оказались неодинаковы.
*/

View File

@ -6,6 +6,7 @@
#include <set>
#include <Poco/SharedPtr.h>
#include <sparsehash/dense_hash_map>
#include <DB/DataTypes/IDataType.h>
#include <DB/DataTypes/DataTypeFactory.h>
@ -44,121 +45,29 @@ class NamesAndTypesList : public std::list<NameAndTypePair>
public:
using std::list<NameAndTypePair>::list;
void readText(ReadBuffer & buf, const DataTypeFactory & data_type_factory)
{
DB::assertString("columns format version: 1\n", buf);
size_t count;
DB::readText(count, buf);
DB::assertString(" columns:\n", buf);
resize(count);
for (NameAndTypePair & it : *this)
{
DB::readBackQuotedString(it.name, buf);
DB::assertString(" ", buf);
String type_name;
DB::readString(type_name, buf);
it.type = data_type_factory.get(type_name);
DB::assertString("\n", buf);
}
}
void readText(ReadBuffer & buf, const DataTypeFactory & data_type_factory);
void writeText(WriteBuffer & buf) const;
void writeText(WriteBuffer & buf) const
{
DB::writeString("columns format version: 1\n", buf);
DB::writeText(size(), buf);
DB::writeString(" columns:\n", buf);
for (const auto & it : *this)
{
DB::writeBackQuotedString(it.name, buf);
DB::writeChar(' ', buf);
DB::writeString(it.type->getName(), buf);
DB::writeChar('\n', buf);
}
}
String toString() const
{
String s;
{
WriteBufferFromString out(s);
writeText(out);
}
return s;
}
static NamesAndTypesList parse(const String & s, const DataTypeFactory & data_type_factory)
{
ReadBufferFromString in(s);
NamesAndTypesList res;
res.readText(in, data_type_factory);
assertEOF(in);
return res;
}
String toString() const;
static NamesAndTypesList parse(const String & s, const DataTypeFactory & data_type_factory);
/// Все элементы rhs должны быть различны.
bool isSubsetOf(const NamesAndTypesList & rhs) const
{
NamesAndTypes vector(rhs.begin(), rhs.end());
vector.insert(vector.end(), begin(), end());
std::sort(vector.begin(), vector.end());
return std::unique(vector.begin(), vector.end()) == vector.begin() + rhs.size();
}
bool isSubsetOf(const NamesAndTypesList & rhs) const;
/// Расстояние Хемминга между множествами
/// (иными словами, добавленные и удаленные столбцы считаются один раз; столбцы, изменившие тип, - дважды).
size_t sizeOfDifference(const NamesAndTypesList & rhs) const
{
NamesAndTypes vector(rhs.begin(), rhs.end());
vector.insert(vector.end(), begin(), end());
std::sort(vector.begin(), vector.end());
return (std::unique(vector.begin(), vector.end()) - vector.begin()) * 2 - size() - rhs.size();
}
size_t sizeOfDifference(const NamesAndTypesList & rhs) const;
Names getNames() const
{
Names res;
res.reserve(size());
for (const NameAndTypePair & column : *this)
{
res.push_back(column.name);
}
return res;
}
Names getNames() const;
/// Оставить только столбцы, имена которых есть в names. В names могут быть лишние столбцы.
NamesAndTypesList filter(const NameSet & names) const
{
NamesAndTypesList res;
for (const NameAndTypePair & column : *this)
{
if (names.count(column.name))
res.push_back(column);
}
return res;
}
NamesAndTypesList filter(const NameSet & names) const;
/// Оставить только столбцы, имена которых есть в names. В names могут быть лишние столбцы.
NamesAndTypesList filter(const Names & names) const
{
return filter(NameSet(names.begin(), names.end()));
}
NamesAndTypesList filter(const Names & names) const;
/// В отличие от filter, возвращает столбцы в том порядке, в котором они идут в names.
NamesAndTypesList addTypes(const Names & names) const
{
std::map<String, DataTypePtr> types;
for (const NameAndTypePair & column : *this)
types[column.name] = column.type;
NamesAndTypesList res;
for (const String & name : names)
{
auto it = types.find(name);
if (it == types.end())
throw Exception("No column " + name, ErrorCodes::THERE_IS_NO_COLUMN);
res.push_back(NameAndTypePair(name, it->second));
}
return res;
}
NamesAndTypesList addTypes(const Names & names) const;
};
typedef SharedPtr<NamesAndTypesList> NamesAndTypesListPtr;

View File

@ -65,7 +65,13 @@ public:
* - проверяются ограничения и квоты, которые должны быть проверены не в рамках одного источника,
* а над общим количеством потраченных ресурсов во всех источниках сразу (информация в ProcessList-е).
*/
virtual void progress(const Progress & value) { progressImpl(value); }
virtual void progress(const Progress & value)
{
/// Данные для прогресса берутся из листовых источников.
if (children.empty())
progressImpl(value);
}
void progressImpl(const Progress & value);
@ -77,6 +83,10 @@ public:
*/
void setProcessListElement(ProcessList::Element * elem);
/** Установить информацию о приблизительном общем количестве строк, которых нужно прочитать.
*/
void setTotalRowsApprox(size_t value) { total_rows_approx = value; }
/** Попросить прервать получение данных как можно скорее.
* По-умолчанию - просто выставляет флаг is_cancelled и просит прерваться всех детей.
@ -161,6 +171,10 @@ protected:
Block totals;
/// Минимумы и максимумы. Первая строчка блока - минимумы, вторая - максимумы.
Block extremes;
/// Приблизительное общее количество строк, которых нужно прочитать. Для прогресс-бара.
size_t total_rows_approx = 0;
/// Информация о приблизительном общем количестве строк собрана в родительском источнике.
bool collected_total_rows_approx = false;
/// Ограничения и квоты.
@ -182,6 +196,14 @@ protected:
*/
bool checkLimits();
void checkQuota(Block & block);
/// Собрать информацию о приблизительном общем числе строк по всем детям.
void collectTotalRowsApprox();
/** Передать информацию о приблизительном общем числе строк в колбэк прогресса.
* Сделано так, что отправка происходит лишь в верхнем источнике.
*/
void collectAndSendTotalRowsApprox();
};
}

View File

@ -4,7 +4,6 @@
#include <DB/Core/Block.h>
#include <DB/IO/WriteBuffer.h>
#include <DB/IO/WriteBufferValidUTF8.h>
#include <DB/DataStreams/IRowOutputStream.h>
#include <DB/DataStreams/IProfilingBlockInputStream.h>
@ -26,7 +25,13 @@ public:
void writePrefix() override;
void writeSuffix() override;
void flush() override { ostr.next(); dst_ostr.next(); }
void flush() override
{
ostr->next();
if (validating_ostr)
dst_ostr.next();
}
void setRowsBeforeLimit(size_t rows_before_limit_) override
{
@ -44,11 +49,13 @@ protected:
virtual void writeExtremes();
WriteBuffer & dst_ostr;
WriteBufferValidUTF8 ostr; /// Валидирует и пишет в dst_ostr.
size_t field_number;
size_t row_count;
bool applied_limit;
size_t rows_before_limit;
std::unique_ptr<WriteBuffer> validating_ostr; /// Валидирует UTF-8 последовательности.
WriteBuffer * ostr;
size_t field_number = 0;
size_t row_count = 0;
bool applied_limit = false;
size_t rows_before_limit = 0;
NamesAndTypes fields;
Block totals;
Block extremes;

View File

@ -19,7 +19,7 @@ private:
AggregateFunctionPtr function;
DataTypes argument_types;
Array parameters;
public:
DataTypeAggregateFunction(const AggregateFunctionPtr & function_, const DataTypes & argument_types_, const Array & parameters_)
: function(function_), argument_types(argument_types_), parameters(parameters_)
@ -27,7 +27,7 @@ public:
}
std::string getFunctionName() const { return function->getName(); }
std::string getName() const
{
std::stringstream stream;
@ -47,20 +47,20 @@ public:
for (DataTypes::const_iterator it = argument_types.begin(); it != argument_types.end(); ++it)
stream << ", " << (*it)->getName();
stream << ")";
return stream.str();
}
DataTypePtr getReturnType() const { return function->getReturnType(); };
DataTypes getArgumentsDataTypes() const { return argument_types; }
DataTypePtr clone() const { return new DataTypeAggregateFunction(function, argument_types, parameters); }
void serializeBinary(const Field & field, WriteBuffer & ostr) const;
void deserializeBinary(Field & field, ReadBuffer & istr) const;
void serializeBinary(const IColumn & column, WriteBuffer & ostr, size_t offset = 0, size_t limit = 0) const;
void deserializeBinary(IColumn & column, ReadBuffer & istr, size_t limit) const;
void deserializeBinary(IColumn & column, ReadBuffer & istr, size_t limit, double avg_value_size_hint) const;
void serializeText(const Field & field, WriteBuffer & ostr) const;
void deserializeText(Field & field, ReadBuffer & istr) const;
void serializeTextEscaped(const Field & field, WriteBuffer & ostr) const;

View File

@ -19,7 +19,7 @@ private:
public:
DataTypeArray(DataTypePtr nested_);
std::string getName() const
{
return "Array(" + nested->getName() + ")";
@ -41,7 +41,7 @@ public:
void serializeTextQuoted(const Field & field, WriteBuffer & ostr) const;
void deserializeTextQuoted(Field & field, ReadBuffer & istr) const;
void serializeTextJSON(const Field & field, WriteBuffer & ostr) const;
/** Потоковая сериализация массивов устроена по-особенному:
@ -57,7 +57,7 @@ public:
/** Прочитать только значения, без размеров.
* При этом, в column уже заранее должны быть считаны все размеры.
*/
void deserializeBinary(IColumn & column, ReadBuffer & istr, size_t limit) const;
void deserializeBinary(IColumn & column, ReadBuffer & istr, size_t limit, double avg_value_size_hint) const;
/** Записать размеры. */
void serializeOffsets(const IColumn & column, WriteBuffer & ostr, size_t offset = 0, size_t limit = 0) const;

View File

@ -17,24 +17,24 @@ class DataTypeFixedString : public IDataType
{
private:
size_t n;
public:
DataTypeFixedString(size_t n_) : n(n_)
{
if (n == 0)
throw Exception("FixedString size must be positive", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
}
std::string getName() const
{
return "FixedString(" + toString(n) + ")";
}
DataTypePtr clone() const
{
return new DataTypeFixedString(n);
}
size_t getN() const
{
return n;
@ -43,7 +43,7 @@ public:
void serializeBinary(const Field & field, WriteBuffer & ostr) const;
void deserializeBinary(Field & field, ReadBuffer & istr) const;
void serializeBinary(const IColumn & column, WriteBuffer & ostr, size_t offset = 0, size_t limit = 0) const;
void deserializeBinary(IColumn & column, ReadBuffer & istr, size_t limit) const;
void deserializeBinary(IColumn & column, ReadBuffer & istr, size_t limit, double avg_value_size_hint) const;
void serializeText(const Field & field, WriteBuffer & ostr) const;
void deserializeText(Field & field, ReadBuffer & istr) const;
@ -53,7 +53,7 @@ public:
void serializeTextQuoted(const Field & field, WriteBuffer & ostr) const;
void deserializeTextQuoted(Field & field, ReadBuffer & istr) const;
void serializeTextJSON(const Field & field, WriteBuffer & ostr) const;
ColumnPtr createColumn() const;

View File

@ -19,9 +19,9 @@ private:
public:
DataTypeNested(NamesAndTypesListPtr nested_);
std::string getName() const;
static std::string concatenateNestedName(const std::string & nested_table_name, const std::string & nested_field_name);
/// Возвращает префикс имени до первой точки '.'. Или имя без изменений, если точки нет.
static std::string extractNestedTableName(const std::string & nested_name);
@ -44,7 +44,7 @@ public:
void serializeTextQuoted(const Field & field, WriteBuffer & ostr) const;
void deserializeTextQuoted(Field & field, ReadBuffer & istr) const;
void serializeTextJSON(const Field & field, WriteBuffer & ostr) const;
/** Потоковая сериализация массивов устроена по-особенному:
@ -60,7 +60,7 @@ public:
/** Прочитать только значения, без размеров.
* При этом, в column уже заранее должны быть считаны все размеры.
*/
void deserializeBinary(IColumn & column, ReadBuffer & istr, size_t limit) const;
void deserializeBinary(IColumn & column, ReadBuffer & istr, size_t limit, double avg_value_size_hint) const;
/** Записать размеры. */
void serializeOffsets(const IColumn & column, WriteBuffer & ostr, size_t offset = 0, size_t limit = 0) const;

View File

@ -22,7 +22,7 @@ public:
{
return "String";
}
DataTypePtr clone() const
{
return new DataTypeString;
@ -31,7 +31,7 @@ public:
void serializeBinary(const Field & field, WriteBuffer & ostr) const;
void deserializeBinary(Field & field, ReadBuffer & istr) const;
void serializeBinary(const IColumn & column, WriteBuffer & ostr, size_t offset = 0, size_t limit = 0) const;
void deserializeBinary(IColumn & column, ReadBuffer & istr, size_t limit) const;
void deserializeBinary(IColumn & column, ReadBuffer & istr, size_t limit, double avg_value_size_hint) const;
void serializeText(const Field & field, WriteBuffer & ostr) const;
void deserializeText(Field & field, ReadBuffer & istr) const;

View File

@ -19,7 +19,7 @@ private:
DataTypes elems;
public:
DataTypeTuple(DataTypes elems_) : elems(elems_) {}
std::string getName() const
{
std::stringstream s;
@ -28,7 +28,7 @@ public:
for (DataTypes::const_iterator it = elems.begin(); it != elems.end(); ++it)
s << (it == elems.begin() ? "" : ", ") << (*it)->getName();
s << ")";
return s.str();
}
@ -62,7 +62,7 @@ public:
}
writeChar(')', ostr);
}
void deserializeText(Field & field, ReadBuffer & istr) const
{
size_t size = elems.size();
@ -82,7 +82,7 @@ public:
{
serializeText(field, ostr);
}
void deserializeTextEscaped(Field & field, ReadBuffer & istr) const
{
deserializeText(field, istr);
@ -92,7 +92,7 @@ public:
{
serializeText(field, ostr);
}
void deserializeTextQuoted(Field & field, ReadBuffer & istr) const
{
deserializeText(field, istr);
@ -129,11 +129,11 @@ public:
* Именно из-за этого (невозможности читать меньший кусок записанных данных), Tuple не могут быть использованы для хранения данных в таблицах.
* (Хотя могут быть использованы для передачи данных по сети в Native формате.)
*/
void deserializeBinary(IColumn & column, ReadBuffer & istr, size_t limit) const
void deserializeBinary(IColumn & column, ReadBuffer & istr, size_t limit, double avg_value_size_hint) const
{
ColumnTuple & real_column = static_cast<ColumnTuple &>(column);
for (size_t i = 0, size = elems.size(); i < size; ++i)
elems[i]->deserializeBinary(*real_column.getData().getByPosition(i).column, istr, limit);
elems[i]->deserializeBinary(*real_column.getData().getByPosition(i).column, istr, limit, avg_value_size_hint);
}
ColumnPtr createColumn() const
@ -147,7 +147,7 @@ public:
}
return new ColumnTuple(tuple_block);
}
ColumnPtr createConstColumn(size_t size, const Field & field) const
{
return new ColumnConstArray(size, get<const Array &>(field), new DataTypeTuple(elems));

View File

@ -30,7 +30,7 @@ public:
/// Если тип числовой, уместны ли с ним все арифметические операции и приведение типов.
/// true для чисел, false для даты и даты-с-временем.
virtual bool behavesAsNumber() const { return false; }
/// Клонировать
virtual SharedPtr<IDataType> clone() const = 0;
@ -49,9 +49,11 @@ public:
* - в этом случае, столбец сериализуется до конца.
*/
virtual void serializeBinary(const IColumn & column, WriteBuffer & ostr, size_t offset = 0, size_t limit = 0) const = 0;
/** Считать не более limit значений и дописать их в конец столбца. */
virtual void deserializeBinary(IColumn & column, ReadBuffer & istr, size_t limit) const = 0;
/** Считать не более limit значений и дописать их в конец столбца.
* avg_value_size_hint - если не 0, то может использоваться, чтобы избежать реаллокаций при чтении строкового столбца.
*/
virtual void deserializeBinary(IColumn & column, ReadBuffer & istr, size_t limit, double avg_value_size_hint) const = 0;
/** Текстовая сериализация - для вывода на экран / сохранения в текстовый файл и т. п.
* Без эскейпинга и квотирования.
@ -68,7 +70,7 @@ public:
*/
virtual void serializeTextQuoted(const Field & field, WriteBuffer & ostr) const = 0;
virtual void deserializeTextQuoted(Field & field, ReadBuffer & istr) const = 0;
/** Текстовая сериализация в виде литерала для использования в формате JSON.
*/
virtual void serializeTextJSON(const Field & field, WriteBuffer & ostr) const = 0;

View File

@ -25,8 +25,8 @@ public:
void serializeBinary(const IColumn & column, WriteBuffer & ostr,
size_t offset = 0, size_t limit = 0) const { throwNoSerialization(); }
void deserializeBinary(IColumn & column, ReadBuffer & istr, size_t limit) const { throwNoSerialization(); }
void deserializeBinary(IColumn & column, ReadBuffer & istr, size_t limit, double avg_value_size_hint) const { throwNoSerialization(); }
void serializeText(const Field & field, WriteBuffer & ostr) const { throwNoSerialization(); }
void deserializeText(Field & field, ReadBuffer & istr) const { throwNoSerialization(); }
@ -36,7 +36,7 @@ public:
void serializeTextQuoted(const Field & field, WriteBuffer & ostr) const { throwNoSerialization(); }
void deserializeTextQuoted(Field & field, ReadBuffer & istr) const { throwNoSerialization(); }
void serializeTextJSON(const Field & field, WriteBuffer & ostr) const { throwNoSerialization(); }
SharedPtr<IColumn> createColumn() const

View File

@ -51,7 +51,7 @@ public:
ostr.write(reinterpret_cast<const char *>(&x[offset]), sizeof(typename ColumnType::value_type) * limit);
}
void deserializeBinary(IColumn & column, ReadBuffer & istr, size_t limit) const
void deserializeBinary(IColumn & column, ReadBuffer & istr, size_t limit, double avg_value_size_hint) const
{
typename ColumnType::Container_t & x = typeid_cast<ColumnType &>(column).getData();
size_t initial_size = x.size();

View File

@ -92,6 +92,18 @@ struct Settings
M(SettingUInt64, max_parallel_replicas, 1) \
M(SettingUInt64, parallel_replicas_count, 0) \
M(SettingUInt64, parallel_replica_offset, 0) \
\
/** Тонкие настройки для чтения из MergeTree */ \
\
/** Если из одного файла читается хотя бы столько строк, чтение можно распараллелить. */ \
M(SettingUInt64, merge_tree_min_rows_for_concurrent_read, (20 * 8192)) \
/** Можно пропускать чтение более чем стольки строк ценой одного seek по файлу. */ \
M(SettingUInt64, merge_tree_min_rows_for_seek, (5 * 8192)) \
/** Если отрезок индекса может содержать нужные ключи, делим его на столько частей и рекурсивно проверяем их. */ \
M(SettingUInt64, merge_tree_coarse_index_granularity, 8) \
/** Максимальное количество строк на запрос, для использования кэша разжатых данных. Если запрос большой - кэш не используется. \
* (Чтобы большие запросы не вымывали кэш.) */ \
M(SettingUInt64, merge_tree_max_rows_to_use_cache, (1024 * 1024)) \
/// Всевозможные ограничения на выполнение запроса.
Limits limits;

View File

@ -25,7 +25,8 @@ public:
all_mark_ranges(mark_ranges_), remaining_mark_ranges(mark_ranges_),
use_uncompressed_cache(use_uncompressed_cache_),
prewhere_actions(prewhere_actions_), prewhere_column(prewhere_column_),
log(&Logger::get("MergeTreeBlockInputStream"))
log(&Logger::get("MergeTreeBlockInputStream")),
ordered_names{column_names}
{
std::reverse(remaining_mark_ranges.begin(), remaining_mark_ranges.end());
@ -69,6 +70,7 @@ public:
}
/// Оценим общее количество строк - для прогресс-бара.
size_t total_rows = 0;
for (const auto & range : all_mark_ranges)
total_rows += range.end - range.begin;
total_rows *= storage.index_granularity;
@ -79,6 +81,8 @@ public:
? ", up to " + toString((all_mark_ranges.back().end - all_mark_ranges.front().begin) * storage.index_granularity)
: "")
<< " rows starting from " << all_mark_ranges.front().begin * storage.index_granularity);
setTotalRowsApprox(total_rows);
}
String getName() const override { return "MergeTreeBlockInputStream"; }
@ -157,10 +161,6 @@ protected:
if (!reader)
{
/// Отправим информацию о том, что собираемся читать примерно столько-то строк.
/// NOTE В конструкторе это делать не получилось бы, потому что тогда ещё не установлен progress_callback.
progressImpl(Progress(0, 0, total_rows));
injectRequiredColumns(columns);
injectRequiredColumns(pre_columns);
@ -192,7 +192,7 @@ protected:
remaining_mark_ranges.pop_back();
}
progressImpl(Progress(res.rows(), res.bytes()));
pre_reader->fillMissingColumns(res);
pre_reader->fillMissingColumns(res, ordered_names);
/// Вычислим выражение в PREWHERE.
prewhere_actions->execute(res);
@ -295,7 +295,7 @@ protected:
else
throw Exception("Illegal type " + column->getName() + " of column for filter. Must be ColumnUInt8 or ColumnConstUInt8.", ErrorCodes::ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER);
reader->fillMissingColumns(res);
reader->fillMissingColumns(res, ordered_names);
}
while (!remaining_mark_ranges.empty() && !res && !isCancelled());
}
@ -317,7 +317,7 @@ protected:
progressImpl(Progress(res.rows(), res.bytes()));
reader->fillMissingColumns(res);
reader->fillMissingColumns(res, ordered_names);
}
if (remaining_mark_ranges.empty())
@ -353,9 +353,11 @@ private:
ExpressionActionsPtr prewhere_actions;
String prewhere_column;
bool remove_prewhere_column;
size_t total_rows = 0; /// Приблизительное общее количество строк - для прогресс-бара.
Logger * log;
/// requested column names in specific order as expected by other stages
const Names ordered_names;
};
}

View File

@ -96,24 +96,9 @@ struct MergeTreeSettings
/// Сколько заданий на слияние кусков разрешено одновременно иметь в очереди ReplicatedMergeTree.
size_t max_replicated_merges_in_queue = 6;
/// Если из одного файла читается хотя бы столько строк, чтение можно распараллелить.
size_t min_rows_for_concurrent_read = 20 * 8192;
/// Через сколько секунд удалять ненужные куски.
time_t old_parts_lifetime = 8 * 60;
/** Настройки чтения и работы с индексом. */
/// Можно пропускать чтение более чем стольки строк ценой одного seek по файлу.
size_t min_rows_for_seek = 5 * 8192;
/// Если отрезок индекса может содержать нужные ключи, делим его на столько частей и рекурсивно проверяем их.
size_t coarse_index_granularity = 8;
/// Максимальное количество строк на запрос, для использования кэша разжатых данных. Если запрос большой - кэш не используется.
/// (Чтобы большие запросы не вымывали кэш.)
size_t max_rows_to_use_cache = 1024 * 1024;
/** Настройки вставок. */
/// Если в таблице хотя бы столько активных кусков, искусственно замедлять вставки в таблицу.

View File

@ -48,10 +48,6 @@ private:
typedef std::vector<RangesInDataPart> RangesInDataParts;
size_t min_marks_for_seek;
size_t min_marks_for_concurrent_read;
size_t max_marks_to_use_cache;
BlockInputStreams spreadMarkRangesAmongThreads(
RangesInDataParts parts,
size_t threads,
@ -60,7 +56,8 @@ private:
bool use_uncompressed_cache,
ExpressionActionsPtr prewhere_actions,
const String & prewhere_column,
const Names & virt_columns);
const Names & virt_columns,
const Settings & settings);
BlockInputStreams spreadMarkRangesAmongThreadsFinal(
RangesInDataParts parts,
@ -70,12 +67,13 @@ private:
bool use_uncompressed_cache,
ExpressionActionsPtr prewhere_actions,
const String & prewhere_column,
const Names & virt_columns);
const Names & virt_columns,
const Settings & settings);
/// Создать выражение "Sign == 1".
void createPositiveSignCondition(ExpressionActionsPtr & out_expression, String & out_column);
MarkRanges markRangesFromPkRange(const MergeTreeData::DataPart::Index & index, PKCondition & key_condition);
MarkRanges markRangesFromPkRange(const MergeTreeData::DataPart::Index & index, PKCondition & key_condition, const Settings & settings);
};
}

View File

@ -173,7 +173,10 @@ public:
}
if (!minimum_size_column)
throw std::logic_error{"could not find a column of minimum size in MergeTree"};
throw Exception{
"could not find a column of minimum size in MergeTree",
ErrorCodes::LOGICAL_ERROR
};
addStream(minimum_size_column->name, *minimum_size_column->type, all_mark_ranges);
columns.emplace(std::begin(columns), *minimum_size_column);
@ -182,7 +185,7 @@ public:
}
/// Заполняет столбцы, которых нет в блоке, значениями по умолчанию.
void fillMissingColumns(Block & res)
void fillMissingColumns(Block & res, const Names & ordered_names)
{
try
{
@ -259,15 +262,23 @@ public:
columns.erase(std::begin(columns));
}
/// sort columns to ensure consistent order among all block
/// sort columns to ensure consistent order among all blocks
if (should_sort)
{
Block sorted_block;
Block ordered_block;
for (const auto & name_and_type : columns)
sorted_block.insert(res.getByName(name_and_type.name));
for (const auto & name : ordered_names)
if (res.has(name))
ordered_block.insert(res.getByName(name));
std::swap(res, sorted_block);
if (res.columns() != ordered_block.columns())
throw Exception{
"Ordered block has different columns than original one:\n" +
ordered_block.dumpNames() + "\nvs.\n" + res.dumpNames(),
ErrorCodes::LOGICAL_ERROR
};
std::swap(res, ordered_block);
}
else if (added_column)
{
@ -294,6 +305,9 @@ private:
std::string path_prefix;
size_t max_mark_range;
/// Используется в качестве подсказки, чтобы уменьшить количество реаллокаций при создании столбца переменной длины.
double avg_value_size_hint = 0;
Stream(const String & path_prefix, UncompressedCache * uncompressed_cache, MarkCache * mark_cache, const MarkRanges & all_mark_ranges)
: path_prefix(path_prefix)
{
@ -494,7 +508,20 @@ private:
{
Stream & stream = *streams[name];
stream.seekToMark(from_mark);
type.deserializeBinary(column, *stream.data_buffer, max_rows_to_read);
type.deserializeBinary(column, *stream.data_buffer, max_rows_to_read, stream.avg_value_size_hint);
/// Вычисление подсказки о среднем размере значения.
size_t column_size = column.size();
if (column_size)
{
double current_avg_value_size = static_cast<double>(column.byteSize()) / column_size;
/// Эвристика, чтобы при изменениях, значение avg_value_size_hint быстро росло, но медленно уменьшалось.
if (current_avg_value_size > stream.avg_value_size_hint)
stream.avg_value_size_hint = current_avg_value_size;
else if (current_avg_value_size * 2 < stream.avg_value_size_hint)
stream.avg_value_size_hint = (current_avg_value_size + stream.avg_value_size_hint * 3) / 4;
}
}
}
};

View File

@ -55,9 +55,10 @@ class Benchmark
public:
Benchmark(unsigned concurrency_, double delay_,
const String & host_, UInt16 port_, const String & default_database_,
const String & user_, const String & password_)
const String & user_, const String & password_, const Settings & settings_)
: concurrency(concurrency_), delay(delay_), queue(concurrency), pool(concurrency),
connections(concurrency, host_, port_, default_database_, user_, password_, data_type_factory)
connections(concurrency, host_, port_, default_database_, user_, password_, data_type_factory),
settings(settings_)
{
std::cerr << std::fixed << std::setprecision(3);
@ -81,6 +82,7 @@ private:
DataTypeFactory data_type_factory;
ConnectionPool connections;
Settings settings;
struct Stats
{
@ -237,7 +239,7 @@ private:
void execute(ConnectionPool::Entry & connection, Query & query)
{
Stopwatch watch;
RemoteBlockInputStream stream(connection, query, nullptr);
RemoteBlockInputStream stream(connection, query, &settings);
Progress progress;
stream.setProgressCallback([&progress](const Progress & value) { progress.incrementPiecewiseAtomically(value); });
@ -304,6 +306,12 @@ int main(int argc, char ** argv)
("user", boost::program_options::value<std::string>()->default_value("default"), "")
("password", boost::program_options::value<std::string>()->default_value(""), "")
("database", boost::program_options::value<std::string>()->default_value("default"), "")
#define DECLARE_SETTING(TYPE, NAME, DEFAULT) (#NAME, boost::program_options::value<std::string> (), "Settings.h")
#define DECLARE_LIMIT(TYPE, NAME, DEFAULT) (#NAME, boost::program_options::value<std::string> (), "Limits.h")
APPLY_FOR_SETTINGS(DECLARE_SETTING)
APPLY_FOR_LIMITS(DECLARE_LIMIT)
#undef DECLARE_SETTING
#undef DECLARE_LIMIT
;
boost::program_options::variables_map options;
@ -316,6 +324,16 @@ int main(int argc, char ** argv)
return 1;
}
/// Извлекаем settings and limits из полученных options
Settings settings;
#define EXTRACT_SETTING(TYPE, NAME, DEFAULT) \
if (options.count(#NAME)) \
settings.set(#NAME, options[#NAME].as<std::string>());
APPLY_FOR_SETTINGS(EXTRACT_SETTING)
APPLY_FOR_LIMITS(EXTRACT_SETTING)
#undef EXTRACT_SETTING
Benchmark benchmark(
options["concurrency"].as<unsigned>(),
options["delay"].as<double>(),
@ -323,7 +341,8 @@ int main(int argc, char ** argv)
options["port"].as<UInt16>(),
options["database"].as<std::string>(),
options["user"].as<std::string>(),
options["password"].as<std::string>());
options["password"].as<std::string>(),
settings);
}
catch (const Exception & e)
{

View File

@ -81,6 +81,7 @@ void Block::insert(const ColumnWithNameAndType & elem)
index_by_position.push_back(it);
}
void Block::insertDefault(const String & name, const DataTypePtr & type)
{
insert({
@ -279,6 +280,17 @@ Block Block::cloneEmpty() const
}
Block Block::sortColumns() const
{
Block sorted_block;
for (const auto & name : index_by_name)
sorted_block.insert(*name.second);
return sorted_block;
}
ColumnsWithNameAndType Block::getColumns() const
{
return ColumnsWithNameAndType(data.begin(), data.end());

View File

@ -0,0 +1,120 @@
#include <DB/Core/NamesAndTypes.h>
namespace DB
{
void NamesAndTypesList::readText(ReadBuffer & buf, const DataTypeFactory & data_type_factory)
{
DB::assertString("columns format version: 1\n", buf);
size_t count;
DB::readText(count, buf);
DB::assertString(" columns:\n", buf);
resize(count);
for (NameAndTypePair & it : *this)
{
DB::readBackQuotedString(it.name, buf);
DB::assertString(" ", buf);
String type_name;
DB::readString(type_name, buf);
it.type = data_type_factory.get(type_name);
DB::assertString("\n", buf);
}
}
void NamesAndTypesList::writeText(WriteBuffer & buf) const
{
DB::writeString("columns format version: 1\n", buf);
DB::writeText(size(), buf);
DB::writeString(" columns:\n", buf);
for (const auto & it : *this)
{
DB::writeBackQuotedString(it.name, buf);
DB::writeChar(' ', buf);
DB::writeString(it.type->getName(), buf);
DB::writeChar('\n', buf);
}
}
String NamesAndTypesList::toString() const
{
String s;
{
WriteBufferFromString out(s);
writeText(out);
}
return s;
}
NamesAndTypesList NamesAndTypesList::parse(const String & s, const DataTypeFactory & data_type_factory)
{
ReadBufferFromString in(s);
NamesAndTypesList res;
res.readText(in, data_type_factory);
assertEOF(in);
return res;
}
bool NamesAndTypesList::isSubsetOf(const NamesAndTypesList & rhs) const
{
NamesAndTypes vector(rhs.begin(), rhs.end());
vector.insert(vector.end(), begin(), end());
std::sort(vector.begin(), vector.end());
return std::unique(vector.begin(), vector.end()) == vector.begin() + rhs.size();
}
size_t NamesAndTypesList::sizeOfDifference(const NamesAndTypesList & rhs) const
{
NamesAndTypes vector(rhs.begin(), rhs.end());
vector.insert(vector.end(), begin(), end());
std::sort(vector.begin(), vector.end());
return (std::unique(vector.begin(), vector.end()) - vector.begin()) * 2 - size() - rhs.size();
}
Names NamesAndTypesList::getNames() const
{
Names res;
res.reserve(size());
for (const NameAndTypePair & column : *this)
{
res.push_back(column.name);
}
return res;
}
NamesAndTypesList NamesAndTypesList::filter(const NameSet & names) const
{
NamesAndTypesList res;
for (const NameAndTypePair & column : *this)
{
if (names.count(column.name))
res.push_back(column);
}
return res;
}
NamesAndTypesList NamesAndTypesList::filter(const Names & names) const
{
return filter(NameSet(names.begin(), names.end()));
}
NamesAndTypesList NamesAndTypesList::addTypes(const Names & names) const
{
/// NOTE Лучше сделать map в IStorage, чем создавать его здесь каждый раз заново.
google::dense_hash_map<StringRef, const DataTypePtr *, StringRefHash> types;
types.set_empty_key(StringRef());
for (const NameAndTypePair & column : *this)
types[column.name] = &column.type;
NamesAndTypesList res;
for (const String & name : names)
{
auto it = types.find(name);
if (it == types.end())
throw Exception("No column " + name, ErrorCodes::THERE_IS_NO_COLUMN);
res.push_back(NameAndTypePair(name, *it->second));
}
return res;
}
}

View File

@ -13,6 +13,8 @@ namespace DB
Block IProfilingBlockInputStream::read()
{
collectAndSendTotalRowsApprox();
if (!info.started)
{
info.total_stopwatch.start();
@ -211,66 +213,62 @@ void IProfilingBlockInputStream::checkQuota(Block & block)
void IProfilingBlockInputStream::progressImpl(const Progress & value)
{
/// Данные для прогресса берутся из листовых источников.
if (children.empty())
if (progress_callback)
progress_callback(value);
if (process_list_elem)
{
if (progress_callback)
progress_callback(value);
if (!process_list_elem->update(value))
cancel();
if (process_list_elem)
/// Общее количество данных, обработанных или предполагаемых к обработке во всех листовых источниках, возможно, на удалённых серверах.
size_t rows_processed = process_list_elem->progress.rows;
size_t bytes_processed = process_list_elem->progress.bytes;
size_t total_rows_estimate = std::max(process_list_elem->progress.rows, process_list_elem->progress.total_rows);
/** Проверяем ограничения на объём данных для чтения, скорость выполнения запроса, квоту на объём данных для чтения.
* NOTE: Может быть, имеет смысл сделать, чтобы они проверялись прямо в ProcessList?
*/
if (limits.mode == LIMITS_TOTAL
&& ((limits.max_rows_to_read && total_rows_estimate > limits.max_rows_to_read)
|| (limits.max_bytes_to_read && bytes_processed > limits.max_bytes_to_read)))
{
if (!process_list_elem->update(value))
cancel();
/// Общее количество данных, обработанных или предполагаемых к обработке во всех листовых источниках, возможно, на удалённых серверах.
size_t rows_processed = process_list_elem->progress.rows;
size_t bytes_processed = process_list_elem->progress.bytes;
size_t total_rows_estimate = std::max(process_list_elem->progress.rows, process_list_elem->progress.total_rows);
/** Проверяем ограничения на объём данных для чтения, скорость выполнения запроса, квоту на объём данных для чтения.
* NOTE: Может быть, имеет смысл сделать, чтобы они проверялись прямо в ProcessList?
*/
if (limits.mode == LIMITS_TOTAL
&& ((limits.max_rows_to_read && total_rows_estimate > limits.max_rows_to_read)
|| (limits.max_bytes_to_read && bytes_processed > limits.max_bytes_to_read)))
if (limits.read_overflow_mode == OverflowMode::THROW)
{
if (limits.read_overflow_mode == OverflowMode::THROW)
{
if (limits.max_rows_to_read && total_rows_estimate > limits.max_rows_to_read)
throw Exception("Limit for rows to read exceeded: " + toString(total_rows_estimate)
+ " rows read (or to read), maximum: " + toString(limits.max_rows_to_read),
ErrorCodes::TOO_MUCH_ROWS);
else
throw Exception("Limit for (uncompressed) bytes to read exceeded: " + toString(bytes_processed)
+ " bytes read, maximum: " + toString(limits.max_bytes_to_read),
ErrorCodes::TOO_MUCH_ROWS);
}
else if (limits.read_overflow_mode == OverflowMode::BREAK)
cancel();
if (limits.max_rows_to_read && total_rows_estimate > limits.max_rows_to_read)
throw Exception("Limit for rows to read exceeded: " + toString(total_rows_estimate)
+ " rows read (or to read), maximum: " + toString(limits.max_rows_to_read),
ErrorCodes::TOO_MUCH_ROWS);
else
throw Exception("Logical error: unknown overflow mode", ErrorCodes::LOGICAL_ERROR);
throw Exception("Limit for (uncompressed) bytes to read exceeded: " + toString(bytes_processed)
+ " bytes read, maximum: " + toString(limits.max_bytes_to_read),
ErrorCodes::TOO_MUCH_ROWS);
}
else if (limits.read_overflow_mode == OverflowMode::BREAK)
cancel();
else
throw Exception("Logical error: unknown overflow mode", ErrorCodes::LOGICAL_ERROR);
}
if (limits.min_execution_speed)
if (limits.min_execution_speed)
{
double total_elapsed = info.total_stopwatch.elapsedSeconds();
if (total_elapsed > limits.timeout_before_checking_execution_speed.totalMicroseconds() / 1000000.0
&& rows_processed / total_elapsed < limits.min_execution_speed)
{
double total_elapsed = info.total_stopwatch.elapsedSeconds();
if (total_elapsed > limits.timeout_before_checking_execution_speed.totalMicroseconds() / 1000000.0
&& rows_processed / total_elapsed < limits.min_execution_speed)
{
throw Exception("Query is executing too slow: " + toString(rows_processed / total_elapsed)
+ " rows/sec., minimum: " + toString(limits.min_execution_speed),
ErrorCodes::TOO_SLOW);
}
throw Exception("Query is executing too slow: " + toString(rows_processed / total_elapsed)
+ " rows/sec., minimum: " + toString(limits.min_execution_speed),
ErrorCodes::TOO_SLOW);
}
}
if (quota != nullptr && limits.mode == LIMITS_TOTAL)
{
quota->checkAndAddReadRowsBytes(time(0), value.rows, value.bytes);
}
if (quota != nullptr && limits.mode == LIMITS_TOTAL)
{
quota->checkAndAddReadRowsBytes(time(0), value.rows, value.bytes);
}
}
}
@ -343,5 +341,31 @@ const Block & IProfilingBlockInputStream::getExtremes() const
return extremes;
}
void IProfilingBlockInputStream::collectTotalRowsApprox()
{
if (collected_total_rows_approx)
return;
collected_total_rows_approx = true;
for (auto & child : children)
{
if (IProfilingBlockInputStream * p_child = dynamic_cast<IProfilingBlockInputStream *>(&*child))
{
p_child->collectTotalRowsApprox();
total_rows_approx += p_child->total_rows_approx;
}
}
}
void IProfilingBlockInputStream::collectAndSendTotalRowsApprox()
{
if (collected_total_rows_approx)
return;
collectTotalRowsApprox();
progressImpl(Progress(0, 0, total_rows_approx));
}
}

View File

@ -17,28 +17,28 @@ JSONCompactRowOutputStream::JSONCompactRowOutputStream(WriteBuffer & ostr_, cons
void JSONCompactRowOutputStream::writeField(const Field & field)
{
fields[field_number].type->serializeTextJSON(field, ostr);
fields[field_number].type->serializeTextJSON(field, *ostr);
++field_number;
}
void JSONCompactRowOutputStream::writeFieldDelimiter()
{
writeCString(", ", ostr);
writeCString(", ", *ostr);
}
void JSONCompactRowOutputStream::writeRowStartDelimiter()
{
if (row_count > 0)
writeCString(",\n", ostr);
writeCString("\t\t[", ostr);
writeCString(",\n", *ostr);
writeCString("\t\t[", *ostr);
}
void JSONCompactRowOutputStream::writeRowEndDelimiter()
{
writeChar(']', ostr);
writeChar(']', *ostr);
field_number = 0;
++row_count;
}
@ -48,21 +48,21 @@ void JSONCompactRowOutputStream::writeTotals()
{
if (totals)
{
writeCString(",\n", ostr);
writeChar('\n', ostr);
writeCString("\t\"totals\": [", ostr);
writeCString(",\n", *ostr);
writeChar('\n', *ostr);
writeCString("\t\"totals\": [", *ostr);
size_t totals_columns = totals.columns();
for (size_t i = 0; i < totals_columns; ++i)
{
if (i != 0)
writeChar(',', ostr);
writeChar(',', *ostr);
const ColumnWithNameAndType & column = totals.getByPosition(i);
column.type->serializeTextJSON((*column.column)[0], ostr);
column.type->serializeTextJSON((*column.column)[0], *ostr);
}
writeChar(']', ostr);
writeChar(']', *ostr);
}
}
@ -90,17 +90,17 @@ void JSONCompactRowOutputStream::writeExtremes()
{
if (extremes)
{
writeCString(",\n", ostr);
writeChar('\n', ostr);
writeCString("\t\"extremes\":\n", ostr);
writeCString("\t{\n", ostr);
writeCString(",\n", *ostr);
writeChar('\n', *ostr);
writeCString("\t\"extremes\":\n", *ostr);
writeCString("\t{\n", *ostr);
writeExtremesElement("min", extremes, 0, ostr);
writeCString(",\n", ostr);
writeExtremesElement("max", extremes, 1, ostr);
writeExtremesElement("min", extremes, 0, *ostr);
writeCString(",\n", *ostr);
writeExtremesElement("max", extremes, 1, *ostr);
writeChar('\n', ostr);
writeCString("\t}", ostr);
writeChar('\n', *ostr);
writeCString("\t}", *ostr);
}
}

View File

@ -1,6 +1,6 @@
#include <DB/DataStreams/JSONRowOutputStream.h>
#include <DB/IO/WriteHelpers.h>
#include <DB/IO/WriteBufferValidUTF8.h>
#include <DB/DataStreams/JSONRowOutputStream.h>
namespace DB
@ -10,71 +10,89 @@ using Poco::SharedPtr;
JSONRowOutputStream::JSONRowOutputStream(WriteBuffer & ostr_, const Block & sample_)
: dst_ostr(ostr_), ostr(dst_ostr), field_number(0), row_count(0), applied_limit(false), rows_before_limit(0)
: dst_ostr(ostr_)
{
NamesAndTypesList columns(sample_.getColumnsList());
fields.assign(columns.begin(), columns.end());
bool have_non_numeric_columns = false;
for (size_t i = 0; i < sample_.columns(); ++i)
{
if (!sample_.unsafeGetByPosition(i).type->isNumeric())
{
have_non_numeric_columns = true;
break;
}
}
if (have_non_numeric_columns)
{
validating_ostr.reset(new WriteBufferValidUTF8(dst_ostr));
ostr = validating_ostr.get();
}
else
ostr = &dst_ostr;
}
void JSONRowOutputStream::writePrefix()
{
writeCString("{\n", ostr);
writeCString("\t\"meta\":\n", ostr);
writeCString("\t[\n", ostr);
writeCString("{\n", *ostr);
writeCString("\t\"meta\":\n", *ostr);
writeCString("\t[\n", *ostr);
for (size_t i = 0; i < fields.size(); ++i)
{
writeCString("\t\t{\n", ostr);
writeCString("\t\t\t\"name\": ", ostr);
writeDoubleQuotedString(fields[i].name, ostr);
writeCString(",\n", ostr);
writeCString("\t\t\t\"type\": ", ostr);
writeDoubleQuotedString(fields[i].type->getName(), ostr);
writeChar('\n', ostr);
writeCString("\t\t}", ostr);
writeCString("\t\t{\n", *ostr);
writeCString("\t\t\t\"name\": ", *ostr);
writeDoubleQuotedString(fields[i].name, *ostr);
writeCString(",\n", *ostr);
writeCString("\t\t\t\"type\": ", *ostr);
writeDoubleQuotedString(fields[i].type->getName(), *ostr);
writeChar('\n', *ostr);
writeCString("\t\t}", *ostr);
if (i + 1 < fields.size())
writeChar(',', ostr);
writeChar('\n', ostr);
writeChar(',', *ostr);
writeChar('\n', *ostr);
}
writeCString("\t],\n", ostr);
writeChar('\n', ostr);
writeCString("\t\"data\":\n", ostr);
writeCString("\t[\n", ostr);
writeCString("\t],\n", *ostr);
writeChar('\n', *ostr);
writeCString("\t\"data\":\n", *ostr);
writeCString("\t[\n", *ostr);
}
void JSONRowOutputStream::writeField(const Field & field)
{
writeCString("\t\t\t", ostr);
writeDoubleQuotedString(fields[field_number].name, ostr);
writeCString(": ", ostr);
fields[field_number].type->serializeTextJSON(field, ostr);
writeCString("\t\t\t", *ostr);
writeDoubleQuotedString(fields[field_number].name, *ostr);
writeCString(": ", *ostr);
fields[field_number].type->serializeTextJSON(field, *ostr);
++field_number;
}
void JSONRowOutputStream::writeFieldDelimiter()
{
writeCString(",\n", ostr);
writeCString(",\n", *ostr);
}
void JSONRowOutputStream::writeRowStartDelimiter()
{
if (row_count > 0)
writeCString(",\n", ostr);
writeCString("\t\t{\n", ostr);
writeCString(",\n", *ostr);
writeCString("\t\t{\n", *ostr);
}
void JSONRowOutputStream::writeRowEndDelimiter()
{
writeChar('\n', ostr);
writeCString("\t\t}", ostr);
writeChar('\n', *ostr);
writeCString("\t\t}", *ostr);
field_number = 0;
++row_count;
}
@ -82,30 +100,30 @@ void JSONRowOutputStream::writeRowEndDelimiter()
void JSONRowOutputStream::writeSuffix()
{
writeChar('\n', ostr);
writeCString("\t]", ostr);
writeChar('\n', *ostr);
writeCString("\t]", *ostr);
writeTotals();
writeExtremes();
writeCString(",\n\n", ostr);
writeCString("\t\"rows\": ", ostr);
writeIntText(row_count, ostr);
writeCString(",\n\n", *ostr);
writeCString("\t\"rows\": ", *ostr);
writeIntText(row_count, *ostr);
writeRowsBeforeLimitAtLeast();
writeChar('\n', ostr);
writeCString("}\n", ostr);
ostr.next();
writeChar('\n', *ostr);
writeCString("}\n", *ostr);
ostr->next();
}
void JSONRowOutputStream::writeRowsBeforeLimitAtLeast()
{
if (applied_limit)
{
writeCString(",\n\n", ostr);
writeCString("\t\"rows_before_limit_at_least\": ", ostr);
writeIntText(rows_before_limit, ostr);
writeCString(",\n\n", *ostr);
writeCString("\t\"rows_before_limit_at_least\": ", *ostr);
writeIntText(rows_before_limit, *ostr);
}
}
@ -113,10 +131,10 @@ void JSONRowOutputStream::writeTotals()
{
if (totals)
{
writeCString(",\n", ostr);
writeChar('\n', ostr);
writeCString("\t\"totals\":\n", ostr);
writeCString("\t{\n", ostr);
writeCString(",\n", *ostr);
writeChar('\n', *ostr);
writeCString("\t\"totals\":\n", *ostr);
writeCString("\t{\n", *ostr);
size_t totals_columns = totals.columns();
for (size_t i = 0; i < totals_columns; ++i)
@ -124,16 +142,16 @@ void JSONRowOutputStream::writeTotals()
const ColumnWithNameAndType & column = totals.getByPosition(i);
if (i != 0)
writeCString(",\n", ostr);
writeCString(",\n", *ostr);
writeCString("\t\t", ostr);
writeDoubleQuotedString(column.name, ostr);
writeCString(": ", ostr);
column.type->serializeTextJSON((*column.column)[0], ostr);
writeCString("\t\t", *ostr);
writeDoubleQuotedString(column.name, *ostr);
writeCString(": ", *ostr);
column.type->serializeTextJSON((*column.column)[0], *ostr);
}
writeChar('\n', ostr);
writeCString("\t}", ostr);
writeChar('\n', *ostr);
writeCString("\t}", *ostr);
}
}
@ -167,17 +185,17 @@ void JSONRowOutputStream::writeExtremes()
{
if (extremes)
{
writeCString(",\n", ostr);
writeChar('\n', ostr);
writeCString("\t\"extremes\":\n", ostr);
writeCString("\t{\n", ostr);
writeCString(",\n", *ostr);
writeChar('\n', *ostr);
writeCString("\t\"extremes\":\n", *ostr);
writeCString("\t{\n", *ostr);
writeExtremesElement("min", extremes, 0, ostr);
writeCString(",\n", ostr);
writeExtremesElement("max", extremes, 1, ostr);
writeExtremesElement("min", extremes, 0, *ostr);
writeCString(",\n", *ostr);
writeExtremesElement("max", extremes, 1, *ostr);
writeChar('\n', ostr);
writeCString("\t}", ostr);
writeChar('\n', *ostr);
writeCString("\t}", *ostr);
}
}

View File

@ -23,7 +23,7 @@ static void readData(const IDataType & type, IColumn & column, ReadBuffer & istr
if (const DataTypeArray * type_arr = typeid_cast<const DataTypeArray *>(&type))
{
IColumn & offsets_column = *typeid_cast<ColumnArray &>(column).getOffsetsColumn();
type_arr->getOffsetsType()->deserializeBinary(offsets_column, istr, rows);
type_arr->getOffsetsType()->deserializeBinary(offsets_column, istr, rows, 0);
if (offsets_column.size() != rows)
throw Exception("Cannot read all data in NativeBlockInputStream.", ErrorCodes::CANNOT_READ_ALL_DATA);
@ -39,7 +39,7 @@ static void readData(const IDataType & type, IColumn & column, ReadBuffer & istr
{
ColumnNested & column_nested = typeid_cast<ColumnNested &>(column);
IColumn & offsets_column = *column_nested.getOffsetsColumn();
type_nested->getOffsetsType()->deserializeBinary(offsets_column, istr, rows);
type_nested->getOffsetsType()->deserializeBinary(offsets_column, istr, rows, 0);
if (offsets_column.size() != rows)
throw Exception("Cannot read all data in NativeBlockInputStream.", ErrorCodes::CANNOT_READ_ALL_DATA);
@ -58,7 +58,7 @@ static void readData(const IDataType & type, IColumn & column, ReadBuffer & istr
}
}
else
type.deserializeBinary(column, istr, rows);
type.deserializeBinary(column, istr, rows, 0); /// TODO Использовать avg_value_size_hint.
if (column.size() != rows)
throw Exception("Cannot read all data in NativeBlockInputStream.", ErrorCodes::CANNOT_READ_ALL_DATA);

View File

@ -45,7 +45,7 @@ void DataTypeAggregateFunction::serializeBinary(const IColumn & column, WriteBuf
function->serialize(*it, ostr);
}
void DataTypeAggregateFunction::deserializeBinary(IColumn & column, ReadBuffer & istr, size_t limit) const
void DataTypeAggregateFunction::deserializeBinary(IColumn & column, ReadBuffer & istr, size_t limit, double avg_value_size_hint) const
{
ColumnAggregateFunction & real_column = typeid_cast<ColumnAggregateFunction &>(column);
ColumnAggregateFunction::Container_t & vec = real_column.getData();

View File

@ -68,7 +68,7 @@ void DataTypeArray::serializeBinary(const IColumn & column, WriteBuffer & ostr,
}
void DataTypeArray::deserializeBinary(IColumn & column, ReadBuffer & istr, size_t limit) const
void DataTypeArray::deserializeBinary(IColumn & column, ReadBuffer & istr, size_t limit, double avg_value_size_hint) const
{
ColumnArray & column_array = typeid_cast<ColumnArray &>(column);
ColumnArray::Offsets_t & offsets = column_array.getOffsets();
@ -79,7 +79,7 @@ void DataTypeArray::deserializeBinary(IColumn & column, ReadBuffer & istr, size_
if (last_offset < nested_column.size())
throw Exception("Nested column longer than last offset", ErrorCodes::LOGICAL_ERROR);
size_t nested_limit = last_offset - nested_column.size();
nested->deserializeBinary(nested_column, istr, nested_limit);
nested->deserializeBinary(nested_column, istr, nested_limit, 0);
if (column_array.getData().size() != last_offset)
throw Exception("Cannot read all array values", ErrorCodes::CANNOT_READ_ALL_DATA);

View File

@ -49,7 +49,7 @@ void DataTypeFixedString::serializeBinary(const IColumn & column, WriteBuffer &
}
void DataTypeFixedString::deserializeBinary(IColumn & column, ReadBuffer & istr, size_t limit) const
void DataTypeFixedString::deserializeBinary(IColumn & column, ReadBuffer & istr, size_t limit, double avg_value_size_hint) const
{
ColumnFixedString::Chars_t & data = typeid_cast<ColumnFixedString &>(column).getChars();

View File

@ -115,7 +115,7 @@ void DataTypeNested::serializeBinary(const IColumn & column, WriteBuffer & ostr,
}
void DataTypeNested::deserializeBinary(IColumn & column, ReadBuffer & istr, size_t limit) const
void DataTypeNested::deserializeBinary(IColumn & column, ReadBuffer & istr, size_t limit, double avg_value_size_hint) const
{
ColumnNested & column_nested = typeid_cast<ColumnNested &>(column);
ColumnNested::Offsets_t & offsets = column_nested.getOffsets();
@ -129,7 +129,7 @@ void DataTypeNested::deserializeBinary(IColumn & column, ReadBuffer & istr, size
NamesAndTypesList::const_iterator it = nested->begin();
for (size_t i = 0; i < nested->size(); ++i, ++it)
{
it->type->deserializeBinary(*column_nested.getData()[i], istr, nested_limit);
it->type->deserializeBinary(*column_nested.getData()[i], istr, nested_limit, 0);
if (column_nested.getData()[i]->size() != last_offset)
throw Exception("Cannot read all nested column values", ErrorCodes::CANNOT_READ_ALL_DATA);
}

View File

@ -12,6 +12,8 @@
#include <DB/IO/WriteHelpers.h>
#include <DB/IO/VarInt.h>
#include <emmintrin.h>
namespace DB
{
@ -70,15 +72,9 @@ void DataTypeString::serializeBinary(const IColumn & column, WriteBuffer & ostr,
}
void DataTypeString::deserializeBinary(IColumn & column, ReadBuffer & istr, size_t limit) const
template <int UNROLL_TIMES>
static NO_INLINE void deserializeBinarySSE2(ColumnString::Chars_t & data, ColumnString::Offsets_t & offsets, ReadBuffer & istr, size_t limit)
{
ColumnString & column_string = typeid_cast<ColumnString &>(column);
ColumnString::Chars_t & data = column_string.getChars();
ColumnString::Offsets_t & offsets = column_string.getOffsets();
data.reserve(data.size() + limit * DBMS_APPROX_STRING_SIZE);
offsets.reserve(offsets.size() + limit);
size_t offset = data.size();
for (size_t i = 0; i < limit; ++i)
{
@ -93,12 +89,76 @@ void DataTypeString::deserializeBinary(IColumn & column, ReadBuffer & istr, size
data.resize(offset);
istr.readStrict(reinterpret_cast<char*>(&data[offset - size - 1]), sizeof(ColumnUInt8::value_type) * size);
if (size)
{
/// Оптимистичная ветка, в которой возможно более эффективное копирование.
if (offset + 16 * UNROLL_TIMES <= data.capacity() && istr.position() + size + 16 * UNROLL_TIMES <= istr.buffer().end())
{
const __m128i * sse_src_pos = reinterpret_cast<const __m128i *>(istr.position());
const __m128i * sse_src_end = sse_src_pos + (size + (16 * UNROLL_TIMES - 1)) / 16 / UNROLL_TIMES * UNROLL_TIMES;
__m128i * sse_dst_pos = reinterpret_cast<__m128i *>(&data[offset - size - 1]);
while (sse_src_pos < sse_src_end)
{
/// NOTE gcc 4.9.2 разворачивает цикл, но почему-то использует только один xmm регистр.
///for (size_t j = 0; j < UNROLL_TIMES; ++j)
/// _mm_storeu_si128(sse_dst_pos + j, _mm_loadu_si128(sse_src_pos + j));
sse_src_pos += UNROLL_TIMES;
sse_dst_pos += UNROLL_TIMES;
if (UNROLL_TIMES >= 4) __asm__("movdqu %0, %%xmm0" :: "m"(sse_src_pos[-4]));
if (UNROLL_TIMES >= 3) __asm__("movdqu %0, %%xmm1" :: "m"(sse_src_pos[-3]));
if (UNROLL_TIMES >= 2) __asm__("movdqu %0, %%xmm2" :: "m"(sse_src_pos[-2]));
if (UNROLL_TIMES >= 1) __asm__("movdqu %0, %%xmm3" :: "m"(sse_src_pos[-1]));
if (UNROLL_TIMES >= 4) __asm__("movdqu %%xmm0, %0" : "=m"(sse_dst_pos[-4]));
if (UNROLL_TIMES >= 3) __asm__("movdqu %%xmm1, %0" : "=m"(sse_dst_pos[-3]));
if (UNROLL_TIMES >= 2) __asm__("movdqu %%xmm2, %0" : "=m"(sse_dst_pos[-2]));
if (UNROLL_TIMES >= 1) __asm__("movdqu %%xmm3, %0" : "=m"(sse_dst_pos[-1]));
}
istr.position() += size;
}
else
{
istr.readStrict(reinterpret_cast<char*>(&data[offset - size - 1]), size);
}
}
data[offset - 1] = 0;
}
}
void DataTypeString::deserializeBinary(IColumn & column, ReadBuffer & istr, size_t limit, double avg_value_size_hint) const
{
ColumnString & column_string = typeid_cast<ColumnString &>(column);
ColumnString::Chars_t & data = column_string.getChars();
ColumnString::Offsets_t & offsets = column_string.getOffsets();
/// Выбрано наугад.
constexpr auto avg_value_size_hint_reserve_multiplier = 1.2;
double avg_chars_size = (avg_value_size_hint && avg_value_size_hint > sizeof(offsets[0])
? (avg_value_size_hint - sizeof(offsets[0])) * avg_value_size_hint_reserve_multiplier
: DBMS_APPROX_STRING_SIZE);
data.reserve(data.size() + std::ceil(limit * avg_chars_size));
offsets.reserve(offsets.size() + limit);
if (avg_chars_size >= 64)
deserializeBinarySSE2<4>(data, offsets, istr, limit);
else if (avg_chars_size >= 48)
deserializeBinarySSE2<3>(data, offsets, istr, limit);
else if (avg_chars_size >= 32)
deserializeBinarySSE2<2>(data, offsets, istr, limit);
else
deserializeBinarySSE2<1>(data, offsets, istr, limit);
}
void DataTypeString::serializeText(const Field & field, WriteBuffer & ostr) const
{
writeString(get<const String &>(field), ostr);

View File

@ -110,7 +110,7 @@ int main(int argc, char ** argv)
{
std::ifstream istr("test");
DB::ReadBufferFromIStream in_buf(istr);
data_type.deserializeBinary(*column, in_buf, n);
data_type.deserializeBinary(*column, in_buf, n, 0);
}
stopwatch.stop();

View File

@ -26,7 +26,7 @@ int main(int argc, char ** argv)
Poco::SharedPtr<DB::ColumnString> column = new DB::ColumnString();
DB::ColumnString::Chars_t & data = column->getChars();
DB::ColumnString::Offsets_t & offsets = column->getOffsets();
data.resize(n * size);
offsets.resize(n);
for (size_t i = 0; i < n; ++i)
@ -52,7 +52,7 @@ int main(int argc, char ** argv)
DB::ReadBufferFromIStream in_buf(istr);
stopwatch.restart();
data_type.deserializeBinary(*column, in_buf, n);
data_type.deserializeBinary(*column, in_buf, n, 0);
stopwatch.stop();
std::cout << "Reading, elapsed: " << static_cast<double>(stopwatch.elapsed()) / 1000000 << std::endl;

View File

@ -206,7 +206,7 @@ void Compiler::compile(
command <<
"LD_LIBRARY_PATH=/usr/share/clickhouse/bin/"
" /usr/share/clickhouse/bin/clang"
" -x c++ -std=gnu++11 -O3 -g -Wall -Werror -Wnon-virtual-dtor -march=native -D NDEBUG"
" -x c++ -std=gnu++1y -O3 -g -Wall -Werror -Wnon-virtual-dtor -march=native -D NDEBUG"
" -shared -fPIC -fvisibility=hidden -fno-implement-inlines"
" -isystem /usr/share/clickhouse/headers/usr/local/include/"
" -isystem /usr/share/clickhouse/headers/usr/include/"

View File

@ -7,6 +7,8 @@
#include <DB/DataStreams/ConcatBlockInputStream.h>
#include <DB/DataStreams/CollapsingFinalBlockInputStream.h>
#include <DB/DataStreams/AddingConstColumnBlockInputStream.h>
#include <DB/DataStreams/CreatingSetsBlockInputStream.h>
#include <DB/DataStreams/NullBlockInputStream.h>
#include <DB/DataTypes/DataTypesNumberFixed.h>
#include <DB/Common/VirtualColumnUtils.h>
@ -14,13 +16,9 @@
namespace DB
{
MergeTreeDataSelectExecutor::MergeTreeDataSelectExecutor(MergeTreeData & data_) : data(data_), log(&Logger::get(data.getLogName() + " (SelectExecutor)"))
MergeTreeDataSelectExecutor::MergeTreeDataSelectExecutor(MergeTreeData & data_)
: data(data_), log(&Logger::get(data.getLogName() + " (SelectExecutor)"))
{
min_marks_for_seek = (data.settings.min_rows_for_seek + data.index_granularity - 1) / data.index_granularity;
min_marks_for_concurrent_read = (data.settings.min_rows_for_concurrent_read + data.index_granularity - 1) / data.index_granularity;
max_marks_to_use_cache = (data.settings.max_rows_to_use_cache + data.index_granularity - 1) / data.index_granularity;
}
/// Построить блок состоящий только из возможных значений виртуальных столбцов
@ -132,7 +130,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
for (size_t i = 0; i < parts.size(); ++i)
{
MergeTreeData::DataPartPtr & part = parts[i];
MarkRanges ranges = markRangesFromPkRange(part->index, key_condition);
MarkRanges ranges = markRangesFromPkRange(part->index, key_condition, settings);
for (size_t j = 0; j < ranges.size(); ++j)
total_count += ranges[j].end - ranges[j].begin;
@ -253,7 +251,14 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
ExpressionAnalyzer analyzer(select.prewhere_expression, data.context, data.getColumnsList());
prewhere_actions = analyzer.getActions(false);
prewhere_column = select.prewhere_expression->getColumnName();
/// TODO: Чтобы работали подзапросы в PREWHERE, можно тут сохранить analyzer.getSetsWithSubqueries(), а потом их выполнить.
SubqueriesForSets prewhere_subqueries = analyzer.getSubqueriesForSets();
/** Вычислим подзапросы прямо сейчас.
* NOTE Недостаток - эти вычисления не вписываются в конвейер выполнения запроса.
* Они делаются до начала выполнения конвейера; их нельзя прервать; во время вычислений не отправляются пакеты прогресса.
*/
if (!prewhere_subqueries.empty())
CreatingSetsBlockInputStream(new NullBlockInputStream, prewhere_subqueries, settings.limits).read();
}
RangesInDataParts parts_with_ranges;
@ -264,7 +269,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
for (auto & part : parts)
{
RangesInDataPart ranges(part, (*part_index)++);
ranges.ranges = markRangesFromPkRange(part->index, key_condition);
ranges.ranges = markRangesFromPkRange(part->index, key_condition, settings);
if (!ranges.ranges.empty())
{
@ -298,7 +303,8 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
settings.use_uncompressed_cache,
prewhere_actions,
prewhere_column,
virt_column_names);
virt_column_names,
settings);
}
else
{
@ -310,7 +316,8 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
settings.use_uncompressed_cache,
prewhere_actions,
prewhere_column,
virt_column_names);
virt_column_names,
settings);
}
if (relative_sample_size != 0)
@ -320,6 +327,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
return res;
}
BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongThreads(
RangesInDataParts parts,
size_t threads,
@ -328,8 +336,12 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongThreads(
bool use_uncompressed_cache,
ExpressionActionsPtr prewhere_actions,
const String & prewhere_column,
const Names & virt_columns)
const Names & virt_columns,
const Settings & settings)
{
size_t min_marks_for_concurrent_read = (settings.merge_tree_min_rows_for_concurrent_read + data.index_granularity - 1) / data.index_granularity;
size_t max_marks_to_use_cache = (settings.merge_tree_max_rows_to_use_cache + data.index_granularity - 1) / data.index_granularity;
/// На всякий случай перемешаем куски.
std::random_shuffle(parts.begin(), parts.end());
@ -419,6 +431,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongThreads(
data.getFullPath() + part.data_part->name + '/', max_block_size, column_names, data,
part.data_part, ranges_to_get_from_part, use_uncompressed_cache,
prewhere_actions, prewhere_column));
for (const String & virt_column : virt_columns)
{
if (virt_column == "_part")
@ -451,8 +464,11 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongThreadsFinal
bool use_uncompressed_cache,
ExpressionActionsPtr prewhere_actions,
const String & prewhere_column,
const Names & virt_columns)
const Names & virt_columns,
const Settings & settings)
{
size_t max_marks_to_use_cache = (settings.merge_tree_max_rows_to_use_cache + data.index_granularity - 1) / data.index_granularity;
size_t sum_marks = 0;
for (size_t i = 0; i < parts.size(); ++i)
for (size_t j = 0; j < parts[i].ranges.size(); ++j)
@ -529,8 +545,11 @@ void MergeTreeDataSelectExecutor::createPositiveSignCondition(ExpressionActionsP
}
/// Получает набор диапазонов засечек, вне которых не могут находиться ключи из заданного диапазона.
MarkRanges MergeTreeDataSelectExecutor::markRangesFromPkRange(const MergeTreeData::DataPart::Index & index, PKCondition & key_condition)
MarkRanges MergeTreeDataSelectExecutor::markRangesFromPkRange(
const MergeTreeData::DataPart::Index & index, PKCondition & key_condition, const Settings & settings)
{
size_t min_marks_for_seek = (settings.merge_tree_min_rows_for_seek + data.index_granularity - 1) / data.index_granularity;
MarkRanges res;
size_t key_size = data.getSortDescription().size();
@ -575,7 +594,7 @@ MarkRanges MergeTreeDataSelectExecutor::markRangesFromPkRange(const MergeTreeDat
else
{
/// Разбиваем отрезок и кладем результат в стек справа налево.
size_t step = (range.end - range.begin - 1) / data.settings.coarse_index_granularity + 1;
size_t step = (range.end - range.begin - 1) / settings.merge_tree_coarse_index_granularity + 1;
size_t end;
for (end = range.end; end > range.begin + step; end -= step)

View File

@ -71,11 +71,10 @@ protected:
if (!buffer.data)
return res;
for (size_t i = 0, size = buffer.data.columns(); i < size; ++i)
for (const auto & name : column_names)
{
auto & col = buffer.data.unsafeGetByPosition(i);
if (column_names.count(col.name))
res.insert(col);
auto & col = buffer.data.getByName(name);
res.insert(ColumnWithNameAndType(col.column->clone(), col.type, name));
}
return res;
@ -213,14 +212,17 @@ private:
void insertIntoBuffer(const Block & block, StorageBuffer::Buffer & buffer, std::unique_lock<std::mutex> && lock)
{
/// Сортируем столбцы в блоке. Это нужно, чтобы было проще потом конкатенировать блоки.
Block sorted_block = block.sortColumns();
if (!buffer.data)
{
buffer.first_write_time = time(0);
buffer.data = block.cloneEmpty();
buffer.data = sorted_block.cloneEmpty();
}
/// Если после вставки в буфер, ограничения будут превышены, то будем сбрасывать буфер.
if (storage.checkThresholds(buffer, time(0), block.rowsInFirstColumn(), block.bytes()))
if (storage.checkThresholds(buffer, time(0), sorted_block.rowsInFirstColumn(), sorted_block.bytes()))
{
/// Вытащим из буфера блок, заменим буфер на пустой. После этого можно разблокировать mutex.
Block block_to_write;
@ -230,13 +232,13 @@ private:
if (!storage.no_destination)
{
appendBlock(block, block_to_write);
appendBlock(sorted_block, block_to_write);
storage.writeBlockToDestination(block_to_write,
storage.context.tryGetTable(storage.destination_database, storage.destination_table));
}
}
else
appendBlock(block, buffer.data);
appendBlock(sorted_block, buffer.data);
}
};

View File

@ -338,7 +338,7 @@ void LogBlockInputStream::readData(const String & name, const IDataType & type,
}
}
else
type.deserializeBinary(column, streams[name]->compressed, max_rows_to_read);
type.deserializeBinary(column, streams[name]->compressed, max_rows_to_read, 0); /// TODO Использовать avg_value_size_hint.
}

View File

@ -23,19 +23,7 @@ SetOrJoinBlockOutputStream::SetOrJoinBlockOutputStream(StorageSetOrJoinBase & ta
void SetOrJoinBlockOutputStream::write(const Block & block)
{
/// Сортируем столбцы в блоке. Это нужно, так как Set и Join рассчитывают на одинаковый порядок столбцов в разных блоках.
size_t columns = block.columns();
std::vector<std::string> names(columns);
for (size_t i = 0; i < columns; ++i)
names[i] = block.unsafeGetByPosition(i).name;
std::sort(names.begin(), names.end());
Block sorted_block;
for (const auto & name : names)
sorted_block.insert(block.getByName(name)); /// NOTE Можно чуть-чуть оптимальнее.
Block sorted_block = block.sortColumns();
table.insertBlock(sorted_block);
backup_stream.write(sorted_block);

View File

@ -276,7 +276,7 @@ void TinyLogBlockInputStream::readData(const String & name, const IDataType & ty
}
}
else
type.deserializeBinary(column, streams[name]->compressed, limit);
type.deserializeBinary(column, streams[name]->compressed, limit, 0); /// TODO Использовать avg_value_size_hint.
}

View File

@ -0,0 +1,90 @@
1 2 [3]
2 [3] 1
[3] 1 2
1 [3] 2
2 1 [3]
[3] 2 1
1 2
2 [3]
[3] 1
1 [3]
2 1
[3] 2
1
2
[3]
1 2 [3]
9 8 [7]
2 [3] 1
8 [7] 9
[3] 1 2
[7] 9 8
1 [3] 2
9 [7] 8
2 1 [3]
8 9 [7]
[3] 2 1
[7] 8 9
1 2
9 8
2 [3]
8 [7]
[3] 1
[7] 9
1 [3]
9 [7]
2 1
8 9
[3] 2
[7] 8
1
9
2
8
[3]
[7]
1 2 [3]
9 8 [7]
11 [33]
2 [3] 1
8 [7] 9
[33] 11
[3] 1 2
[7] 9 8
[33] 11
1 [3] 2
9 [7] 8
11 [33]
2 1 [3]
8 9 [7]
11 [33]
[3] 2 1
[7] 8 9
[33] 11
1 2
9 8
11
2 [3]
8 [7]
[33]
[3] 1
[7] 9
[33] 11
1 [3]
9 [7]
11 [33]
2 1
8 9
11
[3] 2
[7] 8
[33]
1
9
11
2
8
[3]
[7]
[33]

View File

@ -0,0 +1,62 @@
DROP TABLE IF EXISTS test.buffer;
DROP TABLE IF EXISTS test.null;
CREATE TABLE test.null (a UInt8, b String, c Array(UInt32)) ENGINE = Null;
CREATE TABLE test.buffer (a UInt8, b String, c Array(UInt32)) ENGINE = Buffer(test, null, 1, 1000, 1000, 1000, 1000, 1000000, 1000000);
INSERT INTO test.buffer VALUES (1, '2', [3]);
SELECT a, b, c FROM test.buffer ORDER BY a, b, c;
SELECT b, c, a FROM test.buffer ORDER BY a, b, c;
SELECT c, a, b FROM test.buffer ORDER BY a, b, c;
SELECT a, c, b FROM test.buffer ORDER BY a, b, c;
SELECT b, a, c FROM test.buffer ORDER BY a, b, c;
SELECT c, b, a FROM test.buffer ORDER BY a, b, c;
SELECT a, b FROM test.buffer ORDER BY a, b, c;
SELECT b, c FROM test.buffer ORDER BY a, b, c;
SELECT c, a FROM test.buffer ORDER BY a, b, c;
SELECT a, c FROM test.buffer ORDER BY a, b, c;
SELECT b, a FROM test.buffer ORDER BY a, b, c;
SELECT c, b FROM test.buffer ORDER BY a, b, c;
SELECT a FROM test.buffer ORDER BY a, b, c;
SELECT b FROM test.buffer ORDER BY a, b, c;
SELECT c FROM test.buffer ORDER BY a, b, c;
INSERT INTO test.buffer (c, b, a) VALUES ([7], '8', 9);
SELECT a, b, c FROM test.buffer ORDER BY a, b, c;
SELECT b, c, a FROM test.buffer ORDER BY a, b, c;
SELECT c, a, b FROM test.buffer ORDER BY a, b, c;
SELECT a, c, b FROM test.buffer ORDER BY a, b, c;
SELECT b, a, c FROM test.buffer ORDER BY a, b, c;
SELECT c, b, a FROM test.buffer ORDER BY a, b, c;
SELECT a, b FROM test.buffer ORDER BY a, b, c;
SELECT b, c FROM test.buffer ORDER BY a, b, c;
SELECT c, a FROM test.buffer ORDER BY a, b, c;
SELECT a, c FROM test.buffer ORDER BY a, b, c;
SELECT b, a FROM test.buffer ORDER BY a, b, c;
SELECT c, b FROM test.buffer ORDER BY a, b, c;
SELECT a FROM test.buffer ORDER BY a, b, c;
SELECT b FROM test.buffer ORDER BY a, b, c;
SELECT c FROM test.buffer ORDER BY a, b, c;
INSERT INTO test.buffer (a, c) VALUES (11, [33]);
SELECT a, b, c FROM test.buffer ORDER BY a, b, c;
SELECT b, c, a FROM test.buffer ORDER BY a, b, c;
SELECT c, a, b FROM test.buffer ORDER BY a, b, c;
SELECT a, c, b FROM test.buffer ORDER BY a, b, c;
SELECT b, a, c FROM test.buffer ORDER BY a, b, c;
SELECT c, b, a FROM test.buffer ORDER BY a, b, c;
SELECT a, b FROM test.buffer ORDER BY a, b, c;
SELECT b, c FROM test.buffer ORDER BY a, b, c;
SELECT c, a FROM test.buffer ORDER BY a, b, c;
SELECT a, c FROM test.buffer ORDER BY a, b, c;
SELECT b, a FROM test.buffer ORDER BY a, b, c;
SELECT c, b FROM test.buffer ORDER BY a, b, c;
SELECT a FROM test.buffer ORDER BY a, b, c;
SELECT b FROM test.buffer ORDER BY a, b, c;
SELECT c FROM test.buffer ORDER BY a, b, c;
DROP TABLE test.buffer;
DROP TABLE test.null;