Merge pull request #48633 from ClickHouse/vdimir/any_join_single_row

This commit is contained in:
vdimir 2023-06-02 15:05:13 +02:00 committed by GitHub
commit b1f58d765a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 86 additions and 14 deletions

View File

@ -543,13 +543,17 @@ namespace
template <typename Map, typename KeyGetter>
struct Inserter
{
static ALWAYS_INLINE void insertOne(const HashJoin & join, Map & map, KeyGetter & key_getter, Block * stored_block, size_t i,
static ALWAYS_INLINE bool insertOne(const HashJoin & join, Map & map, KeyGetter & key_getter, Block * stored_block, size_t i,
Arena & pool)
{
auto emplace_result = key_getter.emplaceKey(map, i, pool);
if (emplace_result.isInserted() || join.anyTakeLastRow())
{
new (&emplace_result.getMapped()) typename Map::mapped_type(stored_block, i);
return true;
}
return false;
}
static ALWAYS_INLINE void insertAll(const HashJoin &, Map & map, KeyGetter & key_getter, Block * stored_block, size_t i, Arena & pool)
@ -582,7 +586,7 @@ namespace
template <JoinStrictness STRICTNESS, typename KeyGetter, typename Map, bool has_null_map>
size_t NO_INLINE insertFromBlockImplTypeCase(
HashJoin & join, Map & map, size_t rows, const ColumnRawPtrs & key_columns,
const Sizes & key_sizes, Block * stored_block, ConstNullMapPtr null_map, UInt8ColumnDataPtr join_mask, Arena & pool)
const Sizes & key_sizes, Block * stored_block, ConstNullMapPtr null_map, UInt8ColumnDataPtr join_mask, Arena & pool, bool & is_inserted)
{
[[maybe_unused]] constexpr bool mapped_one = std::is_same_v<typename Map::mapped_type, RowRef>;
constexpr bool is_asof_join = STRICTNESS == JoinStrictness::Asof;
@ -593,10 +597,18 @@ namespace
auto key_getter = createKeyGetter<KeyGetter, is_asof_join>(key_columns, key_sizes);
/// For ALL and ASOF join always insert values
is_inserted = !mapped_one || is_asof_join;
for (size_t i = 0; i < rows; ++i)
{
if (has_null_map && (*null_map)[i])
{
/// nulls are not inserted into hash table,
/// keep them for RIGHT and FULL joins
is_inserted = true;
continue;
}
/// Check condition for right table from ON section
if (join_mask && !(*join_mask)[i])
@ -605,7 +617,7 @@ namespace
if constexpr (is_asof_join)
Inserter<Map, KeyGetter>::insertAsof(join, map, key_getter, stored_block, i, pool, *asof_column);
else if constexpr (mapped_one)
Inserter<Map, KeyGetter>::insertOne(join, map, key_getter, stored_block, i, pool);
is_inserted |= Inserter<Map, KeyGetter>::insertOne(join, map, key_getter, stored_block, i, pool);
else
Inserter<Map, KeyGetter>::insertAll(join, map, key_getter, stored_block, i, pool);
}
@ -616,32 +628,37 @@ namespace
template <JoinStrictness STRICTNESS, typename KeyGetter, typename Map>
size_t insertFromBlockImplType(
HashJoin & join, Map & map, size_t rows, const ColumnRawPtrs & key_columns,
const Sizes & key_sizes, Block * stored_block, ConstNullMapPtr null_map, UInt8ColumnDataPtr join_mask, Arena & pool)
const Sizes & key_sizes, Block * stored_block, ConstNullMapPtr null_map, UInt8ColumnDataPtr join_mask, Arena & pool, bool & is_inserted)
{
if (null_map)
return insertFromBlockImplTypeCase<STRICTNESS, KeyGetter, Map, true>(
join, map, rows, key_columns, key_sizes, stored_block, null_map, join_mask, pool);
join, map, rows, key_columns, key_sizes, stored_block, null_map, join_mask, pool, is_inserted);
else
return insertFromBlockImplTypeCase<STRICTNESS, KeyGetter, Map, false>(
join, map, rows, key_columns, key_sizes, stored_block, null_map, join_mask, pool);
join, map, rows, key_columns, key_sizes, stored_block, null_map, join_mask, pool, is_inserted);
}
template <JoinStrictness STRICTNESS, typename Maps>
size_t insertFromBlockImpl(
HashJoin & join, HashJoin::Type type, Maps & maps, size_t rows, const ColumnRawPtrs & key_columns,
const Sizes & key_sizes, Block * stored_block, ConstNullMapPtr null_map, UInt8ColumnDataPtr join_mask, Arena & pool)
const Sizes & key_sizes, Block * stored_block, ConstNullMapPtr null_map, UInt8ColumnDataPtr join_mask, Arena & pool, bool & is_inserted)
{
switch (type)
{
case HashJoin::Type::EMPTY: return 0;
case HashJoin::Type::CROSS: return 0; /// Do nothing. We have already saved block, and it is enough.
case HashJoin::Type::EMPTY:
[[fallthrough]];
case HashJoin::Type::CROSS:
/// Do nothing. We will only save block, and it is enough
is_inserted = true;
return 0;
#define M(TYPE) \
case HashJoin::Type::TYPE: \
return insertFromBlockImplType<STRICTNESS, typename KeyGetterForType<HashJoin::Type::TYPE, std::remove_reference_t<decltype(*maps.TYPE)>>::Type>(\
join, *maps.TYPE, rows, key_columns, key_sizes, stored_block, null_map, join_mask, pool); \
join, *maps.TYPE, rows, key_columns, key_sizes, stored_block, null_map, join_mask, pool, is_inserted); \
break;
APPLY_FOR_JOIN_VARIANTS(M)
#undef M
}
@ -816,6 +833,7 @@ bool HashJoin::addJoinedBlock(const Block & source_block_, bool check_limits)
}
}
bool is_inserted = false;
if (kind != JoinKind::Cross)
{
joinDispatch(kind, strictness, data->maps[onexpr_idx], [&](auto kind_, auto strictness_, auto & map)
@ -824,28 +842,35 @@ bool HashJoin::addJoinedBlock(const Block & source_block_, bool check_limits)
*this, data->type, map, rows, key_columns, key_sizes[onexpr_idx], stored_block, null_map,
/// If mask is false constant, rows are added to hashmap anyway. It's not a happy-flow, so this case is not optimized
join_mask_col.getData(),
data->pool);
data->pool, is_inserted);
if (multiple_disjuncts)
used_flags.reinit<kind_, strictness_>(stored_block);
else
else if (is_inserted)
/// Number of buckets + 1 value from zero storage
used_flags.reinit<kind_, strictness_>(size + 1);
});
}
if (!multiple_disjuncts && save_nullmap)
if (!multiple_disjuncts && save_nullmap && is_inserted)
{
data->blocks_nullmaps_allocated_size += null_map_holder->allocatedBytes();
data->blocks_nullmaps.emplace_back(stored_block, null_map_holder);
}
if (!multiple_disjuncts && not_joined_map)
if (!multiple_disjuncts && not_joined_map && is_inserted)
{
data->blocks_nullmaps_allocated_size += not_joined_map->allocatedBytes();
data->blocks_nullmaps.emplace_back(stored_block, std::move(not_joined_map));
}
if (!multiple_disjuncts && !is_inserted)
{
LOG_TRACE(log, "Skipping inserting block with {} rows", rows);
data->blocks_allocated_size -= stored_block->allocatedBytes();
data->blocks.pop_back();
}
if (!check_limits)
return true;

