make nulls direction configuable for FullSortingMergeJoin(fix review comments)

This commit is contained in:
nemonlou 2024-03-12 09:55:02 +08:00
parent a1a4bd3514
commit 0b5fc743f2
4 changed files with 22 additions and 53 deletions

View File

@ -891,7 +891,6 @@ class IColumn;
M(Int64, ignore_cold_parts_seconds, 0, "Only available in ClickHouse Cloud. Exclude new data parts from SELECT queries until they're either pre-warmed (see cache_populated_by_fetch) or this many seconds old. Only for Replicated-/SharedMergeTree.", 0) \
M(Int64, prefer_warmed_unmerged_parts_seconds, 0, "Only available in ClickHouse Cloud. If a merged part is less than this many seconds old and is not pre-warmed (see cache_populated_by_fetch), but all its source parts are available and pre-warmed, SELECT queries will read from those parts instead. Only for ReplicatedMergeTree. Note that this only checks whether CacheWarmer processed the part; if the part was fetched into cache by something else, it'll still be considered cold until CacheWarmer gets to it; if it was warmed, then evicted from cache, it'll still be considered warm.", 0) \
M(Bool, iceberg_engine_ignore_schema_evolution, false, "Ignore schema evolution in Iceberg table engine and read all data using latest schema saved on table creation. Note that it can lead to incorrect result", 0) \
M(Bool, nulls_biggest_in_smj, true, "Treat nulls as biggest in sort. Used in sort merge join for compare null keys.", 0) \
// End of COMMON_SETTINGS
// Please add settings related to formats into the FORMAT_FACTORY_SETTINGS, move obsolete settings to OBSOLETE_SETTINGS and obsolete format settings to OBSOLETE_FORMAT_SETTINGS.

View File

@ -1693,10 +1693,9 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P
auto add_sorting = [&settings, this] (QueryPlan & plan, const Names & key_names, JoinTableSide join_pos)
{
SortDescription order_descr;
int nulls_direction = settings.nulls_biggest_in_smj ? 1 : -1;
order_descr.reserve(key_names.size());
for (const auto & key_name : key_names)
order_descr.emplace_back(key_name, 1, nulls_direction);
order_descr.emplace_back(key_name);
SortingStep::Settings sort_settings(*context);
@ -1762,15 +1761,13 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P
add_sorting(*joined_plan, join_clause.key_names_right, JoinTableSide::Right);
}
int null_direct_hint = settings.nulls_biggest_in_smj ? 1 : -1;
QueryPlanStepPtr join_step = std::make_unique<JoinStep>(
query_plan.getCurrentDataStream(),
joined_plan->getCurrentDataStream(),
expressions.join,
settings.max_block_size,
max_streams,
analysis_result.optimize_read_in_order,
null_direct_hint);
analysis_result.optimize_read_in_order);
join_step->setStepDescription(fmt::format("JOIN {}", expressions.join->pipelineType()));
std::vector<QueryPlanPtr> plans;

View File

