dbms, ███████████: development [#CONV-5417].

This commit is contained in:
Alexey Milovidov 2012-08-20 05:32:50 +00:00
parent 72a6eb812d
commit 88d79b7bb3
6 changed files with 62 additions and 48 deletions

View File

@ -1,5 +1,4 @@
#ifndef DBMS_CORE_COLUMN_ARRAY_H #pragma once
#define DBMS_CORE_COLUMN_ARRAY_H
#include <string.h> // memcpy #include <string.h> // memcpy
@ -249,5 +248,3 @@ protected:
} }
#endif

View File

@ -8,7 +8,7 @@ namespace DB
{ {
/** Соединяет несколько сортированных потоков в один. /** Соединяет несколько сортированных потоков в один.
* При этом, для каждой группы идущих подряд одинаковых значений столбца id_column (например, идентификатора визита), * При этом, для каждой группы идущих подряд одинаковых значений первичного ключа (столбцов, по которым сортируются данные),
* оставляет не более одной строки со значением столбца sign_column = -1 ("отрицательной строки") * оставляет не более одной строки со значением столбца sign_column = -1 ("отрицательной строки")
* и не более одиной строки со значением столбца sign_column = 1 ("положительной строки"). * и не более одиной строки со значением столбца sign_column = 1 ("положительной строки").
* То есть - производит схлопывание записей из лога изменений. * То есть - производит схлопывание записей из лога изменений.
@ -22,12 +22,11 @@ class CollapsingSortedBlockInputStream : public MergingSortedBlockInputStream
{ {
public: public:
CollapsingSortedBlockInputStream(BlockInputStreams inputs_, SortDescription & description_, CollapsingSortedBlockInputStream(BlockInputStreams inputs_, SortDescription & description_,
const String & id_column_, const String & sign_column_, size_t max_block_size_) const String & sign_column_, size_t max_block_size_)
: MergingSortedBlockInputStream(inputs_, description_, max_block_size_), : MergingSortedBlockInputStream(inputs_, description_, max_block_size_),
id_column(id_column_), sign_column(sign_column_), sign_column(sign_column_), sign_column_number(0),
id_column_number(0), sign_column_number(0),
log(&Logger::get("CollapsingSortedBlockInputStream")), log(&Logger::get("CollapsingSortedBlockInputStream")),
current_id(0), count_positive(0), count_negative(0) count_positive(0), count_negative(0)
{ {
} }
@ -36,24 +35,22 @@ public:
String getName() const { return "CollapsingSortedBlockInputStream"; } String getName() const { return "CollapsingSortedBlockInputStream"; }
BlockInputStreamPtr clone() { return new CollapsingSortedBlockInputStream(inputs, description, id_column, sign_column, max_block_size); } BlockInputStreamPtr clone() { return new CollapsingSortedBlockInputStream(inputs, description, sign_column, max_block_size); }
private: private:
String id_column;
String sign_column; String sign_column;
size_t id_column_number;
size_t sign_column_number; size_t sign_column_number;
Logger * log; Logger * log;
UInt64 current_id; /// Текущий идентификатор "визита". Row current_key; /// Текущий первичный ключ.
Row next_key; /// Первичный ключ следующей строки.
Row first_negative; /// Первая отрицательная строка для текущего идентификатора "визита". Row first_negative; /// Первая отрицательная строка для текущего первичного ключа.
Row last_positive; /// Последняя положительная строка для текущего идентификатора "визита". Row last_positive; /// Последняя положительная строка для текущего первичного ключа.
size_t count_positive; /// Количество положительных строк для текущего идентификатора "визита". size_t count_positive; /// Количество положительных строк для текущего первичного ключа.
size_t count_negative; /// Количество отрицательных строк для текущего идентификатора "визита". size_t count_negative; /// Количество отрицательных строк для текущего первичного ключа.
/// Сохранить строчку, на которую указывает cursor в row. /// Сохранить строчку, на которую указывает cursor в row.
@ -63,8 +60,17 @@ private:
row[i] = (*cursor->all_columns[i])[cursor->pos]; row[i] = (*cursor->all_columns[i])[cursor->pos];
} }
/// Сохранить первичный ключ, на который указывает cursor в row.
void setPrimaryKey(Row & row, SortCursor & cursor)
{
for (size_t i = 0; i < cursor->sort_columns_size; ++i)
row[i] = (*cursor->sort_columns[i])[cursor->pos];
}
/// Вставить в результат строки для текущего идентификатора "визита". /// Вставить в результат строки для текущего идентификатора "визита".
void insertRows(ColumnPlainPtrs & merged_columns, size_t & merged_rows); void insertRows(ColumnPlainPtrs & merged_columns, size_t & merged_rows);
void throwIncorrectData();
}; };
} }

View File

@ -45,8 +45,8 @@ struct Range;
* Column.bin - данные столбца * Column.bin - данные столбца
* Column.mrk - засечки, указывающие, откуда начинать чтение, чтобы пропустить n * k строк. * Column.mrk - засечки, указывающие, откуда начинать чтение, чтобы пропустить n * k строк.
* *
* Если указано id_column и sign_column, то при склейке кусков, также "схлопываются" * Если указано sign_column, то при склейке кусков, также "схлопываются"
* пары записей с разными значениями sign_column для одного значения id_column. * пары записей с разными значениями sign_column для одного значения первичного ключа.
* (см. CollapsingSortedBlockInputStream.h) * (см. CollapsingSortedBlockInputStream.h)
*/ */
class StorageMergeTree : public IStorage class StorageMergeTree : public IStorage
@ -68,7 +68,7 @@ public:
Context & context_, Context & context_,
ASTPtr & primary_expr_ast_, const String & date_column_name_, ASTPtr & primary_expr_ast_, const String & date_column_name_,
size_t index_granularity_, size_t index_granularity_,
const String & id_column_ = "", const String & sign_column_ = "", const String & sign_column_ = "",
size_t delay_time_to_merge_different_level_parts_ = DEFAULT_DELAY_TIME_TO_MERGE_DIFFERENT_LEVEL_PARTS); size_t delay_time_to_merge_different_level_parts_ = DEFAULT_DELAY_TIME_TO_MERGE_DIFFERENT_LEVEL_PARTS);
~StorageMergeTree(); ~StorageMergeTree();
@ -117,7 +117,6 @@ private:
size_t index_granularity; size_t index_granularity;
/// Для схлопывания записей об изменениях, если это требуется. /// Для схлопывания записей об изменениях, если это требуется.
String id_column;
String sign_column; String sign_column;
size_t delay_time_to_merge_different_level_parts; size_t delay_time_to_merge_different_level_parts;

