dbms: FINAL: tiny modifications [#METR-2944].

This commit is contained in:
Alexey Milovidov 2015-03-12 03:01:14 +03:00
parent 78e12a7563
commit 9c9162bf94
2 changed files with 41 additions and 48 deletions

View File

@ -15,10 +15,8 @@ class CollapsingFinalBlockInputStream : public IProfilingBlockInputStream
{
public:
CollapsingFinalBlockInputStream(BlockInputStreams inputs_, const SortDescription & description_,
const String & sign_column_)
: description(description_), sign_column(sign_column_),
log(&Logger::get("CollapsingFinalBlockInputStream")),
first(true), count_positive(0), count_negative(0), count_incorrect_data(0), blocks_fetched(0), blocks_output(0)
const String & sign_column_name_)
: description(description_), sign_column_name(sign_column_name_)
{
children.insert(children.end(), inputs_.begin(), inputs_.end());
}
@ -40,7 +38,7 @@ public:
for (size_t i = 0; i < description.size(); ++i)
res << ", " << description[i].getID();
res << ", sign_column, " << sign_column << ")";
res << ", sign_column, " << sign_column_name << ")";
return res.str();
}
@ -55,10 +53,10 @@ private:
{
MergingBlock(Block block_,
size_t stream_index_,
SortDescription desc_,
const SortDescription & desc,
String sign_column_name,
BlockPlainPtrs * output_blocks)
: block(block_), stream_index(stream_index_), desc(desc_), refcount(0), output_blocks(output_blocks)
: block(block_), stream_index(stream_index_), output_blocks(output_blocks)
{
sort_columns.resize(desc.size());
for (size_t i = 0; i < desc.size(); ++i)
@ -86,8 +84,6 @@ private:
/// Строки с одинаковым ключом будут упорядочены по возрастанию stream_index.
size_t stream_index;
SortDescription desc;
size_t rows;
/// Какие строки нужно оставить. Заполняется при слиянии потоков.
@ -98,7 +94,7 @@ private:
const ColumnInt8 * sign_column;
/// Когда достигает нуля, блок можно выдавать в ответ.
int refcount;
int refcount = 0;
/// Куда положить блок, когда он готов попасть в ответ.
BlockPlainPtrs * output_blocks;
@ -181,17 +177,17 @@ private:
Cursor() {}
explicit Cursor(MergingBlockPtr block_, size_t pos_ = 0) : block(block_), pos(pos_) {}
bool operator<(const Cursor & rhs) const
bool operator< (const Cursor & rhs) const
{
for (size_t i = 0; i < block->sort_columns.size(); ++i)
{
int direction = block->desc[i].direction;
int res = direction * block->sort_columns[i]->compareAt(pos, rhs.pos, *(rhs.block->sort_columns[i]), direction);
int res = block->sort_columns[i]->compareAt(pos, rhs.pos, *(rhs.block->sort_columns[i]), 1);
if (res > 0)
return true;
if (res < 0)
return false;
}
return block->stream_index > rhs.block->stream_index;
}
@ -203,7 +199,7 @@ private:
for (size_t i = 0; i < block->sort_columns.size(); ++i)
{
int res = block->desc[i].direction * block->sort_columns[i]->compareAt(pos, rhs.pos, *(rhs.block->sort_columns[i]), 1);
int res = block->sort_columns[i]->compareAt(pos, rhs.pos, *(rhs.block->sort_columns[i]), 1);
if (res != 0)
return false;
}
@ -235,12 +231,12 @@ private:
typedef std::priority_queue<Cursor> Queue;
SortDescription description;
String sign_column;
const SortDescription description;
String sign_column_name;
Logger * log;
Logger * log = &Logger::get("CollapsingFinalBlockInputStream");
bool first;
bool first = true;
BlockPlainPtrs output_blocks;
@ -249,15 +245,15 @@ private:
Cursor previous; /// Текущий первичный ключ.
Cursor last_positive; /// Последняя положительная строка для текущего первичного ключа.
size_t count_positive; /// Количество положительных строк для текущего первичного ключа.
size_t count_negative; /// Количество отрицательных строк для текущего первичного ключа.
bool last_is_positive; /// true, если последняя строка для текущего первичного ключа положительная.
size_t count_positive = 0; /// Количество положительных строк для текущего первичного ключа.
size_t count_negative = 0; /// Количество отрицательных строк для текущего первичного ключа.
bool last_is_positive = false; /// true, если последняя строка для текущего первичного ключа положительная.
size_t count_incorrect_data; /// Чтобы не писать в лог слишком много сообщений об ошибке.
size_t count_incorrect_data = 0; /// Чтобы не писать в лог слишком много сообщений об ошибке.
/// Посчитаем, сколько блоков получили на вход и отдали на выход.
size_t blocks_fetched;
size_t blocks_output;
size_t blocks_fetched = 0;
size_t blocks_output = 0;
void fetchNextBlock(size_t input_index);
void commitCurrent();

View File

@ -12,14 +12,14 @@ CollapsingFinalBlockInputStream::~CollapsingFinalBlockInputStream()
/// Нужно обезвредить все MergingBlockPtr, чтобы они не пытались класть блоки в output_blocks.
previous.block.cancel();
last_positive.block.cancel();
while (!queue.empty())
{
Cursor c = queue.top();
queue.pop();
c.block.cancel();
}
for (size_t i = 0; i < output_blocks.size(); ++i)
delete output_blocks[i];
}
@ -43,7 +43,7 @@ void CollapsingFinalBlockInputStream::fetchNextBlock(size_t input_index)
Block block = stream->read();
if (!block)
return;
MergingBlockPtr merging_block(new MergingBlock(block, input_index, description, sign_column, &output_blocks));
MergingBlockPtr merging_block(new MergingBlock(block, input_index, description, sign_column_name, &output_blocks));
++blocks_fetched;
queue.push(Cursor(merging_block));
}
@ -56,18 +56,18 @@ void CollapsingFinalBlockInputStream::commitCurrent()
{
last_positive.addToFilter();
}
if (!(count_positive == count_negative || count_positive + 1 == count_negative || count_positive == count_negative + 1))
{
if (count_incorrect_data < MAX_ERROR_MESSAGES)
reportBadCounts();
++count_incorrect_data;
}
last_positive = Cursor();
previous = Cursor();
}
count_negative = 0;
count_positive = 0;
}
@ -81,7 +81,7 @@ Block CollapsingFinalBlockInputStream::readImpl()
first = false;
}
/// Будем формировать блоки для ответа, пока не получится непустой блок.
while (true)
{
@ -89,10 +89,10 @@ Block CollapsingFinalBlockInputStream::readImpl()
{
Cursor current = queue.top();
queue.pop();
bool has_next = !queue.empty();
Cursor next = has_next ? queue.top() : Cursor();
/// Будем продвигаться в текущем блоке, не используя очередь, пока возможно.
while (true)
{
@ -101,7 +101,7 @@ Block CollapsingFinalBlockInputStream::readImpl()
commitCurrent();
previous = current;
}
Int8 sign = current.getSign();
if (sign == 1)
{
@ -116,53 +116,50 @@ Block CollapsingFinalBlockInputStream::readImpl()
}
else
reportBadSign(sign);
if (current.isLast())
{
fetchNextBlock(current.block->stream_index);
/// Все потоки кончились. Обработаем последний ключ.
if (!has_next)
{
commitCurrent();
}
break;
}
else
{
current.next();
if (has_next && !(next < current))
{
queue.push(current);
break;
}
}
}
}
/// Конец потока.
if (output_blocks.empty())
{
if (blocks_fetched != blocks_output)
LOG_ERROR(log, "Logical error: CollapsingFinalBlockInputStream has output " << blocks_output << " blocks instead of " << blocks_fetched);
return Block();
}
MergingBlock * merging_block = output_blocks.back();
Block block = merging_block->block;
for (size_t i = 0; i < block.columns(); ++i)
block.getByPosition(i).column = block.getByPosition(i).column->filter(merging_block->filter);
output_blocks.pop_back();
delete merging_block;
++blocks_output;
if (block)
return block;
}