diff --git a/dbms/src/DataStreams/GraphiteRollupSortedBlockInputStream.cpp b/dbms/src/DataStreams/GraphiteRollupSortedBlockInputStream.cpp index 2606658e828..5da53d8eea5 100644 --- a/dbms/src/DataStreams/GraphiteRollupSortedBlockInputStream.cpp +++ b/dbms/src/DataStreams/GraphiteRollupSortedBlockInputStream.cpp @@ -182,10 +182,12 @@ void GraphiteRollupSortedBlockInputStream::merge(MutableColumns & merged_columns /// Within all rows with same key, we should leave only one row with maximum version; /// and for rows with same maximum version - only last row. - UInt64 next_version = next_cursor->all_columns[version_column_num]->get64(next_cursor->pos); - if (is_new_key || next_version >= current_subgroup_max_version) + if (is_new_key + || next_cursor->all_columns[version_column_num]->compareAt( + next_cursor->pos, current_subgroup_newest_row.row_num, + *(*current_subgroup_newest_row.columns)[version_column_num], + /* nan_direction_hint = */ 1) >= 0) { - current_subgroup_max_version = next_version; setRowRef(current_subgroup_newest_row, next_cursor); /// Small hack: group and subgroups have the same path, so we can set current_group_path here instead of startNextGroup @@ -244,7 +246,8 @@ void GraphiteRollupSortedBlockInputStream::finishCurrentGroup(MutableColumns & m { /// Insert calculated values of the columns `time`, `value`, `version`. merged_columns[time_column_num]->insert(UInt64(current_time_rounded)); - merged_columns[version_column_num]->insert(current_subgroup_max_version); + merged_columns[version_column_num]->insertFrom( + *(*current_subgroup_newest_row.columns)[version_column_num], current_subgroup_newest_row.row_num); if (aggregate_state_created) { diff --git a/dbms/src/DataStreams/GraphiteRollupSortedBlockInputStream.h b/dbms/src/DataStreams/GraphiteRollupSortedBlockInputStream.h index 99d9466408d..c256d27064d 100644 --- a/dbms/src/DataStreams/GraphiteRollupSortedBlockInputStream.h +++ b/dbms/src/DataStreams/GraphiteRollupSortedBlockInputStream.h @@ -184,7 +184,6 @@ private: /// Last row with maximum version for current primary key (time bucket). RowRef current_subgroup_newest_row; - UInt64 current_subgroup_max_version = 0; /// Time of last read row time_t current_time = 0; diff --git a/dbms/src/DataStreams/MergingSortedBlockInputStream.h b/dbms/src/DataStreams/MergingSortedBlockInputStream.h index a8e0c4bf488..6391f52dcd5 100644 --- a/dbms/src/DataStreams/MergingSortedBlockInputStream.h +++ b/dbms/src/DataStreams/MergingSortedBlockInputStream.h @@ -105,6 +105,12 @@ protected: return !(*this == other); } + void reset() + { + RowRef empty; + swap(empty); + } + bool empty() const { return columns == nullptr; } size_t size() const { return empty() ? 0 : columns->size(); } }; diff --git a/dbms/src/DataStreams/ReplacingSortedBlockInputStream.cpp b/dbms/src/DataStreams/ReplacingSortedBlockInputStream.cpp index 96ac8a98355..197fac9c22d 100644 --- a/dbms/src/DataStreams/ReplacingSortedBlockInputStream.cpp +++ b/dbms/src/DataStreams/ReplacingSortedBlockInputStream.cpp @@ -73,10 +73,6 @@ void ReplacingSortedBlockInputStream::merge(MutableColumns & merged_columns, std if (current_key.empty()) setPrimaryKeyRef(current_key, current); - UInt64 version = version_column_number != -1 - ? current->all_columns[version_column_number]->get64(current->pos) - : 0; - setPrimaryKeyRef(next_key, current); bool key_differs = next_key != current_key; @@ -89,9 +85,9 @@ void ReplacingSortedBlockInputStream::merge(MutableColumns & merged_columns, std if (key_differs) { - max_version = 0; /// Write the data for the previous primary key. insertRow(merged_columns, merged_rows); + selected_row.reset(); current_key.swap(next_key); } @@ -101,9 +97,13 @@ void ReplacingSortedBlockInputStream::merge(MutableColumns & merged_columns, std current_row_sources.emplace_back(current.impl->order, true); /// A non-strict comparison, since we select the last row for the same version values. - if (version >= max_version) + if (version_column_number == -1 + || selected_row.empty() + || current->all_columns[version_column_number]->compareAt( + current->pos, selected_row.row_num, + *(*selected_row.columns)[version_column_number], + /* nan_direction_hint = */ 1) >= 0) { - max_version = version; max_pos = current_pos; setRowRef(selected_row, current); } diff --git a/dbms/src/DataStreams/ReplacingSortedBlockInputStream.h b/dbms/src/DataStreams/ReplacingSortedBlockInputStream.h index e64100b2207..b8592a0e5b6 100644 --- a/dbms/src/DataStreams/ReplacingSortedBlockInputStream.h +++ b/dbms/src/DataStreams/ReplacingSortedBlockInputStream.h @@ -43,8 +43,6 @@ private: RowRef next_key; /// Last row with maximum version for current primary key. RowRef selected_row; - /// Max version for current primary key. - UInt64 max_version = 0; /// The position (into current_row_sources) of the row with the highest version. size_t max_pos = 0; diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.cpp b/dbms/src/Storages/MergeTree/MergeTreeData.cpp index db830d77bf5..6a8b2f5deed 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeData.cpp @@ -111,7 +111,8 @@ MergeTreeData::MergeTreeData( data_parts_by_info(data_parts_indexes.get()), data_parts_by_state_and_info(data_parts_indexes.get()) { - merging_params.check(columns); + /// NOTE: using the same columns list as is read when performing actual merges. + merging_params.check(getColumnsList()); if (!primary_expr_ast) throw Exception("Primary key cannot be empty", ErrorCodes::BAD_ARGUMENTS); @@ -355,10 +356,11 @@ void MergeTreeData::MergingParams::check(const NamesAndTypesList & columns) cons { if (column.name == version_column) { - if (!column.type->isUnsignedInteger() && !column.type->isDateOrDateTime()) - throw Exception("Version column (" + version_column + ")" - " for storage " + storage + " must have type of UInt family or Date or DateTime." - " Provided column of type " + column.type->getName() + ".", ErrorCodes::BAD_TYPE_OF_FIELD); + if (!column.type->canBeUsedAsVersion()) + throw Exception("The column " + version_column + + " cannot be used as a version column for storage " + storage + + " because it is of type " + column.type->getName() + + " (must use an integer or Date or DateTime)", ErrorCodes::BAD_TYPE_OF_FIELD); miss_column = false; break; }