View File

@ -5,6 +5,26 @@ namespace DB
{ {
void CollapsingSortedBlockInputStream::throwIncorrectData()
{
std::stringstream s;
s << "Incorrect data: number of rows with sign = 1 (" << count_positive
<< ") differs with number of rows with sign = -1 (" << count_negative
<< ") by more than one (for key: ";
for (size_t i = 0, size = current_key.size(); i < size; ++i)
{
if (i != 0)
s << ", ";
s << boost::apply_visitor(FieldVisitorToString(), current_key[i]);
}
s << ").";
throw Exception(s.str(), ErrorCodes::INCORRECT_DATA);
}
void CollapsingSortedBlockInputStream::insertRows(ColumnPlainPtrs & merged_columns, size_t & merged_rows) void CollapsingSortedBlockInputStream::insertRows(ColumnPlainPtrs & merged_columns, size_t & merged_rows)
{ {
if (count_positive != 0 || count_negative != 0) if (count_positive != 0 || count_negative != 0)
@ -24,13 +44,7 @@ void CollapsingSortedBlockInputStream::insertRows(ColumnPlainPtrs & merged_colum
} }
if (!(count_positive == count_negative || count_positive + 1 == count_negative || count_positive == count_negative + 1)) if (!(count_positive == count_negative || count_positive + 1 == count_negative || count_positive == count_negative + 1))
throw Exception("Incorrect data: number of rows with sign = 1 (" throwIncorrectData();
+ Poco::NumberFormatter::format(count_positive) +
") differs with number of rows with sign = -1 ("
+ Poco::NumberFormatter::format(count_negative) +
") by more than one (for id = "
+ Poco::NumberFormatter::format(current_id) + ").",
ErrorCodes::INCORRECT_DATA);
} }
} }
@ -56,8 +70,9 @@ Block CollapsingSortedBlockInputStream::readImpl()
{ {
first_negative.resize(num_columns); first_negative.resize(num_columns);
last_positive.resize(num_columns); last_positive.resize(num_columns);
current_key.resize(description.size());
next_key.resize(description.size());
id_column_number = merged_block.getPositionByName(id_column);
sign_column_number = merged_block.getPositionByName(sign_column); sign_column_number = merged_block.getPositionByName(sign_column);
} }
@ -67,15 +82,15 @@ Block CollapsingSortedBlockInputStream::readImpl()
SortCursor current = queue.top(); SortCursor current = queue.top();
queue.pop(); queue.pop();
UInt64 id = boost::get<UInt64>((*current->all_columns[id_column_number])[current->pos]);
Int8 sign = boost::get<Int64>((*current->all_columns[sign_column_number])[current->pos]); Int8 sign = boost::get<Int64>((*current->all_columns[sign_column_number])[current->pos]);
setPrimaryKey(next_key, current);
if (id != current_id) if (next_key != current_key)
{ {
/// Запишем данные для предыдущего визита. /// Запишем данные для предыдущего визита.
insertRows(merged_columns, merged_rows); insertRows(merged_columns, merged_rows);
current_id = id; current_key = next_key;
count_negative = 0; count_negative = 0;
count_positive = 0; count_positive = 0;
} }

