generalize MinMax index [#CLICKHOUSE-3000]

This commit is contained in:
Alexey Zatelepin 2017-08-21 18:35:29 +03:00 committed by alexey-milovidov
parent 5471979f46
commit 645e4b7a53
10 changed files with 146 additions and 56 deletions

View File

@ -209,6 +209,36 @@ void MergeTreeData::initPartitionKey()
partition_expr = ExpressionAnalyzer(partition_expr_ast, context, nullptr, getColumnsList()).getActions(false);
for (const ASTPtr & ast : partition_expr_ast->children)
partition_expr_columns.emplace_back(ast->getColumnName());
const NamesAndTypesList & minmax_idx_columns_with_types = partition_expr->getRequiredColumnsWithTypes();
minmax_idx_expr = std::make_shared<ExpressionActions>(minmax_idx_columns_with_types, context.getSettingsRef());
minmax_idx_columns.clear();
minmax_idx_column_types.clear();
minmax_idx_sort_descr.clear();
for (const NameAndTypePair & column : minmax_idx_columns_with_types)
{
minmax_idx_columns.emplace_back(column.name);
minmax_idx_column_types.emplace_back(column.type);
minmax_idx_sort_descr.emplace_back(column.name, 1, 1);
}
bool encountered_date_column = false;
for (size_t i = 0; i < minmax_idx_column_types.size(); ++i)
{
if (typeid_cast<const DataTypeDate *>(minmax_idx_column_types[i].get()))
{
if (!encountered_date_column)
{
minmax_idx_date_column_pos = i;
encountered_date_column = true;
}
else
{
/// There is more than one Date column in partition key and we don't know which one to choose.
minmax_idx_date_column_pos = -1;
}
}
}
}
@ -1222,9 +1252,8 @@ MergeTreeData::DataPartsVector MergeTreeData::renameTempPartAndReplace(
if (increment)
part->info.min_block = part->info.max_block = increment->get();
// TODO V1-specific
String new_name = MergeTreePartInfo::getPartName(
part->minmax_idx.min_date, part->minmax_idx.max_date, part->info.min_block, part->info.max_block, part->info.level);
part->getMinDate(), part->getMaxDate(), part->info.min_block, part->info.max_block, part->info.level);
LOG_TRACE(log, "Renaming temporary part " << part->relative_path << " to " << new_name << ".");

View File

@ -484,6 +484,12 @@ public:
ExpressionActionsPtr partition_expr;
Names partition_expr_columns;
ExpressionActionsPtr minmax_idx_expr;
Names minmax_idx_columns;
DataTypes minmax_idx_column_types;
Int64 minmax_idx_date_column_pos = -1; /// In a common case minmax index includes a date column.
SortDescription minmax_idx_sort_descr; /// For use with PKCondition.
/// Limiting parallel sends per one table, used in DataPartsExchange
std::atomic_uint current_table_sends {0};

View File

