Fixed tests

This commit is contained in:
Maksim Kita 2022-06-17 11:51:39 +02:00
parent ef084ad12a
commit 33f4b4d834
7 changed files with 70 additions and 41 deletions

View File

@ -54,27 +54,25 @@ public:
sum_blocks_granularity += (block_size * length);
}
void insertFromChunk(Chunk && chunk, size_t limit_rows)
void insertChunk(Chunk && chunk, size_t rows_size)
{
if (merged_rows)
throw Exception("Cannot insert to MergedData from Chunk because MergedData is not empty.",
ErrorCodes::LOGICAL_ERROR);
auto num_rows = chunk.getNumRows();
UInt64 num_rows = chunk.getNumRows();
columns = chunk.mutateColumns();
if (limit_rows && num_rows > limit_rows)
if (rows_size < num_rows)
{
num_rows = limit_rows;
size_t pop_size = num_rows - rows_size;
for (auto & column : columns)
column = IColumn::mutate(column->cut(0, num_rows));
column->popBack(pop_size);
}
need_flush = true;
total_merged_rows += num_rows;
merged_rows = num_rows;
/// We don't care about granularity here. Because, for fast-forward optimization, chunk will be moved as-is.
/// sum_blocks_granularity += block_size * num_rows;
total_merged_rows += rows_size;
merged_rows = rows_size;
}
Chunk pull()

View File

