This commit is contained in:
Alexey Milovidov 2016-04-15 20:42:51 +03:00
parent d79174b2ce
commit 3923c7abce
12 changed files with 155 additions and 122 deletions

View File

@ -155,7 +155,7 @@ protected:
QueueWithCollation queue_with_collation; QueueWithCollation queue_with_collation;
/// Эти методы используются в Collapsing/Summing/Aggregating SortedBlockInputStream-ах. /// Эти методы используются в Collapsing/Summing/Aggregating... SortedBlockInputStream-ах.
/// Сохранить строчку, на которую указывает cursor, в row. /// Сохранить строчку, на которую указывает cursor, в row.
template <class TSortCursor> template <class TSortCursor>

View File

@ -186,20 +186,23 @@ public:
using AlterDataPartTransactionPtr = std::unique_ptr<AlterDataPartTransaction>; using AlterDataPartTransactionPtr = std::unique_ptr<AlterDataPartTransaction>;
/// Режим работы. См. выше.
enum Mode
{
Ordinary = 0, /// Числа сохраняются - не меняйте.
Collapsing = 1,
Summing = 2,
Aggregating = 3,
Unsorted = 4,
Replacing = 5,
};
/// Настройки для разных режимов работы. /// Настройки для разных режимов работы.
struct MergingParams struct MergingParams
{ {
/// Режим работы. См. выше.
enum Mode
{
Ordinary = 0, /// Числа сохраняются - не меняйте.
Collapsing = 1,
Summing = 2,
Aggregating = 3,
Unsorted = 4,
Replacing = 5,
};
Mode mode;
/// Для Collapsing режима. /// Для Collapsing режима.
String sign_column; String sign_column;
@ -208,8 +211,12 @@ public:
/// Для Replacing режима. Может быть пустым. /// Для Replacing режима. Может быть пустым.
String version_column; String version_column;
/// Проверить наличие и корректность типов столбцов.
void check(const NamesAndTypesList & columns) const;
}; };
static void doNothing(const String & name) {} static void doNothing(const String & name) {}
/** Подцепить таблицу с соответствующим именем, по соответствующему пути (с / на конце), /** Подцепить таблицу с соответствующим именем, по соответствующему пути (с / на конце),
@ -230,7 +237,6 @@ public:
const String & date_column_name_, const String & date_column_name_,
const ASTPtr & sampling_expression_, /// nullptr, если семплирование не поддерживается. const ASTPtr & sampling_expression_, /// nullptr, если семплирование не поддерживается.
size_t index_granularity_, size_t index_granularity_,
Mode mode_,
const MergingParams & merging_params_, const MergingParams & merging_params_,
const MergeTreeSettings & settings_, const MergeTreeSettings & settings_,
const String & log_name_, const String & log_name_,
@ -247,9 +253,10 @@ public:
bool supportsFinal() const bool supportsFinal() const
{ {
return mode == Mode::Collapsing return merging_params.mode == MergingParams::Collapsing
|| mode == Mode::Summing || merging_params.mode == MergingParams::Summing
|| mode == Mode::Aggregating; || merging_params.mode == MergingParams::Aggregating
|| merging_params.mode == MergingParams::Replacing;
} }
Int64 getMaxDataPartIndex(); Int64 getMaxDataPartIndex();
@ -447,7 +454,6 @@ public:
const size_t index_granularity; const size_t index_granularity;
/// Режим работы - какие дополнительные действия делать при мердже. /// Режим работы - какие дополнительные действия делать при мердже.
const Mode mode;
const MergingParams merging_params; const MergingParams merging_params;
const MergeTreeSettings settings; const MergeTreeSettings settings;

View File

@ -311,7 +311,7 @@ public:
/// Заканчиваем запись и достаем чексуммы. /// Заканчиваем запись и достаем чексуммы.
MergeTreeData::DataPart::Checksums checksums; MergeTreeData::DataPart::Checksums checksums;
if (storage.mode != MergeTreeData::Unsorted) if (storage.merging_params.mode != MergeTreeData::MergingParams::Unsorted)
{ {
index_stream->next(); index_stream->next();
checksums.files["primary.idx"].file_size = index_stream->count(); checksums.files["primary.idx"].file_size = index_stream->count();
@ -366,7 +366,7 @@ private:
{ {
Poco::File(part_path).createDirectories(); Poco::File(part_path).createDirectories();
if (storage.mode != MergeTreeData::Unsorted) if (storage.merging_params.mode != MergeTreeData::MergingParams::Unsorted)
{ {
index_file_stream = new WriteBufferFromFile(part_path + "primary.idx", DBMS_DEFAULT_BUFFER_SIZE, O_TRUNC | O_CREAT | O_WRONLY); index_file_stream = new WriteBufferFromFile(part_path + "primary.idx", DBMS_DEFAULT_BUFFER_SIZE, O_TRUNC | O_CREAT | O_WRONLY);
index_stream = new HashingWriteBuffer(*index_file_stream); index_stream = new HashingWriteBuffer(*index_file_stream);
@ -444,7 +444,7 @@ private:
/// Пишем индекс. Индекс содержит значение Primary Key для каждой index_granularity строки. /// Пишем индекс. Индекс содержит значение Primary Key для каждой index_granularity строки.
for (size_t i = index_offset; i < rows; i += storage.index_granularity) for (size_t i = index_offset; i < rows; i += storage.index_granularity)
{ {
if (storage.mode != MergeTreeData::Unsorted) if (storage.merging_params.mode != MergeTreeData::MergingParams::Unsorted)
{ {
for (size_t j = 0, size = primary_columns.size(); j < size; ++j) for (size_t j = 0, size = primary_columns.size(); j < size; ++j)
{ {

View File

@ -40,7 +40,6 @@ public:
const String & date_column_name_, const String & date_column_name_,
const ASTPtr & sampling_expression_, /// nullptr, если семплирование не поддерживается. const ASTPtr & sampling_expression_, /// nullptr, если семплирование не поддерживается.
size_t index_granularity_, size_t index_granularity_,
MergeTreeData::Mode mode_,
const MergeTreeData::MergingParams & merging_params_, const MergeTreeData::MergingParams & merging_params_,
const MergeTreeSettings & settings_); const MergeTreeSettings & settings_);
@ -183,7 +182,6 @@ private:
const String & date_column_name_, const String & date_column_name_,
const ASTPtr & sampling_expression_, /// nullptr, если семплирование не поддерживается. const ASTPtr & sampling_expression_, /// nullptr, если семплирование не поддерживается.
size_t index_granularity_, size_t index_granularity_,
MergeTreeData::Mode mode_,
const MergeTreeData::MergingParams & merging_params_, const MergeTreeData::MergingParams & merging_params_,
const MergeTreeSettings & settings_); const MergeTreeSettings & settings_);

View File

@ -86,7 +86,6 @@ public:
const String & date_column_name_, const String & date_column_name_,
const ASTPtr & sampling_expression_, /// nullptr, если семплирование не поддерживается. const ASTPtr & sampling_expression_, /// nullptr, если семплирование не поддерживается.
size_t index_granularity_, size_t index_granularity_,
MergeTreeData::Mode mode_,
const MergeTreeData::MergingParams & merging_params_, const MergeTreeData::MergingParams & merging_params_,
const MergeTreeSettings & settings_); const MergeTreeSettings & settings_);
@ -317,7 +316,6 @@ private:
const String & date_column_name_, const String & date_column_name_,
const ASTPtr & sampling_expression_, const ASTPtr & sampling_expression_,
size_t index_granularity_, size_t index_granularity_,
MergeTreeData::Mode mode_,
const MergeTreeData::MergingParams & merging_params_, const MergeTreeData::MergingParams & merging_params_,
const MergeTreeSettings & settings_); const MergeTreeSettings & settings_);

View File

@ -36,7 +36,6 @@ MergeTreeData::MergeTreeData(
ASTPtr & primary_expr_ast_, ASTPtr & primary_expr_ast_,
const String & date_column_name_, const ASTPtr & sampling_expression_, const String & date_column_name_, const ASTPtr & sampling_expression_,
size_t index_granularity_, size_t index_granularity_,
Mode mode_,
const MergingParams & merging_params_, const MergingParams & merging_params_,
const MergeTreeSettings & settings_, const MergeTreeSettings & settings_,
const String & log_name_, const String & log_name_,
@ -45,7 +44,7 @@ MergeTreeData::MergeTreeData(
: ITableDeclaration{materialized_columns_, alias_columns_, column_defaults_}, context(context_), : ITableDeclaration{materialized_columns_, alias_columns_, column_defaults_}, context(context_),
date_column_name(date_column_name_), sampling_expression(sampling_expression_), date_column_name(date_column_name_), sampling_expression(sampling_expression_),
index_granularity(index_granularity_), index_granularity(index_granularity_),
mode(mode_), merging_params(merging_params_), merging_params(merging_params_),
settings(settings_), primary_expr_ast(primary_expr_ast_ ? primary_expr_ast_->clone() : nullptr), settings(settings_), primary_expr_ast(primary_expr_ast_ ? primary_expr_ast_->clone() : nullptr),
require_part_metadata(require_part_metadata_), require_part_metadata(require_part_metadata_),
full_path(full_path_), columns(columns_), full_path(full_path_), columns(columns_),
@ -75,36 +74,7 @@ MergeTreeData::MergeTreeData(
"Date column (" + date_column_name + ") does not exist in table declaration.", "Date column (" + date_column_name + ") does not exist in table declaration.",
ErrorCodes::NO_SUCH_COLUMN_IN_TABLE}; ErrorCodes::NO_SUCH_COLUMN_IN_TABLE};
/// Проверяем, что столбец sign_column, если нужен, существует, и имеет тип Int8. merging_params.check(*columns);
if (mode == Collapsing)
{
if (merging_params.sign_column.empty())
throw Exception("Logical error: Sign column for storage CollapsingMergeTree is empty", ErrorCodes::LOGICAL_ERROR);
for (const auto & column : *columns)
{
if (column.name == merging_params.sign_column)
{
if (!typeid_cast<const DataTypeInt8 *>(column.type.get()))
throw Exception("Sign column (" + merging_params.sign_column + ")"
" for storage CollapsingMergeTree must have type Int8."
" Provided column of type " + column.type->getName() + ".", ErrorCodes::BAD_TYPE_OF_FIELD);
break;
}
}
}
/// Если заданы columns_to_sum, проверяем, что такие столбцы существуют.
if (!merging_params.columns_to_sum.empty())
{
if (mode != Summing)
throw Exception("List of columns to sum for MergeTree cannot be specified in all modes except Summing.", ErrorCodes::LOGICAL_ERROR);
for (const auto & column_to_sum : merging_params.columns_to_sum)
if (columns->end() == std::find_if(columns->begin(), columns->end(),
[&](const NameAndTypePair & name_and_type) { return column_to_sum == name_and_type.name; }))
throw Exception("Column " + column_to_sum + " listed in columns to sum does not exist in table declaration.");
}
/// создаём директорию, если её нет /// создаём директорию, если её нет
Poco::File(full_path).createDirectories(); Poco::File(full_path).createDirectories();
@ -130,10 +100,72 @@ MergeTreeData::MergeTreeData(
for (size_t i = 0; i < primary_key_size; ++i) for (size_t i = 0; i < primary_key_size; ++i)
primary_key_data_types[i] = primary_key_sample.unsafeGetByPosition(i).type; primary_key_data_types[i] = primary_key_sample.unsafeGetByPosition(i).type;
} }
else if (mode != Unsorted) else if (merging_params.mode != MergingParams::Unsorted)
throw Exception("Primary key could be empty only for UnsortedMergeTree", ErrorCodes::BAD_ARGUMENTS); throw Exception("Primary key could be empty only for UnsortedMergeTree", ErrorCodes::BAD_ARGUMENTS);
} }
void MergeTreeData::MergingParams::check(const NamesAndTypesList & columns) const
{
/// Проверяем, что столбец sign_column, если нужен, существует, и имеет тип Int8.
if (mode == MergingParams::Collapsing)
{
if (sign_column.empty())
throw Exception("Logical error: Sign column for storage CollapsingMergeTree is empty", ErrorCodes::LOGICAL_ERROR);
for (const auto & column : columns)
{
if (column.name == sign_column)
{
if (!typeid_cast<const DataTypeInt8 *>(column.type.get()))
throw Exception("Sign column (" + sign_column + ")"
" for storage CollapsingMergeTree must have type Int8."
" Provided column of type " + column.type->getName() + ".", ErrorCodes::BAD_TYPE_OF_FIELD);
break;
}
}
}
else if (!sign_column.empty())
throw Exception("Sign column for MergeTree cannot be specified in all modes except Collapsing.", ErrorCodes::LOGICAL_ERROR);
/// Если заданы columns_to_sum, проверяем, что такие столбцы существуют.
if (!columns_to_sum.empty())
{
if (mode != MergingParams::Summing)
throw Exception("List of columns to sum for MergeTree cannot be specified in all modes except Summing.",
ErrorCodes::LOGICAL_ERROR);
for (const auto & column_to_sum : columns_to_sum)
if (columns.end() == std::find_if(columns.begin(), columns.end(),
[&](const NameAndTypePair & name_and_type) { return column_to_sum == name_and_type.name; }))
throw Exception("Column " + column_to_sum + " listed in columns to sum does not exist in table declaration.");
}
/// Проверяем, что столбец version_column, если допустим, имеет тип целого беззнакового числа.
if (!version_column.empty())
{
if (mode != MergingParams::Replacing)
throw Exception("Version column for MergeTree cannot be specified in all modes except Replacing.",
ErrorCodes::LOGICAL_ERROR);
for (const auto & column : columns)
{
if (column.name == version_column)
{
if (!typeid_cast<const DataTypeUInt8 *>(column.type.get())
&& !typeid_cast<const DataTypeUInt16 *>(column.type.get())
&& !typeid_cast<const DataTypeUInt32 *>(column.type.get())
&& !typeid_cast<const DataTypeUInt64 *>(column.type.get()))
throw Exception("Version column (" + version_column + ")"
" for storage ReplacingMergeTree must have type of UInt family."
" Provided column of type " + column.type->getName() + ".", ErrorCodes::BAD_TYPE_OF_FIELD);
break;
}
}
}
}
Int64 MergeTreeData::getMaxDataPartIndex() Int64 MergeTreeData::getMaxDataPartIndex()
{ {
std::lock_guard<std::mutex> lock_all(all_data_parts_mutex); std::lock_guard<std::mutex> lock_all(all_data_parts_mutex);
@ -147,16 +179,17 @@ Int64 MergeTreeData::getMaxDataPartIndex()
std::string MergeTreeData::getModePrefix() const std::string MergeTreeData::getModePrefix() const
{ {
switch (mode) switch (merging_params.mode)
{ {
case Ordinary: return ""; case MergingParams::Ordinary: return "";
case Collapsing: return "Collapsing"; case MergingParams::Collapsing: return "Collapsing";
case Summing: return "Summing"; case MergingParams::Summing: return "Summing";
case Aggregating: return "Aggregating"; case MergingParams::Aggregating: return "Aggregating";
case Unsorted: return "Unsorted"; case MergingParams::Unsorted: return "Unsorted";
case MergingParams::Replacing: return "Replacing";
default: default:
throw Exception("Unknown mode of operation for MergeTreeData: " + toString(mode), ErrorCodes::LOGICAL_ERROR); throw Exception("Unknown mode of operation for MergeTreeData: " + toString(merging_params.mode), ErrorCodes::LOGICAL_ERROR);
} }
} }

View File

@ -402,7 +402,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
__sync_add_and_fetch(&merge_entry->bytes_read_uncompressed, value.bytes); __sync_add_and_fetch(&merge_entry->bytes_read_uncompressed, value.bytes);
}); });
if (data.mode != MergeTreeData::Unsorted) if (data.merging_params.mode != MergeTreeData::MergingParams::Unsorted)
src_streams.push_back(new MaterializingBlockInputStream{ src_streams.push_back(new MaterializingBlockInputStream{
new ExpressionBlockInputStream(input.release(), data.getPrimaryExpression())}); new ExpressionBlockInputStream(input.release(), data.getPrimaryExpression())});
else else
@ -416,39 +416,39 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
/// то есть (примерного) возрастания времени вставки. /// то есть (примерного) возрастания времени вставки.
std::unique_ptr<IProfilingBlockInputStream> merged_stream; std::unique_ptr<IProfilingBlockInputStream> merged_stream;
switch (data.mode) switch (data.merging_params.mode)
{ {
case MergeTreeData::Ordinary: case MergeTreeData::MergingParams::Ordinary:
merged_stream = std::make_unique<MergingSortedBlockInputStream>( merged_stream = std::make_unique<MergingSortedBlockInputStream>(
src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE); src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE);
break; break;
case MergeTreeData::Collapsing: case MergeTreeData::MergingParams::Collapsing:
merged_stream = std::make_unique<CollapsingSortedBlockInputStream>( merged_stream = std::make_unique<CollapsingSortedBlockInputStream>(
src_streams, data.getSortDescription(), data.merging_params.sign_column, DEFAULT_MERGE_BLOCK_SIZE); src_streams, data.getSortDescription(), data.merging_params.sign_column, DEFAULT_MERGE_BLOCK_SIZE);
break; break;
case MergeTreeData::Summing: case MergeTreeData::MergingParams::Summing:
merged_stream = std::make_unique<SummingSortedBlockInputStream>( merged_stream = std::make_unique<SummingSortedBlockInputStream>(
src_streams, data.getSortDescription(), data.merging_params.columns_to_sum, DEFAULT_MERGE_BLOCK_SIZE); src_streams, data.getSortDescription(), data.merging_params.columns_to_sum, DEFAULT_MERGE_BLOCK_SIZE);
break; break;
case MergeTreeData::Aggregating: case MergeTreeData::MergingParams::Aggregating:
merged_stream = std::make_unique<AggregatingSortedBlockInputStream>( merged_stream = std::make_unique<AggregatingSortedBlockInputStream>(
src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE); src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE);
break; break;
case MergeTreeData::Replacing: /// TODO case MergeTreeData::MergingParams::Replacing: /// TODO
merged_stream = std::make_unique<MergingSortedBlockInputStream>( merged_stream = std::make_unique<MergingSortedBlockInputStream>(
src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE); src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE);
break; break;
case MergeTreeData::Unsorted: case MergeTreeData::MergingParams::Unsorted:
merged_stream = std::make_unique<ConcatBlockInputStream>(src_streams); merged_stream = std::make_unique<ConcatBlockInputStream>(src_streams);
break; break;
default: default:
throw Exception("Unknown mode of operation for MergeTreeData: " + toString(data.mode), ErrorCodes::LOGICAL_ERROR); throw Exception("Unknown mode of operation for MergeTreeData: " + toString(data.merging_params.mode), ErrorCodes::LOGICAL_ERROR);
} }
String new_part_tmp_path = data.getFullPath() + "tmp_" + merged_name + "/"; String new_part_tmp_path = data.getFullPath() + "tmp_" + merged_name + "/";
@ -612,7 +612,7 @@ MergeTreeData::PerShardDataParts MergeTreeDataMerger::reshardPartition(
__sync_add_and_fetch(&merge_entry->bytes_read_uncompressed, value.bytes); __sync_add_and_fetch(&merge_entry->bytes_read_uncompressed, value.bytes);
}); });
if (data.mode != MergeTreeData::Unsorted) if (data.merging_params.mode != MergeTreeData::MergingParams::Unsorted)
src_streams.push_back(new MaterializingBlockInputStream{ src_streams.push_back(new MaterializingBlockInputStream{
new ExpressionBlockInputStream(input.release(), data.getPrimaryExpression())}); new ExpressionBlockInputStream(input.release(), data.getPrimaryExpression())});
else else
@ -676,39 +676,39 @@ MergeTreeData::PerShardDataParts MergeTreeDataMerger::reshardPartition(
/// то есть (примерного) возрастания времени вставки. /// то есть (примерного) возрастания времени вставки.
std::unique_ptr<IProfilingBlockInputStream> merged_stream; std::unique_ptr<IProfilingBlockInputStream> merged_stream;
switch (data.mode) switch (data.merging_params.mode)
{ {
case MergeTreeData::Ordinary: case MergeTreeData::MergingParams::Ordinary:
merged_stream = std::make_unique<MergingSortedBlockInputStream>( merged_stream = std::make_unique<MergingSortedBlockInputStream>(
src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE); src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE);
break; break;
case MergeTreeData::Collapsing: case MergeTreeData::MergingParams::Collapsing:
merged_stream = std::make_unique<CollapsingSortedBlockInputStream>( merged_stream = std::make_unique<CollapsingSortedBlockInputStream>(
src_streams, data.getSortDescription(), data.merging_params.sign_column, DEFAULT_MERGE_BLOCK_SIZE); src_streams, data.getSortDescription(), data.merging_params.sign_column, DEFAULT_MERGE_BLOCK_SIZE);
break; break;
case MergeTreeData::Summing: case MergeTreeData::MergingParams::Summing:
merged_stream = std::make_unique<SummingSortedBlockInputStream>( merged_stream = std::make_unique<SummingSortedBlockInputStream>(
src_streams, data.getSortDescription(), data.merging_params.columns_to_sum, DEFAULT_MERGE_BLOCK_SIZE); src_streams, data.getSortDescription(), data.merging_params.columns_to_sum, DEFAULT_MERGE_BLOCK_SIZE);
break; break;
case MergeTreeData::Aggregating: case MergeTreeData::MergingParams::Aggregating:
merged_stream = std::make_unique<AggregatingSortedBlockInputStream>( merged_stream = std::make_unique<AggregatingSortedBlockInputStream>(
src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE); src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE);
break; break;
case MergeTreeData::Replacing: /// TODO case MergeTreeData::MergingParams::Replacing: /// TODO
merged_stream = std::make_unique<MergingSortedBlockInputStream>( merged_stream = std::make_unique<MergingSortedBlockInputStream>(
src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE); src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE);
break; break;
case MergeTreeData::Unsorted: case MergeTreeData::MergingParams::Unsorted:
merged_stream = std::make_unique<ConcatBlockInputStream>(src_streams); merged_stream = std::make_unique<ConcatBlockInputStream>(src_streams);
break; break;
default: default:
throw Exception("Unknown mode of operation for MergeTreeData: " + toString(data.mode), ErrorCodes::LOGICAL_ERROR); throw Exception("Unknown mode of operation for MergeTreeData: " + toString(data.merging_params.mode), ErrorCodes::LOGICAL_ERROR);
} }
merged_stream->readPrefix(); merged_stream->readPrefix();

View File

@ -485,7 +485,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
{ {
RangesInDataPart ranges(part, (*inout_part_index)++); RangesInDataPart ranges(part, (*inout_part_index)++);
if (data.mode != MergeTreeData::Unsorted) if (data.merging_params.mode != MergeTreeData::MergingParams::Unsorted)
ranges.ranges = markRangesFromPKRange(part->index, key_condition, settings); ranges.ranges = markRangesFromPKRange(part->index, key_condition, settings);
else else
ranges.ranges = MarkRanges{MarkRange{0, part->size}}; ranges.ranges = MarkRanges{MarkRange{0, part->size}};
@ -815,30 +815,30 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongThreadsFinal
{ {
BlockInputStreamPtr merged; BlockInputStreamPtr merged;
switch (data.mode) switch (data.merging_params.mode)
{ {
case MergeTreeData::Ordinary: case MergeTreeData::MergingParams::Ordinary:
merged = new MergingSortedBlockInputStream(to_merge, data.getSortDescription(), max_block_size); merged = new MergingSortedBlockInputStream(to_merge, data.getSortDescription(), max_block_size);
break; break;
case MergeTreeData::Collapsing: case MergeTreeData::MergingParams::Collapsing:
merged = new CollapsingFinalBlockInputStream(to_merge, data.getSortDescription(), data.merging_params.sign_column); merged = new CollapsingFinalBlockInputStream(to_merge, data.getSortDescription(), data.merging_params.sign_column);
break; break;
case MergeTreeData::Summing: case MergeTreeData::MergingParams::Summing:
merged = new SummingSortedBlockInputStream(to_merge, merged = new SummingSortedBlockInputStream(to_merge,
data.getSortDescription(), data.merging_params.columns_to_sum, max_block_size); data.getSortDescription(), data.merging_params.columns_to_sum, max_block_size);
break; break;
case MergeTreeData::Aggregating: case MergeTreeData::MergingParams::Aggregating:
merged = new AggregatingSortedBlockInputStream(to_merge, data.getSortDescription(), max_block_size); merged = new AggregatingSortedBlockInputStream(to_merge, data.getSortDescription(), max_block_size);
break; break;
case MergeTreeData::Replacing: /// TODO case MergeTreeData::MergingParams::Replacing: /// TODO
merged = new MergingSortedBlockInputStream(to_merge, data.getSortDescription(), max_block_size); merged = new MergingSortedBlockInputStream(to_merge, data.getSortDescription(), max_block_size);
break; break;
case MergeTreeData::Unsorted: case MergeTreeData::MergingParams::Unsorted:
throw Exception("UnsortedMergeTree doesn't support FINAL", ErrorCodes::LOGICAL_ERROR); throw Exception("UnsortedMergeTree doesn't support FINAL", ErrorCodes::LOGICAL_ERROR);
} }

View File

@ -99,7 +99,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithDa
new_data_part->is_temp = true; new_data_part->is_temp = true;
/// Если для сортировки надо вычислить некоторые столбцы - делаем это. /// Если для сортировки надо вычислить некоторые столбцы - делаем это.
if (data.mode != MergeTreeData::Unsorted) if (data.merging_params.mode != MergeTreeData::MergingParams::Unsorted)
data.getPrimaryExpression()->execute(block); data.getPrimaryExpression()->execute(block);
SortDescription sort_descr = data.getSortDescription(); SortDescription sort_descr = data.getSortDescription();
@ -107,7 +107,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithDa
/// Сортируем. /// Сортируем.
IColumn::Permutation * perm_ptr = nullptr; IColumn::Permutation * perm_ptr = nullptr;
IColumn::Permutation perm; IColumn::Permutation perm;
if (data.mode != MergeTreeData::Unsorted) if (data.merging_params.mode != MergeTreeData::MergingParams::Unsorted)
{ {
if (!isAlreadySorted(block, sort_descr)) if (!isAlreadySorted(block, sort_descr))
{ {

View File

@ -352,12 +352,13 @@ StoragePtr StorageFactory::get(
} }
else if (endsWith(name, "MergeTree")) else if (endsWith(name, "MergeTree"))
{ {
/** Движки [Replicated][|Summing|Collapsing|Aggregating|Unsorted]MergeTree (2 * 5 комбинаций) /** Движки [Replicated][|Summing|Collapsing|Aggregating|Unsorted|Replacing]MergeTree (2 * 6 комбинаций)
* В качестве аргумента для движка должно быть указано: * В качестве аргумента для движка должно быть указано:
* - (для Replicated) Путь к таблице в ZooKeeper * - (для Replicated) Путь к таблице в ZooKeeper
* - (для Replicated) Имя реплики в ZooKeeper * - (для Replicated) Имя реплики в ZooKeeper
* - имя столбца с датой; * - имя столбца с датой;
* - (не обязательно) выражение для семплирования (запрос с SAMPLE x будет выбирать строки, у которых в этом столбце значение меньше, чем x*UINT32_MAX); * - (не обязательно) выражение для семплирования
* (запрос с SAMPLE x будет выбирать строки, у которых в этом столбце значение меньше, чем x * UINT32_MAX);
* - выражение для сортировки (либо скалярное выражение, либо tuple из нескольких); * - выражение для сортировки (либо скалярное выражение, либо tuple из нескольких);
* - index_granularity; * - index_granularity;
* - (для Collapsing) имя столбца, содержащего тип строчки с изменением "визита" (принимающего значения 1 и -1). * - (для Collapsing) имя столбца, содержащего тип строчки с изменением "визита" (принимающего значения 1 и -1).
@ -440,16 +441,17 @@ For further info please read the documentation: https://clickhouse.yandex-team.r
if (replicated) if (replicated)
name_part = name_part.substr(strlen("Replicated")); name_part = name_part.substr(strlen("Replicated"));
MergeTreeData::Mode mode = MergeTreeData::Ordinary; MergeTreeData::MergingParams merging_params;
merging_params.mode = MergeTreeData::MergingParams::Ordinary;
if (name_part == "Collapsing") if (name_part == "Collapsing")
mode = MergeTreeData::Collapsing; merging_params.mode = MergeTreeData::MergingParams::Collapsing;
else if (name_part == "Summing") else if (name_part == "Summing")
mode = MergeTreeData::Summing; merging_params.mode = MergeTreeData::MergingParams::Summing;
else if (name_part == "Aggregating") else if (name_part == "Aggregating")
mode = MergeTreeData::Aggregating; merging_params.mode = MergeTreeData::MergingParams::Aggregating;
else if (name_part == "Unsorted") else if (name_part == "Unsorted")
mode = MergeTreeData::Unsorted; merging_params.mode = MergeTreeData::MergingParams::Unsorted;
else if (!name_part.empty()) else if (!name_part.empty())
throw Exception("Unknown storage " + name + verbose_help, ErrorCodes::UNKNOWN_STORAGE); throw Exception("Unknown storage " + name + verbose_help, ErrorCodes::UNKNOWN_STORAGE);
@ -461,9 +463,9 @@ For further info please read the documentation: https://clickhouse.yandex-team.r
args = typeid_cast<ASTExpressionList &>(*args_func.at(0)).children; args = typeid_cast<ASTExpressionList &>(*args_func.at(0)).children;
/// NOTE Слегка запутанно. /// NOTE Слегка запутанно.
size_t num_additional_params = (replicated ? 2 : 0) + (mode == MergeTreeData::Collapsing); size_t num_additional_params = (replicated ? 2 : 0) + (merging_params.mode == MergeTreeData::MergingParams::Collapsing);
if (mode == MergeTreeData::Unsorted if (merging_params.mode == MergeTreeData::MergingParams::Unsorted
&& args.size() != num_additional_params + 2) && args.size() != num_additional_params + 2)
{ {
String params; String params;
@ -482,7 +484,8 @@ For further info please read the documentation: https://clickhouse.yandex-team.r
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
} }
if (mode != MergeTreeData::Summing && mode != MergeTreeData::Unsorted if (merging_params.mode != MergeTreeData::MergingParams::Summing
&& merging_params.mode != MergeTreeData::MergingParams::Unsorted
&& args.size() != num_additional_params + 3 && args.size() != num_additional_params + 3
&& args.size() != num_additional_params + 4) && args.size() != num_additional_params + 4)
{ {
@ -499,7 +502,7 @@ For further info please read the documentation: https://clickhouse.yandex-team.r
"\nprimary key expression," "\nprimary key expression,"
"\nindex granularity\n"; "\nindex granularity\n";
if (mode == MergeTreeData::Collapsing) if (merging_params.mode == MergeTreeData::MergingParams::Collapsing)
params += ", sign column"; params += ", sign column";
throw Exception("Storage " + name + " requires " throw Exception("Storage " + name + " requires "
@ -508,7 +511,7 @@ For further info please read the documentation: https://clickhouse.yandex-team.r
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
} }
if (mode == MergeTreeData::Summing if (merging_params.mode == MergeTreeData::MergingParams::Summing
&& args.size() != num_additional_params + 3 && args.size() != num_additional_params + 3
&& args.size() != num_additional_params + 4 && args.size() != num_additional_params + 4
&& args.size() != num_additional_params + 5) && args.size() != num_additional_params + 5)
@ -544,8 +547,6 @@ For further info please read the documentation: https://clickhouse.yandex-team.r
ASTPtr sampling_expression; ASTPtr sampling_expression;
UInt64 index_granularity; UInt64 index_granularity;
MergeTreeData::MergingParams merging_params;
if (replicated) if (replicated)
{ {
auto ast = typeid_cast<ASTLiteral *>(&*args[0]); auto ast = typeid_cast<ASTLiteral *>(&*args[0]);
@ -566,7 +567,7 @@ For further info please read the documentation: https://clickhouse.yandex-team.r
args.erase(args.begin(), args.begin() + 2); args.erase(args.begin(), args.begin() + 2);
} }
if (mode == MergeTreeData::Collapsing) if (merging_params.mode == MergeTreeData::MergingParams::Collapsing)
{ {
if (auto ast = typeid_cast<ASTIdentifier *>(&*args.back())) if (auto ast = typeid_cast<ASTIdentifier *>(&*args.back()))
merging_params.sign_column = ast->name; merging_params.sign_column = ast->name;
@ -575,7 +576,7 @@ For further info please read the documentation: https://clickhouse.yandex-team.r
args.pop_back(); args.pop_back();
} }
else if (mode == MergeTreeData::Summing) else if (merging_params.mode == MergeTreeData::MergingParams::Summing)
{ {
/// Если последний элемент - не index granularity (литерал), то это - список суммируемых столбцов. /// Если последний элемент - не index granularity (литерал), то это - список суммируемых столбцов.
if (!typeid_cast<const ASTLiteral *>(&*args.back())) if (!typeid_cast<const ASTLiteral *>(&*args.back()))
@ -599,7 +600,7 @@ For further info please read the documentation: https://clickhouse.yandex-team.r
else else
throw Exception(String("Date column name must be an unquoted string") + verbose_help, ErrorCodes::BAD_ARGUMENTS); throw Exception(String("Date column name must be an unquoted string") + verbose_help, ErrorCodes::BAD_ARGUMENTS);
if (mode != MergeTreeData::Unsorted) if (merging_params.mode != MergeTreeData::MergingParams::Unsorted)
primary_expr_list = extractPrimaryKey(args[1]); primary_expr_list = extractPrimaryKey(args[1]);
auto ast = typeid_cast<ASTLiteral *>(&*args.back()); auto ast = typeid_cast<ASTLiteral *>(&*args.back());
@ -613,14 +614,14 @@ For further info please read the documentation: https://clickhouse.yandex-team.r
zookeeper_path, replica_name, attach, data_path, database_name, table_name, zookeeper_path, replica_name, attach, data_path, database_name, table_name,
columns, materialized_columns, alias_columns, column_defaults, columns, materialized_columns, alias_columns, column_defaults,
context, primary_expr_list, date_column_name, context, primary_expr_list, date_column_name,
sampling_expression, index_granularity, mode, merging_params, sampling_expression, index_granularity, merging_params,
context.getMergeTreeSettings()); context.getMergeTreeSettings());
else else
return StorageMergeTree::create( return StorageMergeTree::create(
data_path, database_name, table_name, data_path, database_name, table_name,
columns, materialized_columns, alias_columns, column_defaults, columns, materialized_columns, alias_columns, column_defaults,
context, primary_expr_list, date_column_name, context, primary_expr_list, date_column_name,
sampling_expression, index_granularity, mode, merging_params, sampling_expression, index_granularity, merging_params,
context.getMergeTreeSettings()); context.getMergeTreeSettings());
} }
else else

View File

@ -32,7 +32,6 @@ StorageMergeTree::StorageMergeTree(
const String & date_column_name_, const String & date_column_name_,
const ASTPtr & sampling_expression_, /// nullptr, если семплирование не поддерживается. const ASTPtr & sampling_expression_, /// nullptr, если семплирование не поддерживается.
size_t index_granularity_, size_t index_granularity_,
MergeTreeData::Mode mode_,
const MergeTreeData::MergingParams & merging_params_, const MergeTreeData::MergingParams & merging_params_,
const MergeTreeSettings & settings_) const MergeTreeSettings & settings_)
: IStorage{materialized_columns_, alias_columns_, column_defaults_}, : IStorage{materialized_columns_, alias_columns_, column_defaults_},
@ -41,7 +40,7 @@ StorageMergeTree::StorageMergeTree(
data(full_path, columns_, data(full_path, columns_,
materialized_columns_, alias_columns_, column_defaults_, materialized_columns_, alias_columns_, column_defaults_,
context_, primary_expr_ast_, date_column_name_, context_, primary_expr_ast_, date_column_name_,
sampling_expression_, index_granularity_,mode_, merging_params_, sampling_expression_, index_granularity_, merging_params_,
settings_, database_name_ + "." + table_name, false), settings_, database_name_ + "." + table_name, false),
reader(data), writer(data), merger(data), reader(data), writer(data), merger(data),
increment(0), increment(0),
@ -84,7 +83,6 @@ StoragePtr StorageMergeTree::create(
const String & date_column_name_, const String & date_column_name_,
const ASTPtr & sampling_expression_, const ASTPtr & sampling_expression_,
size_t index_granularity_, size_t index_granularity_,
MergeTreeData::Mode mode_,
const MergeTreeData::MergingParams & merging_params_, const MergeTreeData::MergingParams & merging_params_,
const MergeTreeSettings & settings_) const MergeTreeSettings & settings_)
{ {
@ -92,7 +90,7 @@ StoragePtr StorageMergeTree::create(
path_, database_name_, table_name_, path_, database_name_, table_name_,
columns_, materialized_columns_, alias_columns_, column_defaults_, columns_, materialized_columns_, alias_columns_, column_defaults_,
context_, primary_expr_ast_, date_column_name_, context_, primary_expr_ast_, date_column_name_,
sampling_expression_, index_granularity_, mode_, merging_params_, settings_ sampling_expression_, index_granularity_, merging_params_, settings_
}; };
StoragePtr res_ptr = res->thisPtr(); StoragePtr res_ptr = res->thisPtr();

View File

@ -189,7 +189,6 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
const String & date_column_name_, const String & date_column_name_,
const ASTPtr & sampling_expression_, const ASTPtr & sampling_expression_,
size_t index_granularity_, size_t index_granularity_,
MergeTreeData::Mode mode_,
const MergeTreeData::MergingParams & merging_params_, const MergeTreeData::MergingParams & merging_params_,
const MergeTreeSettings & settings_) const MergeTreeSettings & settings_)
: IStorage{materialized_columns_, alias_columns_, column_defaults_}, context(context_), : IStorage{materialized_columns_, alias_columns_, column_defaults_}, context(context_),
@ -200,7 +199,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
data(full_path, columns_, data(full_path, columns_,
materialized_columns_, alias_columns_, column_defaults_, materialized_columns_, alias_columns_, column_defaults_,
context_, primary_expr_ast_, date_column_name_, context_, primary_expr_ast_, date_column_name_,
sampling_expression_, index_granularity_, mode_, merging_params_, sampling_expression_, index_granularity_, merging_params_,
settings_, database_name_ + "." + table_name, true, settings_, database_name_ + "." + table_name, true,
[this] (const std::string & name) { enqueuePartForCheck(name); }), [this] (const std::string & name) { enqueuePartForCheck(name); }),
reader(data), writer(data), merger(data), fetcher(data), sharded_partition_uploader_client(*this), reader(data), writer(data), merger(data), fetcher(data), sharded_partition_uploader_client(*this),
@ -273,7 +272,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
unreplicated_data.reset(new MergeTreeData(unreplicated_path, columns_, unreplicated_data.reset(new MergeTreeData(unreplicated_path, columns_,
materialized_columns_, alias_columns_, column_defaults_, materialized_columns_, alias_columns_, column_defaults_,
context_, primary_expr_ast_, context_, primary_expr_ast_,
date_column_name_, sampling_expression_, index_granularity_, mode_, merging_params_, settings_, date_column_name_, sampling_expression_, index_granularity_, merging_params_, settings_,
database_name_ + "." + table_name + "[unreplicated]", false)); database_name_ + "." + table_name + "[unreplicated]", false));
unreplicated_data->loadDataParts(skip_sanity_checks); unreplicated_data->loadDataParts(skip_sanity_checks);
@ -331,7 +330,6 @@ StoragePtr StorageReplicatedMergeTree::create(
const String & date_column_name_, const String & date_column_name_,
const ASTPtr & sampling_expression_, const ASTPtr & sampling_expression_,
size_t index_granularity_, size_t index_granularity_,
MergeTreeData::Mode mode_,
const MergeTreeData::MergingParams & merging_params_, const MergeTreeData::MergingParams & merging_params_,
const MergeTreeSettings & settings_) const MergeTreeSettings & settings_)
{ {
@ -340,7 +338,7 @@ StoragePtr StorageReplicatedMergeTree::create(
path_, database_name_, name_, path_, database_name_, name_,
columns_, materialized_columns_, alias_columns_, column_defaults_, columns_, materialized_columns_, alias_columns_, column_defaults_,
context_, primary_expr_ast_, date_column_name_, context_, primary_expr_ast_, date_column_name_,
sampling_expression_, index_granularity_, mode_, sampling_expression_, index_granularity_,
merging_params_, settings_}; merging_params_, settings_};
StoragePtr res_ptr = res->thisPtr(); StoragePtr res_ptr = res->thisPtr();
@ -415,7 +413,7 @@ namespace
<< "date column: " << data.date_column_name << "\n" << "date column: " << data.date_column_name << "\n"
<< "sampling expression: " << formattedAST(data.sampling_expression) << "\n" << "sampling expression: " << formattedAST(data.sampling_expression) << "\n"
<< "index granularity: " << data.index_granularity << "\n" << "index granularity: " << data.index_granularity << "\n"
<< "mode: " << static_cast<int>(data.mode) << "\n" << "mode: " << static_cast<int>(data.merging_params.mode) << "\n"
<< "sign column: " << data.merging_params.sign_column << "\n" << "sign column: " << data.merging_params.sign_column << "\n"
<< "primary key: " << formattedAST(data.primary_expr_ast) << "\n"; << "primary key: " << formattedAST(data.primary_expr_ast) << "\n";
} }
@ -466,9 +464,10 @@ namespace
int read_mode = 0; int read_mode = 0;
in >> read_mode; in >> read_mode;
if (read_mode != static_cast<int>(data.mode)) if (read_mode != static_cast<int>(data.merging_params.mode))
throw Exception("Existing table metadata in ZooKeeper differs in mode of merge operation." throw Exception("Existing table metadata in ZooKeeper differs in mode of merge operation."
" Stored in ZooKeeper: " + DB::toString(read_mode) + ", local: " + DB::toString(static_cast<int>(data.mode)), " Stored in ZooKeeper: " + DB::toString(read_mode) + ", local: "
+ DB::toString(static_cast<int>(data.merging_params.mode)),
ErrorCodes::METADATA_MISMATCH); ErrorCodes::METADATA_MISMATCH);
in >> "\nsign column: "; in >> "\nsign column: ";