Merge pull request #2000 from yandex/negative-versions

Fix version column checks, support negative versions
This commit is contained in:
alexey-milovidov 2018-03-06 20:25:18 +03:00 committed by GitHub
commit 71e73c41ac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 27 additions and 19 deletions

View File

@ -182,10 +182,12 @@ void GraphiteRollupSortedBlockInputStream::merge(MutableColumns & merged_columns
/// Within all rows with same key, we should leave only one row with maximum version;
/// and for rows with same maximum version - only last row.
UInt64 next_version = next_cursor->all_columns[version_column_num]->get64(next_cursor->pos);
if (is_new_key || next_version >= current_subgroup_max_version)
if (is_new_key
|| next_cursor->all_columns[version_column_num]->compareAt(
next_cursor->pos, current_subgroup_newest_row.row_num,
*(*current_subgroup_newest_row.columns)[version_column_num],
/* nan_direction_hint = */ 1) >= 0)
{
current_subgroup_max_version = next_version;
setRowRef(current_subgroup_newest_row, next_cursor);
/// Small hack: group and subgroups have the same path, so we can set current_group_path here instead of startNextGroup
@ -244,7 +246,8 @@ void GraphiteRollupSortedBlockInputStream::finishCurrentGroup(MutableColumns & m
{
/// Insert calculated values of the columns `time`, `value`, `version`.
merged_columns[time_column_num]->insert(UInt64(current_time_rounded));
merged_columns[version_column_num]->insert(current_subgroup_max_version);
merged_columns[version_column_num]->insertFrom(
*(*current_subgroup_newest_row.columns)[version_column_num], current_subgroup_newest_row.row_num);
if (aggregate_state_created)
{

View File

@ -184,7 +184,6 @@ private:
/// Last row with maximum version for current primary key (time bucket).
RowRef current_subgroup_newest_row;
UInt64 current_subgroup_max_version = 0;
/// Time of last read row
time_t current_time = 0;

View File

@ -105,6 +105,12 @@ protected:
return !(*this == other);
}
void reset()
{
RowRef empty;
swap(empty);
}
bool empty() const { return columns == nullptr; }
size_t size() const { return empty() ? 0 : columns->size(); }
};

View File

@ -73,10 +73,6 @@ void ReplacingSortedBlockInputStream::merge(MutableColumns & merged_columns, std
if (current_key.empty())
setPrimaryKeyRef(current_key, current);
UInt64 version = version_column_number != -1
? current->all_columns[version_column_number]->get64(current->pos)
: 0;
setPrimaryKeyRef(next_key, current);
bool key_differs = next_key != current_key;
@ -89,9 +85,9 @@ void ReplacingSortedBlockInputStream::merge(MutableColumns & merged_columns, std
if (key_differs)
{
max_version = 0;
/// Write the data for the previous primary key.
insertRow(merged_columns, merged_rows);
selected_row.reset();
current_key.swap(next_key);
}
@ -101,9 +97,13 @@ void ReplacingSortedBlockInputStream::merge(MutableColumns & merged_columns, std
current_row_sources.emplace_back(current.impl->order, true);
/// A non-strict comparison, since we select the last row for the same version values.
if (version >= max_version)
if (version_column_number == -1
|| selected_row.empty()
|| current->all_columns[version_column_number]->compareAt(
current->pos, selected_row.row_num,
*(*selected_row.columns)[version_column_number],
/* nan_direction_hint = */ 1) >= 0)
{
max_version = version;
max_pos = current_pos;
setRowRef(selected_row, current);
}

View File

@ -43,8 +43,6 @@ private:
RowRef next_key;
/// Last row with maximum version for current primary key.
RowRef selected_row;
/// Max version for current primary key.
UInt64 max_version = 0;
/// The position (into current_row_sources) of the row with the highest version.
size_t max_pos = 0;

View File

@ -111,7 +111,8 @@ MergeTreeData::MergeTreeData(
data_parts_by_info(data_parts_indexes.get<TagByInfo>()),
data_parts_by_state_and_info(data_parts_indexes.get<TagByStateAndInfo>())
{
merging_params.check(columns);
/// NOTE: using the same columns list as is read when performing actual merges.
merging_params.check(getColumnsList());
if (!primary_expr_ast)
throw Exception("Primary key cannot be empty", ErrorCodes::BAD_ARGUMENTS);
@ -355,10 +356,11 @@ void MergeTreeData::MergingParams::check(const NamesAndTypesList & columns) cons
{
if (column.name == version_column)
{
if (!column.type->isUnsignedInteger() && !column.type->isDateOrDateTime())
throw Exception("Version column (" + version_column + ")"
" for storage " + storage + " must have type of UInt family or Date or DateTime."
" Provided column of type " + column.type->getName() + ".", ErrorCodes::BAD_TYPE_OF_FIELD);
if (!column.type->canBeUsedAsVersion())
throw Exception("The column " + version_column +
" cannot be used as a version column for storage " + storage +
" because it is of type " + column.type->getName() +
" (must use an integer or Date or DateTime)", ErrorCodes::BAD_TYPE_OF_FIELD);
miss_column = false;
break;
}