diff --git a/dbms/include/DB/DataStreams/CollapsingFinalBlockInputStream.h b/dbms/include/DB/DataStreams/CollapsingFinalBlockInputStream.h index cc89c946059..11334cd4d3b 100644 --- a/dbms/include/DB/DataStreams/CollapsingFinalBlockInputStream.h +++ b/dbms/include/DB/DataStreams/CollapsingFinalBlockInputStream.h @@ -15,10 +15,8 @@ class CollapsingFinalBlockInputStream : public IProfilingBlockInputStream { public: CollapsingFinalBlockInputStream(BlockInputStreams inputs_, const SortDescription & description_, - const String & sign_column_) - : description(description_), sign_column(sign_column_), - log(&Logger::get("CollapsingFinalBlockInputStream")), - first(true), count_positive(0), count_negative(0), count_incorrect_data(0), blocks_fetched(0), blocks_output(0) + const String & sign_column_name_) + : description(description_), sign_column_name(sign_column_name_) { children.insert(children.end(), inputs_.begin(), inputs_.end()); } @@ -40,7 +38,7 @@ public: for (size_t i = 0; i < description.size(); ++i) res << ", " << description[i].getID(); - res << ", sign_column, " << sign_column << ")"; + res << ", sign_column, " << sign_column_name << ")"; return res.str(); } @@ -55,10 +53,10 @@ private: { MergingBlock(Block block_, size_t stream_index_, - SortDescription desc_, + const SortDescription & desc, String sign_column_name, BlockPlainPtrs * output_blocks) - : block(block_), stream_index(stream_index_), desc(desc_), refcount(0), output_blocks(output_blocks) + : block(block_), stream_index(stream_index_), output_blocks(output_blocks) { sort_columns.resize(desc.size()); for (size_t i = 0; i < desc.size(); ++i) @@ -86,8 +84,6 @@ private: /// Строки с одинаковым ключом будут упорядочены по возрастанию stream_index. size_t stream_index; - - SortDescription desc; size_t rows; /// Какие строки нужно оставить. Заполняется при слиянии потоков. @@ -98,7 +94,7 @@ private: const ColumnInt8 * sign_column; /// Когда достигает нуля, блок можно выдавать в ответ. - int refcount; + int refcount = 0; /// Куда положить блок, когда он готов попасть в ответ. BlockPlainPtrs * output_blocks; @@ -181,17 +177,17 @@ private: Cursor() {} explicit Cursor(MergingBlockPtr block_, size_t pos_ = 0) : block(block_), pos(pos_) {} - bool operator<(const Cursor & rhs) const + bool operator< (const Cursor & rhs) const { for (size_t i = 0; i < block->sort_columns.size(); ++i) { - int direction = block->desc[i].direction; - int res = direction * block->sort_columns[i]->compareAt(pos, rhs.pos, *(rhs.block->sort_columns[i]), direction); + int res = block->sort_columns[i]->compareAt(pos, rhs.pos, *(rhs.block->sort_columns[i]), 1); if (res > 0) return true; if (res < 0) return false; } + return block->stream_index > rhs.block->stream_index; } @@ -203,7 +199,7 @@ private: for (size_t i = 0; i < block->sort_columns.size(); ++i) { - int res = block->desc[i].direction * block->sort_columns[i]->compareAt(pos, rhs.pos, *(rhs.block->sort_columns[i]), 1); + int res = block->sort_columns[i]->compareAt(pos, rhs.pos, *(rhs.block->sort_columns[i]), 1); if (res != 0) return false; } @@ -235,12 +231,12 @@ private: typedef std::priority_queue Queue; - SortDescription description; - String sign_column; + const SortDescription description; + String sign_column_name; - Logger * log; + Logger * log = &Logger::get("CollapsingFinalBlockInputStream"); - bool first; + bool first = true; BlockPlainPtrs output_blocks; @@ -249,15 +245,15 @@ private: Cursor previous; /// Текущий первичный ключ. Cursor last_positive; /// Последняя положительная строка для текущего первичного ключа. - size_t count_positive; /// Количество положительных строк для текущего первичного ключа. - size_t count_negative; /// Количество отрицательных строк для текущего первичного ключа. - bool last_is_positive; /// true, если последняя строка для текущего первичного ключа положительная. + size_t count_positive = 0; /// Количество положительных строк для текущего первичного ключа. + size_t count_negative = 0; /// Количество отрицательных строк для текущего первичного ключа. + bool last_is_positive = false; /// true, если последняя строка для текущего первичного ключа положительная. - size_t count_incorrect_data; /// Чтобы не писать в лог слишком много сообщений об ошибке. + size_t count_incorrect_data = 0; /// Чтобы не писать в лог слишком много сообщений об ошибке. /// Посчитаем, сколько блоков получили на вход и отдали на выход. - size_t blocks_fetched; - size_t blocks_output; + size_t blocks_fetched = 0; + size_t blocks_output = 0; void fetchNextBlock(size_t input_index); void commitCurrent(); diff --git a/dbms/src/DataStreams/CollapsingFinalBlockInputStream.cpp b/dbms/src/DataStreams/CollapsingFinalBlockInputStream.cpp index 82c6a6a5b9b..5a975e7932c 100644 --- a/dbms/src/DataStreams/CollapsingFinalBlockInputStream.cpp +++ b/dbms/src/DataStreams/CollapsingFinalBlockInputStream.cpp @@ -12,14 +12,14 @@ CollapsingFinalBlockInputStream::~CollapsingFinalBlockInputStream() /// Нужно обезвредить все MergingBlockPtr, чтобы они не пытались класть блоки в output_blocks. previous.block.cancel(); last_positive.block.cancel(); - + while (!queue.empty()) { Cursor c = queue.top(); queue.pop(); c.block.cancel(); } - + for (size_t i = 0; i < output_blocks.size(); ++i) delete output_blocks[i]; } @@ -43,7 +43,7 @@ void CollapsingFinalBlockInputStream::fetchNextBlock(size_t input_index) Block block = stream->read(); if (!block) return; - MergingBlockPtr merging_block(new MergingBlock(block, input_index, description, sign_column, &output_blocks)); + MergingBlockPtr merging_block(new MergingBlock(block, input_index, description, sign_column_name, &output_blocks)); ++blocks_fetched; queue.push(Cursor(merging_block)); } @@ -56,18 +56,18 @@ void CollapsingFinalBlockInputStream::commitCurrent() { last_positive.addToFilter(); } - + if (!(count_positive == count_negative || count_positive + 1 == count_negative || count_positive == count_negative + 1)) { if (count_incorrect_data < MAX_ERROR_MESSAGES) reportBadCounts(); ++count_incorrect_data; } - + last_positive = Cursor(); previous = Cursor(); } - + count_negative = 0; count_positive = 0; } @@ -81,7 +81,7 @@ Block CollapsingFinalBlockInputStream::readImpl() first = false; } - + /// Будем формировать блоки для ответа, пока не получится непустой блок. while (true) { @@ -89,10 +89,10 @@ Block CollapsingFinalBlockInputStream::readImpl() { Cursor current = queue.top(); queue.pop(); - + bool has_next = !queue.empty(); Cursor next = has_next ? queue.top() : Cursor(); - + /// Будем продвигаться в текущем блоке, не используя очередь, пока возможно. while (true) { @@ -101,7 +101,7 @@ Block CollapsingFinalBlockInputStream::readImpl() commitCurrent(); previous = current; } - + Int8 sign = current.getSign(); if (sign == 1) { @@ -116,53 +116,50 @@ Block CollapsingFinalBlockInputStream::readImpl() } else reportBadSign(sign); - + if (current.isLast()) { fetchNextBlock(current.block->stream_index); - + /// Все потоки кончились. Обработаем последний ключ. if (!has_next) - { commitCurrent(); - } - + break; } else { current.next(); - + if (has_next && !(next < current)) { queue.push(current); - break; } } } } - + /// Конец потока. if (output_blocks.empty()) { if (blocks_fetched != blocks_output) LOG_ERROR(log, "Logical error: CollapsingFinalBlockInputStream has output " << blocks_output << " blocks instead of " << blocks_fetched); - + return Block(); } - + MergingBlock * merging_block = output_blocks.back(); Block block = merging_block->block; - + for (size_t i = 0; i < block.columns(); ++i) block.getByPosition(i).column = block.getByPosition(i).column->filter(merging_block->filter); - + output_blocks.pop_back(); delete merging_block; - + ++blocks_output; - + if (block) return block; }