From e3c0352b45c7ae14bcb37de73c6c3dd482c98e8d Mon Sep 17 00:00:00 2001 From: Michael Kolupaev Date: Fri, 26 Apr 2013 12:44:45 +0000 Subject: [PATCH] clickhouse: optimized FINAL [#CONV-7363]. --- .../CollapsingFinalBlockInputStream.h | 2 +- .../CollapsingFinalBlockInputStream.cpp | 81 +++++++++++-------- 2 files changed, 49 insertions(+), 34 deletions(-) diff --git a/dbms/include/DB/DataStreams/CollapsingFinalBlockInputStream.h b/dbms/include/DB/DataStreams/CollapsingFinalBlockInputStream.h index 03f14e5162b..960d98f7fa5 100644 --- a/dbms/include/DB/DataStreams/CollapsingFinalBlockInputStream.h +++ b/dbms/include/DB/DataStreams/CollapsingFinalBlockInputStream.h @@ -226,7 +226,7 @@ private: Queue queue; - Cursor current; /// Текущий первичный ключ. + Cursor previous; Cursor first_negative; /// Первая отрицательная строка для текущего первичного ключа. Cursor last_positive; /// Последняя положительная строка для текущего первичного ключа. diff --git a/dbms/src/DataStreams/CollapsingFinalBlockInputStream.cpp b/dbms/src/DataStreams/CollapsingFinalBlockInputStream.cpp index c8538a906b9..04a0dcbc807 100644 --- a/dbms/src/DataStreams/CollapsingFinalBlockInputStream.cpp +++ b/dbms/src/DataStreams/CollapsingFinalBlockInputStream.cpp @@ -7,7 +7,7 @@ namespace DB CollapsingFinalBlockInputStream::~CollapsingFinalBlockInputStream() { /// Нужно обезвредить все MergingBlockPtr, чтобы они не пытались класть блоки в output_blocks. - current.block.cancel(); + previous.block.cancel(); first_negative.block.cancel(); last_positive.block.cancel(); @@ -63,7 +63,7 @@ void CollapsingFinalBlockInputStream::commitCurrent() first_negative = Cursor(); last_positive = Cursor(); - current = Cursor(); + previous = Cursor(); } count_negative = 0; @@ -85,44 +85,59 @@ Block CollapsingFinalBlockInputStream::readImpl() { while (!queue.empty() && output_blocks.empty()) { - Cursor next = queue.top(); + Cursor current = queue.top(); queue.pop(); - if (!next.equal(current)) - { - commitCurrent(); - current = next; - } + bool has_next = !queue.empty(); + Cursor next = has_next ? queue.top() : Cursor(); - Int8 sign = next.getSign(); - if (sign == 1) + /// Будем продвигаться в текущем блоке, не используя очередь, пока возможно. + while (true) { - last_positive = next; - ++count_positive; - } - else if (sign == -1) - { - if (!count_negative) - first_negative = next; - ++count_negative; - } - else - reportBadSign(sign); - - if (next.isLast()) - { - fetchNextBlock(next.block->stream_index); - - /// Все потоки кончились. Обработаем последний ключ. - if (queue.empty()) + if (!current.equal(previous)) { commitCurrent(); + previous = current; + } + + Int8 sign = current.getSign(); + if (sign == 1) + { + last_positive = current; + ++count_positive; + } + else if (sign == -1) + { + if (!count_negative) + first_negative = current; + ++count_negative; + } + else + reportBadSign(sign); + + if (current.isLast()) + { + fetchNextBlock(current.block->stream_index); + + /// Все потоки кончились. Обработаем последний ключ. + if (!has_next) + { + commitCurrent(); + } + + break; + } + else + { + current.next(); + + if (has_next && !(next < current)) + { + queue.push(current); + + break; + } } - } - else - { - next.next(); - queue.push(next); } }