This commit is contained in:
Alexey Milovidov 2013-09-30 19:54:25 +00:00
parent 87763e0c95
commit c084250925
3 changed files with 70 additions and 17 deletions

View File

@ -37,9 +37,12 @@ namespace DB
* Column.bin - данные столбца
* Column.mrk - засечки, указывающие, откуда начинать чтение, чтобы пропустить n * k строк.
*
* Если указано sign_column, то при склейке кусков, также "схлопываются"
* пары записей с разными значениями sign_column для одного значения первичного ключа.
* (см. CollapsingSortedBlockInputStream.h)
* Имеется несколько режимов работы, определяющих, что делать при мердже:
* - Ordinary - ничего дополнительно не делать;
* - Collapsing - при склейке кусков "схлопывать"
* пары записей с разными значениями sign_column для одного значения первичного ключа.
* (см. CollapsingSortedBlockInputStream.h)
* - Summing - при склейке кусков, при совпадении PK суммировать все числовые столбцы, не входящие в PK.
*/
struct StorageMergeTreeSettings
@ -104,6 +107,14 @@ friend class MergeTreeBlockOutputStream;
friend class MergedBlockOutputStream;
public:
/// Режим работы. См. выше.
enum Mode
{
Ordinary,
Collapsing,
Summing,
};
/** Подцепить таблицу с соответствующим именем, по соответствующему пути (с / на конце),
* (корректность имён и путей не проверяется)
* состоящую из указанных столбцов.
@ -118,13 +129,26 @@ public:
const String & date_column_name_,
const ASTPtr & sampling_expression_, /// NULL, если семплирование не поддерживается.
size_t index_granularity_,
Mode mode_ = Ordinary,
const String & sign_column_ = "",
const StorageMergeTreeSettings & settings_ = StorageMergeTreeSettings());
void shutdown();
~StorageMergeTree();
std::string getName() const { return sign_column.empty() ? "MergeTree" : "CollapsingMergeTree"; }
std::string getName() const
{
switch (mode)
{
case Ordinary: return "MergeTree";
case Collapsing: return "CollapsingMergeTree";
case Summing: return "SummingMergeTree";
default:
throw Exception("Unknown mode of operation for StorageMergeTree: " + toString(mode), ErrorCodes::LOGICAL_ERROR);
}
}
std::string getTableName() const { return name; }
std::string getSignColumnName() const { return sign_column; }
bool supportsSampling() const { return !!sampling_expression; }
@ -180,7 +204,9 @@ private:
size_t min_marks_for_concurrent_read;
size_t max_marks_to_use_cache;
/// Для схлопывания записей об изменениях, если это требуется.
/// Режим работы - какие дополнительные действия делать при мердже.
Mode mode;
/// Для схлопывания записей об изменениях, если используется Collapsing режим работы.
String sign_column;
StorageMergeTreeSettings settings;
@ -306,6 +332,7 @@ private:
const String & date_column_name_,
const ASTPtr & sampling_expression_, /// NULL, если семплирование не поддерживается.
size_t index_granularity_,
Mode mode_ = Ordinary,
const String & sign_column_ = "",
const StorageMergeTreeSettings & settings_ = StorageMergeTreeSettings());

View File

@ -211,7 +211,7 @@ StoragePtr StorageFactory::get(
else
throw Exception("No addresses listed in config", ErrorCodes::NO_ELEMENTS_IN_CONFIG);
}
else if (name == "MergeTree")
else if (name == "MergeTree" || name == "SummingMergeTree")
{
/** В качестве аргумента для движка должно быть указано:
* - имя столбца с датой;
@ -220,18 +220,20 @@ StoragePtr StorageFactory::get(
* - index_granularity.
* Например: ENGINE = MergeTree(EventDate, intHash32(UniqID), (CounterID, EventDate, intHash32(UniqID), EventTime), 8192).
*
* SummingMergeTree - вариант, в котором при слиянии делается суммирование всех числовых столбцов кроме PK
* - для Баннерной Крутилки.
*/
ASTs & args_func = dynamic_cast<ASTFunction &>(*dynamic_cast<ASTCreateQuery &>(*query).storage).children;
if (args_func.size() != 1)
throw Exception("Storage MergeTree requires 3 or 4 parameters"
throw Exception("Storage " + name + " requires 3 or 4 parameters"
" - name of column with date, [name of column for sampling], primary key expression, index granularity.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
ASTs & args = dynamic_cast<ASTExpressionList &>(*args_func.at(0)).children;
if (args.size() != 3 && args.size() != 4)
throw Exception("Storage MergeTree requires 3 or 4 parameters"
throw Exception("Storage " + name + " requires 3 or 4 parameters"
" - name of column with date, [name of column for sampling], primary key expression, index granularity.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
@ -243,12 +245,14 @@ StoragePtr StorageFactory::get(
ASTFunction & primary_expr_func = dynamic_cast<ASTFunction &>(*args[arg_offset + 1]);
if (primary_expr_func.name != "tuple")
throw Exception("Primary expression for storage MergeTree must be in parentheses.",
throw Exception("Primary expression for storage " + name + " must be in parentheses.",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
ASTPtr primary_expr = primary_expr_func.children.at(0);
return StorageMergeTree::create(data_path, table_name, columns, context, primary_expr, date_column_name, sampling_expression, index_granularity);
return StorageMergeTree::create(
data_path, table_name, columns, context, primary_expr, date_column_name, sampling_expression, index_granularity,
name == "SummingMergeTree" ? StorageMergeTree::Summing : StorageMergeTree::Ordinary);
}
else if (name == "CollapsingMergeTree")
{
@ -288,7 +292,9 @@ StoragePtr StorageFactory::get(
ASTPtr primary_expr = primary_expr_func.children.at(0);
return StorageMergeTree::create(data_path, table_name, columns, context, primary_expr, date_column_name, sampling_expression, index_granularity, sign_column_name);
return StorageMergeTree::create(
data_path, table_name, columns, context, primary_expr, date_column_name,
sampling_expression, index_granularity, StorageMergeTree::Collapsing, sign_column_name);
}
else if (name == "SystemNumbers")
{

View File

@ -61,13 +61,14 @@ StorageMergeTree::StorageMergeTree(
ASTPtr & primary_expr_ast_,
const String & date_column_name_, const ASTPtr & sampling_expression_,
size_t index_granularity_,
Mode mode_,
const String & sign_column_,
const StorageMergeTreeSettings & settings_)
: path(path_), name(name_), full_path(path + escapeForFileName(name) + '/'), columns(columns_),
context(context_), primary_expr_ast(primary_expr_ast_->clone()),
date_column_name(date_column_name_), sampling_expression(sampling_expression_),
index_granularity(index_granularity_),
sign_column(sign_column_),
mode(mode_), sign_column(sign_column_),
settings(settings_),
increment(full_path + "increment.txt"), log(&Logger::get("StorageMergeTree: " + name)), shutdown_called(false),
file_name_regexp("^(\\d{8})_(\\d{8})_(\\d+)_(\\d+)_(\\d+)")
@ -105,10 +106,13 @@ StoragePtr StorageMergeTree::create(
ASTPtr & primary_expr_ast_,
const String & date_column_name_, const ASTPtr & sampling_expression_,
size_t index_granularity_,
Mode mode_,
const String & sign_column_,
const StorageMergeTreeSettings & settings_)
{
return (new StorageMergeTree(path_, name_, columns_, context_, primary_expr_ast_, date_column_name_, sampling_expression_, index_granularity_, sign_column_, settings_))->thisPtr();
return (new StorageMergeTree(
path_, name_, columns_, context_, primary_expr_ast_, date_column_name_,
sampling_expression_, index_granularity_, mode_, sign_column_, settings_))->thisPtr();
}
@ -894,10 +898,26 @@ void StorageMergeTree::mergeParts(std::vector<DataPartPtr> parts)
/// Порядок потоков важен: при совпадении ключа элементы идут в порядке номера потока-источника.
/// В слитом куске строки с одинаковым ключом должны идти в порядке возрастания идентификатора исходного куска, то есть (примерного) возрастания времени вставки.
BlockInputStreamPtr merged_stream = sign_column.empty()
? new MergingSortedBlockInputStream(src_streams, sort_descr, DEFAULT_MERGE_BLOCK_SIZE)
: new CollapsingSortedBlockInputStream(src_streams, sort_descr, sign_column, DEFAULT_MERGE_BLOCK_SIZE);
BlockInputStreamPtr merged_stream;
switch (mode)
{
case Ordinary:
merged_stream = new MergingSortedBlockInputStream(src_streams, sort_descr, DEFAULT_MERGE_BLOCK_SIZE);
break;
case Collapsing:
merged_stream = new CollapsingSortedBlockInputStream(src_streams, sort_descr, sign_column, DEFAULT_MERGE_BLOCK_SIZE);
break;
case Summing:
/* TODO merged_stream =
break;*/
default:
throw Exception("Unknown mode of operation for StorageMergeTree: " + toString(mode), ErrorCodes::LOGICAL_ERROR);
}
MergedBlockOutputStreamPtr to = new MergedBlockOutputStream(*this,
new_data_part->left_date, new_data_part->right_date, new_data_part->left, new_data_part->right, new_data_part->level);