diff --git a/dbms/include/DB/AggregateFunctions/AggregateFunctionSequenceMatch.h b/dbms/include/DB/AggregateFunctions/AggregateFunctionSequenceMatch.h index aef08825536..90e9e774e2a 100644 --- a/dbms/include/DB/AggregateFunctions/AggregateFunctionSequenceMatch.h +++ b/dbms/include/DB/AggregateFunctions/AggregateFunctionSequenceMatch.h @@ -346,17 +346,18 @@ private: } protected: - template - bool match(T1 & events_it, const T2 events_end) const + template + bool match(T & events_it, const T events_end) const { const auto action_begin = std::begin(actions); const auto action_end = std::end(actions); auto action_it = action_begin; + const auto events_begin = events_it; auto base_it = events_it; /// an iterator to action plus an iterator to row in events list plus timestamp at the start of sequence - using backtrack_info = std::tuple; + using backtrack_info = std::tuple; std::stack back_stack; /// backtrack if possible @@ -473,6 +474,9 @@ protected: ++action_it; } + if (events_it == events_begin) + ++events_it; + return action_it == action_end; } diff --git a/dbms/include/DB/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.h b/dbms/include/DB/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.h index f5a9129ef2c..ad90db1eaeb 100644 --- a/dbms/include/DB/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.h +++ b/dbms/include/DB/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.h @@ -12,6 +12,19 @@ namespace DB * Это экономит оперативку в случае использования двухуровневой агрегации, где в каждом потоке будет до 256 блоков с частями результата. * * Агрегатные функции в блоках не должны быть финализированы, чтобы их состояния можно было объединить. + * + * Замечания: + * + * На хорошей сети (10Gbit) может работать заметно медленнее, так как чтения блоков с разных + * удалённых серверов делаются последовательно, при этом, чтение упирается в CPU. + * Это несложно исправить. + * + * Также, чтения и вычисления (слияние состояний) делаются по очереди. + * Есть возможность делать чтения асинхронно - при этом будет расходоваться в два раза больше памяти, но всё-равно немного. + * Это можно сделать с помощью UnionBlockInputStream. + * + * Можно держать в памяти не по одному блоку из каждого источника, а по несколько, и распараллелить мердж. + * При этом будет расходоваться кратно больше оперативки. */ class MergingAggregatedMemoryEfficientBlockInputStream : public IProfilingBlockInputStream { @@ -25,12 +38,12 @@ public: children = inputs_; } - String getName() const override { return "MergingAggregatedMemorySavvy"; } + String getName() const override { return "MergingAggregatedMemoryEfficient"; } String getID() const override { std::stringstream res; - res << "MergingAggregatedMemorySavvy(" << aggregator.getID(); + res << "MergingAggregatedMemoryEfficient(" << aggregator.getID(); for (size_t i = 0, size = children.size(); i < size; ++i) res << ", " << children.back()->getID(); res << ")"; @@ -79,35 +92,45 @@ protected: if (input.block.info.bucket_num >= current_bucket_num) continue; - //std::cerr << "reading block\n"; - Block block = input.stream->read(); - - if (!block) + /// Если придёт блок не с основными данными, а с overflows, то запомним его и повторим чтение. + while (true) { - //std::cerr << "input is exhausted\n"; - input.is_exhausted = true; - continue; - } + // std::cerr << "reading block\n"; + Block block = input.stream->read(); - if (block.info.bucket_num != -1) - { - //std::cerr << "block for bucket " << block.info.bucket_num << "\n"; + if (!block) + { + // std::cerr << "input is exhausted\n"; + input.is_exhausted = true; + break; + } - has_two_level = true; - input.block = block; - } - else if (block.info.is_overflows) - { - //std::cerr << "block for overflows\n"; + if (block.info.bucket_num != -1) + { + /// Один из разрезанных блоков для двухуровневых данных. + // std::cerr << "block for bucket " << block.info.bucket_num << "\n"; - has_overflows = true; - input.overflow_block = block; - } - else - { - //std::cerr << "block without bucket\n"; + has_two_level = true; + input.block = block; + } + else if (block.info.is_overflows) + { + // std::cerr << "block for overflows\n"; - input.block = block; + has_overflows = true; + input.overflow_block = block; + + continue; + } + else + { + /// Блок для неразрезанных (одноуровневых) данных. + // std::cerr << "block without bucket\n"; + + input.block = block; + } + + break; } } @@ -115,11 +138,12 @@ protected: { if (current_bucket_num == NUM_BUCKETS) { - //std::cerr << "at end\n"; + /// Обработали все основные данные. Остались, возможно, только overflows-блоки. + // std::cerr << "at end\n"; if (has_overflows) { - //std::cerr << "merging overflows\n"; + // std::cerr << "merging overflows\n"; has_overflows = false; BlocksList blocks_to_merge; @@ -135,23 +159,35 @@ protected: } else if (has_two_level) { - //std::cerr << "has two level\n"; + /** Есть двухуровневые данные. + * Будем обрабатывать номера корзин по возрастанию. + * Найдём минимальный номер корзины, для которой есть данные, + * затем померджим эти данные. + */ + // std::cerr << "has two level\n"; int min_bucket_num = NUM_BUCKETS; for (auto & input : inputs) { + /// Изначально разрезанные (двухуровневые) блоки. if (input.block.info.bucket_num != -1 && input.block.info.bucket_num < min_bucket_num) min_bucket_num = input.block.info.bucket_num; + /// Ещё не разрезанный по корзинам блок. Разрезаем его и кладём результат в splitted_blocks. if (input.block.info.bucket_num == -1 && input.block && input.splitted_blocks.empty()) { - //std::cerr << "having block without bucket; will split\n"; + LOG_TRACE(&Logger::get("MergingAggregatedMemoryEfficient"), "Having block without bucket: will split."); input.splitted_blocks = aggregator.convertBlockToTwoLevel(input.block); - /// Нельзя уничтожать исходный блок. + + /** Нельзя уничтожать исходный блок. + * Потому что он владеет Arena с состояниями агрегатных функций, + * а splitted_blocks ей не владеют, но ссылаются на эти состояния. + */ } + /// Блоки, которые мы получили разрезанием одноуровневых блоков. if (!input.splitted_blocks.empty()) { for (const auto & block : input.splitted_blocks) @@ -167,25 +203,27 @@ protected: current_bucket_num = min_bucket_num; - //std::cerr << "current_bucket_num = " << current_bucket_num << "\n"; + // std::cerr << "current_bucket_num = " << current_bucket_num << "\n"; + /// Блоков с основными данными больше нет. if (current_bucket_num == NUM_BUCKETS) continue; + /// Теперь собираем блоки для current_bucket_num, чтобы их померджить. BlocksList blocks_to_merge; for (auto & input : inputs) { if (input.block.info.bucket_num == current_bucket_num) { - //std::cerr << "having block for current_bucket_num\n"; + // std::cerr << "having block for current_bucket_num\n"; blocks_to_merge.emplace_back(std::move(input.block)); input.block = Block(); } else if (!input.splitted_blocks.empty() && input.splitted_blocks[min_bucket_num]) { - //std::cerr << "having splitted data for bucket\n"; + // std::cerr << "having splitted data for bucket\n"; blocks_to_merge.emplace_back(std::move(input.splitted_blocks[min_bucket_num])); input.splitted_blocks[min_bucket_num] = Block(); @@ -196,7 +234,8 @@ protected: } else { - //std::cerr << "don't have two level\n"; + /// Есть только одноуровневые данные. Просто мерджим их. + // std::cerr << "don't have two level\n"; BlocksList blocks_to_merge; diff --git a/dbms/include/DB/Functions/FunctionsRandom.h b/dbms/include/DB/Functions/FunctionsRandom.h index efe6c06e376..92e97df6694 100644 --- a/dbms/include/DB/Functions/FunctionsRandom.h +++ b/dbms/include/DB/Functions/FunctionsRandom.h @@ -24,6 +24,8 @@ namespace DB * rand - linear congruental generator 0 .. 2^32 - 1. * rand64 - комбинирует несколько значений rand, чтобы получить значения из диапазона 0 .. 2^64 - 1. * + * randConstant - служебная функция, выдаёт константный столбец со случайным значением. + * * В качестве затравки используют время. * Замечание: переинициализируется на каждый блок. * Это значит, что таймер должен быть достаточного разрешения, чтобы выдавать разные значения на каждый блок. @@ -182,11 +184,60 @@ public: }; -struct NameRand { static constexpr auto name = "rand"; }; -struct NameRand64 { static constexpr auto name = "rand64"; }; +template +class FunctionRandomConstant : public IFunction +{ +private: + typedef typename Impl::ReturnType ToType; + + /// Значение одно для разных блоков. + bool is_initialized = false; + ToType value; + +public: + static constexpr auto name = Name::name; + static IFunction * create(const Context & context) { return new FunctionRandomConstant; } + + /// Получить имя функции. + String getName() const + { + return name; + } + + /// Получить тип результата по типам аргументов. Если функция неприменима для данных аргументов - кинуть исключение. + DataTypePtr getReturnType(const DataTypes & arguments) const + { + if (arguments.size() > 1) + throw Exception("Number of arguments for function " + getName() + " doesn't match: passed " + + toString(arguments.size()) + ", should be 0 or 1.", + ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); + + return new typename DataTypeFromFieldType::Type; + } + + /// Выполнить функцию над блоком. + void execute(Block & block, const ColumnNumbers & arguments, size_t result) + { + if (!is_initialized) + { + is_initialized = true; + typename ColumnVector::Container_t vec_to(1); + Impl::execute(vec_to); + value = vec_to[0]; + } + + block.getByPosition(result).column = new ColumnConst(block.rowsInFirstColumn(), value); + } +}; + + +struct NameRand { static constexpr auto name = "rand"; }; +struct NameRand64 { static constexpr auto name = "rand64"; }; +struct NameRandConstant { static constexpr auto name = "randConstant"; }; typedef FunctionRandom FunctionRand; typedef FunctionRandom FunctionRand64; +typedef FunctionRandomConstant FunctionRandConstant; } diff --git a/dbms/src/Functions/FunctionsRandom.cpp b/dbms/src/Functions/FunctionsRandom.cpp index 8ca6bb8a22c..d01feedee52 100644 --- a/dbms/src/Functions/FunctionsRandom.cpp +++ b/dbms/src/Functions/FunctionsRandom.cpp @@ -8,6 +8,7 @@ void registerFunctionsRandom(FunctionFactory & factory) { factory.registerFunction(); factory.registerFunction(); + factory.registerFunction(); } } diff --git a/dbms/src/Interpreters/Aggregator.cpp b/dbms/src/Interpreters/Aggregator.cpp index 7de6a05d5f9..ab55ee2a3c9 100644 --- a/dbms/src/Interpreters/Aggregator.cpp +++ b/dbms/src/Interpreters/Aggregator.cpp @@ -1735,17 +1735,62 @@ Block Aggregator::mergeBlocks(BlocksList & blocks, bool final) throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT); } - BlocksList merged_block = convertToBlocks(result, final, 1); + BlocksList merged_blocks = convertToBlocks(result, final, 1); - if (merged_block.size() > 1) /// TODO overflows - throw Exception("Logical error: temporary result is not single-level", ErrorCodes::LOGICAL_ERROR); + if (merged_blocks.size() > 1) + { + /** Может быть два блока. Один с is_overflows, другой - нет. + * Если есть непустой блок не is_overflows, то удаляем блок с is_overflows. + * Если есть пустой блок не is_overflows и блок с is_overflows, то удаляем пустой блок. + * + * Это делаем, потому что исходим из допущения, что в функцию передаются + * либо все блоки не is_overflows, либо все блоки is_overflows. + */ + + bool has_nonempty_nonoverflows = false; + bool has_overflows = false; + + for (const auto & block : merged_blocks) + { + if (block && !block.info.is_overflows) + has_nonempty_nonoverflows = true; + else if (block.info.is_overflows) + has_overflows = true; + } + + if (has_nonempty_nonoverflows) + { + for (auto it = merged_blocks.begin(); it != merged_blocks.end(); ++it) + { + if (it->info.is_overflows) + { + merged_blocks.erase(it); + break; + } + } + } + else if (has_overflows) + { + for (auto it = merged_blocks.begin(); it != merged_blocks.end(); ++it) + { + if (!*it) + { + merged_blocks.erase(it); + break; + } + } + } + + if (merged_blocks.size() > 1) + throw Exception("Logical error: temporary result is not single-level", ErrorCodes::LOGICAL_ERROR); + } LOG_TRACE(log, "Merged partially aggregated blocks."); - if (merged_block.empty()) + if (merged_blocks.empty()) return {}; - return merged_block.front(); + return merged_blocks.front(); } diff --git a/dbms/tests/queries/0_stateless/00222_sequence_aggregate_function_family.reference b/dbms/tests/queries/0_stateless/00222_sequence_aggregate_function_family.reference new file mode 100644 index 00000000000..010b7b79019 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00222_sequence_aggregate_function_family.reference @@ -0,0 +1,42 @@ +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 +1 diff --git a/dbms/tests/queries/0_stateless/00222_sequence_aggregate_function_family.sql b/dbms/tests/queries/0_stateless/00222_sequence_aggregate_function_family.sql new file mode 100644 index 00000000000..10b5fdb6428 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00222_sequence_aggregate_function_family.sql @@ -0,0 +1,51 @@ +drop table if exists sequence_test; + +create table sequence_test (time UInt32, data UInt8) engine=Memory; + +insert into sequence_test values (0,0),(1,0),(2,0),(3,0),(4,1),(5,2),(6,0),(7,0),(8,0),(9,0),(10,1),(11,1); + +select 1 = sequenceMatch('')(toDateTime(time), data = 0, data = 1, data = 2, data = 3) from sequence_test; +select 1 = sequenceMatch('.')(toDateTime(time), data = 0, data = 1, data = 2, data = 3) from sequence_test; +select 1 = sequenceMatch('.*')(toDateTime(time), data = 0, data = 1, data = 2, data = 3) from sequence_test; +select 1 = sequenceMatch('(?1)')(toDateTime(time), data = 0, data = 1, data = 2, data = 3) from sequence_test; +select 1 = sequenceMatch('(?2)')(toDateTime(time), data = 0, data = 1, data = 2, data = 3) from sequence_test; +select 1 = sequenceMatch('(?3)')(toDateTime(time), data = 0, data = 1, data = 2, data = 3) from sequence_test; +select 0 = sequenceMatch('(?4)')(toDateTime(time), data = 0, data = 1, data = 2, data = 3) from sequence_test; +select 1 = sequenceMatch('(?1)(?1)')(toDateTime(time), data = 0, data = 1, data = 2, data = 3) from sequence_test; +select 1 = sequenceMatch('(?1)(?1)(?1)')(toDateTime(time), data = 0, data = 1, data = 2, data = 3) from sequence_test; +select 1 = sequenceMatch('(?1)(?1)(?1)(?1)')(toDateTime(time), data = 0, data = 1, data = 2, data = 3) from sequence_test; +select 0 = sequenceMatch('(?1)(?1)(?1)(?1)(?1)')(toDateTime(time), data = 0, data = 1, data = 2, data = 3) from sequence_test; +select 1 = sequenceMatch('(?1)(?1)(?1)(?1)(?2)')(toDateTime(time), data = 0, data = 1, data = 2, data = 3) from sequence_test; +select 1 = sequenceMatch('(?1)(?t>10)(?2)')(toDateTime(time), data = 0, data = 1, data = 2, data = 3) from sequence_test; +select 0 = sequenceMatch('(?1)(?t>11)(?2)')(toDateTime(time), data = 0, data = 1, data = 2, data = 3) from sequence_test; +select 1 = sequenceMatch('(?1)(?t<11)(?2)')(toDateTime(time), data = 0, data = 1, data = 2, data = 3) from sequence_test; +select 1 = sequenceMatch('(?1)(?t<3)(?3)')(toDateTime(time), data = 0, data = 1, data = 2, data = 3) from sequence_test; +select 1 = sequenceMatch('(?1)(?t<=2)(?3)')(toDateTime(time), data = 0, data = 1, data = 2, data = 3) from sequence_test; +select 0 = sequenceMatch('(?1)(?t<2)(?3)')(toDateTime(time), data = 0, data = 1, data = 2, data = 3) from sequence_test; +select 1 = sequenceMatch('(?2)(?t>=7)(?2)')(toDateTime(time), data = 0, data = 1, data = 2, data = 3) from sequence_test; +select 0 = sequenceMatch('(?2)(?t>7)(?2)')(toDateTime(time), data = 0, data = 1, data = 2, data = 3) from sequence_test; +select 1 = sequenceMatch('(?2)(?3)(?1)')(toDateTime(time), data = 0, data = 1, data = 2, data = 3) from sequence_test; + +select count() = sequenceCount('')(toDateTime(time), data = 0, data = 1, data = 2, data = 3) from sequence_test; +select count() = sequenceCount('.')(toDateTime(time), data = 0, data = 1, data = 2, data = 3) from sequence_test; +select count() = sequenceCount('.*')(toDateTime(time), data = 0, data = 1, data = 2, data = 3) from sequence_test; +select 8 = sequenceCount('(?1)')(toDateTime(time), data = 0, data = 1, data = 2, data = 3) from sequence_test; +select 3 = sequenceCount('(?2)')(toDateTime(time), data = 0, data = 1, data = 2, data = 3) from sequence_test; +select 1 = sequenceCount('(?3)')(toDateTime(time), data = 0, data = 1, data = 2, data = 3) from sequence_test; +select 0 = sequenceCount('(?4)')(toDateTime(time), data = 0, data = 1, data = 2, data = 3) from sequence_test; +select 4 = sequenceCount('(?1)(?1)')(toDateTime(time), data = 0, data = 1, data = 2, data = 3) from sequence_test; +select 2 = sequenceCount('(?1)(?1)(?1)')(toDateTime(time), data = 0, data = 1, data = 2, data = 3) from sequence_test; +select 2 = sequenceCount('(?1)(?1)(?1)(?1)')(toDateTime(time), data = 0, data = 1, data = 2, data = 3) from sequence_test; +select 0 = sequenceCount('(?1)(?1)(?1)(?1)(?1)')(toDateTime(time), data = 0, data = 1, data = 2, data = 3) from sequence_test; +select 2 = sequenceCount('(?1)(?1)(?1)(?1)(?2)')(toDateTime(time), data = 0, data = 1, data = 2, data = 3) from sequence_test; +select 1 = sequenceCount('(?1)(?t>10)(?2)')(toDateTime(time), data = 0, data = 1, data = 2, data = 3) from sequence_test; +select 0 = sequenceCount('(?1)(?t>11)(?2)')(toDateTime(time), data = 0, data = 1, data = 2, data = 3) from sequence_test; +select 2 = sequenceCount('(?1)(?t<11)(?2)')(toDateTime(time), data = 0, data = 1, data = 2, data = 3) from sequence_test; +select 1 = sequenceCount('(?1)(?t<3)(?3)')(toDateTime(time), data = 0, data = 1, data = 2, data = 3) from sequence_test; +select 1 = sequenceCount('(?1)(?t<=2)(?3)')(toDateTime(time), data = 0, data = 1, data = 2, data = 3) from sequence_test; +select 0 = sequenceCount('(?1)(?t<2)(?3)')(toDateTime(time), data = 0, data = 1, data = 2, data = 3) from sequence_test; +select 1 = sequenceCount('(?2)(?t>=7)(?2)')(toDateTime(time), data = 0, data = 1, data = 2, data = 3) from sequence_test; +select 0 = sequenceCount('(?2)(?t>7)(?2)')(toDateTime(time), data = 0, data = 1, data = 2, data = 3) from sequence_test; +select 1 = sequenceCount('(?2)(?3)(?1)')(toDateTime(time), data = 0, data = 1, data = 2, data = 3) from sequence_test; + +drop table sequence_test; diff --git a/dbms/tests/queries/0_stateless/00223_distributed_aggregation_memory_efficient.reference b/dbms/tests/queries/0_stateless/00223_distributed_aggregation_memory_efficient.reference new file mode 100644 index 00000000000..49acd873311 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00223_distributed_aggregation_memory_efficient.reference @@ -0,0 +1,70 @@ +1 1 +1 1 +1 1 +1 1 +1 1 +1 1 +1 1 +1 1 +1 1 +1 1 +1 1 +1 1 +1 1 +1 1 +1 1 +1 1 +1 1 +1 1 +1 1 +1 1 +1 1 +1 1 +1 1 +1 1 +1 1 +1 1 +1 1 +1 1 +1 1 +1 1 +1 1 +1 1 +1 1 +1 1 +1 1 +1 1 +1 1 +1 1 +1 1 +1 1 +1 1 +1 1 +1 1 +1 1 +1 1 +1 1 +1 1 +1 1 +1 1 +1 1 +1 1 1 +1 1 1 +1 1 1 +1 1 1 +1 1 1 +1 1 1 +1 1 1 +1 1 1 +1 1 1 +1 1 1 +1 1 1 +1 1 1 +1 1 1 +1 1 1 +1 1 1 +1 1 1 +1 1 1 +1 1 1 +1 1 1 +1 1 1 diff --git a/dbms/tests/queries/0_stateless/00223_distributed_aggregation_memory_efficient.sql b/dbms/tests/queries/0_stateless/00223_distributed_aggregation_memory_efficient.sql new file mode 100644 index 00000000000..d4e78ffbb4e --- /dev/null +++ b/dbms/tests/queries/0_stateless/00223_distributed_aggregation_memory_efficient.sql @@ -0,0 +1,104 @@ +SET max_block_size = 1000; + +DROP TABLE IF EXISTS test.numbers_10; +CREATE TABLE test.numbers_10 ENGINE = Memory AS SELECT * FROM system.numbers LIMIT 10000; + +SET distributed_aggregation_memory_efficient = 0; +SET group_by_two_level_threshold = 1000; + +SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number); +SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number); +SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number); +SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number); +SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number); +SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number); +SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number); +SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number); +SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number); +SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number); + +SET distributed_aggregation_memory_efficient = 0; +SET group_by_two_level_threshold = 7; + +SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number); +SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number); +SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number); +SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number); +SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number); +SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number); +SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number); +SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number); +SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number); +SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number); + +SET distributed_aggregation_memory_efficient = 1; +SET group_by_two_level_threshold = 1000; + +SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number); +SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number); +SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number); +SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number); +SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number); +SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number); +SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number); +SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number); +SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number); +SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number); + +SET distributed_aggregation_memory_efficient = 1; +SET group_by_two_level_threshold = 7; + +SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number); +SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number); +SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number); +SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number); +SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number); +SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number); +SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number); +SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number); +SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number); +SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number); + +SET distributed_aggregation_memory_efficient = 1; +SET group_by_two_level_threshold = 1; + +SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number); +SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number); +SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number); +SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number); +SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number); +SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number); +SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number); +SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number); +SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number); +SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10) FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY number); + +SET distributed_aggregation_memory_efficient = 1; +SET group_by_two_level_threshold = 1000; + +SELECT sum(c = 1) IN (0, 10), sum(c = 2) IN (0, 5), sum(c) = 10 FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) AND number >= (randConstant() % 2 ? 0 : 5) GROUP BY number); +SELECT sum(c = 1) IN (0, 10), sum(c = 2) IN (0, 5), sum(c) = 10 FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) AND number >= (randConstant() % 2 ? 0 : 5) GROUP BY number); +SELECT sum(c = 1) IN (0, 10), sum(c = 2) IN (0, 5), sum(c) = 10 FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) AND number >= (randConstant() % 2 ? 0 : 5) GROUP BY number); +SELECT sum(c = 1) IN (0, 10), sum(c = 2) IN (0, 5), sum(c) = 10 FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) AND number >= (randConstant() % 2 ? 0 : 5) GROUP BY number); +SELECT sum(c = 1) IN (0, 10), sum(c = 2) IN (0, 5), sum(c) = 10 FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) AND number >= (randConstant() % 2 ? 0 : 5) GROUP BY number); +SELECT sum(c = 1) IN (0, 10), sum(c = 2) IN (0, 5), sum(c) = 10 FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) AND number >= (randConstant() % 2 ? 0 : 5) GROUP BY number); +SELECT sum(c = 1) IN (0, 10), sum(c = 2) IN (0, 5), sum(c) = 10 FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) AND number >= (randConstant() % 2 ? 0 : 5) GROUP BY number); +SELECT sum(c = 1) IN (0, 10), sum(c = 2) IN (0, 5), sum(c) = 10 FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) AND number >= (randConstant() % 2 ? 0 : 5) GROUP BY number); +SELECT sum(c = 1) IN (0, 10), sum(c = 2) IN (0, 5), sum(c) = 10 FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) AND number >= (randConstant() % 2 ? 0 : 5) GROUP BY number); +SELECT sum(c = 1) IN (0, 10), sum(c = 2) IN (0, 5), sum(c) = 10 FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) AND number >= (randConstant() % 2 ? 0 : 5) GROUP BY number); + +SET distributed_aggregation_memory_efficient = 1; +SET group_by_two_level_threshold = 1; + +SELECT sum(c = 1) IN (0, 10), sum(c = 2) IN (0, 5), sum(c) = 10 FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) AND number >= (randConstant() % 2 ? 0 : 5) GROUP BY number); +SELECT sum(c = 1) IN (0, 10), sum(c = 2) IN (0, 5), sum(c) = 10 FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) AND number >= (randConstant() % 2 ? 0 : 5) GROUP BY number); +SELECT sum(c = 1) IN (0, 10), sum(c = 2) IN (0, 5), sum(c) = 10 FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) AND number >= (randConstant() % 2 ? 0 : 5) GROUP BY number); +SELECT sum(c = 1) IN (0, 10), sum(c = 2) IN (0, 5), sum(c) = 10 FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) AND number >= (randConstant() % 2 ? 0 : 5) GROUP BY number); +SELECT sum(c = 1) IN (0, 10), sum(c = 2) IN (0, 5), sum(c) = 10 FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) AND number >= (randConstant() % 2 ? 0 : 5) GROUP BY number); +SELECT sum(c = 1) IN (0, 10), sum(c = 2) IN (0, 5), sum(c) = 10 FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) AND number >= (randConstant() % 2 ? 0 : 5) GROUP BY number); +SELECT sum(c = 1) IN (0, 10), sum(c = 2) IN (0, 5), sum(c) = 10 FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) AND number >= (randConstant() % 2 ? 0 : 5) GROUP BY number); +SELECT sum(c = 1) IN (0, 10), sum(c = 2) IN (0, 5), sum(c) = 10 FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) AND number >= (randConstant() % 2 ? 0 : 5) GROUP BY number); +SELECT sum(c = 1) IN (0, 10), sum(c = 2) IN (0, 5), sum(c) = 10 FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) AND number >= (randConstant() % 2 ? 0 : 5) GROUP BY number); +SELECT sum(c = 1) IN (0, 10), sum(c = 2) IN (0, 5), sum(c) = 10 FROM (SELECT number, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) AND number >= (randConstant() % 2 ? 0 : 5) GROUP BY number); + +DROP TABLE test.numbers_10; diff --git a/dbms/tests/queries/0_stateless/00224_distributed_aggregation_memory_efficient_and_overflows.reference b/dbms/tests/queries/0_stateless/00224_distributed_aggregation_memory_efficient_and_overflows.reference new file mode 100644 index 00000000000..db336f3d78f --- /dev/null +++ b/dbms/tests/queries/0_stateless/00224_distributed_aggregation_memory_efficient_and_overflows.reference @@ -0,0 +1,24 @@ +0 2 +1 2 +2 2 +3 2 +4 2 +5 2 +6 2 +7 2 +8 2 +9 2 + +0 200000 +0 2 +1 2 +2 2 +3 2 +4 2 +5 2 +6 2 +7 2 +8 2 +9 2 + +0 200000 diff --git a/dbms/tests/queries/0_stateless/00224_distributed_aggregation_memory_efficient_and_overflows.sql b/dbms/tests/queries/0_stateless/00224_distributed_aggregation_memory_efficient_and_overflows.sql new file mode 100644 index 00000000000..acd05003c2a --- /dev/null +++ b/dbms/tests/queries/0_stateless/00224_distributed_aggregation_memory_efficient_and_overflows.sql @@ -0,0 +1,14 @@ +DROP TABLE IF EXISTS test.numbers_100k_log; +CREATE TABLE test.numbers_100k_log ENGINE = Log AS SELECT * FROM system.numbers LIMIT 100000; + +SELECT number, count() FROM remote('127.0.0.{1,2}', test.numbers_100k_log) GROUP BY number WITH TOTALS ORDER BY number LIMIT 10; + +SET distributed_aggregation_memory_efficient = 1, + group_by_two_level_threshold = 1000, + group_by_overflow_mode = 'any', + max_rows_to_group_by = 1000, + totals_mode = 'after_having_auto'; + +SELECT number, count() FROM remote('127.0.0.{1,2}', test.numbers_100k_log) GROUP BY number WITH TOTALS ORDER BY number LIMIT 10; + +DROP TABLE test.numbers_100k_log;