View File

@ -142,28 +142,26 @@ StoragePtr StorageFactory::get(
* - имя столбца с датой; * - имя столбца с датой;
* - выражение для сортировки в скобках; * - выражение для сортировки в скобках;
* - index_granularity; * - index_granularity;
* - имя столбца - идентификатора "визита";
* - имя столбца, содержащего тип строчки с изменением "визита" (принимающего значения 1 и -1). * - имя столбца, содержащего тип строчки с изменением "визита" (принимающего значения 1 и -1).
* Например: ENGINE = CollapsingMergeTree(EventDate, (CounterID, EventDate, intHash32(UniqID), EventTime), 8192, VisitID, Sign). * Например: ENGINE = CollapsingMergeTree(EventDate, (CounterID, EventDate, intHash32(UniqID), VisitID), 8192, Sign).
*/ */
ASTs & args_func = dynamic_cast<ASTFunction &>(*dynamic_cast<ASTCreateQuery &>(*query).storage).children; ASTs & args_func = dynamic_cast<ASTFunction &>(*dynamic_cast<ASTCreateQuery &>(*query).storage).children;
if (args_func.size() != 1) if (args_func.size() != 1)
throw Exception("Storage CollapsingMergeTree requires exactly 5 parameters" throw Exception("Storage CollapsingMergeTree requires exactly 4 parameters"
" - name of column with date, primary key expression, index granularity, id_column, sign_column.", " - name of column with date, primary key expression, index granularity, sign_column.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
ASTs & args = dynamic_cast<ASTExpressionList &>(*args_func.at(0)).children; ASTs & args = dynamic_cast<ASTExpressionList &>(*args_func.at(0)).children;
if (args.size() != 5) if (args.size() != 4)
throw Exception("Storage CollapsingMergeTree requires exactly 5 parameters" throw Exception("Storage CollapsingMergeTree requires exactly 4 parameters"
" - name of column with date, primary key expression, index granularity, id_column, sign_column.", " - name of column with date, primary key expression, index granularity, sign_column.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
String date_column_name = dynamic_cast<ASTIdentifier &>(*args[0]).name; String date_column_name = dynamic_cast<ASTIdentifier &>(*args[0]).name;
UInt64 index_granularity = boost::get<UInt64>(dynamic_cast<ASTLiteral &>(*args[2]).value); UInt64 index_granularity = boost::get<UInt64>(dynamic_cast<ASTLiteral &>(*args[2]).value);
String id_column_name = dynamic_cast<ASTIdentifier &>(*args[3]).name; String sign_column_name = dynamic_cast<ASTIdentifier &>(*args[3]).name;
String sign_column_name = dynamic_cast<ASTIdentifier &>(*args[4]).name;
ASTFunction & primary_expr_func = dynamic_cast<ASTFunction &>(*args[1]); ASTFunction & primary_expr_func = dynamic_cast<ASTFunction &>(*args[1]);
if (primary_expr_func.name != "tuple") if (primary_expr_func.name != "tuple")
@ -172,8 +170,7 @@ StoragePtr StorageFactory::get(
ASTPtr primary_expr = primary_expr_func.children.at(0); ASTPtr primary_expr = primary_expr_func.children.at(0);
return new StorageMergeTree(data_path, table_name, columns, context, primary_expr, date_column_name, index_granularity, return new StorageMergeTree(data_path, table_name, columns, context, primary_expr, date_column_name, index_granularity, sign_column_name);
id_column_name, sign_column_name);
} }
else if (name == "SystemNumbers") else if (name == "SystemNumbers")
{ {

View File

@ -638,12 +638,12 @@ StorageMergeTree::StorageMergeTree(
Context & context_, Context & context_,
ASTPtr & primary_expr_ast_, const String & date_column_name_, ASTPtr & primary_expr_ast_, const String & date_column_name_,
size_t index_granularity_, size_t index_granularity_,
const String & id_column_, const String & sign_column_, const String & sign_column_,
size_t delay_time_to_merge_different_level_parts_) size_t delay_time_to_merge_different_level_parts_)
: path(path_), name(name_), full_path(path + escapeForFileName(name) + '/'), columns(columns_), : path(path_), name(name_), full_path(path + escapeForFileName(name) + '/'), columns(columns_),
context(context_), primary_expr_ast(primary_expr_ast_->clone()), context(context_), primary_expr_ast(primary_expr_ast_->clone()),
date_column_name(date_column_name_), index_granularity(index_granularity_), date_column_name(date_column_name_), index_granularity(index_granularity_),
id_column(id_column_), sign_column(sign_column_), sign_column(sign_column_),
delay_time_to_merge_different_level_parts(delay_time_to_merge_different_level_parts_), delay_time_to_merge_different_level_parts(delay_time_to_merge_different_level_parts_),
increment(full_path + "increment.txt"), log(&Logger::get("StorageMergeTree: " + name)) increment(full_path + "increment.txt"), log(&Logger::get("StorageMergeTree: " + name))
{ {
@ -1265,9 +1265,9 @@ void StorageMergeTree::mergeParts(DataPartPtr left, DataPartPtr right)
full_path + right->name + '/', DEFAULT_BLOCK_SIZE, all_column_names, *this, right, empty_prefix, empty_range), primary_expr)); full_path + right->name + '/', DEFAULT_BLOCK_SIZE, all_column_names, *this, right, empty_prefix, empty_range), primary_expr));
BlockInputStreamPtr merged_stream = new AddingDefaultBlockInputStream( BlockInputStreamPtr merged_stream = new AddingDefaultBlockInputStream(
(id_column.empty() (sign_column.empty()
? new MergingSortedBlockInputStream(src_streams, sort_descr, DEFAULT_BLOCK_SIZE) ? new MergingSortedBlockInputStream(src_streams, sort_descr, DEFAULT_BLOCK_SIZE)
: new CollapsingSortedBlockInputStream(src_streams, sort_descr, id_column, sign_column, DEFAULT_BLOCK_SIZE)), : new CollapsingSortedBlockInputStream(src_streams, sort_descr, sign_column, DEFAULT_BLOCK_SIZE)),
columns); columns);
BlockOutputStreamPtr to = new MergedBlockOutputStream(*this, BlockOutputStreamPtr to = new MergedBlockOutputStream(*this,