Merge pull request #60041 from kitaisreal/parts-splitter-invalid-ranges-for-the-same-part

PartsSplitter invalid ranges for the same part
This commit is contained in:
Nikolai Kochetov 2024-03-01 12:45:43 +01:00 committed by GitHub
commit 258d8ab162
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 251 additions and 64 deletions

View File

@ -8,6 +8,10 @@
#include <Core/Field.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeMap.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypeVariant.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/TreeRewriter.h>
#include <Parsers/ASTFunction.h>
@ -30,6 +34,71 @@ std::string toString(const Values & value)
return fmt::format("({})", fmt::join(value, ", "));
}
bool isSafePrimaryDataKeyType(const IDataType & data_type)
{
auto type_id = data_type.getTypeId();
switch (type_id)
{
case TypeIndex::Float32:
case TypeIndex::Float64:
case TypeIndex::Nullable:
case TypeIndex::Object:
return false;
case TypeIndex::Array:
{
const auto & data_type_array = static_cast<const DataTypeArray &>(data_type);
return isSafePrimaryDataKeyType(*data_type_array.getNestedType());
}
case TypeIndex::Tuple:
{
const auto & data_type_tuple = static_cast<const DataTypeTuple &>(data_type);
const auto & data_type_tuple_elements = data_type_tuple.getElements();
for (const auto & data_type_tuple_element : data_type_tuple_elements)
if (!isSafePrimaryDataKeyType(*data_type_tuple_element))
return false;
return true;
}
case TypeIndex::LowCardinality:
{
const auto & data_type_low_cardinality = static_cast<const DataTypeLowCardinality &>(data_type);
return isSafePrimaryDataKeyType(*data_type_low_cardinality.getDictionaryType());
}
case TypeIndex::Map:
{
const auto & data_type_map = static_cast<const DataTypeMap &>(data_type);
return isSafePrimaryDataKeyType(*data_type_map.getKeyType()) && isSafePrimaryDataKeyType(*data_type_map.getValueType());
}
case TypeIndex::Variant:
{
const auto & data_type_variant = static_cast<const DataTypeVariant &>(data_type);
const auto & data_type_variant_elements = data_type_variant.getVariants();
for (const auto & data_type_variant_element : data_type_variant_elements)
if (!isSafePrimaryDataKeyType(*data_type_variant_element))
return false;
return false;
}
default:
{
break;
}
}
return true;
}
bool isSafePrimaryKey(const KeyDescription & primary_key)
{
for (const auto & type : primary_key.data_types)
{
if (!isSafePrimaryDataKeyType(*type))
return false;
}
return true;
}
int compareValues(const Values & lhs, const Values & rhs)
{
chassert(lhs.size() == rhs.size());
@ -117,8 +186,10 @@ public:
return findLeftmostMarkGreaterThanValueInRange(part_index, value, mark_range.begin, mark_range.end);
}
size_t getMarkRows(size_t part_idx, size_t mark) const { return parts[part_idx].data_part->index_granularity.getMarkRows(mark); }
size_t getMarkRows(size_t part_idx, size_t mark) const
{
return parts[part_idx].data_part->index_granularity.getMarkRows(mark);
}
private:
const RangesInDataParts & parts;
};
@ -174,23 +245,20 @@ struct PartsRangesIterator
else if (compare_result == 1)
return false;
if (part_index == other.part_index)
{
/// Within the same part we should process events in order of mark numbers,
/// because they already ordered by value and range ends have greater mark numbers than the beginnings.
/// Otherwise we could get invalid ranges with the right bound that is less than the left bound.
const auto ev_mark = event == EventType::RangeStart ? range.begin : range.end;
const auto other_ev_mark = other.event == EventType::RangeStart ? other.range.begin : other.range.end;
// Start event always before end event
if (ev_mark == other_ev_mark)
return event < other.event;
return ev_mark < other_ev_mark;
}
if (event == other.event)
{
if (part_index == other.part_index)
{
/// Within the same part we should process events in order of mark numbers,
/// because they already ordered by value and range ends have greater mark numbers than the beginnings.
/// Otherwise we could get invalid ranges with the right bound that is less than the left bound.
const auto ev_mark = event == EventType::RangeStart ? range.begin : range.end;
const auto other_ev_mark = other.event == EventType::RangeStart ? other.range.begin : other.range.end;
return ev_mark < other_ev_mark;
}
return part_index < other.part_index;
}
// Start event always before end event
return event < other.event;
@ -216,18 +284,82 @@ struct PartsRangesIterator
return true;
}
void dump(WriteBuffer & buffer) const
{
buffer << "Part index " << part_index;
buffer << " event " << (event == PartsRangesIterator::EventType::RangeStart ? "Range Start" : "Range End");
buffer << " range begin " << range.begin;
buffer << " end " << range.end;
buffer << " value " << ::toString(value) << '\n';
}
[[maybe_unused]] String toString() const
{
WriteBufferFromOwnString buffer;
dump(buffer);
return buffer.str();
}
Values value;
MarkRange range;
size_t part_index;
EventType event;
};
struct PartRangeIndex
{
explicit PartRangeIndex(PartsRangesIterator & ranges_iterator)
: part_index(ranges_iterator.part_index)
, range(ranges_iterator.range)
{}
bool operator==(const PartRangeIndex & other) const
{
return part_index == other.part_index && range.begin == other.range.begin && range.end == other.range.end;
}
bool operator<(const PartRangeIndex & other) const
{
return part_index < other.part_index && range.begin < other.range.begin && range.end < other.range.end;
}
size_t part_index;
MarkRange range;
};
struct PartRangeIndexHash
{
size_t operator()(const PartRangeIndex & part_range_index) const noexcept
{
size_t result = 0;
boost::hash_combine(result, part_range_index.part_index);
boost::hash_combine(result, part_range_index.range.begin);
boost::hash_combine(result, part_range_index.range.end);
return result;
}
};
struct SplitPartsRangesResult
{
RangesInDataParts non_intersecting_parts_ranges;
RangesInDataParts intersecting_parts_ranges;
};
void dump(const std::vector<PartsRangesIterator> & ranges_iterators, WriteBuffer & buffer)
{
for (const auto & range_iterator : ranges_iterators)
range_iterator.dump(buffer);
}
String toString(const std::vector<PartsRangesIterator> & ranges_iterators)
{
WriteBufferFromOwnString buffer;
dump(ranges_iterators, buffer);
return buffer.str();
}
SplitPartsRangesResult splitPartsRanges(RangesInDataParts ranges_in_data_parts, const LoggerPtr & logger)
{
/** Split ranges in data parts into intersecting ranges in data parts and non intersecting ranges in data parts.
@ -307,7 +439,11 @@ SplitPartsRangesResult splitPartsRanges(RangesInDataParts ranges_in_data_parts,
}
}
std::sort(parts_ranges.begin(), parts_ranges.end());
LOG_TEST(logger, "Parts ranges before sort {}", toString(parts_ranges));
::sort(parts_ranges.begin(), parts_ranges.end());
LOG_TEST(logger, "Parts ranges after sort {}", toString(parts_ranges));
RangesInDataPartsBuilder intersecting_ranges_in_data_parts_builder(ranges_in_data_parts);
RangesInDataPartsBuilder non_intersecting_ranges_in_data_parts_builder(ranges_in_data_parts);
@ -324,24 +460,27 @@ SplitPartsRangesResult splitPartsRanges(RangesInDataParts ranges_in_data_parts,
intersecting_ranges_in_data_parts_builder.addRange(part_index, mark_range);
};
std::unordered_map<size_t, MarkRange> part_index_start_to_range;
std::unordered_map<PartRangeIndex, MarkRange, PartRangeIndexHash> part_index_start_to_range;
chassert(!parts_ranges.empty());
chassert(parts_ranges[0].event == PartsRangesIterator::EventType::RangeStart);
part_index_start_to_range[parts_ranges[0].part_index] = parts_ranges[0].range;
part_index_start_to_range[PartRangeIndex(parts_ranges[0])] = parts_ranges[0].range;
size_t parts_ranges_size = parts_ranges.size();
for (size_t i = 1; i < parts_ranges_size; ++i)
{
auto & previous_part_range = parts_ranges[i - 1];
PartRangeIndex previous_part_range_index(previous_part_range);
auto & current_part_range = parts_ranges[i];
PartRangeIndex current_part_range_index(current_part_range);
size_t intersecting_parts = part_index_start_to_range.size();
bool range_start = current_part_range.event == PartsRangesIterator::EventType::RangeStart;
if (range_start)
{
auto [it, inserted] = part_index_start_to_range.emplace(current_part_range.part_index, current_part_range.range);
chassert(inserted);
auto [it, inserted] = part_index_start_to_range.emplace(current_part_range_index, current_part_range.range);
if (!inserted)
throw Exception(ErrorCodes::LOGICAL_ERROR, "PartsSplitter expected unique range");
if (intersecting_parts != 1)
continue;
@ -365,7 +504,7 @@ SplitPartsRangesResult splitPartsRanges(RangesInDataParts ranges_in_data_parts,
if (end - begin >= min_number_of_marks_for_non_intersecting_range)
{
part_index_start_to_range[previous_part_range.part_index].begin = end;
part_index_start_to_range[previous_part_range_index].begin = end;
add_non_intersecting_range(previous_part_range.part_index, MarkRange{begin, end});
}
@ -379,8 +518,10 @@ SplitPartsRangesResult splitPartsRanges(RangesInDataParts ranges_in_data_parts,
break;
}
chassert(other_interval_it != part_index_start_to_range.end());
size_t other_interval_part_index = other_interval_it->first;
if (!(other_interval_it != part_index_start_to_range.end() && other_interval_it != it))
throw Exception(ErrorCodes::LOGICAL_ERROR, "PartsSplitter expected single other interval");
size_t other_interval_part_index = other_interval_it->first.part_index;
MarkRange other_interval_range = other_interval_it->second;
/// If part level is 0, we must process whole other intersecting part because it can contain duplicate primary keys
@ -413,6 +554,7 @@ SplitPartsRangesResult splitPartsRanges(RangesInDataParts ranges_in_data_parts,
}
chassert(current_part_range.event == PartsRangesIterator::EventType::RangeEnd);
chassert(part_index_start_to_range.contains(current_part_range_index));
/** If there are more than 1 part ranges that we are currently processing
* that means that this part range is intersecting with other range.
@ -421,8 +563,8 @@ SplitPartsRangesResult splitPartsRanges(RangesInDataParts ranges_in_data_parts,
*/
if (intersecting_parts != 1 || ranges_in_data_parts[current_part_range.part_index].data_part->info.level == 0)
{
add_intersecting_range(current_part_range.part_index, part_index_start_to_range[current_part_range.part_index]);
part_index_start_to_range.erase(current_part_range.part_index);
add_intersecting_range(current_part_range.part_index, part_index_start_to_range[current_part_range_index]);
part_index_start_to_range.erase(current_part_range_index);
continue;
}
@ -432,13 +574,12 @@ SplitPartsRangesResult splitPartsRanges(RangesInDataParts ranges_in_data_parts,
chassert(current_part_range.range == previous_part_range.range);
/// Case 3 Range End after Range Start
non_intersecting_ranges_in_data_parts_builder.addRange(current_part_range.part_index, current_part_range.range);
part_index_start_to_range.erase(current_part_range.part_index);
add_non_intersecting_range(current_part_range.part_index, current_part_range.range);
part_index_start_to_range.erase(current_part_range_index);
continue;
}
chassert(previous_part_range.event == PartsRangesIterator::EventType::RangeEnd);
chassert(previous_part_range.part_index != current_part_range.part_index);
/// Case 4 Range End after Range End
std::optional<size_t> begin_optional = index_access.findLeftmostMarkGreaterThanValueInRange(current_part_range.part_index,
@ -449,25 +590,25 @@ SplitPartsRangesResult splitPartsRanges(RangesInDataParts ranges_in_data_parts,
if (begin_optional && end - *begin_optional >= min_number_of_marks_for_non_intersecting_range)
{
size_t begin = *begin_optional;
add_intersecting_range(current_part_range.part_index, MarkRange{part_index_start_to_range[current_part_range.part_index].begin, begin});
add_intersecting_range(current_part_range.part_index, MarkRange{part_index_start_to_range[current_part_range_index].begin, begin});
add_non_intersecting_range(current_part_range.part_index, MarkRange{begin, end});
}
else
{
add_intersecting_range(current_part_range.part_index, MarkRange{part_index_start_to_range[current_part_range.part_index].begin, end});
add_intersecting_range(current_part_range.part_index, MarkRange{part_index_start_to_range[current_part_range_index].begin, end});
}
part_index_start_to_range.erase(current_part_range.part_index);
part_index_start_to_range.erase(current_part_range_index);
}
/// Process parts ranges with undefined value at end mark
bool is_intersecting = part_index_start_to_range.size() > 1;
for (const auto & [part_index, mark_range] : part_index_start_to_range)
for (const auto & [part_range_index, mark_range] : part_index_start_to_range)
{
if (is_intersecting)
add_intersecting_range(part_index, mark_range);
add_intersecting_range(part_range_index.part_index, mark_range);
else
add_non_intersecting_range(part_index, mark_range);
add_non_intersecting_range(part_range_index.part_index, mark_range);
}
auto && non_intersecting_ranges_in_data_parts = std::move(non_intersecting_ranges_in_data_parts_builder.getCurrentRangesInDataParts());
@ -493,35 +634,42 @@ std::pair<std::vector<RangesInDataParts>, std::vector<Values>> splitIntersecting
size_t max_layers,
const LoggerPtr & logger)
{
// We will advance the iterator pointing to the mark with the smallest PK value until
// there will be not less than rows_per_layer rows in the current layer (roughly speaking).
// Then we choose the last observed value as the new border, so the current layer will consists
// of granules with values greater than the previous mark and less or equal than the new border.
/** We will advance the iterator pointing to the mark with the smallest PK value until
* there will be not less than rows_per_layer rows in the current layer (roughly speaking).
* Then we choose the last observed value as the new border, so the current layer will consists
* of granules with values greater than the previous mark and less or equal than the new border.
*
* We use PartRangeIndex to track currently processing ranges, because after sort, RangeStart event is always placed
* before Range End event and it is possible to encounter overlapping Range Start events for the same part.
*/
IndexAccess index_access(intersecting_ranges_in_data_parts);
std::priority_queue<PartsRangesIterator, std::vector<PartsRangesIterator>, std::greater<>> parts_ranges_queue;
using PartsRangesIteratorWithIndex = std::pair<PartsRangesIterator, PartRangeIndex>;
std::priority_queue<PartsRangesIteratorWithIndex, std::vector<PartsRangesIteratorWithIndex>, std::greater<>> parts_ranges_queue;
for (size_t part_index = 0; part_index < intersecting_ranges_in_data_parts.size(); ++part_index)
{
for (const auto & range : intersecting_ranges_in_data_parts[part_index].ranges)
{
const auto & index_granularity = intersecting_ranges_in_data_parts[part_index].data_part->index_granularity;
parts_ranges_queue.push(
{index_access.getValue(part_index, range.begin), range, part_index, PartsRangesIterator::EventType::RangeStart});
PartsRangesIterator parts_range_start{index_access.getValue(part_index, range.begin), range, part_index, PartsRangesIterator::EventType::RangeStart};
PartRangeIndex parts_range_start_index(parts_range_start);
parts_ranges_queue.push({std::move(parts_range_start), std::move(parts_range_start_index)});
const bool value_is_defined_at_end_mark = range.end < index_granularity.getMarksCount();
if (!value_is_defined_at_end_mark)
continue;
parts_ranges_queue.push(
{index_access.getValue(part_index, range.end), range, part_index, PartsRangesIterator::EventType::RangeEnd});
PartsRangesIterator parts_range_end{index_access.getValue(part_index, range.end), range, part_index, PartsRangesIterator::EventType::RangeEnd};
PartRangeIndex parts_range_end_index(parts_range_end);
parts_ranges_queue.push({std::move(parts_range_end), std::move(parts_range_end_index)});
}
}
/// The beginning of currently started (but not yet finished) range of marks of a part in the current layer.
std::unordered_map<size_t, size_t> current_part_range_begin;
std::unordered_map<PartRangeIndex, size_t, PartRangeIndexHash> current_part_range_begin;
/// The current ending of a range of marks of a part in the current layer.
std::unordered_map<size_t, size_t> current_part_range_end;
std::unordered_map<PartRangeIndex, size_t, PartRangeIndexHash> current_part_range_end;
/// Determine borders between layers.
std::vector<Values> borders;
@ -551,17 +699,19 @@ std::pair<std::vector<RangesInDataParts>, std::vector<Values>> splitIntersecting
{
// We're advancing iterators until a new value showed up.
Values last_value;
while (!parts_ranges_queue.empty() && (last_value.empty() || last_value == parts_ranges_queue.top().value))
while (!parts_ranges_queue.empty() && (last_value.empty() || last_value == parts_ranges_queue.top().first.value))
{
auto current = parts_ranges_queue.top();
auto [current, current_range_index] = parts_ranges_queue.top();
PartRangeIndex current_part_range_index(current);
parts_ranges_queue.pop();
const auto part_index = current.part_index;
if (current.event == PartsRangesIterator::EventType::RangeEnd)
{
current_layer_builder.addRange(part_index, MarkRange{current_part_range_begin[part_index], current.range.end});
current_part_range_begin.erase(part_index);
current_part_range_end.erase(part_index);
current_layer_builder.addRange(part_index, MarkRange{current_part_range_begin[current_range_index], current.range.end});
current_part_range_begin.erase(current_range_index);
current_part_range_end.erase(current_range_index);
continue;
}
@ -569,14 +719,14 @@ std::pair<std::vector<RangesInDataParts>, std::vector<Values>> splitIntersecting
rows_in_current_layer += index_access.getMarkRows(part_index, current.range.begin);
++marks_in_current_layer;
current_part_range_begin.try_emplace(part_index, current.range.begin);
current_part_range_end[part_index] = current.range.begin;
current_part_range_begin.try_emplace(current_range_index, current.range.begin);
current_part_range_end[current_range_index] = current.range.begin;
if (current.range.begin + 1 < current.range.end)
{
++current.range.begin;
current.value = index_access.getValue(part_index, current.range.begin);
parts_ranges_queue.push(std::move(current));
parts_ranges_queue.push({std::move(current), current_range_index});
}
}
@ -587,10 +737,10 @@ std::pair<std::vector<RangesInDataParts>, std::vector<Values>> splitIntersecting
borders.push_back(last_value);
}
for (const auto & [part_index, last_mark] : current_part_range_end)
for (const auto & [current_range_index, last_mark] : current_part_range_end)
{
current_layer_builder.addRange(part_index, MarkRange{current_part_range_begin[part_index], last_mark + 1});
current_part_range_begin[part_index] = current_part_range_end[part_index];
current_layer_builder.addRange(current_range_index.part_index, MarkRange{current_part_range_begin[current_range_index], last_mark + 1});
current_part_range_begin[current_range_index] = current_part_range_end[current_range_index];
}
result_layers.back() = std::move(current_layer_builder.getCurrentRangesInDataParts());
@ -658,10 +808,10 @@ ASTs buildFilters(const KeyDescription & primary_key, const std::vector<Values>
{
ASTPtr component_ast = std::make_shared<ASTLiteral>(values[i]);
auto decayed_type = removeNullable(removeLowCardinality(primary_key.data_types.at(i)));
// Values of some types (e.g. Date, DateTime) are stored in columns as numbers and we get them as just numbers from the index.
// So we need an explicit Cast for them.
if (isColumnedAsNumber(decayed_type->getTypeId()) && !isNumber(decayed_type->getTypeId()))
component_ast = makeASTFunction("cast", std::move(component_ast), std::make_shared<ASTLiteral>(decayed_type->getName()));
component_ast = makeASTFunction("cast", std::move(component_ast), std::make_shared<ASTLiteral>(decayed_type->getName()));
values_ast.push_back(std::move(component_ast));
}
@ -730,15 +880,18 @@ SplitPartsWithRangesByPrimaryKeyResult splitPartsWithRangesByPrimaryKey(
bool split_parts_ranges_into_intersecting_and_non_intersecting_final,
bool split_intersecting_parts_ranges_into_layers)
{
if (max_layers <= 1)
throw Exception(ErrorCodes::LOGICAL_ERROR, "max_layer should be greater than 1");
auto logger = getLogger("PartsSplitter");
SplitPartsWithRangesByPrimaryKeyResult result;
RangesInDataParts intersecting_parts_ranges = std::move(parts);
if (!isSafePrimaryKey(primary_key))
{
result.merging_pipes.emplace_back(in_order_reading_step_getter(intersecting_parts_ranges));
return result;
}
if (split_parts_ranges_into_intersecting_and_non_intersecting_final)
{
SplitPartsRangesResult split_result = splitPartsRanges(intersecting_parts_ranges, logger);
@ -752,6 +905,9 @@ SplitPartsWithRangesByPrimaryKeyResult splitPartsWithRangesByPrimaryKey(
return result;
}
if (max_layers <= 1)
throw Exception(ErrorCodes::LOGICAL_ERROR, "max_layer should be greater than 1");
auto && [layers, borders] = splitIntersectingPartsRangesIntoLayers(intersecting_parts_ranges, max_layers, logger);
auto filters = buildFilters(primary_key, borders);
result.merging_pipes.resize(layers.size());

View File

@ -0,0 +1,31 @@
DROP TABLE IF EXISTS test_table;
CREATE TABLE test_table
(
`eventType` String,
`timestamp` UInt64,
`key` UInt64
)
ENGINE = ReplacingMergeTree
PRIMARY KEY (eventType, timestamp)
ORDER BY (eventType, timestamp, key)
SETTINGS index_granularity = 1;
SYSTEM STOP MERGES test_table;
INSERT INTO test_table VALUES ('1', 1704472004759, 1), ('3', 1704153600000, 2), ('3', 1704153600000, 3), ('5', 1700161822134, 4);
INSERT INTO test_table VALUES ('1', 1704468357009, 1), ('3', 1704153600000, 2), ('3', 1704153600000, 3), ('5', 1701458520878, 4);
INSERT INTO test_table VALUES ('1', 1704470704762, 1), ('3', 1704153600000, 2), ('3', 1704153600000, 3), ('5', 1702609856302, 4);
SELECT eventType, timestamp, key FROM test_table
WHERE (eventType IN ('2', '4')) AND
((timestamp >= max2(toInt64('1698938519999'), toUnixTimestamp64Milli(now64() - toIntervalDay(90)))) AND
(timestamp <= (toInt64('1707143315452') - 1)));
SELECT eventType, timestamp, key FROM test_table FINAL
WHERE (eventType IN ('2', '4')) AND
((timestamp >= max2(toInt64('1698938519999'), toUnixTimestamp64Milli(now64() - toIntervalDay(90)))) AND
(timestamp <= (toInt64('1707143315452') - 1)));
DROP TABLE test_table;