dbms: fixed error [#METR-17536].

This commit is contained in:
Alexey Milovidov 2015-09-10 04:58:37 +03:00
parent 34a5cf045f
commit 540c411aa8
4 changed files with 52 additions and 11 deletions

View File

@ -28,6 +28,9 @@ String MergingAggregatedMemoryEfficientBlockInputStream::getID() const
Block MergingAggregatedMemoryEfficientBlockInputStream::readImpl() Block MergingAggregatedMemoryEfficientBlockInputStream::readImpl()
{ {
/// Если child - RemoteBlockInputStream, то отправляет запрос на все удалённые серверы, инициируя вычисления. /// Если child - RemoteBlockInputStream, то отправляет запрос на все удалённые серверы, инициируя вычисления.
/** NOTE: Если соединения ещё не установлены, то устанавливает их последовательно.
* И отправляет запрос последовательно. Это медленно.
*/
if (!started) if (!started)
{ {
started = true; started = true;
@ -154,11 +157,7 @@ Block MergingAggregatedMemoryEfficientBlockInputStream::readImpl()
LOG_TRACE(&Logger::get("MergingAggregatedMemoryEfficient"), "Having block without bucket: will split."); LOG_TRACE(&Logger::get("MergingAggregatedMemoryEfficient"), "Having block without bucket: will split.");
input.splitted_blocks = aggregator.convertBlockToTwoLevel(input.block); input.splitted_blocks = aggregator.convertBlockToTwoLevel(input.block);
input.block = Block();
/** Нельзя уничтожать исходный блок.
* Потому что он владеет Arena с состояниями агрегатных функций,
* а splitted_blocks ей не владеют, но ссылаются на эти состояния.
*/
} }
/// Блоки, которые мы получили разрезанием одноуровневых блоков. /// Блоки, которые мы получили разрезанием одноуровневых блоков.

View File

@ -1814,6 +1814,9 @@ void NO_INLINE Aggregator::convertBlockToTwoLevelImpl(
size_t rows = source.rowsInFirstColumn(); size_t rows = source.rowsInFirstColumn();
size_t columns = source.columns(); size_t columns = source.columns();
/// Для каждого номера корзины создадим фильтр, где будут отмечены строки, относящиеся к этой корзине.
std::vector<IColumn::Filter> filters(destinations.size());
/// Для всех строчек. /// Для всех строчек.
for (size_t i = 0; i < rows; ++i) for (size_t i = 0; i < rows; ++i)
{ {
@ -1826,15 +1829,33 @@ void NO_INLINE Aggregator::convertBlockToTwoLevelImpl(
/// Этот ключ нам больше не нужен. /// Этот ключ нам больше не нужен.
method.onExistingKey(key, keys, *pool); method.onExistingKey(key, keys, *pool);
auto & filter = filters[bucket];
if (unlikely(filter.empty()))
filter.resize_fill(rows);
filter[i] = 1;
}
for (size_t bucket = 0, size = destinations.size(); bucket < size; ++bucket)
{
const auto & filter = filters[bucket];
if (filter.empty())
continue;
Block & dst = destinations[bucket]; Block & dst = destinations[bucket];
if (unlikely(!dst)) dst.info.bucket_num = bucket;
{
dst = source.cloneEmpty();
dst.info.bucket_num = bucket;
}
for (size_t j = 0; j < columns; ++j) for (size_t j = 0; j < columns; ++j)
dst.unsafeGetByPosition(j).column.get()->insertFrom(*source.unsafeGetByPosition(j).column.get(), i); {
const ColumnWithTypeAndName & src_col = source.unsafeGetByPosition(j);
dst.insert({src_col.column->filter(filter), src_col.type, src_col.name});
/** Вставленные в блок столбцы типа ColumnAggregateFunction будут владеть состояниями агрегатных функций
* путём удержания SharedPtr-а на исходный столбец. См. ColumnAggregateFunction.h
*/
}
} }
} }

View File

@ -78,4 +78,14 @@
1 1 1 1 1 1
1 1 1 1 1 1
1 1 1 1 1 1
1 1 1
1 1 1
1 1 1
1 1 1
1 1 1
1 1 1
1 1 1
1 1 1
1 1 1
1 1 1
2 2

View File

@ -115,6 +115,17 @@ SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10), sum(c) IN (10, 15, 20) FROM
SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10), sum(c) IN (10, 15, 20) FROM (SELECT number AS k1, number + 1 AS k2, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY k1, k2 HAVING count() > 0 ORDER BY k1, k2); SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10), sum(c) IN (10, 15, 20) FROM (SELECT number AS k1, number + 1 AS k2, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY k1, k2 HAVING count() > 0 ORDER BY k1, k2);
SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10), sum(c) IN (10, 15, 20) FROM (SELECT number AS k1, number + 1 AS k2, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY k1, k2 HAVING count() > 0 ORDER BY k1, k2); SELECT sum(c = 1) IN (0, 5), sum(c = 2) IN (5, 10), sum(c) IN (10, 15, 20) FROM (SELECT number AS k1, number + 1 AS k2, count() AS c FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 5 : 10) GROUP BY k1, k2 HAVING count() > 0 ORDER BY k1, k2);
SELECT sum(c = 20) IN (5, 10), sum(c = 10) IN (0, 5), sum(u != 10) = 0 FROM (SELECT intDiv(number, 10) AS k1, k1 + 1 AS k2, count() AS c, uniq(number) AS u FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 50 : 100) GROUP BY k1, k2 HAVING count() > 0 ORDER BY k1, k2);
SELECT sum(c = 20) IN (5, 10), sum(c = 10) IN (0, 5), sum(u != 10) = 0 FROM (SELECT intDiv(number, 10) AS k1, k1 + 1 AS k2, count() AS c, uniq(number) AS u FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 50 : 100) GROUP BY k1, k2 HAVING count() > 0 ORDER BY k1, k2);
SELECT sum(c = 20) IN (5, 10), sum(c = 10) IN (0, 5), sum(u != 10) = 0 FROM (SELECT intDiv(number, 10) AS k1, k1 + 1 AS k2, count() AS c, uniq(number) AS u FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 50 : 100) GROUP BY k1, k2 HAVING count() > 0 ORDER BY k1, k2);
SELECT sum(c = 20) IN (5, 10), sum(c = 10) IN (0, 5), sum(u != 10) = 0 FROM (SELECT intDiv(number, 10) AS k1, k1 + 1 AS k2, count() AS c, uniq(number) AS u FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 50 : 100) GROUP BY k1, k2 HAVING count() > 0 ORDER BY k1, k2);
SELECT sum(c = 20) IN (5, 10), sum(c = 10) IN (0, 5), sum(u != 10) = 0 FROM (SELECT intDiv(number, 10) AS k1, k1 + 1 AS k2, count() AS c, uniq(number) AS u FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 50 : 100) GROUP BY k1, k2 HAVING count() > 0 ORDER BY k1, k2);
SELECT sum(c = 20) IN (5, 10), sum(c = 10) IN (0, 5), sum(u != 10) = 0 FROM (SELECT intDiv(number, 10) AS k1, k1 + 1 AS k2, count() AS c, uniq(number) AS u FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 50 : 100) GROUP BY k1, k2 HAVING count() > 0 ORDER BY k1, k2);
SELECT sum(c = 20) IN (5, 10), sum(c = 10) IN (0, 5), sum(u != 10) = 0 FROM (SELECT intDiv(number, 10) AS k1, k1 + 1 AS k2, count() AS c, uniq(number) AS u FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 50 : 100) GROUP BY k1, k2 HAVING count() > 0 ORDER BY k1, k2);
SELECT sum(c = 20) IN (5, 10), sum(c = 10) IN (0, 5), sum(u != 10) = 0 FROM (SELECT intDiv(number, 10) AS k1, k1 + 1 AS k2, count() AS c, uniq(number) AS u FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 50 : 100) GROUP BY k1, k2 HAVING count() > 0 ORDER BY k1, k2);
SELECT sum(c = 20) IN (5, 10), sum(c = 10) IN (0, 5), sum(u != 10) = 0 FROM (SELECT intDiv(number, 10) AS k1, k1 + 1 AS k2, count() AS c, uniq(number) AS u FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 50 : 100) GROUP BY k1, k2 HAVING count() > 0 ORDER BY k1, k2);
SELECT sum(c = 20) IN (5, 10), sum(c = 10) IN (0, 5), sum(u != 10) = 0 FROM (SELECT intDiv(number, 10) AS k1, k1 + 1 AS k2, count() AS c, uniq(number) AS u FROM remote('127.0.0.{1,2}', test.numbers_10) WHERE number < (randConstant() % 2 ? 50 : 100) GROUP BY k1, k2 HAVING count() > 0 ORDER BY k1, k2);
DROP TABLE test.numbers_10; DROP TABLE test.numbers_10;
SELECT count() FROM remote('127.0.0.{1,2}', system.one); SELECT count() FROM remote('127.0.0.{1,2}', system.one);