dbms: more scalable aggregator: development [#METR-2944].

This commit is contained in:
Alexey Milovidov 2015-01-03 06:18:49 +03:00
parent 7cf0bca8af
commit 221efeb82a
18 changed files with 188 additions and 85 deletions

View File

@ -5,6 +5,7 @@
#include <list>
#include <initializer_list>
#include <DB/Core/BlockInfo.h>
#include <DB/Core/ColumnWithNameAndType.h>
#include <DB/Core/NamesAndTypes.h>
#include <DB/Core/Exception.h>
@ -35,6 +36,8 @@ private:
IndexByName_t index_by_name;
public:
BlockInfo info;
Block() = default;
Block(std::initializer_list<ColumnWithNameAndType> il) : data{il}
{

View File

@ -0,0 +1,87 @@
#pragma once
#include <DB/Core/Types.h>
#include <DB/Core/Exception.h>
#include <DB/Core/ErrorCodes.h>
#include <DB/IO/ReadBuffer.h>
#include <DB/IO/WriteBuffer.h>
#include <DB/IO/VarInt.h>
#include <DB/IO/ReadHelpers.h>
#include <DB/IO/WriteHelpers.h>
namespace DB
{
/** Дополнительная информация о блоке.
*/
struct BlockInfo
{
/** is_overflows:
* После выполнения GROUP BY ... WITH TOTALS с настройками max_rows_to_group_by и group_by_overflow_mode = 'any',
* в отдельный блок засовывается строчка с аргегированными значениями, не прошедшими max_rows_to_group_by.
* Если это такой блок, то для него is_overflows выставляется в true.
*/
/** bucket_num:
* При использовании двухуровневого метода агрегации, данные с разными группами ключей раскидываются по разным корзинам.
* В таком случае здесь указывается номер корзины. Он используется для оптимизации слияния при распределённой аргегации.
* Иначе - -1.
*/
#define APPLY_FOR_INFO_FIELDS(M) \
M(bool, is_overflows, false, 1) \
M(Int32, bucket_num, -1, 2)
#define DECLARE_FIELD(TYPE, NAME, DEFAULT, FIELD_NUM) \
TYPE NAME = DEFAULT;
APPLY_FOR_INFO_FIELDS(DECLARE_FIELD)
#undef DECLARE_FIELD
/// Записать значения в бинарном виде. NOTE: Можно было бы использовать protobuf, но он был бы overkill для данного случая.
void write(WriteBuffer & out) const
{
/// Набор пар FIELD_NUM, значение в бинарном виде. Затем 0.
#define WRITE_FIELD(TYPE, NAME, DEFAULT, FIELD_NUM) \
writeVarUInt(FIELD_NUM, out); \
writeBinary(NAME, out);
APPLY_FOR_INFO_FIELDS(WRITE_FIELD);
#undef WRITE_FIELD
writeVarUInt(0, out);
}
/// Прочитать значения в бинарном виде.
void read(ReadBuffer & in)
{
UInt64 field_num = 0;
while (true)
{
readVarUInt(field_num, in);
if (field_num == 0)
break;
switch (field_num)
{
#define READ_FIELD(TYPE, NAME, DEFAULT, FIELD_NUM) \
case FIELD_NUM: \
readBinary(NAME, in); \
break;
APPLY_FOR_INFO_FIELDS(READ_FIELD);
#undef READ_FIELD
default:
throw Exception("Unknown BlockInfo field number: " + toString(field_num), ErrorCodes::UNKNOWN_BLOCK_INFO_FIELD);
}
}
}
#undef APPLY_FOR_INFO_FIELDS
};
}

View File

@ -65,6 +65,7 @@
#define DBMS_MIN_REVISION_WITH_STRING_QUERY_ID 39002
#define DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES 50264
#define DBMS_MIN_REVISION_WITH_TOTAL_ROWS_IN_PROGRESS 51554
#define DBMS_MIN_REVISION_WITH_BLOCK_INFO 51903
#define DBMS_DISTRIBUTED_DIRECTORY_MONITOR_SLEEP_TIME_MS 100

View File

@ -268,6 +268,7 @@ namespace ErrorCodes
UNION_ALL_RESULT_STRUCTURES_MISMATCH,
UNION_ALL_COLUMN_ALIAS_MISMATCH,
CLIENT_OUTPUT_FORMAT_SPECIFIED,
UNKNOWN_BLOCK_INFO_FIELD,
POCO_EXCEPTION = 1000,
STD_EXCEPTION,

View File

@ -17,15 +17,15 @@ class MergingAggregatedBlockInputStream : public IProfilingBlockInputStream
{
public:
MergingAggregatedBlockInputStream(BlockInputStreamPtr input_, const ColumnNumbers & keys_,
const AggregateDescriptions & aggregates_, bool overflow_row_, bool final_)
: aggregator(new Aggregator(keys_, aggregates_, overflow_row_)), final(final_)
const AggregateDescriptions & aggregates_, bool overflow_row_, bool final_, size_t max_threads_)
: aggregator(new Aggregator(keys_, aggregates_, overflow_row_)), final(final_), max_threads(max_threads_)
{
children.push_back(input_);
}
MergingAggregatedBlockInputStream(BlockInputStreamPtr input_, const Names & keys_names_,
const AggregateDescriptions & aggregates_, bool overflow_row_, bool final_)
: aggregator(new Aggregator(keys_names_, aggregates_, overflow_row_)), final(final_)
const AggregateDescriptions & aggregates_, bool overflow_row_, bool final_, size_t max_threads_)
: aggregator(new Aggregator(keys_names_, aggregates_, overflow_row_)), final(final_), max_threads(max_threads_)
{
children.push_back(input_);
}
@ -45,6 +45,7 @@ protected:
private:
SharedPtr<Aggregator> aggregator;
bool final;
size_t max_threads;
bool executed = false;
BlocksList blocks;

View File

@ -13,8 +13,11 @@ namespace DB
class NativeBlockInputStream : public IProfilingBlockInputStream
{
public:
NativeBlockInputStream(ReadBuffer & istr_, const DataTypeFactory & data_type_factory_)
: istr(istr_), data_type_factory(data_type_factory_) {}
/** В случае указания ненулевой server_revision, может ожидаться и считываться дополнительная информация о блоке,
* в зависимости от поддерживаемой для указанной ревизии.
*/
NativeBlockInputStream(ReadBuffer & istr_, const DataTypeFactory & data_type_factory_, UInt64 server_revision_ = 0)
: istr(istr_), data_type_factory(data_type_factory_), server_revision(server_revision_) {}
String getName() const override { return "NativeBlockInputStream"; }
@ -31,6 +34,7 @@ protected:
private:
ReadBuffer & istr;
const DataTypeFactory & data_type_factory;
UInt64 server_revision;
};
}

