Preparation [#METR-20644].

This commit is contained in:
Alexey Milovidov 2016-04-15 20:13:51 +03:00
parent bde00e80cc
commit d79174b2ce
9 changed files with 73 additions and 58 deletions

View File

@ -64,6 +64,8 @@ namespace ErrorCodes
* - Collapsing - при склейке кусков "схлопывать"
* пары записей с разными значениями sign_column для одного значения первичного ключа.
* (см. CollapsingSortedBlockInputStream.h)
* - Replacing - при склейке кусков, при совпадении PK, оставлять только одну строчку
* - последнюю, либо, если задан столбец "версия" - последнюю среди строчек с максимальной версией.
* - Summing - при склейке кусков, при совпадении PK суммировать все числовые столбцы, не входящие в PK.
* - Aggregating - при склейке кусков, при совпадении PK, делается слияние состояний столбцов-агрегатных функций.
* - Unsorted - при склейке кусков, данные не упорядочиваются, а всего лишь конкатенируются;
@ -192,6 +194,20 @@ public:
Summing = 2,
Aggregating = 3,
Unsorted = 4,
Replacing = 5,
};
/// Настройки для разных режимов работы.
struct MergingParams
{
/// Для Collapsing режима.
String sign_column;
/// Для Summing режима. Если пустое - то выбирается автоматически.
Names columns_to_sum;
/// Для Replacing режима. Может быть пустым.
String version_column;
};
static void doNothing(const String & name) {}
@ -215,8 +231,7 @@ public:
const ASTPtr & sampling_expression_, /// nullptr, если семплирование не поддерживается.
size_t index_granularity_,
Mode mode_,
const String & sign_column_, /// Для Collapsing режима.
const Names & columns_to_sum_, /// Для Summing режима. Если пустое - то выбирается автоматически.
const MergingParams & merging_params_,
const MergeTreeSettings & settings_,
const String & log_name_,
bool require_part_metadata_,
@ -433,10 +448,7 @@ public:
/// Режим работы - какие дополнительные действия делать при мердже.
const Mode mode;
/// Для схлопывания записей об изменениях, если используется Collapsing режим работы.
const String sign_column;
/// Для суммирования, если используется Summing режим работы.
const Names columns_to_sum;
const MergingParams merging_params;
const MergeTreeSettings settings;

View File