@ -76,22 +76,24 @@ void MergeTreeDataMerger::FuturePart::assign(MergeTreeData::DataPartsVector part
parts = std::move(parts_);
UInt32 max_level = 0;
MinMaxIndex minmax_idx;
for (const auto & part : parts)
{
max_level = std::max(max_level, part->info.level);
minmax_idx.merge(part->minmax_idx);
}
part_info.partition_id = parts.front()->info.partition_id;
part_info.min_block = parts.front()->info.min_block;
part_info.max_block = parts.back()->info.max_block;
part_info.level = max_level + 1;
// TODO V1-specific
DayNum_t min_date = DayNum_t(std::numeric_limits<UInt16>::max());
DayNum_t max_date = DayNum_t(std::numeric_limits<UInt16>::min());
for (const auto & part : parts)
{
min_date = std::min(min_date, part->getMinDate());
max_date = std::max(max_date, part->getMaxDate());
}
name = MergeTreePartInfo::getPartName(
minmax_idx.min_date, minmax_idx.max_date,
part_info.min_block, part_info.max_block, part_info.level);
min_date, max_date, part_info.min_block, part_info.max_block, part_info.level);
}
MergeTreeDataMerger::MergeTreeDataMerger(MergeTreeData & data_, const BackgroundProcessingPool & pool_)
@ -1024,7 +1026,7 @@ MergeTreeData::PerShardDataParts MergeTreeDataMerger::reshardPartition(
rows_written += block.rows();
output_stream->write(block);
data_part->minmax_idx.update(*block.getByName(data.date_column_name).column);
data_part->minmax_idx.update(block, data.minmax_idx_columns);
merge_entry->rows_written = merged_stream->getProfileInfo().rows;
merge_entry->bytes_written_uncompressed = merged_stream->getProfileInfo().bytes;
@ -1068,11 +1070,8 @@ MergeTreeData::PerShardDataParts MergeTreeDataMerger::reshardPartition(
size_t shard_no = entry.first;
MergeTreeData::MutableDataPartPtr & part_from_shard = entry.second;
// TODO V1-specific
std::string new_name = MergeTreePartInfo::getPartName(
part_from_shard->minmax_idx.min_date,
part_from_shard->minmax_idx.max_date,
part_from_shard->getMinDate(), part_from_shard->getMaxDate(),
part_from_shard->info.min_block, part_from_shard->info.max_block, part_from_shard->info.level);
std::string new_relative_path = "reshard/" + toString(shard_no) + "/" + new_name;

View File

@ -14,7 +14,6 @@
#include <Storages/MergeTree/MergeTreeDataPart.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnsNumber.h>
#include <Poco/File.h>
#include <Poco/Path.h>
@ -287,22 +286,55 @@ const MergeTreeDataPartChecksums::Checksum * MergeTreeDataPart::tryGetBinChecksu
}
void MinMaxIndex::update(const IColumn & column)
void MinMaxIndex::update(const Block & block, const Names & column_names)
{
Field min_value;
Field max_value;
const auto & date_column = typeid_cast<const ColumnUInt16 &>(column);
date_column.getExtremes(min_value, max_value);
min_date = std::min(min_date, static_cast<DayNum_t>(get<UInt64>(min_value)));
max_date = std::max(max_date, static_cast<DayNum_t>(get<UInt64>(max_value)));
if (!initialized)
{
min_column_values.resize(column_names.size(), /* dont_init_elems = */ true);
max_column_values.resize(column_names.size(), /* dont_init_elems = */ true);
}
for (size_t i = 0; i < column_names.size(); ++i)
{
Field min_value;
Field max_value;
const ColumnWithTypeAndName & column = block.getByName(column_names[i]);
column.column->getExtremes(min_value, max_value);
if (!initialized)
{
new (min_column_values.place(i)) Field(min_value);
new (max_column_values.place(i)) Field(max_value);
}
else
{
min_column_values[i] = std::min(min_column_values[i], min_value);
max_column_values[i] = std::max(max_column_values[i], max_value);
}
}
initialized = true;
}
void MinMaxIndex::merge(const MinMaxIndex & other)
{
if (other.min_date < min_date)
min_date = other.min_date;
if (other.max_date > max_date)
max_date = other.max_date;
if (!other.initialized)
return;
if (!initialized)
{
min_column_values.assign(other.min_column_values);
max_column_values.assign(other.max_column_values);
initialized = true;
}
else
{
for (size_t i = 0; i < min_column_values.size(); ++i)
{
min_column_values[i] = std::min(min_column_values[i], other.min_column_values[i]);
max_column_values[i] = std::max(max_column_values[i], other.max_column_values[i]);
}
}
}
@ -407,6 +439,24 @@ String MergeTreeDataPart::getNameWithPrefix() const
}
DayNum_t MergeTreeDataPart::getMinDate() const
{
if (storage.minmax_idx_date_column_pos != -1)
return DayNum_t(minmax_idx.min_column_values[storage.minmax_idx_date_column_pos].get<UInt64>());
else
return DayNum_t();
}
DayNum_t MergeTreeDataPart::getMaxDate() const
{
if (storage.minmax_idx_date_column_pos != -1)
return DayNum_t(minmax_idx.max_column_values[storage.minmax_idx_date_column_pos].get<UInt64>());
else
return DayNum_t();
}
MergeTreeDataPart::~MergeTreeDataPart()
{
if (is_temp)
@ -608,9 +658,16 @@ void MergeTreeDataPart::loadIndex()
void MergeTreeDataPart::loadPartitionAndMinMaxIndex()
{
MergeTreePartInfo::parseMinMaxDatesFromPartName(name, minmax_idx.min_date, minmax_idx.max_date);
DayNum_t min_date;
DayNum_t max_date;
MergeTreePartInfo::parseMinMaxDatesFromPartName(name, min_date, max_date);
const auto & date_lut = DateLUT::instance();
partition = static_cast<UInt64>(date_lut.toNumYYYYMM(DayNum_t(minmax_idx.min_date)));
partition = Row(1, static_cast<UInt64>(date_lut.toNumYYYYMM(min_date)));
minmax_idx.min_column_values = Row(1, static_cast<UInt64>(min_date));
minmax_idx.max_column_values = Row(1, static_cast<UInt64>(max_date));
minmax_idx.initialized = true;
}
void MergeTreeDataPart::loadChecksums(bool require)

View File

@ -1,6 +1,7 @@
#pragma once
#include <Core/Row.h>
#include <Core/Block.h>
#include <Core/NamesAndTypes.h>
#include <Storages/MergeTree/MergeTreePartInfo.h>
#include <Columns/IColumn.h>
@ -81,11 +82,12 @@ struct MergeTreeDataPartChecksums
struct MinMaxIndex
{
void update(const IColumn & column);
void update(const Block & block, const Names & column_names);
void merge(const MinMaxIndex & other);
DayNum_t min_date = DayNum_t(std::numeric_limits<UInt16>::max());
DayNum_t max_date = DayNum_t(std::numeric_limits<UInt16>::min());
bool initialized = false;
Row min_column_values;
Row max_column_values;
};
@ -130,6 +132,9 @@ struct MergeTreeDataPart
bool contains(const MergeTreeDataPart & other) const { return info.contains(other.info); }
/// If the partition key includes date column (a common case), these functions will return min and max values for this column.
DayNum_t getMinDate() const;
DayNum_t getMaxDate() const;
MergeTreeData & storage;

View File

@ -202,9 +202,6 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
PKCondition key_condition(query_info, context, available_real_and_virtual_columns, sort_descr,
data.getPrimaryExpression());
PKCondition date_condition(query_info, context, available_real_and_virtual_columns,
SortDescription(1, SortColumnDescription(data.date_column_name, 1, 1)),
std::make_shared<ExpressionActions>(date_columns, settings));
if (settings.force_primary_key && key_condition.alwaysUnknownOrTrue())
{
@ -217,15 +214,17 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
throw Exception(exception_message.str(), ErrorCodes::INDEX_NOT_USED);
}
if (settings.force_index_by_date && date_condition.alwaysUnknownOrTrue())
PKCondition minmax_idx_condition(
query_info, context, available_real_and_virtual_columns,
data.minmax_idx_sort_descr, data.minmax_idx_expr);
if (settings.force_index_by_date && minmax_idx_condition.alwaysUnknownOrTrue())
throw Exception("Index by date (" + data.date_column_name + ") is not used and setting 'force_index_by_date' is set.",
ErrorCodes::INDEX_NOT_USED);
/// Select the parts in which there can be data that satisfy `date_condition` and that match the condition on `_part`,
/// Select the parts in which there can be data that satisfy `minmax_idx_condition` and that match the condition on `_part`,
/// as well as `max_block_number_to_read`.
{
const DataTypes data_types_date { std::make_shared<DataTypeDate>() };
auto prev_parts = parts;
parts.clear();
@ -234,11 +233,10 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
if (part_values.find(part->name) == part_values.end())
continue;
// TODO V1-specific
Field left = static_cast<UInt64>(part->minmax_idx.min_date);
Field right = static_cast<UInt64>(part->minmax_idx.max_date);
if (!date_condition.mayBeTrueInRange(1, &left, &right, data_types_date))
if (!minmax_idx_condition.mayBeTrueInRange(
data.minmax_idx_columns.size(),
&part->minmax_idx.min_column_values[0], &part->minmax_idx.max_column_values[0],
data.minmax_idx_column_types))
continue;
if (max_block_number_to_read && part->info.max_block > max_block_number_to_read)
@ -477,7 +475,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
}
LOG_DEBUG(log, "Key condition: " << key_condition.toString());
LOG_DEBUG(log, "Date condition: " << date_condition.toString());
LOG_DEBUG(log, "MinMax index condition: " << minmax_idx_condition.toString());
/// PREWHERE
ExpressionActionsPtr prewhere_actions;

View File

@ -132,11 +132,10 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPa
Int64 temp_index = data.insert_increment.get();
MinMaxIndex minmax_idx;
// TODO V1-specific
ColumnPlainPtrs minmax_columns{block.getByName(data.date_column_name).column.get()};
minmax_idx.update(minmax_columns);
DayNum_t min_date = minmax_idx.min_date;
DayNum_t max_date = minmax_idx.max_date;
minmax_idx.update(block, data.minmax_idx_columns);
DayNum_t min_date(minmax_idx.min_column_values[data.minmax_idx_date_column_pos].get<UInt64>());
DayNum_t max_date(minmax_idx.max_column_values[data.minmax_idx_date_column_pos].get<UInt64>());
const auto & date_lut = DateLUT::instance();
@ -152,7 +151,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPa
MergeTreeData::MutableDataPartPtr new_data_part = std::make_shared<MergeTreeData::DataPart>(
data, part_name, MergeTreePartInfo(new_partition_id, temp_index, temp_index, 0));
new_data_part->partition = std::move(block_with_partition.partition);
new_data_part->minmax_idx = minmax_idx;
new_data_part->minmax_idx = std::move(minmax_idx);
new_data_part->relative_path = TMP_PREFIX + part_name;
new_data_part->is_temp = true;

View File

@ -175,9 +175,8 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(zkutil::ZooKeeperPtr & zoo
part->info.max_block = block_number;
part->info.level = 0;
// TODO V1-specific
String part_name = MergeTreePartInfo::getPartName(
part->minmax_idx.min_date, part->minmax_idx.max_date, block_number, block_number, 0);
part->getMinDate(), part->getMaxDate(), block_number, block_number, 0);
part->name = part_name;

View File

@ -2568,8 +2568,6 @@ void StorageReplicatedMergeTree::alter(const AlterCommands & params,
/// The name of an imaginary part covering all possible parts in the specified partition with numbers in the range from zero to specified right bound.
static String getFakePartNameCoveringPartRange(const String & partition_id, UInt64 left, UInt64 right)
{
// TODO V1-specific
/// The date range is all month long.
const auto & lut = DateLUT::instance();
time_t start_time = lut.YYYYMMDDToDate(parse<UInt32>(partition_id + "01"));

View File

@ -225,8 +225,8 @@ BlockInputStreams StorageSystemParts::read(
/// For convenience, in returned refcount, don't add references that was due to local variables in this method: all_parts, active_parts.
block.getByPosition(i++).column->insert(part.use_count() - (active_parts.count(part) ? 2 : 1));
block.getByPosition(i++).column->insert(static_cast<UInt64>(part->minmax_idx.min_date));
block.getByPosition(i++).column->insert(static_cast<UInt64>(part->minmax_idx.max_date));
block.getByPosition(i++).column->insert(static_cast<UInt64>(part->getMinDate()));
block.getByPosition(i++).column->insert(static_cast<UInt64>(part->getMaxDate()));
block.getByPosition(i++).column->insert(part->info.min_block);
block.getByPosition(i++).column->insert(part->info.max_block);
block.getByPosition(i++).column->insert(static_cast<UInt64>(part->info.level));