View File

@ -12,13 +12,18 @@ namespace DB
class NativeBlockOutputStream : public IBlockOutputStream
{
public:
NativeBlockOutputStream(WriteBuffer & ostr_) : ostr(ostr_) {}
/** В случае указания ненулевой client_revision, может записываться дополнительная информация о блоке,
* в зависимости от поддерживаемой для указанной ревизии.
*/
NativeBlockOutputStream(WriteBuffer & ostr_, UInt64 client_revision_ = 0)
: ostr(ostr_), client_revision(client_revision_) {}
void write(const Block & block) override;
void flush() override { ostr.next(); }
private:
WriteBuffer & ostr;
UInt64 client_revision;
};
}

View File

@ -632,7 +632,7 @@ public:
*/
BlocksList convertToBlocks(AggregatedDataVariants & data_variants, bool final, size_t max_threads);
/** Объединить несколько структур данных агрегации в одну. (В первый непустой элемент массива.) Все варианты агрегации должны быть одинаковыми!
/** Объединить несколько структур данных агрегации в одну. (В первый непустой элемент массива.)
* После объединения, все стркутуры агрегации (а не только те, в которую они будут слиты) должны жить,
* пока не будет вызвана функция convertToBlocks.
* Это нужно, так как в слитом результате могут остаться указатели на память в пуле, которым владеют другие структуры агрегации.
@ -641,9 +641,8 @@ public:
/** Объединить несколько агрегированных блоков в одну структуру данных.
* (Доагрегировать несколько блоков, которые представляют собой результат независимых агрегаций с удалённых серверов.)
* Если overflow_row = true, то предполагается, что агрегаты для строк, не попавших в max_rows_to_group_by, расположены в первой строке каждого блока.
*/
void merge(BlockInputStreamPtr stream, AggregatedDataVariants & result);
void merge(BlockInputStreamPtr stream, AggregatedDataVariants & result, size_t max_threads);
/// Для IBlockInputStream.
String getID() const;
@ -740,7 +739,6 @@ protected:
void mergeStreamsImpl(
Method & method,
Arena * aggregates_pool,
size_t start_row,
size_t rows,
ConstColumnPlainPtrs & key_columns,
AggregateColumnsData & aggregate_columns,

View File

@ -269,7 +269,7 @@ void Connection::sendData(const Block & block, const String & name)
else
maybe_compressed_out = out;
block_out = new NativeBlockOutputStream(*maybe_compressed_out);
block_out = new NativeBlockOutputStream(*maybe_compressed_out, server_revision);
}
writeVarUInt(Protocol::Client::Data, *out);
@ -444,7 +444,7 @@ void Connection::initBlockInput()
else
maybe_compressed_in = in;
block_in = new NativeBlockInputStream(*maybe_compressed_in, data_type_factory);
block_in = new NativeBlockInputStream(*maybe_compressed_in, data_type_factory, server_revision);
}
}