@ -359,7 +359,7 @@ void MergeJoinAlgorithm::consume(Input & input, size_t source_num)
cursors[source_num]->setChunk(std::move(input.chunk));
}
template <JoinKind kind, NullOrder nullOrder>
template <JoinKind kind>
struct AllJoinImpl
{
constexpr static bool enabled = isInner(kind) || isLeft(kind) || isRight(kind) || isFull(kind);
@ -369,7 +369,8 @@ struct AllJoinImpl
size_t max_block_size,
PaddedPODArray<UInt64> & left_map,
PaddedPODArray<UInt64> & right_map,
std::unique_ptr<AllJoinState> & state)
std::unique_ptr<AllJoinState> & state,
int null_direction_hint)
{
right_map.clear();
right_map.reserve(max_block_size);
@ -385,7 +386,7 @@ struct AllJoinImpl
lpos = left_cursor->getRow();
rpos = right_cursor->getRow();
cmp = compareCursors(left_cursor.cursor, right_cursor.cursor, nullDirection(nullOrder));
cmp = compareCursors(left_cursor.cursor, right_cursor.cursor, null_direction_hint);
if (cmp == 0)
{
size_t lnum = nextDistinct(left_cursor.cursor);
@ -435,37 +436,19 @@ struct AllJoinImpl
}
};
template <template<JoinKind, NullOrder> class Impl, typename ... Args>
void dispatchKind(JoinKind kind, int null_direction_hint, Args && ... args)
template <template<JoinKind> class Impl, typename ... Args>
void dispatchKind(JoinKind kind, Args && ... args)
{
if (isSmall(null_direction_hint))
{
if (Impl<JoinKind::Inner, NullOrder::SMALLEST>::enabled && kind == JoinKind::Inner)
return Impl<JoinKind::Inner, NullOrder::SMALLEST>::join(std::forward<Args>(args)...);
else if (Impl<JoinKind::Left, NullOrder::SMALLEST>::enabled && kind == JoinKind::Left)
return Impl<JoinKind::Left, NullOrder::SMALLEST>::join(std::forward<Args>(args)...);
else if (Impl<JoinKind::Right, NullOrder::SMALLEST>::enabled && kind == JoinKind::Right)
return Impl<JoinKind::Right, NullOrder::SMALLEST>::join(std::forward<Args>(args)...);
else if (Impl<JoinKind::Full, NullOrder::SMALLEST>::enabled && kind == JoinKind::Full)
return Impl<JoinKind::Full, NullOrder::SMALLEST>::join(std::forward<Args>(args)...);
if (Impl<JoinKind::Inner>::enabled && kind == JoinKind::Inner)
return Impl<JoinKind::Inner>::join(std::forward<Args>(args)...);
else if (Impl<JoinKind::Left>::enabled && kind == JoinKind::Left)
return Impl<JoinKind::Left>::join(std::forward<Args>(args)...);
else if (Impl<JoinKind::Right>::enabled && kind == JoinKind::Right)
return Impl<JoinKind::Right>::join(std::forward<Args>(args)...);
else if (Impl<JoinKind::Full>::enabled && kind == JoinKind::Full)
return Impl<JoinKind::Full>::join(std::forward<Args>(args)...);
else
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Unsupported join kind: \"{}\"", kind);
}
else
{
if (Impl<JoinKind::Inner, NullOrder::BIGGEST>::enabled && kind == JoinKind::Inner)
return Impl<JoinKind::Inner, NullOrder::BIGGEST>::join(std::forward<Args>(args)...);
else if (Impl<JoinKind::Left, NullOrder::BIGGEST>::enabled && kind == JoinKind::Left)
return Impl<JoinKind::Left, NullOrder::BIGGEST>::join(std::forward<Args>(args)...);
else if (Impl<JoinKind::Right, NullOrder::BIGGEST>::enabled && kind == JoinKind::Right)
return Impl<JoinKind::Right, NullOrder::BIGGEST>::join(std::forward<Args>(args)...);
else if (Impl<JoinKind::Full, NullOrder::BIGGEST>::enabled && kind == JoinKind::Full)
return Impl<JoinKind::Full, NullOrder::BIGGEST>::join(std::forward<Args>(args)...);
else
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Unsupported join kind: \"{}\"", kind);
}
}
std::optional<MergeJoinAlgorithm::Status> MergeJoinAlgorithm::handleAllJoinState()
@ -538,7 +521,7 @@ MergeJoinAlgorithm::Status MergeJoinAlgorithm::allJoin(JoinKind kind)
{
PaddedPODArray<UInt64> idx_map[2];
dispatchKind<AllJoinImpl>(kind, null_direction_hint, *cursors[0], *cursors[1], max_block_size, idx_map[0], idx_map[1], all_join_state);
dispatchKind<AllJoinImpl>(kind, *cursors[0], *cursors[1], max_block_size, idx_map[0], idx_map[1], all_join_state, null_direction_hint);
assert(idx_map[0].size() == idx_map[1].size());
Chunk result;
@ -588,7 +571,7 @@ MergeJoinAlgorithm::Status MergeJoinAlgorithm::allJoin(JoinKind kind)
}
template <JoinKind kind, NullOrder order>
template <JoinKind kind>
struct AnyJoinImpl
{
constexpr static bool enabled = isInner(kind) || isLeft(kind) || isRight(kind);
@ -597,7 +580,8 @@ struct AnyJoinImpl
FullMergeJoinCursor & right_cursor,
PaddedPODArray<UInt64> & left_map,
PaddedPODArray<UInt64> & right_map,
AnyJoinState & state)
AnyJoinState & state,
int null_direction_hint)
{
assert(enabled);
@ -620,7 +604,7 @@ struct AnyJoinImpl
lpos = left_cursor->getRow();
rpos = right_cursor->getRow();
cmp = compareCursors(left_cursor.cursor, right_cursor.cursor, nullDirection(order));
cmp = compareCursors(left_cursor.cursor, right_cursor.cursor, null_direction_hint);
if (cmp == 0)
{
if constexpr (isLeftOrFull(kind))
@ -744,7 +728,7 @@ MergeJoinAlgorithm::Status MergeJoinAlgorithm::anyJoin(JoinKind kind)
PaddedPODArray<UInt64> idx_map[2];
size_t prev_pos[] = {current_left.getRow(), current_right.getRow()};
dispatchKind<AnyJoinImpl>(kind, null_direction_hint, *cursors[0], *cursors[1], idx_map[0], idx_map[1], any_join_state);
dispatchKind<AnyJoinImpl>(kind, *cursors[0], *cursors[1], idx_map[0], idx_map[1], any_join_state, null_direction_hint);
assert(idx_map[0].empty() || idx_map[1].empty() || idx_map[0].size() == idx_map[1].size());
size_t num_result_rows = std::max(idx_map[0].size(), idx_map[1].size());

View File

@ -220,17 +220,6 @@ private:
bool recieved_all_blocks = false;
};
/// Join method.
enum class NullOrder
{
SMALLEST, /// null is treated as smallest
BIGGEST /// null is treated as biggest
};
inline constexpr bool isSmall(int null_direction) { return null_direction == 1; }
inline constexpr int nullDirection(NullOrder order) {return order == NullOrder::SMALLEST ? 1 : -1;}
/*
* This class is used to join chunks from two sorted streams.
* It is used in MergeJoinTransform.