@ -41,8 +41,7 @@ public:
const ASTPtr & sampling_expression_, /// nullptr, если семплирование не поддерживается.
size_t index_granularity_,
MergeTreeData::Mode mode_,
const String & sign_column_, /// Для Collapsing режима.
const Names & columns_to_sum_, /// Для Summing режима.
const MergeTreeData::MergingParams & merging_params_,
const MergeTreeSettings & settings_);
void shutdown() override;
@ -185,8 +184,7 @@ private:
const ASTPtr & sampling_expression_, /// nullptr, если семплирование не поддерживается.
size_t index_granularity_,
MergeTreeData::Mode mode_,
const String & sign_column_,
const Names & columns_to_sum_,
const MergeTreeData::MergingParams & merging_params_,
const MergeTreeSettings & settings_);
/** Определяет, какие куски нужно объединять, и объединяет их.

View File

@ -87,8 +87,7 @@ public:
const ASTPtr & sampling_expression_, /// nullptr, если семплирование не поддерживается.
size_t index_granularity_,
MergeTreeData::Mode mode_,
const String & sign_column_, /// Для Collapsing режима.
const Names & columns_to_sum_, /// Для Summing режима.
const MergeTreeData::MergingParams & merging_params_,
const MergeTreeSettings & settings_);
void shutdown() override;
@ -319,8 +318,7 @@ private:
const ASTPtr & sampling_expression_,
size_t index_granularity_,
MergeTreeData::Mode mode_,
const String & sign_column_,
const Names & columns_to_sum_,
const MergeTreeData::MergingParams & merging_params_,
const MergeTreeSettings & settings_);
/// Инициализация.

View File

@ -37,8 +37,7 @@ MergeTreeData::MergeTreeData(
const String & date_column_name_, const ASTPtr & sampling_expression_,
size_t index_granularity_,
Mode mode_,
const String & sign_column_,
const Names & columns_to_sum_,
const MergingParams & merging_params_,
const MergeTreeSettings & settings_,
const String & log_name_,
bool require_part_metadata_,
@ -46,7 +45,7 @@ MergeTreeData::MergeTreeData(
: ITableDeclaration{materialized_columns_, alias_columns_, column_defaults_}, context(context_),
date_column_name(date_column_name_), sampling_expression(sampling_expression_),
index_granularity(index_granularity_),
mode(mode_), sign_column(sign_column_), columns_to_sum(columns_to_sum_),
mode(mode_), merging_params(merging_params_),
settings(settings_), primary_expr_ast(primary_expr_ast_ ? primary_expr_ast_->clone() : nullptr),
require_part_metadata(require_part_metadata_),
full_path(full_path_), columns(columns_),
@ -79,15 +78,16 @@ MergeTreeData::MergeTreeData(
/// Проверяем, что столбец sign_column, если нужен, существует, и имеет тип Int8.
if (mode == Collapsing)
{
if (sign_column.empty())
if (merging_params.sign_column.empty())
throw Exception("Logical error: Sign column for storage CollapsingMergeTree is empty", ErrorCodes::LOGICAL_ERROR);
for (const auto & column : *columns)
{
if (column.name == sign_column)
if (column.name == merging_params.sign_column)
{
if (!typeid_cast<const DataTypeInt8 *>(column.type.get()))
throw Exception("Sign column (" + sign_column + ") for storage CollapsingMergeTree must have type Int8."
throw Exception("Sign column (" + merging_params.sign_column + ")"
" for storage CollapsingMergeTree must have type Int8."
" Provided column of type " + column.type->getName() + ".", ErrorCodes::BAD_TYPE_OF_FIELD);
break;
}
@ -95,12 +95,12 @@ MergeTreeData::MergeTreeData(
}
/// Если заданы columns_to_sum, проверяем, что такие столбцы существуют.
if (!columns_to_sum.empty())
if (!merging_params.columns_to_sum.empty())
{
if (mode != Summing)
throw Exception("List of columns to sum for MergeTree cannot be specified in all modes except Summing.", ErrorCodes::LOGICAL_ERROR);
for (const auto & column_to_sum : columns_to_sum)
for (const auto & column_to_sum : merging_params.columns_to_sum)
if (columns->end() == std::find_if(columns->begin(), columns->end(),
[&](const NameAndTypePair & name_and_type) { return column_to_sum == name_and_type.name; }))
throw Exception("Column " + column_to_sum + " listed in columns to sum does not exist in table declaration.");
@ -463,7 +463,7 @@ void MergeTreeData::checkAlter(const AlterCommands & params)
if (primary_expr)
keys = primary_expr->getRequiredColumns();
keys.push_back(sign_column);
keys.push_back(merging_params.sign_column);
std::sort(keys.begin(), keys.end());

View File

@ -425,12 +425,12 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
case MergeTreeData::Collapsing:
merged_stream = std::make_unique<CollapsingSortedBlockInputStream>(
src_streams, data.getSortDescription(), data.sign_column, DEFAULT_MERGE_BLOCK_SIZE);
src_streams, data.getSortDescription(), data.merging_params.sign_column, DEFAULT_MERGE_BLOCK_SIZE);
break;
case MergeTreeData::Summing:
merged_stream = std::make_unique<SummingSortedBlockInputStream>(
src_streams, data.getSortDescription(), data.columns_to_sum, DEFAULT_MERGE_BLOCK_SIZE);
src_streams, data.getSortDescription(), data.merging_params.columns_to_sum, DEFAULT_MERGE_BLOCK_SIZE);
break;
case MergeTreeData::Aggregating:
@ -438,6 +438,11 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE);
break;
case MergeTreeData::Replacing: /// TODO
merged_stream = std::make_unique<MergingSortedBlockInputStream>(
src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE);
break;
case MergeTreeData::Unsorted:
merged_stream = std::make_unique<ConcatBlockInputStream>(src_streams);
break;
@ -680,12 +685,12 @@ MergeTreeData::PerShardDataParts MergeTreeDataMerger::reshardPartition(
case MergeTreeData::Collapsing:
merged_stream = std::make_unique<CollapsingSortedBlockInputStream>(
src_streams, data.getSortDescription(), data.sign_column, DEFAULT_MERGE_BLOCK_SIZE);
src_streams, data.getSortDescription(), data.merging_params.sign_column, DEFAULT_MERGE_BLOCK_SIZE);
break;
case MergeTreeData::Summing:
merged_stream = std::make_unique<SummingSortedBlockInputStream>(
src_streams, data.getSortDescription(), data.columns_to_sum, DEFAULT_MERGE_BLOCK_SIZE);
src_streams, data.getSortDescription(), data.merging_params.columns_to_sum, DEFAULT_MERGE_BLOCK_SIZE);
break;
case MergeTreeData::Aggregating:
@ -693,6 +698,11 @@ MergeTreeData::PerShardDataParts MergeTreeDataMerger::reshardPartition(
src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE);
break;
case MergeTreeData::Replacing: /// TODO
merged_stream = std::make_unique<MergingSortedBlockInputStream>(
src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE);
break;
case MergeTreeData::Unsorted:
merged_stream = std::make_unique<ConcatBlockInputStream>(src_streams);
break;

View File

@ -518,8 +518,8 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
std::vector<String> add_columns = data.getPrimaryExpression()->getRequiredColumns();
column_names_to_read.insert(column_names_to_read.end(), add_columns.begin(), add_columns.end());
if (!data.sign_column.empty())
column_names_to_read.push_back(data.sign_column);
if (!data.merging_params.sign_column.empty())
column_names_to_read.push_back(data.merging_params.sign_column);
std::sort(column_names_to_read.begin(), column_names_to_read.end());
column_names_to_read.erase(std::unique(column_names_to_read.begin(), column_names_to_read.end()), column_names_to_read.end());
@ -799,7 +799,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongThreadsFinal
BlockInputStreams res;
if (to_merge.size() == 1)
{
if (!data.sign_column.empty())
if (!data.merging_params.sign_column.empty())
{
ExpressionActionsPtr sign_filter_expression;
String sign_filter_column;
@ -822,17 +822,22 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongThreadsFinal
break;
case MergeTreeData::Collapsing:
merged = new CollapsingFinalBlockInputStream(to_merge, data.getSortDescription(), data.sign_column);
merged = new CollapsingFinalBlockInputStream(to_merge, data.getSortDescription(), data.merging_params.sign_column);
break;
case MergeTreeData::Summing:
merged = new SummingSortedBlockInputStream(to_merge, data.getSortDescription(), data.columns_to_sum, max_block_size);
merged = new SummingSortedBlockInputStream(to_merge,
data.getSortDescription(), data.merging_params.columns_to_sum, max_block_size);
break;
case MergeTreeData::Aggregating:
merged = new AggregatingSortedBlockInputStream(to_merge, data.getSortDescription(), max_block_size);
break;
case MergeTreeData::Replacing: /// TODO
merged = new MergingSortedBlockInputStream(to_merge, data.getSortDescription(), max_block_size);
break;
case MergeTreeData::Unsorted:
throw Exception("UnsortedMergeTree doesn't support FINAL", ErrorCodes::LOGICAL_ERROR);
}
@ -866,7 +871,7 @@ void MergeTreeDataSelectExecutor::createPositiveSignCondition(
arguments->children.push_back(sign_ptr);
arguments->children.push_back(one_ptr);
sign->name = data.sign_column;
sign->name = data.merging_params.sign_column;
sign->kind = ASTIdentifier::Column;
one->value = Field(static_cast<Int64>(1));

View File

@ -544,11 +544,7 @@ For further info please read the documentation: https://clickhouse.yandex-team.r
ASTPtr sampling_expression;
UInt64 index_granularity;
/// Для Collapsing.
String sign_column_name;
/// Для Summing.
Names columns_to_sum;
MergeTreeData::MergingParams merging_params;
if (replicated)
{
@ -573,7 +569,7 @@ For further info please read the documentation: https://clickhouse.yandex-team.r
if (mode == MergeTreeData::Collapsing)
{
if (auto ast = typeid_cast<ASTIdentifier *>(&*args.back()))
sign_column_name = ast->name;
merging_params.sign_column = ast->name;
else
throw Exception(String("Sign column name must be an unquoted string") + verbose_help, ErrorCodes::BAD_ARGUMENTS);
@ -584,7 +580,7 @@ For further info please read the documentation: https://clickhouse.yandex-team.r
/// Если последний элемент - не index granularity (литерал), то это - список суммируемых столбцов.
if (!typeid_cast<const ASTLiteral *>(&*args.back()))
{
columns_to_sum = extractColumnNames(args.back());
merging_params.columns_to_sum = extractColumnNames(args.back());
args.pop_back();
}
}
@ -617,14 +613,14 @@ For further info please read the documentation: https://clickhouse.yandex-team.r
zookeeper_path, replica_name, attach, data_path, database_name, table_name,
columns, materialized_columns, alias_columns, column_defaults,
context, primary_expr_list, date_column_name,
sampling_expression, index_granularity, mode, sign_column_name, columns_to_sum,
sampling_expression, index_granularity, mode, merging_params,
context.getMergeTreeSettings());
else
return StorageMergeTree::create(
data_path, database_name, table_name,
columns, materialized_columns, alias_columns, column_defaults,
context, primary_expr_list, date_column_name,
sampling_expression, index_granularity, mode, sign_column_name, columns_to_sum,
sampling_expression, index_granularity, mode, merging_params,
context.getMergeTreeSettings());
}
else

View File

@ -33,8 +33,7 @@ StorageMergeTree::StorageMergeTree(
const ASTPtr & sampling_expression_, /// nullptr, если семплирование не поддерживается.
size_t index_granularity_,
MergeTreeData::Mode mode_,
const String & sign_column_,
const Names & columns_to_sum_,
const MergeTreeData::MergingParams & merging_params_,
const MergeTreeSettings & settings_)
: IStorage{materialized_columns_, alias_columns_, column_defaults_},
path(path_), database_name(database_name_), table_name(table_name_), full_path(path + escapeForFileName(table_name) + '/'),
@ -42,7 +41,7 @@ StorageMergeTree::StorageMergeTree(
data(full_path, columns_,
materialized_columns_, alias_columns_, column_defaults_,
context_, primary_expr_ast_, date_column_name_,
sampling_expression_, index_granularity_,mode_, sign_column_, columns_to_sum_,
sampling_expression_, index_granularity_,mode_, merging_params_,
settings_, database_name_ + "." + table_name, false),
reader(data), writer(data), merger(data),
increment(0),
@ -86,15 +85,14 @@ StoragePtr StorageMergeTree::create(
const ASTPtr & sampling_expression_,
size_t index_granularity_,
MergeTreeData::Mode mode_,
const String & sign_column_,
const Names & columns_to_sum_,
const MergeTreeData::MergingParams & merging_params_,
const MergeTreeSettings & settings_)
{
auto res = new StorageMergeTree{
path_, database_name_, table_name_,
columns_, materialized_columns_, alias_columns_, column_defaults_,
context_, primary_expr_ast_, date_column_name_,
sampling_expression_, index_granularity_, mode_, sign_column_, columns_to_sum_, settings_
sampling_expression_, index_granularity_, mode_, merging_params_, settings_
};
StoragePtr res_ptr = res->thisPtr();

View File

@ -190,8 +190,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
const ASTPtr & sampling_expression_,
size_t index_granularity_,
MergeTreeData::Mode mode_,
const String & sign_column_,
const Names & columns_to_sum_,
const MergeTreeData::MergingParams & merging_params_,
const MergeTreeSettings & settings_)
: IStorage{materialized_columns_, alias_columns_, column_defaults_}, context(context_),
current_zookeeper(context.getZooKeeper()), database_name(database_name_),
@ -201,7 +200,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
data(full_path, columns_,
materialized_columns_, alias_columns_, column_defaults_,
context_, primary_expr_ast_, date_column_name_,
sampling_expression_, index_granularity_, mode_, sign_column_, columns_to_sum_,
sampling_expression_, index_granularity_, mode_, merging_params_,
settings_, database_name_ + "." + table_name, true,
[this] (const std::string & name) { enqueuePartForCheck(name); }),
reader(data), writer(data), merger(data), fetcher(data), sharded_partition_uploader_client(*this),
@ -274,7 +273,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
unreplicated_data.reset(new MergeTreeData(unreplicated_path, columns_,
materialized_columns_, alias_columns_, column_defaults_,
context_, primary_expr_ast_,
date_column_name_, sampling_expression_, index_granularity_, mode_, sign_column_, columns_to_sum_, settings_,
date_column_name_, sampling_expression_, index_granularity_, mode_, merging_params_, settings_,
database_name_ + "." + table_name + "[unreplicated]", false));
unreplicated_data->loadDataParts(skip_sanity_checks);
@ -333,8 +332,7 @@ StoragePtr StorageReplicatedMergeTree::create(
const ASTPtr & sampling_expression_,
size_t index_granularity_,
MergeTreeData::Mode mode_,
const String & sign_column_,
const Names & columns_to_sum_,
const MergeTreeData::MergingParams & merging_params_,
const MergeTreeSettings & settings_)
{
auto res = new StorageReplicatedMergeTree{
@ -343,7 +341,7 @@ StoragePtr StorageReplicatedMergeTree::create(
columns_, materialized_columns_, alias_columns_, column_defaults_,
context_, primary_expr_ast_, date_column_name_,
sampling_expression_, index_granularity_, mode_,
sign_column_, columns_to_sum_, settings_};
merging_params_, settings_};
StoragePtr res_ptr = res->thisPtr();
@ -418,7 +416,7 @@ namespace
<< "sampling expression: " << formattedAST(data.sampling_expression) << "\n"
<< "index granularity: " << data.index_granularity << "\n"
<< "mode: " << static_cast<int>(data.mode) << "\n"
<< "sign column: " << data.sign_column << "\n"
<< "sign column: " << data.merging_params.sign_column << "\n"
<< "primary key: " << formattedAST(data.primary_expr_ast) << "\n";
}
@ -477,9 +475,9 @@ namespace
String read_sign_column;
in >> read_sign_column;
if (read_sign_column != data.sign_column)
if (read_sign_column != data.merging_params.sign_column)
throw Exception("Existing table metadata in ZooKeeper differs in sign column."
" Stored in ZooKeeper: " + read_sign_column + ", local: " + data.sign_column,
" Stored in ZooKeeper: " + read_sign_column + ", local: " + data.merging_params.sign_column,
ErrorCodes::METADATA_MISMATCH);
in >> "\nprimary key: ";