diff --git a/dbms/src/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.cpp b/dbms/src/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.cpp index 5c64c0c858f..44a5889338b 100644 --- a/dbms/src/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.cpp +++ b/dbms/src/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.cpp @@ -28,6 +28,9 @@ String MergingAggregatedMemoryEfficientBlockInputStream::getID() const Block MergingAggregatedMemoryEfficientBlockInputStream::readImpl() { /// Если child - RemoteBlockInputStream, то отправляет запрос на все удалённые серверы, инициируя вычисления. + /** NOTE: Если соединения ещё не установлены, то устанавливает их последовательно. + * И отправляет запрос последовательно. Это медленно. + */ if (!started) { started = true; @@ -154,11 +157,7 @@ Block MergingAggregatedMemoryEfficientBlockInputStream::readImpl() LOG_TRACE(&Logger::get("MergingAggregatedMemoryEfficient"), "Having block without bucket: will split."); input.splitted_blocks = aggregator.convertBlockToTwoLevel(input.block); - - /** Нельзя уничтожать исходный блок. - * Потому что он владеет Arena с состояниями агрегатных функций, - * а splitted_blocks ей не владеют, но ссылаются на эти состояния. - */ + input.block = Block(); } /// Блоки, которые мы получили разрезанием одноуровневых блоков. diff --git a/dbms/src/Interpreters/Aggregator.cpp b/dbms/src/Interpreters/Aggregator.cpp index 47088d1a8cf..ce615fb53d4 100644 --- a/dbms/src/Interpreters/Aggregator.cpp +++ b/dbms/src/Interpreters/Aggregator.cpp @@ -1814,6 +1814,9 @@ void NO_INLINE Aggregator::convertBlockToTwoLevelImpl( size_t rows = source.rowsInFirstColumn(); size_t columns = source.columns(); + /// Для каждого номера корзины создадим фильтр, где будут отмечены строки, относящиеся к этой корзине. + std::vector filters(destinations.size()); + /// Для всех строчек. for (size_t i = 0; i < rows; ++i) { @@ -1826,15 +1829,33 @@ void NO_INLINE Aggregator::convertBlockToTwoLevelImpl( /// Этот ключ нам больше не нужен. method.onExistingKey(key, keys, *pool); + auto & filter = filters[bucket]; + + if (unlikely(filter.empty())) + filter.resize_fill(rows); + + filter[i] = 1; + } + + for (size_t bucket = 0, size = destinations.size(); bucket < size; ++bucket) + { + const auto & filter = filters[bucket]; + + if (filter.empty()) + continue; + Block & dst = destinations[bucket]; - if (unlikely(!dst)) - { - dst = source.cloneEmpty(); - dst.info.bucket_num = bucket; - } + dst.info.bucket_num = bucket; for (size_t j = 0; j < columns; ++j) - dst.unsafeGetByPosition(j).column.get()->insertFrom(*source.unsafeGetByPosition(j).column.get(), i); + { + const ColumnWithTypeAndName & src_col = source.unsafeGetByPosition(j); + dst.insert({src_col.column->filter(filter), src_col.type, src_col.name}); + + /** Вставленные в блок столбцы типа ColumnAggregateFunction будут владеть состояниями агрегатных функций + * путём удержания SharedPtr-а на исходный столбец. См. ColumnAggregateFunction.h + */ + } } } diff --git a/dbms/tests/queries/0_stateless/00223_distributed_aggregation_memory_efficient.reference b/dbms/tests/queries/0_stateless/00223_distributed_aggregation_memory_efficient.reference index fb3ea660581..8e01654f052 100644 --- a/dbms/tests/queries/0_stateless/00223_distributed_aggregation_memory_efficient.reference +++ b/dbms/tests/queries/0_stateless/00223_distributed_aggregation_memory_efficient.reference @@ -78,4 +78,14 @@ 1 1 1 1 1 1 1 1 1 +1 1 1 +1 1 1 +1 1 1 +1 1 1 +1 1 1 +1 1 1 +1 1 1 +1 1 1 +1 1 1 +1 1 1 2 diff --git a/dbms/tests/queries/0_stateless/00223_distributed_aggregation_memory_efficient.sql b/dbms/tests/queries/0_stateless/00223_distributed_aggregation_memory_efficient.sql index cfb35cf1cc8..3462e48a58b 100644 --- a/dbms/tests/queries/0_stateless/00223_distributed_aggregation_memory_efficient.sql +++ b/dbms/tests/queries/0_stateless/00223_distributed_aggregation_memory_efficient.sql @@ -115,6 +115,17 @@ SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10), sum(c) IN (10, 15, 20) FROM SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10), sum(c) IN (10, 15, 20) FROM (SELECT number AS k1, number + 1 AS k2, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY k1, k2 HAVING count() > 0 ORDER BY k1, k2); SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10), sum(c) IN (10, 15, 20) FROM (SELECT number AS k1, number + 1 AS k2, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY k1, k2 HAVING count() > 0 ORDER BY k1, k2); +SELECT sum(c = 20) IN (5, 10), sum(c = 10) IN (0, 5), sum(u != 10) = 0 FROM (SELECT intDiv(number, 10) AS k1, k1 + 1 AS k2, count() AS c, uniq(number) AS u FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 50 : 100) GROUP BY k1, k2 HAVING count() > 0 ORDER BY k1, k2); +SELECT sum(c = 20) IN (5, 10), sum(c = 10) IN (0, 5), sum(u != 10) = 0 FROM (SELECT intDiv(number, 10) AS k1, k1 + 1 AS k2, count() AS c, uniq(number) AS u FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 50 : 100) GROUP BY k1, k2 HAVING count() > 0 ORDER BY k1, k2); +SELECT sum(c = 20) IN (5, 10), sum(c = 10) IN (0, 5), sum(u != 10) = 0 FROM (SELECT intDiv(number, 10) AS k1, k1 + 1 AS k2, count() AS c, uniq(number) AS u FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 50 : 100) GROUP BY k1, k2 HAVING count() > 0 ORDER BY k1, k2); +SELECT sum(c = 20) IN (5, 10), sum(c = 10) IN (0, 5), sum(u != 10) = 0 FROM (SELECT intDiv(number, 10) AS k1, k1 + 1 AS k2, count() AS c, uniq(number) AS u FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 50 : 100) GROUP BY k1, k2 HAVING count() > 0 ORDER BY k1, k2); +SELECT sum(c = 20) IN (5, 10), sum(c = 10) IN (0, 5), sum(u != 10) = 0 FROM (SELECT intDiv(number, 10) AS k1, k1 + 1 AS k2, count() AS c, uniq(number) AS u FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 50 : 100) GROUP BY k1, k2 HAVING count() > 0 ORDER BY k1, k2); +SELECT sum(c = 20) IN (5, 10), sum(c = 10) IN (0, 5), sum(u != 10) = 0 FROM (SELECT intDiv(number, 10) AS k1, k1 + 1 AS k2, count() AS c, uniq(number) AS u FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 50 : 100) GROUP BY k1, k2 HAVING count() > 0 ORDER BY k1, k2); +SELECT sum(c = 20) IN (5, 10), sum(c = 10) IN (0, 5), sum(u != 10) = 0 FROM (SELECT intDiv(number, 10) AS k1, k1 + 1 AS k2, count() AS c, uniq(number) AS u FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 50 : 100) GROUP BY k1, k2 HAVING count() > 0 ORDER BY k1, k2); +SELECT sum(c = 20) IN (5, 10), sum(c = 10) IN (0, 5), sum(u != 10) = 0 FROM (SELECT intDiv(number, 10) AS k1, k1 + 1 AS k2, count() AS c, uniq(number) AS u FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 50 : 100) GROUP BY k1, k2 HAVING count() > 0 ORDER BY k1, k2); +SELECT sum(c = 20) IN (5, 10), sum(c = 10) IN (0, 5), sum(u != 10) = 0 FROM (SELECT intDiv(number, 10) AS k1, k1 + 1 AS k2, count() AS c, uniq(number) AS u FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 50 : 100) GROUP BY k1, k2 HAVING count() > 0 ORDER BY k1, k2); +SELECT sum(c = 20) IN (5, 10), sum(c = 10) IN (0, 5), sum(u != 10) = 0 FROM (SELECT intDiv(number, 10) AS k1, k1 + 1 AS k2, count() AS c, uniq(number) AS u FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 50 : 100) GROUP BY k1, k2 HAVING count() > 0 ORDER BY k1, k2); + DROP TABLE test.numbers_10; SELECT count() FROM remote('127.0.0.{1,2}', system.one);