View File

@ -36,6 +36,7 @@ void Block::addDefaults(const NamesAndTypesList & required_columns)
Block & Block::operator= (const Block & other)
{
info = other.info;
data = other.data;
index_by_position.resize(data.size());
@ -272,6 +273,7 @@ Block Block::cloneEmpty() const
{
Block res;
res.info = info;
for (Container_t::const_iterator it = data.begin(); it != data.end(); ++it)
res.insert(it->cloneEmpty());
@ -363,7 +365,7 @@ bool blocksHaveEqualStructure(const Block & lhs, const Block & rhs)
{
const IDataType & lhs_type = *lhs.getByPosition(i).type;
const IDataType & rhs_type = *rhs.getByPosition(i).type;
if (lhs_type.getName() != rhs_type.getName())
return false;
}
@ -374,6 +376,7 @@ bool blocksHaveEqualStructure(const Block & lhs, const Block & rhs)
void Block::clear()
{
info = BlockInfo();
data.clear();
index_by_name.clear();
index_by_position.clear();
@ -381,6 +384,7 @@ void Block::clear()
void Block::swap(Block & other)
{
std::swap(info, other.info);
data.swap(other.data);
index_by_name.swap(other.index_by_name);
index_by_position.swap(other.index_by_position);

View File

@ -13,8 +13,8 @@ Block MergingAggregatedBlockInputStream::readImpl()
{
executed = true;
AggregatedDataVariants data_variants;
aggregator->merge(children.back(), data_variants);
blocks = aggregator->convertToBlocks(data_variants, final, 1);
aggregator->merge(children.back(), data_variants, max_threads);
blocks = aggregator->convertToBlocks(data_variants, final, max_threads);
it = blocks.begin();
}

View File

@ -72,6 +72,10 @@ Block NativeBlockInputStream::readImpl()
if (istr.eof())
return res;
/// Дополнительная информация о блоке.
if (server_revision >= DBMS_MIN_REVISION_WITH_BLOCK_INFO)
res.info.read(istr);
/// Размеры
size_t columns = 0;
size_t rows = 0;

View File

@ -49,6 +49,10 @@ static void writeData(const IDataType & type, const IColumn & column, WriteBuffe
void NativeBlockOutputStream::write(const Block & block)
{
/// Дополнительная информация о блоке.
if (client_revision >= DBMS_MIN_REVISION_WITH_BLOCK_INFO)
block.info.write(ostr);
/// Размеры
size_t columns = block.columns();
size_t rows = block.rows();

View File

@ -56,8 +56,8 @@ Block TotalsHavingBlockInputStream::readImpl()
{
block = children[0]->read();
/// В этом случае, первый блок - блок со значениями, не вошедшими в max_rows_to_group_by. Отложим его.
if (overflow_row && !overflow_aggregates && block)
/// Блок со значениями, не вошедшими в max_rows_to_group_by. Отложим его.
if (overflow_row && block && block.info.is_overflows)
{
overflow_aggregates = block;
continue;

View File

@ -22,6 +22,7 @@
#include <DB/Storages/StorageLog.h>
#include <DB/Interpreters/Context.h>
#include <Yandex/Revision.h>
int main(int argc, char ** argv)
@ -107,7 +108,7 @@ int main(int argc, char ** argv)
SharedPtr<IBlockInputStream> in = table->read(column_names, 0, Context{}, Settings(), stage)[0];
WriteBufferFromOStream out1(std::cout);
CompressedWriteBuffer out2(out1);
NativeBlockOutputStream out3(out2);
NativeBlockOutputStream out3(out2, Revision::get());
copyData(*in, out3);
}
@ -118,7 +119,7 @@ int main(int argc, char ** argv)
ReadBufferFromIStream in1(std::cin);
CompressedReadBuffer in2(in1);
NativeBlockInputStream in3(in2, factory);
NativeBlockInputStream in3(in2, factory, Revision::get());
SharedPtr<IBlockOutputStream> out = table->write(0);
copyData(in3, *out);
}

View File

@ -432,7 +432,6 @@ template <typename Method>
void NO_INLINE Aggregator::mergeStreamsImpl(
Method & method,
Arena * aggregates_pool,
size_t start_row,
size_t rows,
ConstColumnPlainPtrs & key_columns,
AggregateColumnsData & aggregate_columns,
@ -442,7 +441,7 @@ void NO_INLINE Aggregator::mergeStreamsImpl(
method.init(key_columns);
/// Для всех строчек.
for (size_t i = start_row; i < rows; ++i)
for (size_t i = 0; i < rows; ++i)
{
typename Method::iterator it;
bool inserted; /// Вставили новый ключ, или такой ключ уже был?
@ -524,22 +523,6 @@ void NO_INLINE Aggregator::convertToBlockImplNotFinal(
}
}
/*template <typename Method>
void Aggregator::convertToBlocksTwoLevelImpl(
Method & method,
ColumnPlainPtrs & key_columns,
AggregateColumnsData & aggregate_columns,
ColumnPlainPtrs & final_aggregate_columns,
const Sizes & key_sizes,
size_t start_row,
bool final) const
{
if (final)
convertToBlockImplFinal(method, data, key_columns, aggregate_columns, final_aggregate_columns, key_sizes, start_row);
else
convertToBlockImplNotFinal(method, data, key_columns, aggregate_columns, final_aggregate_columns, key_sizes, start_row);
}*/
template <typename Method>
void NO_INLINE Aggregator::destroyImpl(
@ -838,8 +821,12 @@ BlocksList Aggregator::prepareBlocksAndFillWithoutKey(AggregatedDataVariants & d
}
};
Block block = prepareBlockAndFill(data_variants, final, rows, filler);
if (overflow_row)
block.info.is_overflows = true;
BlocksList blocks;
blocks.emplace_back(prepareBlockAndFill(data_variants, final, rows, filler));
blocks.emplace_back(std::move(block));
return blocks;
}
@ -909,7 +896,7 @@ BlocksList Aggregator::prepareBlocksAndFillTwoLevelImpl(
{
current_memory_tracker = memory_tracker;
return prepareBlockAndFill(data_variants, final, method.data.impls[bucket].size(),
Block block = prepareBlockAndFill(data_variants, final, method.data.impls[bucket].size(),
[bucket, &filler] (
ColumnPlainPtrs & key_columns,
AggregateColumnsData & aggregate_columns,
@ -919,6 +906,9 @@ BlocksList Aggregator::prepareBlocksAndFillTwoLevelImpl(
{
filler(key_columns, aggregate_columns, final_aggregate_columns, key_sizes, final, bucket);
});
block.info.bucket_num = bucket;
return block;
};
/// packaged_task используются, чтобы исключения автоматически прокидывались в основной поток.
@ -996,11 +986,6 @@ BlocksList Aggregator::convertToBlocks(AggregatedDataVariants & data_variants, b
&& data_variants.isTwoLevel()) /// TODO Использовать общий тред-пул с функцией merge.
thread_pool.reset(new boost::threadpool::pool(max_threads));
/** Если требуется выдать overflow_row
* (то есть, блок со значениями, не поместившимися в max_rows_to_group_by),
* то этот блок должен идти первым (на это рассчитывает TotalsHavingBlockInputStream).
*/
if (data_variants.type == AggregatedDataVariants::Type::without_key || overflow_row)
blocks.splice(blocks.end(), prepareBlocksAndFillWithoutKey(data_variants, final));
@ -1145,7 +1130,7 @@ AggregatedDataVariantsPtr Aggregator::merge(ManyAggregatedDataVariants & data_va
}
void Aggregator::merge(BlockInputStreamPtr stream, AggregatedDataVariants & result)
void Aggregator::merge(BlockInputStreamPtr stream, AggregatedDataVariants & result, size_t max_threads)
{
StringRefs key(keys_size);
ConstColumnPlainPtrs key_columns(keys_size);
@ -1158,7 +1143,15 @@ void Aggregator::merge(BlockInputStreamPtr stream, AggregatedDataVariants & resu
/// result будет уничтожать состояния агрегатных функций в деструкторе
result.aggregator = this;
/// Читаем все данные
/** Если на удалённых серверах использовался двухуровневый метод агрегации,
* то в блоках будет расположена информация о номере корзины.
* Тогда вычисления можно будет распараллелить по корзинам.
* Разложим блоки по указанным в них номерам корзин.
*/
using BucketToBlocks = std::map<Int32, BlocksList>;
BucketToBlocks bucket_to_blocks;
/// Читаем все данные. TODO memory-savvy режим, при котором в один момент времени обрабатывается только одна корзина.
while (Block block = stream->read())
{
LOG_TRACE(log, "Merging aggregated block");
@ -1187,7 +1180,7 @@ void Aggregator::merge(BlockInputStreamPtr stream, AggregatedDataVariants & resu
result.key_sizes = key_sizes;
}
if (result.type == AggregatedDataVariants::Type::without_key || overflow_row)
if (result.type == AggregatedDataVariants::Type::without_key || block.info.is_overflows)
{
AggregatedDataWithoutKey & res = result.without_key;
if (!res)
@ -1201,11 +1194,9 @@ void Aggregator::merge(BlockInputStreamPtr stream, AggregatedDataVariants & resu
aggregate_functions[i]->merge(res + offsets_of_aggregate_states[i], (*aggregate_columns[i])[0]);
}
size_t start_row = overflow_row ? 1 : 0;
#define M(NAME, IS_TWO_LEVEL) \
else if (result.type == AggregatedDataVariants::Type::NAME) \
mergeStreamsImpl(*result.NAME, result.aggregates_pool, start_row, rows, key_columns, aggregate_columns, key_sizes, key);
mergeStreamsImpl(*result.NAME, result.aggregates_pool, rows, key_columns, aggregate_columns, key_sizes, key);
if (false) {}
APPLY_FOR_AGGREGATED_VARIANTS(M)

View File

@ -73,7 +73,7 @@ void InterpreterSelectQuery::basicInit(BlockInputStreamPtr input_, const NamesAn
{
if (table_column_names.empty())
{
/// Оптимизация: мы считаем, что запрос содержит только один SELECT, даже если это может быть
/// Оптимизация: мы считаем, что запрос содержит только один SELECT, даже если это может быть
/// в самом деле цепочкой UNION ALL. Первый запрос достаточен для определения нужных столбцов.
context.setColumns(InterpreterSelectQuery(query.table, context, to_stage, subquery_depth, nullptr, false).getSampleBlock().getColumnsList());
}
@ -121,7 +121,7 @@ void InterpreterSelectQuery::basicInit(BlockInputStreamPtr input_, const NamesAn
if (isFirstSelectInsideUnionAll())
{
/// Создаем цепочку запросов SELECT и проверяем, что результаты всех запросов SELECT cовместимые.
/// NOTE Мы можем безопасно применить static_cast вместо typeid_cast,
/// NOTE Мы можем безопасно применить static_cast вместо typeid_cast,
/// потому что знаем, что в цепочке UNION ALL имеются только деревья типа SELECT.
InterpreterSelectQuery * interpreter = this;
Block first = interpreter->getSampleBlock();
@ -132,7 +132,7 @@ void InterpreterSelectQuery::basicInit(BlockInputStreamPtr input_, const NamesAn
Block current = interpreter->getSampleBlock();
if (!blocksHaveEqualStructure(first, current))
throw Exception("Result structures mismatch in the SELECT queries of the UNION ALL chain. Found result structure:\n\n" + current.dumpStructure()
+ "\n\nwhile expecting:\n\n" + first.dumpStructure() + "\n\ninstead",
+ "\n\nwhile expecting:\n\n" + first.dumpStructure() + "\n\ninstead",
ErrorCodes::UNION_ALL_RESULT_STRUCTURES_MISMATCH);
}
}
@ -173,7 +173,7 @@ InterpreterSelectQuery::InterpreterSelectQuery(ASTPtr query_ptr_, const Context
context(context_), settings(context.getSettings()), to_stage(to_stage_), subquery_depth(subquery_depth_),
is_union_all_head(true),
log(&Logger::get("InterpreterSelectQuery"))
{
{
init(input_, required_column_names_, table_column_names);
}
@ -189,7 +189,7 @@ bool InterpreterSelectQuery::hasAsterisk() const
if (next_query.hasAsterisk())
return true;
}
return false;
}
@ -207,7 +207,7 @@ void InterpreterSelectQuery::rewriteExpressionList(const Names & required_column
{
if (query.distinct)
return;
if (isFirstSelectInsideUnionAll())
for (IAST * tree = query.next_union_all.get(); tree != nullptr; tree = static_cast<ASTSelectQuery *>(tree)->next_union_all.get())
{
@ -215,9 +215,9 @@ void InterpreterSelectQuery::rewriteExpressionList(const Names & required_column
if (next_query.distinct)
return;
}
query.rewriteSelectExpressionList(required_column_names);
if (isFirstSelectInsideUnionAll())
for (IAST * tree = query.next_union_all.get(); tree != nullptr; tree = static_cast<ASTSelectQuery *>(tree)->next_union_all.get())
{
@ -293,12 +293,12 @@ static inline BlockInputStreamPtr maybeAsynchronous(BlockInputStreamPtr in, bool
BlockInputStreamPtr InterpreterSelectQuery::execute()
{
(void) executeWithoutUnion();
if (streams.empty())
return new NullBlockInputStream;
executeUnion(streams);
/// Ограничения на результат, квота на результат, а также колбек для прогресса.
if (IProfilingBlockInputStream * stream = dynamic_cast<IProfilingBlockInputStream *>(&*streams[0]))
{
@ -318,7 +318,7 @@ BlockInputStreamPtr InterpreterSelectQuery::execute()
stream->setQuota(context.getQuota());
}
}
return streams[0];
}
@ -333,13 +333,13 @@ const BlockInputStreams & InterpreterSelectQuery::executeWithoutUnion()
const auto & others = p->streams;
streams.insert(streams.end(), others.begin(), others.end());
}
for (auto & stream : streams)
stream = new MaterializingBlockInputStream(stream);
}
else
executeSingleQuery();
return streams;
}
@ -353,12 +353,12 @@ void InterpreterSelectQuery::executeSingleQuery()
* Если есть GROUP BY, то выполним все операции до GROUP BY, включительно, параллельно;
* параллельный GROUP BY склеит потоки в один,
* затем выполним остальные операции с одним получившимся потоком.
* Если запрос является членом цепочки UNION ALL и не содержит GROUP BY, ORDER BY, DISTINCT, или LIMIT,
* Если запрос является членом цепочки UNION ALL и не содержит GROUP BY, ORDER BY, DISTINCT, или LIMIT,
* то объединение источников данных выполняется не на этом уровне, а на верхнем уровне.
*/
bool do_execute_union = false;
/** Вынем данные из Storage. from_stage - до какой стадии запрос был выполнен в Storage. */
QueryProcessingStage::Enum from_stage = executeFetchColumns(streams);
@ -370,7 +370,7 @@ void InterpreterSelectQuery::executeSingleQuery()
bool need_aggregate = false;
bool has_having = false;
bool has_order_by = false;
ExpressionActionsPtr array_join;
ExpressionActionsPtr before_where;
ExpressionActionsPtr before_aggregation;
@ -466,10 +466,10 @@ void InterpreterSelectQuery::executeSingleQuery()
to_stage > QueryProcessingStage::WithMergeableState &&
!query.group_by_with_totals;
if (need_aggregate || has_order_by)
do_execute_union = true;
if (first_stage)
{
if (has_where)
@ -495,7 +495,7 @@ void InterpreterSelectQuery::executeSingleQuery()
do_execute_union = true;
}
}
if (second_stage)
{
bool need_second_distinct_pass = true;
@ -540,13 +540,13 @@ void InterpreterSelectQuery::executeSingleQuery()
executePreLimit(streams);
do_execute_union = true;
}
if (need_second_distinct_pass)
do_execute_union = true;
if (do_execute_union)
executeUnion(streams);
/// Если было более одного источника - то нужно выполнить DISTINCT ещё раз после их слияния.
if (need_second_distinct_pass)
executeDistinct(streams, false, Names());
@ -589,7 +589,7 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns(BlockInpu
Names required_columns = query_analyzer->getRequiredColumns();
if (query.table && typeid_cast<ASTSelectQuery *>(&*query.table))
{
{
/** Для подзапроса не действуют ограничения на максимальный размер результата.
* Так как результат поздапроса - ещё не результат всего запроса.
*/
@ -785,7 +785,7 @@ void InterpreterSelectQuery::executeMergeAggregated(BlockInputStreams & streams,
{
/// Если объединять нечего
if (streams.size() == 1)
return;
return;
/// Склеим несколько источников в один
streams[0] = new UnionBlockInputStream(streams, settings.max_threads);
@ -795,7 +795,7 @@ void InterpreterSelectQuery::executeMergeAggregated(BlockInputStreams & streams,
Names key_names;
AggregateDescriptions aggregates;
query_analyzer->getAggregateInfo(key_names, aggregates);
streams[0] = maybeAsynchronous(new MergingAggregatedBlockInputStream(streams[0], key_names, aggregates, overflow_row, final), settings.asynchronous);
streams[0] = maybeAsynchronous(new MergingAggregatedBlockInputStream(streams[0], key_names, aggregates, overflow_row, final, settings.max_threads), settings.asynchronous);
}

View File

@ -17,6 +17,8 @@
#include <DB/IO/copyData.h>
#include <DB/DataStreams/AsynchronousBlockInputStream.h>
#include <DB/DataStreams/NativeBlockInputStream.h>
#include <DB/DataStreams/NativeBlockOutputStream.h>
#include <DB/Interpreters/executeQuery.h>
#include <DB/Storages/StorageMemory.h>
@ -597,12 +599,10 @@ void TCPHandler::initBlockInput()
else
state.maybe_compressed_in = in;
state.block_in = query_context.getFormatFactory().getInput(
"Native",
state.block_in = new NativeBlockInputStream(
*state.maybe_compressed_in,
state.io.out_sample,
query_context.getSettingsRef().max_insert_block_size, /// Реально не используется в формате Native.
query_context.getDataTypeFactory());
query_context.getDataTypeFactory(),
client_revision);
}
}
@ -616,10 +616,9 @@ void TCPHandler::initBlockOutput()
else
state.maybe_compressed_out = out;
state.block_out = query_context.getFormatFactory().getOutput(
"Native",
state.block_out = new NativeBlockOutputStream(
*state.maybe_compressed_out,
state.io.in_sample);
client_revision);
}
}