View File

@ -0,0 +1,6 @@
Join(ANY, LEFT, key) 0 1
Join(ANY, LEFT, key) 1 1
Join(ANY, LEFT, key) 1 1
1
1
1

View File

@ -0,0 +1,41 @@
DROP TABLE IF EXISTS join_test;
DROP TABLE IF EXISTS join_test_right;
CREATE TABLE join_test ( `key` UInt64, `value` UInt64 ) ENGINE = Join(ANY, LEFT, key);
-- Save table size before inserting any rows
CREATE TEMPORARY TABLE initial_table_size AS
SELECT engine_full, total_rows, total_bytes FROM system.tables WHERE (name = 'join_test') AND (database = currentDatabase());
-- Check that table size is less than 100K
SELECT engine_full, total_rows, total_bytes < 100_000 FROM initial_table_size;
INSERT INTO join_test (key, value) SELECT 1, number FROM numbers(1);
-- Save table size after inserting one row
CREATE TEMPORARY TABLE one_row_table_size AS
SELECT engine_full, total_rows, total_bytes FROM system.tables WHERE (name = 'join_test') AND (database = currentDatabase());
-- Check that table size is less than 2x after inserting one row
SELECT engine_full, total_rows, total_bytes < 2 * (SELECT total_bytes FROM initial_table_size) FROM one_row_table_size;
-- Insert some more rows with the same key
INSERT INTO join_test (key, value) SELECT 1, number FROM numbers(1);
INSERT INTO join_test (key, value) SELECT 1, number FROM numbers(10_000);
-- Check that rows with the same key are not duplicated
SELECT engine_full, total_rows, total_bytes == (SELECT total_bytes FROM one_row_table_size) FROM system.tables WHERE (name = 'join_test') AND (database = currentDatabase());
-- For RIGHT join we save all rows from the right table
CREATE TABLE join_test_right ( `key` UInt64, `value` UInt64 ) ENGINE = Join(ANY, RIGHT, key);
INSERT INTO join_test_right (key, value) SELECT 1, number FROM numbers(1);
INSERT INTO join_test_right (key, value) SELECT 1, number FROM numbers(1);
INSERT INTO join_test_right (key, value) SELECT 1, number FROM numbers(1);
SELECT count() == 3 FROM (SELECT 1 as key) t1 ANY RIGHT JOIN join_test_right ON t1.key = join_test_right.key;
INSERT INTO join_test_right (key, value) SELECT 1, number FROM numbers(7);
SELECT count() == 10 FROM (SELECT 1 as key) t1 ANY RIGHT JOIN join_test_right ON t1.key = join_test_right.key;
SELECT count() == 10 FROM (SELECT 2 as key) t1 ANY RIGHT JOIN join_test_right ON t1.key = join_test_right.key;
DROP TABLE IF EXISTS join_test;
DROP TABLE IF EXISTS join_test_right;