dbms: fixed error with merging [#METR-16247].

This commit is contained in:
Alexey Milovidov 2015-05-12 07:55:14 +03:00
parent 566b62854b
commit e3b0c97b0f
10 changed files with 226 additions and 32 deletions

View File

@ -50,7 +50,7 @@ protected:
Block readImpl() override;
private:
Logger * log = &Logger::get("SummingSortedBlockInputStream");
Logger * log = &Logger::get("AggregatingSortedBlockInputStream");
/// Прочитали до конца.
bool finished = false;
@ -68,7 +68,7 @@ private:
* Шаблоны используем вместо полиморфных SortCursor'ов и вызовов виртуальных функций.
*/
template<class TSortCursor>
void merge(Block & merged_block, ColumnPlainPtrs & merged_columns, std::priority_queue<TSortCursor> & queue);
void merge(ColumnPlainPtrs & merged_columns, std::priority_queue<TSortCursor> & queue);
/// Вставить в результат первую строку для текущей группы.
void insertCurrentRow(ColumnPlainPtrs & merged_columns);

View File

@ -81,7 +81,7 @@ private:
* Шаблоны используем вместо полиморфных SortCursor'ов и вызовов виртуальных функций.
*/
template<class TSortCursor>
void merge(Block & merged_block, ColumnPlainPtrs & merged_columns, std::priority_queue<TSortCursor> & queue);
void merge(ColumnPlainPtrs & merged_columns, std::priority_queue<TSortCursor> & queue);
/// Вставить в результат строки для текущего идентификатора "визита".
void insertRows(ColumnPlainPtrs & merged_columns, size_t & merged_rows, bool last_in_stream = false);

View File

@ -136,7 +136,7 @@ private:
void initQueue(std::priority_queue<TSortCursor> & queue);
template <typename TSortCursor>
void merge(Block & merged_block, ColumnPlainPtrs & merged_columns, std::priority_queue<TSortCursor> & queue);
void merge(ColumnPlainPtrs & merged_columns, std::priority_queue<TSortCursor> & queue);
Logger * log = &Logger::get("MergingSortedBlockInputStream");

View File

@ -99,7 +99,7 @@ private:
* Шаблоны используем вместо полиморфных SortCursor'ов и вызовов виртуальных функций.
*/
template<class TSortCursor>
void merge(Block & merged_block, ColumnPlainPtrs & merged_columns, std::priority_queue<TSortCursor> & queue);
void merge(ColumnPlainPtrs & merged_columns, std::priority_queue<TSortCursor> & queue);
/// Вставить в результат просуммированную строку для текущей группы.
void insertCurrentRow(ColumnPlainPtrs & merged_columns);

View File

@ -61,16 +61,16 @@ Block AggregatingSortedBlockInputStream::readImpl()
columns_to_aggregate[i] = typeid_cast<ColumnAggregateFunction *>(merged_columns[column_numbers_to_aggregate[i]]);
if (has_collation)
merge(merged_block, merged_columns, queue_with_collation);
merge(merged_columns, queue_with_collation);
else
merge(merged_block, merged_columns, queue);
merge(merged_columns, queue);
return merged_block;
}
template<class TSortCursor>
void AggregatingSortedBlockInputStream::merge(Block & merged_block, ColumnPlainPtrs & merged_columns, std::priority_queue<TSortCursor> & queue)
void AggregatingSortedBlockInputStream::merge(ColumnPlainPtrs & merged_columns, std::priority_queue<TSortCursor> & queue)
{
size_t merged_rows = 0;
@ -81,13 +81,15 @@ void AggregatingSortedBlockInputStream::merge(Block & merged_block, ColumnPlainP
setPrimaryKey(next_key, current);
bool key_differs = next_key != current_key;
/// если накопилось достаточно строк и последняя посчитана полностью
if (next_key != current_key && merged_rows >= max_block_size)
if (key_differs && merged_rows >= max_block_size)
return;
queue.pop();
if (next_key != current_key)
if (key_differs)
{
current_key = std::move(next_key);
next_key.resize(description.size());

View File

@ -104,15 +104,15 @@ Block CollapsingSortedBlockInputStream::readImpl()
}
if (has_collation)
merge(merged_block, merged_columns, queue_with_collation);
merge(merged_columns, queue_with_collation);
else
merge(merged_block, merged_columns, queue);
merge(merged_columns, queue);
return merged_block;
}
template<class TSortCursor>
void CollapsingSortedBlockInputStream::merge(Block & merged_block, ColumnPlainPtrs & merged_columns, std::priority_queue<TSortCursor> & queue)
void CollapsingSortedBlockInputStream::merge(ColumnPlainPtrs & merged_columns, std::priority_queue<TSortCursor> & queue)
{
size_t merged_rows = 0;
@ -120,12 +120,22 @@ void CollapsingSortedBlockInputStream::merge(Block & merged_block, ColumnPlainPt
while (!queue.empty())
{
TSortCursor current = queue.top();
queue.pop();
Int8 sign = get<Int64>((*current->all_columns[sign_column_number])[current->pos]);
setPrimaryKey(next_key, current);
if (next_key != current_key)
bool key_differs = next_key != current_key;
/// если накопилось достаточно строк и последняя посчитана полностью
if (key_differs && merged_rows >= max_block_size)
{
++blocks_written;
return;
}
queue.pop();
if (key_differs)
{
/// Запишем данные для предыдущего визита.
insertRows(merged_columns, merged_rows);
@ -168,12 +178,6 @@ void CollapsingSortedBlockInputStream::merge(Block & merged_block, ColumnPlainPt
/// Достаём из соответствующего источника следующий блок, если есть.
fetchNextBlock(current, queue);
}
if (merged_rows >= max_block_size)
{
++blocks_written;
return;
}
}
/// Запишем данные для последнего визита.

View File

@ -111,15 +111,15 @@ Block MergingSortedBlockInputStream::readImpl()
return Block();
if (has_collation)
merge(merged_block, merged_columns, queue_with_collation);
merge(merged_columns, queue_with_collation);
else
merge(merged_block, merged_columns, queue);
merge(merged_columns, queue);
return merged_block;
}
template <typename TSortCursor>
void MergingSortedBlockInputStream::merge(Block & merged_block, ColumnPlainPtrs & merged_columns, std::priority_queue<TSortCursor> & queue)
void MergingSortedBlockInputStream::merge(ColumnPlainPtrs & merged_columns, std::priority_queue<TSortCursor> & queue)
{
size_t merged_rows = 0;

View File

@ -142,16 +142,16 @@ Block SummingSortedBlockInputStream::readImpl()
}
if (has_collation)
merge(merged_block, merged_columns, queue_with_collation);
merge(merged_columns, queue_with_collation);
else
merge(merged_block, merged_columns, queue);
merge(merged_columns, queue);
return merged_block;
}
template<class TSortCursor>
void SummingSortedBlockInputStream::merge(Block & merged_block, ColumnPlainPtrs & merged_columns, std::priority_queue<TSortCursor> & queue)
void SummingSortedBlockInputStream::merge(ColumnPlainPtrs & merged_columns, std::priority_queue<TSortCursor> & queue)
{
size_t merged_rows = 0;
@ -159,11 +159,18 @@ void SummingSortedBlockInputStream::merge(Block & merged_block, ColumnPlainPtrs
while (!queue.empty())
{
TSortCursor current = queue.top();
queue.pop();
setPrimaryKey(next_key, current);
if (next_key != current_key)
bool key_differs = next_key != current_key;
/// если накопилось достаточно строк и последняя посчитана полностью
if (key_differs && merged_rows >= max_block_size)
return;
queue.pop();
if (key_differs)
{
/// Запишем данные для предыдущей группы.
if (!current_key[0].isNull() && !current_row_is_zero)
@ -194,9 +201,6 @@ void SummingSortedBlockInputStream::merge(Block & merged_block, ColumnPlainPtrs
/// Достаём из соответствующего источника следующий блок, если есть.
fetchNextBlock(current, queue);
}
if (merged_rows >= max_block_size)
return;
}
/// Запишем данные для последней группы, если она ненулевая.

View File

@ -0,0 +1,120 @@
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1

View File

@ -0,0 +1,64 @@
#!/bin/bash
function create {
clickhouse-client --query="DROP TABLE IF EXISTS test.summing"
clickhouse-client --query="DROP TABLE IF EXISTS test.collapsing"
clickhouse-client --query="DROP TABLE IF EXISTS test.aggregating"
clickhouse-client --query="CREATE TABLE test.summing (d Date DEFAULT today(), x UInt64, s UInt64 DEFAULT 1) ENGINE = SummingMergeTree(d, x, 8192)"
clickhouse-client --query="CREATE TABLE test.collapsing (d Date DEFAULT today(), x UInt64, s UInt64 DEFAULT 1) ENGINE = CollapsingMergeTree(d, x, 8192, s)"
clickhouse-client --query="CREATE TABLE test.aggregating (d Date DEFAULT today(), x UInt64, s AggregateFunction(sum, UInt64)) ENGINE = AggregatingMergeTree(d, x, 8192)"
}
function cleanup {
clickhouse-client --query="DROP TABLE test.summing"
clickhouse-client --query="DROP TABLE test.collapsing"
clickhouse-client --query="DROP TABLE test.aggregating"
}
function test {
create
SUM=$(( $1 + $2 ))
MAX=$(( $1 > $2 ? $1 : $2 ))
clickhouse-client --query="INSERT INTO test.summing (x) SELECT number AS x FROM system.numbers LIMIT $1"
clickhouse-client --query="INSERT INTO test.summing (x) SELECT number AS x FROM system.numbers LIMIT $2"
clickhouse-client --query="INSERT INTO test.collapsing (x) SELECT number AS x FROM system.numbers LIMIT $1"
clickhouse-client --query="INSERT INTO test.collapsing (x) SELECT number AS x FROM system.numbers LIMIT $2"
clickhouse-client --query="INSERT INTO test.aggregating (d, x, s) SELECT today() AS d, number AS x, sumState(materialize(toUInt64(1))) AS s FROM (SELECT number FROM system.numbers LIMIT $1) GROUP BY number"
clickhouse-client --query="INSERT INTO test.aggregating (d, x, s) SELECT today() AS d, number AS x, sumState(materialize(toUInt64(1))) AS s FROM (SELECT number FROM system.numbers LIMIT $2) GROUP BY number"
clickhouse-client --query="SELECT count() = $SUM, sum(s) = $SUM FROM test.summing"
clickhouse-client --query="OPTIMIZE TABLE test.summing"
clickhouse-client --query="SELECT count() = $MAX, sum(s) = $SUM FROM test.summing"
echo
clickhouse-client --query="SELECT count() = $SUM, sum(s) = $SUM FROM test.collapsing"
clickhouse-client --query="OPTIMIZE TABLE test.collapsing"
clickhouse-client --query="SELECT count() = $MAX, sum(s) = $MAX FROM test.collapsing"
echo
clickhouse-client --query="SELECT count() = $SUM, sumMerge(s) = $SUM FROM test.aggregating"
clickhouse-client --query="OPTIMIZE TABLE test.aggregating"
clickhouse-client --query="SELECT count() = $MAX, sumMerge(s) = $SUM FROM test.aggregating"
echo
echo
}
test 8191 8191
test 8191 8192
test 8192 8191
test 8192 8192
test 8192 8193
test 8193 8192
test 8193 8193
test 8191 8193
test 8193 8191
test 8193 8194
test 8194 8193
test 8194 8194
cleanup