GraphiteMergeTree: fixed handling of 'version' column [#CLICKHOUSE-2804].

This commit is contained in:
Alexey Milovidov 2017-02-08 00:20:28 +03:00
parent 083e9cc3aa
commit f0ca756f52
4 changed files with 67 additions and 48 deletions

View File

@ -179,15 +179,16 @@ private:
/// All data has been read.
bool finished = false;
/// This object owns a column while 'current_path' references to it.
SharedBlockPtr owned_current_block;
RowRef selected_row; /// Last row with maximum version for current primary key.
UInt64 current_max_version = 0;
bool is_first = true;
StringRef current_path;
time_t current_time;
time_t current_time = 0;
time_t current_time_rounded = 0;
StringRef next_path;
time_t next_time;
UInt64 current_max_version = 0;
time_t next_time = 0;
time_t next_time_rounded = 0;
const Graphite::Pattern * current_pattern = nullptr;
std::vector<char> place_for_aggregate_state;
@ -207,8 +208,7 @@ private:
void finishCurrentRow(ColumnPlainPtrs & merged_columns);
/// Обновить состояние агрегатной функции новым значением value.
template <class TSortCursor>
void accumulateRow(TSortCursor & cursor);
void accumulateRow(RowRef & row);
};
}

View File

@ -51,15 +51,15 @@ private:
Logger * log = &Logger::get("ReplacingSortedBlockInputStream");
/// Прочитали до конца.
/// All data has been read.
bool finished = false;
RowRef current_key; /// Текущий первичный ключ.
RowRef next_key; /// Первичный ключ следующей строки.
RowRef current_key; /// Primary key of current row.
RowRef next_key; /// Primary key of next row.
RowRef selected_row; /// Последняя строка с максимальной версией для текущего первичного ключа.
RowRef selected_row; /// Last row with maximum version for current primary key.
UInt64 max_version = 0; /// Максимальная версия для текущего первичного ключа.
UInt64 max_version = 0; /// Max version for current primary key.
template<class TSortCursor>
void merge(ColumnPlainPtrs & merged_columns, std::priority_queue<TSortCursor> & queue);

View File

@ -93,6 +93,9 @@ Block GraphiteRollupSortedBlockInputStream::readImpl()
for (size_t i = 0; i < num_columns; ++i)
if (i != time_column_num && i != value_column_num && i != version_column_num)
unmodified_column_numbers.push_back(i);
if (selected_row.empty())
selected_row.columns.resize(num_columns);
}
if (has_collation)
@ -123,19 +126,29 @@ void GraphiteRollupSortedBlockInputStream::merge(ColumnPlainPtrs & merged_column
bool path_differs = is_first || next_path != current_path;
is_first = false;
/// Is new key before rounding.
bool is_new_key = path_differs || next_time != current_time;
UInt64 current_version = current->all_columns[version_column_num]->get64(current->pos);
if (is_new_key)
{
current_max_version = 0;
if (path_differs)
current_pattern = selectPatternForPath(next_path);
if (current_pattern)
{
UInt32 precision = selectPrecision(current_pattern->retentions, next_time);
next_time = roundTimeToPrecision(date_lut, next_time, precision);
next_time_rounded = roundTimeToPrecision(date_lut, next_time, precision);
}
/// If no patterns has matched - it means that no need to do rounding.
bool is_new_key = path_differs || next_time != current_time;
/// Key will be new after rounding.
bool will_be_new_key = path_differs || next_time_rounded != current_time_rounded;
if (is_new_key)
if (will_be_new_key)
{
if (merged_rows)
{
@ -148,10 +161,9 @@ void GraphiteRollupSortedBlockInputStream::merge(ColumnPlainPtrs & merged_column
startNextRow(merged_columns, current);
owned_current_block = source_blocks[current.impl->order];
current_path = next_path;
current_time = next_time;
current_max_version = 0;
current_time_rounded = next_time_rounded;
if (prev_pattern)
prev_pattern->function->destroy(place_for_aggregate_state.data());
@ -161,8 +173,16 @@ void GraphiteRollupSortedBlockInputStream::merge(ColumnPlainPtrs & merged_column
++merged_rows;
}
accumulateRow(current);
current_max_version = std::max(current_max_version, current->all_columns[version_column_num]->get64(current->pos));
accumulateRow(selected_row);
current_max_version = current_version;
}
else if (current_version >= current_max_version)
{
/// Within all rows with same key, we should leave only one row with maximum version;
/// and for rows with same maximum version - only last row.
current_max_version = current_version;
setRowRef(selected_row, current);
}
queue.pop();
@ -210,7 +230,7 @@ void GraphiteRollupSortedBlockInputStream::startNextRow(ColumnPlainPtrs & merged
void GraphiteRollupSortedBlockInputStream::finishCurrentRow(ColumnPlainPtrs & merged_columns)
{
/// Вставляем вычисленные значения столбцов time, value, version.
merged_columns[time_column_num]->insert(UInt64(current_time));
merged_columns[time_column_num]->insert(UInt64(current_time_rounded));
merged_columns[version_column_num]->insert(current_max_version);
if (current_pattern)
@ -218,11 +238,10 @@ void GraphiteRollupSortedBlockInputStream::finishCurrentRow(ColumnPlainPtrs & me
}
template <class TSortCursor>
void GraphiteRollupSortedBlockInputStream::accumulateRow(TSortCursor & cursor)
void GraphiteRollupSortedBlockInputStream::accumulateRow(RowRef & row)
{
if (current_pattern)
current_pattern->function->add(place_for_aggregate_state.data(), &cursor->all_columns[value_column_num], cursor->pos, nullptr);
current_pattern->function->add(place_for_aggregate_state.data(), &row.columns[value_column_num], row.row_num, nullptr);
}
}

View File

@ -29,7 +29,7 @@ Block ReplacingSortedBlockInputStream::readImpl()
if (merged_columns.empty())
return Block();
/// Дополнительная инициализация.
/// Additional initialization.
if (selected_row.empty())
{
selected_row.columns.resize(num_columns);