dbms: development [#CONV-2944].

This commit is contained in:
Alexey Milovidov 2012-08-16 17:02:31 +00:00
parent 395db3be6a
commit f89b460b8f
4 changed files with 163 additions and 11 deletions

View File

@ -121,6 +121,7 @@ namespace ErrorCodes
CANNOT_CLOCK_GETTIME,
UNKNOWN_SETTING,
THERE_IS_NO_DEFAULT_VALUE,
INCORRECT_DATA,
POCO_EXCEPTION = 1000,
STD_EXCEPTION,

View File

@ -1,5 +1,6 @@
#pragma once
#include <DB/Core/Row.h>
#include <DB/DataStreams/MergingSortedBlockInputStream.h>
@ -7,31 +8,62 @@ namespace DB
{
/** Соединяет несколько сортированных потоков в один.
* При этом, для каждого значения столбца id_column,
* оставляет первую строку со значением столбца sign_column = -1
* и последнюю строку со значением столбца sign_column = 1
* При этом, для каждой группы идущих подряд одинаковых значений столбца id_column (например, идентификатора визита),
* оставляет не более одной строки со значением столбца sign_column = -1 ("положительной строки")
* и не более одиной строки со значением столбца sign_column = 1 ("отрицательной строки").
* То есть - производит схлопывание записей из лога изменений.
*
* Если количество положительных и отрицательных строк совпадает - то пишет первую отрицательную и последнюю положительную строку.
* Если положительных на 1 больше, чем отрицательных - то пишет только последнюю положительную строку.
* Если отрицательных на 1 больше, чем положительных - то пишет только первую отрицательную строку.
* Иначе - логическая ошибка.
*/
class CollapsingSortedBlockInputStream : public MergingSortedBlockInputStream
{
public:
CollapsingSortedBlockInputStream(BlockInputStreams inputs_, SortDescription & description_, size_t max_block_size_)
: inputs(inputs_), description(description_), max_block_size(max_block_size_), first(true),
num_columns(0), source_blocks(inputs.size()), cursors(inputs.size()), log(&Logger::get("CollapsingSortedBlockInputStream"))
CollapsingSortedBlockInputStream(BlockInputStreams inputs_, SortDescription & description_,
const String & id_column_, const String & sign_column_, size_t max_block_size_)
: MergingSortedBlockInputStream(inputs_, description_, max_block_size_),
id_column(id_column_), sign_column(sign_column_),
id_column_number(0), sign_column_number(0),
log(&Logger::get("CollapsingSortedBlockInputStream")),
current_id(0), count_positive(0), count_negative(0)
{
children.insert(children.end(), inputs.begin(), inputs.end());
}
Block readImpl();
String getName() const { return "CollapsingSortedBlockInputStream"; }
BlockInputStreamPtr clone() { return new CollapsingSortedBlockInputStream(inputs, description, max_block_size); }
BlockInputStreamPtr clone() { return new CollapsingSortedBlockInputStream(inputs, description, id_column, sign_column, max_block_size); }
private:
SortCursor first_negative;
SortCursor last_positive;
String id_column;
String sign_column;
size_t id_column_number;
size_t sign_column_number;
Logger * log;
UInt64 current_id; /// Текущий идентификатор "визита".
Row first_negative; /// Первая отрицательная строка для текущего идентификатора "визита".
Row last_positive; /// Последняя положительная строка для текущего идентификатора "визита".
size_t count_positive; /// Количество положительных строк для текущего идентификатора "визита".
size_t count_negative; /// Количество отрицательных строк для текущего идентификатора "визита".
/// Сохранить строчку, на которую указывает cursor в row.
void setRow(Row & row, SortCursor & cursor)
{
for (size_t i = 0; i < num_columns; ++i)
row[i] = (*cursor->all_columns[i])[cursor->pos];
}
/// Вставить в результат строки для текущего идентификатора "визита".
void insertRows(ColumnPlainPtrs & merged_columns);
};
}

View File

@ -0,0 +1,119 @@
#include <DB/DataStreams/CollapsingSortedBlockInputStream.h>
namespace DB
{
void CollapsingSortedBlockInputStream::insertRows(ColumnPlainPtrs & merged_columns)
{
if (count_positive != 0 && count_negative != 0)
{
if (count_positive == count_negative || count_positive + 1 == count_negative)
for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insert(first_negative[i]);
else if (count_positive == count_negative || count_positive == count_negative + 1)
for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insert(last_positive[i]);
else
throw Exception("Incorrect data: number of rows with sign = 1 ("
+ Poco::NumberFormatter::format(count_positive) +
") differs with number of rows with sign = -1 ("
+ Poco::NumberFormatter::format(count_negative) +
") by more than one (for id = "
+ Poco::NumberFormatter::format(current_id) + ").",
ErrorCodes::INCORRECT_DATA);
}
}
Block CollapsingSortedBlockInputStream::readImpl()
{
if (!inputs.size())
return Block();
if (inputs.size() == 1)
return inputs[0]->read();
size_t merged_rows = 0;
Block merged_block;
ColumnPlainPtrs merged_columns;
init(merged_block, merged_columns);
if (merged_columns.empty())
return Block();
/// Дополнительная инициализация.
if (first_negative.empty())
{
first_negative.resize(num_columns);
last_positive.resize(num_columns);
id_column_number = merged_block.getPositionByName(id_column);
sign_column_number = merged_block.getPositionByName(sign_column);
}
/// Вынимаем строки в нужном порядке и кладём в merged_block, пока строк не больше max_block_size
while (!queue.empty())
{
SortCursor current = queue.top();
queue.pop();
UInt64 id = boost::get<UInt64>((*current->all_columns[id_column_number])[current->pos]);
Int8 sign = boost::get<Int64>((*current->all_columns[sign_column_number])[current->pos]);
if (id != current_id)
{
current_id = id;
/// Запишем данные для предыдущего визита.
insertRows(merged_columns);
count_negative = 0;
count_positive = 0;
}
if (sign == 1)
{
++count_positive;
setRow(last_positive, current);
}
else if (sign == -1)
{
if (!count_negative)
setRow(first_negative, current);
++count_negative;
}
else
throw Exception("Incorrect data: Sign = " + Poco::NumberFormatter::format(sign) + " (must be 1 or -1).",
ErrorCodes::INCORRECT_DATA);
for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insert((*current->all_columns[i])[current->pos]);
if (!current->isLast())
{
current->next();
queue.push(current);
}
else
{
/// Достаём из соответствующего источника следующий блок, если есть.
fetchNextBlock(current);
}
++merged_rows;
if (merged_rows == max_block_size)
return merged_block;
}
/// Запишем данные для последнего визита.
insertRows(merged_columns);
inputs.clear();
return merged_block;
}
}

View File

@ -28,7 +28,7 @@ void MergingSortedBlockInputStream::init(Block & merged_block, ColumnPlainPtrs &
if (!num_columns)
num_columns = source_blocks[0].columns();
cursors[i] = SortCursorImpl(*it, description);
cursors[i] = SortCursorImpl(*it, description, i);
queue.push(SortCursor(&cursors[i]));
}
}