@ -7,11 +7,6 @@
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
MergingSortedAlgorithm::MergingSortedAlgorithm(
Block header_,
size_t num_inputs,
@ -113,15 +108,15 @@ IMergingAlgorithm::Status MergingSortedAlgorithm::mergeBatchImpl(TSortingQueue &
if (merged_data.hasEnoughRows())
return Status(merged_data.pull());
auto [current_ptr, batch_size] = queue.current();
auto [current_ptr, initial_batch_size] = queue.current();
auto current = *current_ptr;
bool batch_skip_last_row = false;
if (current.impl->isLast(batch_size) && current_inputs[current.impl->order].skip_last_row)
if (current.impl->isLast(initial_batch_size) && current_inputs[current.impl->order].skip_last_row)
{
batch_skip_last_row = true;
if (batch_size == 1)
if (initial_batch_size == 1)
{
/// Get the next block from the corresponding source, if there is one.
queue.removeTop();
@ -130,27 +125,63 @@ IMergingAlgorithm::Status MergingSortedAlgorithm::mergeBatchImpl(TSortingQueue &
}
UInt64 merged_rows = merged_data.mergedRows();
size_t updated_batch_size = initial_batch_size;
if (merged_rows + batch_size > merged_data.maxBlockSize())
if (merged_rows + updated_batch_size > merged_data.maxBlockSize())
{
batch_skip_last_row = false;
batch_size -= merged_rows + batch_size - merged_data.maxBlockSize();
updated_batch_size -= merged_rows + updated_batch_size - merged_data.maxBlockSize();
}
bool limit_reached = false;
if (limit && merged_rows + batch_size > limit)
if (limit && merged_rows + updated_batch_size > limit)
{
batch_skip_last_row = false;
batch_size -= merged_rows + batch_size - limit;
updated_batch_size -= merged_rows + updated_batch_size - limit;
limit_reached = true;
}
size_t insert_rows_size = batch_size - static_cast<size_t>(batch_skip_last_row);
size_t insert_rows_size = updated_batch_size - static_cast<size_t>(batch_skip_last_row);
if (unlikely(current.impl->isFirst() && current.impl->isLast(initial_batch_size)))
{
/** This is special optimization if current cursor is totally less than next cursor.
* We want to insert current cursor chunk directly in merged data.
*
* First if merged_data is not empty we need to flush it.
* We will get into the same condition on next mergeBatch call.
*
* Then we can insert chunk directly in merged data.
*/
if (merged_data.mergedRows() != 0)
return Status(merged_data.pull());
size_t source_num = current.impl->order;
merged_data.insertChunk(std::move(current_inputs[source_num].chunk), insert_rows_size);
current_inputs[source_num].chunk = Chunk();
auto status = Status(merged_data.pull(), limit_reached);
if (!limit_reached)
status.required_source = source_num;
if (out_row_sources_buf)
{
RowSourcePart row_source(current.impl->order);
for (size_t i = 0; i < insert_rows_size; ++i)
out_row_sources_buf->write(row_source.data);
}
/// We will get the next block from the corresponding source, if there is one.
queue.removeTop();
return status;
}
merged_data.insertRows(current->all_columns, current->getRow(), insert_rows_size, current->rows);
if (out_row_sources_buf)
{
/// Actually, current.impl->order stores source number (i.e. cursors[current.impl->order] == current.impl)
RowSourcePart row_source(current.impl->order);
for (size_t i = 0; i < insert_rows_size; ++i)
@ -160,9 +191,9 @@ IMergingAlgorithm::Status MergingSortedAlgorithm::mergeBatchImpl(TSortingQueue &
if (limit_reached)
break;
if (!current->isLast(batch_size))
if (!current->isLast(updated_batch_size))
{
queue.next(batch_size);
queue.next(updated_batch_size);
}
else
{

View File

@ -125,23 +125,23 @@ Selected 2/3 parts by partition key, 2 parts by primary key, 2/2 marks by primar
--------- tDD ----------------------------
select uniqExact(_part), count() from tDD where toDate(d)=toDate('2020-09-24');
1 10000
1 8192
Selected 1/4 parts by partition key, 1 parts by primary key, 1/1 marks by primary key, 1 marks to read from 1 ranges
select uniqExact(_part), count() FROM tDD WHERE toDate(d) = toDate('2020-09-24');
1 10000
1 8192
Selected 1/4 parts by partition key, 1 parts by primary key, 1/1 marks by primary key, 1 marks to read from 1 ranges
select uniqExact(_part), count() FROM tDD WHERE toDate(d) = '2020-09-24';
1 10000
1 8192
Selected 1/4 parts by partition key, 1 parts by primary key, 1/1 marks by primary key, 1 marks to read from 1 ranges
select uniqExact(_part), count() FROM tDD WHERE toDate(d) >= '2020-09-23' and toDate(d) <= '2020-09-26';
3 40000
3 32768
Selected 3/4 parts by partition key, 3 parts by primary key, 4/4 marks by primary key, 4 marks to read from 3 ranges
select uniqExact(_part), count() FROM tDD WHERE toYYYYMMDD(d) >= 20200923 and toDate(d) <= '2020-09-26';
3 40000
3 32768
Selected 3/4 parts by partition key, 3 parts by primary key, 4/4 marks by primary key, 4 marks to read from 3 ranges
--------- sDD ----------------------------

View File

@ -1,4 +1,4 @@
"marks",6
"marks",7
"optimize_trivial_count_query",16384
"max_threads=1",16384
"max_threads=100",16384

View File

@ -47,15 +47,15 @@ all_2_2_0 u Default
174250
======
174250
58413
57920
57917
57920
58413
174250
======
174250
58413
57920
57917
57920
58413
174250
======
508413
57920

View File

@ -47,9 +47,9 @@ SELECT id, u, s FROM remote('127.0.0.{1,2}', currentDatabase(), t_sparse_full) O
SELECT '======';
SELECT sum(u) FROM t_sparse_full GROUP BY id % 3 AS k WITH TOTALS ORDER BY k;
SELECT '======';
SELECT sum(u) FROM t_sparse_full GROUP BY id % 3 AS k WITH ROLLUP ORDER BY k;
SELECT sum(u) AS value FROM t_sparse_full GROUP BY id % 3 AS k WITH ROLLUP ORDER BY value;
SELECT '======';
SELECT sum(u) FROM t_sparse_full GROUP BY id % 3 AS k WITH CUBE ORDER BY k;
SELECT sum(u) AS value FROM t_sparse_full GROUP BY id % 3 AS k WITH CUBE ORDER BY value;
SELECT '======';
SELECT sum(id) FROM t_sparse_full GROUP BY u % 3 AS k ORDER BY k;
SELECT '======';