mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 23:21:59 +00:00
Add join_overwrite settings for StorageJoin.
This makes using StorageJoin as dictionary more flexible.
This commit is contained in:
parent
a0d8743c4c
commit
8ea4d7987b
@ -1,3 +1,5 @@
|
||||
#include <array>
|
||||
#include <common/constexpr_helpers.h>
|
||||
#include <common/logger_useful.h>
|
||||
|
||||
#include <Columns/ColumnConst.h>
|
||||
@ -32,10 +34,11 @@ namespace ErrorCodes
|
||||
|
||||
|
||||
Join::Join(const Names & key_names_right_, bool use_nulls_, const SizeLimits & limits,
|
||||
ASTTableJoin::Kind kind_, ASTTableJoin::Strictness strictness_)
|
||||
ASTTableJoin::Kind kind_, ASTTableJoin::Strictness strictness_, bool overwrite_)
|
||||
: kind(kind_), strictness(strictness_),
|
||||
key_names_right(key_names_right_),
|
||||
use_nulls(use_nulls_),
|
||||
overwrite(overwrite_),
|
||||
log(&Logger::get("Join")),
|
||||
limits(limits)
|
||||
{
|
||||
@ -177,21 +180,8 @@ void Join::init(Type type_)
|
||||
|
||||
if (kind == ASTTableJoin::Kind::Cross)
|
||||
return;
|
||||
|
||||
if (!getFullness(kind))
|
||||
{
|
||||
if (strictness == ASTTableJoin::Strictness::Any)
|
||||
initImpl(maps_any, type);
|
||||
else
|
||||
initImpl(maps_all, type);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (strictness == ASTTableJoin::Strictness::Any)
|
||||
initImpl(maps_any_full, type);
|
||||
else
|
||||
initImpl(maps_all_full, type);
|
||||
}
|
||||
dispatch(MapInitTag());
|
||||
dispatch([&](auto, auto, auto & map) { initImpl(map, type); });
|
||||
}
|
||||
|
||||
size_t Join::getTotalRowCount() const
|
||||
@ -205,10 +195,7 @@ size_t Join::getTotalRowCount() const
|
||||
}
|
||||
else
|
||||
{
|
||||
res += getTotalRowCountImpl(maps_any, type);
|
||||
res += getTotalRowCountImpl(maps_all, type);
|
||||
res += getTotalRowCountImpl(maps_any_full, type);
|
||||
res += getTotalRowCountImpl(maps_all_full, type);
|
||||
dispatch([&](auto, auto, auto & map) { res += getTotalRowCountImpl(map, type); });
|
||||
}
|
||||
|
||||
return res;
|
||||
@ -225,10 +212,7 @@ size_t Join::getTotalByteCount() const
|
||||
}
|
||||
else
|
||||
{
|
||||
res += getTotalByteCountImpl(maps_any, type);
|
||||
res += getTotalByteCountImpl(maps_all, type);
|
||||
res += getTotalByteCountImpl(maps_any_full, type);
|
||||
res += getTotalByteCountImpl(maps_all_full, type);
|
||||
dispatch([&](auto, auto, auto & map) { res += getTotalByteCountImpl(map, type); });
|
||||
res += pool.size();
|
||||
}
|
||||
|
||||
@ -326,6 +310,8 @@ namespace
|
||||
KeyGetter::onNewKey(it->first, pool);
|
||||
new (&it->second) typename Map::mapped_type(stored_block, i);
|
||||
}
|
||||
else if (it->second.overwrite)
|
||||
new (&it->second) typename Map::mapped_type(stored_block, i);
|
||||
}
|
||||
};
|
||||
|
||||
@ -482,21 +468,9 @@ bool Join::insertFromBlock(const Block & block)
|
||||
|
||||
if (kind != ASTTableJoin::Kind::Cross)
|
||||
{
|
||||
/// Fill the hash table.
|
||||
if (!getFullness(kind))
|
||||
{
|
||||
if (strictness == ASTTableJoin::Strictness::Any)
|
||||
insertFromBlockImpl<ASTTableJoin::Strictness::Any>(type, maps_any, rows, key_columns, keys_size, key_sizes, stored_block, null_map, pool);
|
||||
else
|
||||
insertFromBlockImpl<ASTTableJoin::Strictness::All>(type, maps_all, rows, key_columns, keys_size, key_sizes, stored_block, null_map, pool);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (strictness == ASTTableJoin::Strictness::Any)
|
||||
insertFromBlockImpl<ASTTableJoin::Strictness::Any>(type, maps_any_full, rows, key_columns, keys_size, key_sizes, stored_block, null_map, pool);
|
||||
else
|
||||
insertFromBlockImpl<ASTTableJoin::Strictness::All>(type, maps_all_full, rows, key_columns, keys_size, key_sizes, stored_block, null_map, pool);
|
||||
}
|
||||
dispatch([&](auto, auto strictness_, auto & map) {
|
||||
insertFromBlockImpl<strictness_>(type, map, rows, key_columns, keys_size, key_sizes, stored_block, null_map, pool);
|
||||
});
|
||||
}
|
||||
|
||||
return limits.check(getTotalRowCount(), getTotalByteCount(), "JOIN", ErrorCodes::SET_SIZE_LIMIT_EXCEEDED);
|
||||
@ -505,11 +479,11 @@ bool Join::insertFromBlock(const Block & block)
|
||||
|
||||
namespace
|
||||
{
|
||||
template <ASTTableJoin::Kind KIND, ASTTableJoin::Strictness STRICTNESS, typename Map>
|
||||
template <bool pad_left, ASTTableJoin::Strictness STRICTNESS, typename Map>
|
||||
struct Adder;
|
||||
|
||||
template <typename Map>
|
||||
struct Adder<ASTTableJoin::Kind::Left, ASTTableJoin::Strictness::Any, Map>
|
||||
struct Adder<true, ASTTableJoin::Strictness::Any, Map>
|
||||
{
|
||||
static void addFound(const typename Map::const_iterator & it, size_t num_columns_to_add, MutableColumns & added_columns,
|
||||
size_t i, IColumn::Filter * filter, IColumn::Offset & /*current_offset*/, IColumn::Offsets * /*offsets*/,
|
||||
@ -532,7 +506,7 @@ namespace
|
||||
};
|
||||
|
||||
template <typename Map>
|
||||
struct Adder<ASTTableJoin::Kind::Inner, ASTTableJoin::Strictness::Any, Map>
|
||||
struct Adder<false, ASTTableJoin::Strictness::Any, Map>
|
||||
{
|
||||
static void addFound(const typename Map::const_iterator & it, size_t num_columns_to_add, MutableColumns & added_columns,
|
||||
size_t i, IColumn::Filter * filter, IColumn::Offset & /*current_offset*/, IColumn::Offsets * /*offsets*/,
|
||||
@ -551,8 +525,8 @@ namespace
|
||||
}
|
||||
};
|
||||
|
||||
template <ASTTableJoin::Kind KIND, typename Map>
|
||||
struct Adder<KIND, ASTTableJoin::Strictness::All, Map>
|
||||
template <bool pad_left, typename Map>
|
||||
struct Adder<pad_left, ASTTableJoin::Strictness::All, Map>
|
||||
{
|
||||
static void addFound(const typename Map::const_iterator & it, size_t num_columns_to_add, MutableColumns & added_columns,
|
||||
size_t i, IColumn::Filter * filter, IColumn::Offset & current_offset, IColumn::Offsets * offsets,
|
||||
@ -578,7 +552,7 @@ namespace
|
||||
{
|
||||
(*filter)[i] = 0;
|
||||
|
||||
if (KIND == ASTTableJoin::Kind::Inner)
|
||||
if (!pad_left)
|
||||
{
|
||||
(*offsets)[i] = current_offset;
|
||||
}
|
||||
@ -609,7 +583,7 @@ namespace
|
||||
{
|
||||
if (has_null_map && (*null_map)[i])
|
||||
{
|
||||
Adder<KIND, STRICTNESS, Map>::addNotFound(
|
||||
Adder<Join::KindTrait<KIND>::pad_left, STRICTNESS, Map>::addNotFound(
|
||||
num_columns_to_add, added_columns, i, filter.get(), current_offset, offsets_to_replicate.get());
|
||||
}
|
||||
else
|
||||
@ -620,11 +594,11 @@ namespace
|
||||
if (it != map.end())
|
||||
{
|
||||
it->second.setUsed();
|
||||
Adder<KIND, STRICTNESS, Map>::addFound(
|
||||
Adder<Join::KindTrait<KIND>::pad_left, STRICTNESS, Map>::addFound(
|
||||
it, num_columns_to_add, added_columns, i, filter.get(), current_offset, offsets_to_replicate.get(), right_indexes);
|
||||
}
|
||||
else
|
||||
Adder<KIND, STRICTNESS, Map>::addNotFound(
|
||||
Adder<Join::KindTrait<KIND>::pad_left, STRICTNESS, Map>::addNotFound(
|
||||
num_columns_to_add, added_columns, i, filter.get(), current_offset, offsets_to_replicate.get());
|
||||
}
|
||||
}
|
||||
@ -655,7 +629,7 @@ void Join::joinBlockImpl(
|
||||
const Names & key_names_left,
|
||||
const NameSet & needed_key_names_right,
|
||||
const Block & block_with_columns_to_add,
|
||||
const Maps & maps) const
|
||||
const Maps & maps_) const
|
||||
{
|
||||
size_t keys_size = key_names_left.size();
|
||||
ColumnRawPtrs key_columns(keys_size);
|
||||
@ -749,7 +723,7 @@ void Join::joinBlockImpl(
|
||||
#define M(TYPE) \
|
||||
case Join::Type::TYPE: \
|
||||
joinBlockImplType<KIND, STRICTNESS, typename KeyGetterForType<Join::Type::TYPE>::Type>(\
|
||||
*maps.TYPE, rows, key_columns, key_sizes, added_columns, null_map, \
|
||||
*maps_.TYPE, rows, key_columns, key_sizes, added_columns, null_map, \
|
||||
filter, current_offset, offsets_to_replicate, right_indexes); \
|
||||
break;
|
||||
APPLY_FOR_JOIN_VARIANTS(M)
|
||||
@ -906,10 +880,10 @@ DataTypePtr Join::joinGetReturnType(const String & column_name) const
|
||||
|
||||
|
||||
template <typename Maps>
|
||||
void Join::joinGetImpl(Block & block, const String & column_name, const Maps & maps) const
|
||||
void Join::joinGetImpl(Block & block, const String & column_name, const Maps & maps_) const
|
||||
{
|
||||
joinBlockImpl<ASTTableJoin::Kind::Left, ASTTableJoin::Strictness::Any>(
|
||||
block, {block.getByPosition(0).name}, {}, {sample_block_with_columns_to_add.getByName(column_name)}, maps);
|
||||
block, {block.getByPosition(0).name}, {}, {sample_block_with_columns_to_add.getByName(column_name)}, maps_);
|
||||
}
|
||||
|
||||
|
||||
@ -926,7 +900,12 @@ void Join::joinGet(Block & block, const String & column_name) const
|
||||
checkTypeOfKey(block, sample_block_with_keys);
|
||||
|
||||
if (kind == ASTTableJoin::Kind::Left && strictness == ASTTableJoin::Strictness::Any)
|
||||
joinGetImpl(block, column_name, maps_any);
|
||||
{
|
||||
if (overwrite)
|
||||
joinGetImpl(block, column_name, std::get<MapsAnyOverwrite>(maps));
|
||||
else
|
||||
joinGetImpl(block, column_name, std::get<MapsAny>(maps));
|
||||
}
|
||||
else
|
||||
throw Exception("joinGet only supports StorageJoin of type Left Any", ErrorCodes::LOGICAL_ERROR);
|
||||
}
|
||||
@ -940,30 +919,10 @@ void Join::joinBlock(Block & block, const Names & key_names_left, const NameSet
|
||||
|
||||
checkTypesOfKeys(block, key_names_left, sample_block_with_keys);
|
||||
|
||||
if (kind == ASTTableJoin::Kind::Left && strictness == ASTTableJoin::Strictness::Any)
|
||||
joinBlockImpl<ASTTableJoin::Kind::Left, ASTTableJoin::Strictness::Any>(
|
||||
block, key_names_left, needed_key_names_right, sample_block_with_columns_to_add, maps_any);
|
||||
else if (kind == ASTTableJoin::Kind::Inner && strictness == ASTTableJoin::Strictness::Any)
|
||||
joinBlockImpl<ASTTableJoin::Kind::Inner, ASTTableJoin::Strictness::Any>(
|
||||
block, key_names_left, needed_key_names_right, sample_block_with_columns_to_add, maps_any);
|
||||
else if (kind == ASTTableJoin::Kind::Left && strictness == ASTTableJoin::Strictness::All)
|
||||
joinBlockImpl<ASTTableJoin::Kind::Left, ASTTableJoin::Strictness::All>(
|
||||
block, key_names_left, needed_key_names_right, sample_block_with_columns_to_add, maps_all);
|
||||
else if (kind == ASTTableJoin::Kind::Inner && strictness == ASTTableJoin::Strictness::All)
|
||||
joinBlockImpl<ASTTableJoin::Kind::Inner, ASTTableJoin::Strictness::All>(
|
||||
block, key_names_left, needed_key_names_right, sample_block_with_columns_to_add, maps_all);
|
||||
else if (kind == ASTTableJoin::Kind::Full && strictness == ASTTableJoin::Strictness::Any)
|
||||
joinBlockImpl<ASTTableJoin::Kind::Left, ASTTableJoin::Strictness::Any>(
|
||||
block, key_names_left, needed_key_names_right, sample_block_with_columns_to_add, maps_any_full);
|
||||
else if (kind == ASTTableJoin::Kind::Right && strictness == ASTTableJoin::Strictness::Any)
|
||||
joinBlockImpl<ASTTableJoin::Kind::Inner, ASTTableJoin::Strictness::Any>(
|
||||
block, key_names_left, needed_key_names_right, sample_block_with_columns_to_add, maps_any_full);
|
||||
else if (kind == ASTTableJoin::Kind::Full && strictness == ASTTableJoin::Strictness::All)
|
||||
joinBlockImpl<ASTTableJoin::Kind::Left, ASTTableJoin::Strictness::All>(
|
||||
block, key_names_left, needed_key_names_right, sample_block_with_columns_to_add, maps_all_full);
|
||||
else if (kind == ASTTableJoin::Kind::Right && strictness == ASTTableJoin::Strictness::All)
|
||||
joinBlockImpl<ASTTableJoin::Kind::Inner, ASTTableJoin::Strictness::All>(
|
||||
block, key_names_left, needed_key_names_right, sample_block_with_columns_to_add, maps_all_full);
|
||||
if (dispatch([&](auto kind_, auto strictness_, auto & map) {
|
||||
joinBlockImpl<kind_, strictness_>(block, key_names_left, needed_key_names_right, sample_block_with_columns_to_add, map);
|
||||
}))
|
||||
;
|
||||
else if (kind == ASTTableJoin::Kind::Cross)
|
||||
joinBlockImplCross(block);
|
||||
else
|
||||
@ -1112,12 +1071,12 @@ protected:
|
||||
if (parent.blocks.empty())
|
||||
return Block();
|
||||
|
||||
if (parent.strictness == ASTTableJoin::Strictness::Any)
|
||||
return createBlock<ASTTableJoin::Strictness::Any>(parent.maps_any_full);
|
||||
else if (parent.strictness == ASTTableJoin::Strictness::All)
|
||||
return createBlock<ASTTableJoin::Strictness::All>(parent.maps_all_full);
|
||||
Block block;
|
||||
if (parent.dispatch([&](auto, auto strictness, auto & map) { block = createBlock<strictness>(map); }))
|
||||
;
|
||||
else
|
||||
throw Exception("Logical error: unknown JOIN strictness (must be ANY or ALL)", ErrorCodes::LOGICAL_ERROR);
|
||||
return block;
|
||||
}
|
||||
|
||||
private:
|
||||
|
@ -15,6 +15,8 @@
|
||||
|
||||
#include <DataStreams/SizeLimits.h>
|
||||
#include <DataStreams/IBlockInputStream.h>
|
||||
#include <variant>
|
||||
#include <common/constexpr_helpers.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -221,7 +223,7 @@ class Join
|
||||
{
|
||||
public:
|
||||
Join(const Names & key_names_right_, bool use_nulls_, const SizeLimits & limits,
|
||||
ASTTableJoin::Kind kind_, ASTTableJoin::Strictness strictness_);
|
||||
ASTTableJoin::Kind kind_, ASTTableJoin::Strictness strictness_, bool overwrite_ = false);
|
||||
|
||||
bool empty() { return type == Type::EMPTY; }
|
||||
|
||||
@ -289,15 +291,18 @@ public:
|
||||
|
||||
|
||||
/** Depending on template parameter, adds or doesn't add a flag, that element was used (row was joined).
|
||||
* For implementation of RIGHT and FULL JOINs.
|
||||
* Depending on template parameter, decide whether to overwrite existing values when encountering the same key again
|
||||
* with_used is for implementation of RIGHT and FULL JOINs.
|
||||
* overwrite is for implementation of StorageJoin with overwrite setting enabled
|
||||
* NOTE: It is possible to store the flag in one bit of pointer to block or row_num. It seems not reasonable, because memory saving is minimal.
|
||||
*/
|
||||
template <bool enable, typename Base>
|
||||
struct WithUsedFlag;
|
||||
template <bool with_used, bool overwrite_, typename Base>
|
||||
struct WithFlags;
|
||||
|
||||
template <typename Base>
|
||||
struct WithUsedFlag<true, Base> : Base
|
||||
template <bool overwrite_, typename Base>
|
||||
struct WithFlags<true, overwrite_, Base> : Base
|
||||
{
|
||||
static constexpr bool overwrite = overwrite_;
|
||||
mutable std::atomic<bool> used {};
|
||||
using Base::Base;
|
||||
using Base_t = Base;
|
||||
@ -305,9 +310,10 @@ public:
|
||||
bool getUsed() const { return used; }
|
||||
};
|
||||
|
||||
template <typename Base>
|
||||
struct WithUsedFlag<false, Base> : Base
|
||||
template <bool overwrite_, typename Base>
|
||||
struct WithFlags<false, overwrite_, Base> : Base
|
||||
{
|
||||
static constexpr bool overwrite = overwrite_;
|
||||
using Base::Base;
|
||||
using Base_t = Base;
|
||||
void setUsed() const {}
|
||||
@ -363,10 +369,80 @@ public:
|
||||
std::unique_ptr<HashMap<UInt128, Mapped, UInt128TrivialHash>> hashed;
|
||||
};
|
||||
|
||||
using MapsAny = MapsTemplate<WithUsedFlag<false, RowRef>>;
|
||||
using MapsAll = MapsTemplate<WithUsedFlag<false, RowRefList>>;
|
||||
using MapsAnyFull = MapsTemplate<WithUsedFlag<true, RowRef>>;
|
||||
using MapsAllFull = MapsTemplate<WithUsedFlag<true, RowRefList>>;
|
||||
using MapsAny = MapsTemplate<WithFlags<false, false, RowRef>>;
|
||||
using MapsAnyOverwrite = MapsTemplate<WithFlags<false, true, RowRef>>;
|
||||
using MapsAll = MapsTemplate<WithFlags<false, false, RowRefList>>;
|
||||
using MapsAnyFull = MapsTemplate<WithFlags<true, false, RowRef>>;
|
||||
using MapsAnyFullOverwrite = MapsTemplate<WithFlags<true, true, RowRef>>;
|
||||
using MapsAllFull = MapsTemplate<WithFlags<true, false, RowRefList>>;
|
||||
|
||||
template <ASTTableJoin::Kind KIND>
|
||||
struct KindTrait
|
||||
{
|
||||
// Affects the Adder trait so that when the right part is empty, adding a default value on the left
|
||||
static constexpr bool pad_left = static_in_v<KIND, ASTTableJoin::Kind::Left, ASTTableJoin::Kind::Full>;
|
||||
|
||||
// Affects the Map trait so that a `used` flag is attached to map slots in order to
|
||||
// generate default values on the right when the left part is empty
|
||||
static constexpr bool pad_right = static_in_v<KIND, ASTTableJoin::Kind::Right, ASTTableJoin::Kind::Full>;
|
||||
};
|
||||
|
||||
template <bool pad_right, typename ASTTableJoin::Strictness, bool overwrite>
|
||||
struct MapGetterImpl;
|
||||
|
||||
template <ASTTableJoin::Kind kind, ASTTableJoin::Strictness strictness, bool overwrite>
|
||||
using Map = typename MapGetterImpl<KindTrait<kind>::pad_right, strictness, overwrite>::Map;
|
||||
|
||||
static constexpr std::array<ASTTableJoin::Strictness, 2> STRICTNESSES = {ASTTableJoin::Strictness::Any, ASTTableJoin::Strictness::All};
|
||||
static constexpr std::array<ASTTableJoin::Kind, 4> KINDS
|
||||
= {ASTTableJoin::Kind::Left, ASTTableJoin::Kind::Inner, ASTTableJoin::Kind::Full, ASTTableJoin::Kind::Right};
|
||||
|
||||
struct MapInitTag {};
|
||||
|
||||
template <typename Func>
|
||||
bool dispatch(Func && func)
|
||||
{
|
||||
if (overwrite)
|
||||
return static_for<0, KINDS.size()>([&](auto i) {
|
||||
if (kind == KINDS[i] && strictness == ASTTableJoin::Strictness::Any)
|
||||
{
|
||||
if constexpr (std::is_same_v<Func, MapInitTag>)
|
||||
maps = Map<KINDS[i], ASTTableJoin::Strictness::Any, true>();
|
||||
else
|
||||
func(
|
||||
std::integral_constant<ASTTableJoin::Kind, KINDS[i]>(),
|
||||
std::integral_constant<ASTTableJoin::Strictness, ASTTableJoin::Strictness::Any>(),
|
||||
std::get<Map<KINDS[i], ASTTableJoin::Strictness::Any, true>>(maps));
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
});
|
||||
else
|
||||
return static_for<0, KINDS.size() * STRICTNESSES.size()>([&](auto ij) {
|
||||
// NOTE: Avoid using nested static loop as GCC and CLANG have bugs in different ways
|
||||
// See https://stackoverflow.com/questions/44386415/gcc-and-clang-disagree-about-c17-constexpr-lambda-captures
|
||||
constexpr auto i = ij / STRICTNESSES.size();
|
||||
constexpr auto j = ij % STRICTNESSES.size();
|
||||
if (kind == KINDS[i] && strictness == STRICTNESSES[j])
|
||||
{
|
||||
if constexpr (std::is_same_v<Func, MapInitTag>)
|
||||
maps = Map<KINDS[i], STRICTNESSES[j], false>();
|
||||
else
|
||||
func(
|
||||
std::integral_constant<ASTTableJoin::Kind, KINDS[i]>(),
|
||||
std::integral_constant<ASTTableJoin::Strictness, STRICTNESSES[j]>(),
|
||||
std::get<Map<KINDS[i], STRICTNESSES[j], false>>(maps));
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
});
|
||||
}
|
||||
|
||||
template <typename Func>
|
||||
bool dispatch(Func && func) const
|
||||
{
|
||||
return const_cast<Join &>(*this).dispatch(std::forward<Func>(func));
|
||||
}
|
||||
|
||||
private:
|
||||
friend class NonJoinedBlockInputStream;
|
||||
@ -381,14 +457,14 @@ private:
|
||||
/// Substitute NULLs for non-JOINed rows.
|
||||
bool use_nulls;
|
||||
|
||||
/// Overwrite existing values when encountering the same key again
|
||||
bool overwrite;
|
||||
|
||||
/** Blocks of "right" table.
|
||||
*/
|
||||
BlocksList blocks;
|
||||
|
||||
MapsAny maps_any; /// For ANY LEFT|INNER JOIN
|
||||
MapsAll maps_all; /// For ALL LEFT|INNER JOIN
|
||||
MapsAnyFull maps_any_full; /// For ANY RIGHT|FULL JOIN
|
||||
MapsAllFull maps_all_full; /// For ALL RIGHT|FULL JOIN
|
||||
std::variant<MapsAny, MapsAnyOverwrite, MapsAll, MapsAnyFull, MapsAnyFullOverwrite, MapsAllFull> maps;
|
||||
|
||||
/// Additional data - strings for string keys and continuation elements of single-linked lists of references to rows.
|
||||
Arena pool;
|
||||
@ -441,5 +517,28 @@ private:
|
||||
using JoinPtr = std::shared_ptr<Join>;
|
||||
using Joins = std::vector<JoinPtr>;
|
||||
|
||||
template <bool overwrite_>
|
||||
struct Join::MapGetterImpl<false, ASTTableJoin::Strictness::Any, overwrite_>
|
||||
{
|
||||
using Map = std::conditional_t<overwrite_, MapsAnyOverwrite, MapsAny>;
|
||||
};
|
||||
|
||||
template <bool overwrite_>
|
||||
struct Join::MapGetterImpl<true, ASTTableJoin::Strictness::Any, overwrite_>
|
||||
{
|
||||
using Map = std::conditional_t<overwrite_, MapsAnyFullOverwrite, MapsAnyFull>;
|
||||
};
|
||||
|
||||
template <>
|
||||
struct Join::MapGetterImpl<false, ASTTableJoin::Strictness::All, false>
|
||||
{
|
||||
using Map = MapsAll;
|
||||
};
|
||||
|
||||
template <>
|
||||
struct Join::MapGetterImpl<true, ASTTableJoin::Strictness::All, false>
|
||||
{
|
||||
using Map = MapsAllFull;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -253,6 +253,7 @@ struct Settings
|
||||
M(SettingUInt64, max_rows_in_join, 0, "Maximum size of the hash table for JOIN (in number of rows).") \
|
||||
M(SettingUInt64, max_bytes_in_join, 0, "Maximum size of the hash table for JOIN (in number of bytes in memory).") \
|
||||
M(SettingOverflowMode<false>, join_overflow_mode, OverflowMode::THROW, "What to do when the limit is exceeded.") \
|
||||
M(SettingBool, join_overwrite, false, "Whether to overwrite existing values when encountering the same key again.") \
|
||||
\
|
||||
M(SettingUInt64, max_rows_to_transfer, 0, "Maximum size (in rows) of the transmitted external table obtained when the GLOBAL IN/JOIN section is executed.") \
|
||||
M(SettingUInt64, max_bytes_to_transfer, 0, "Maximum size (in uncompressed bytes) of the transmitted external table obtained when the GLOBAL IN/JOIN section is executed.") \
|
||||
|
@ -33,7 +33,8 @@ StorageJoin::StorageJoin(
|
||||
SizeLimits limits_,
|
||||
ASTTableJoin::Kind kind_,
|
||||
ASTTableJoin::Strictness strictness_,
|
||||
const ColumnsDescription & columns_)
|
||||
const ColumnsDescription & columns_,
|
||||
bool overwrite)
|
||||
: StorageSetOrJoinBase{path_, name_, columns_}
|
||||
, key_names(key_names_)
|
||||
, use_nulls(use_nulls_)
|
||||
@ -45,7 +46,7 @@ StorageJoin::StorageJoin(
|
||||
if (!getColumns().hasPhysical(key))
|
||||
throw Exception{"Key column (" + key + ") does not exist in table declaration.", ErrorCodes::NO_SUCH_COLUMN_IN_TABLE};
|
||||
|
||||
join = std::make_shared<Join>(key_names, use_nulls, limits, kind, strictness);
|
||||
join = std::make_shared<Join>(key_names, use_nulls, limits, kind, strictness, overwrite);
|
||||
join->setSampleBlock(getSampleBlock().sortColumns());
|
||||
restore();
|
||||
}
|
||||
@ -134,6 +135,7 @@ void registerStorageJoin(StorageFactory & factory)
|
||||
auto max_rows_in_join = settings.max_rows_in_join;
|
||||
auto max_bytes_in_join = settings.max_bytes_in_join;
|
||||
auto join_overflow_mode = settings.join_overflow_mode;
|
||||
auto join_overwrite = settings.join_overwrite;
|
||||
|
||||
if (args.storage_def && args.storage_def->settings)
|
||||
{
|
||||
@ -147,6 +149,8 @@ void registerStorageJoin(StorageFactory & factory)
|
||||
max_bytes_in_join.set(setting.value);
|
||||
else if (setting.name == "join_overflow_mode")
|
||||
join_overflow_mode.set(setting.value);
|
||||
else if (setting.name == "join_overwrite")
|
||||
join_overwrite.set(setting.value);
|
||||
else
|
||||
throw Exception(
|
||||
"Unknown setting " + setting.name + " for storage " + args.engine_name,
|
||||
@ -162,7 +166,8 @@ void registerStorageJoin(StorageFactory & factory)
|
||||
SizeLimits{max_rows_in_join.value, max_bytes_in_join.value, join_overflow_mode.value},
|
||||
kind,
|
||||
strictness,
|
||||
args.columns);
|
||||
args.columns,
|
||||
join_overwrite);
|
||||
});
|
||||
}
|
||||
|
||||
@ -224,12 +229,12 @@ protected:
|
||||
if (parent.blocks.empty())
|
||||
return Block();
|
||||
|
||||
if (parent.strictness == ASTTableJoin::Strictness::Any)
|
||||
return createBlock<ASTTableJoin::Strictness::Any>(parent.maps_any);
|
||||
else if (parent.strictness == ASTTableJoin::Strictness::All)
|
||||
return createBlock<ASTTableJoin::Strictness::All>(parent.maps_all);
|
||||
Block block;
|
||||
if (parent.dispatch([&](auto, auto strictness, auto & map) { block = createBlock<strictness>(map); }))
|
||||
;
|
||||
else
|
||||
throw Exception("Logical error: unknown JOIN strictness (must be ANY or ALL)", ErrorCodes::LOGICAL_ERROR);
|
||||
return block;
|
||||
}
|
||||
|
||||
private:
|
||||
|
@ -62,7 +62,8 @@ protected:
|
||||
bool use_nulls_,
|
||||
SizeLimits limits_,
|
||||
ASTTableJoin::Kind kind_, ASTTableJoin::Strictness strictness_,
|
||||
const ColumnsDescription & columns_);
|
||||
const ColumnsDescription & columns_,
|
||||
bool overwrite);
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -0,0 +1,2 @@
|
||||
2
|
||||
3
|
15
dbms/tests/queries/0_stateless/00830_join_overwrite.sql
Normal file
15
dbms/tests/queries/0_stateless/00830_join_overwrite.sql
Normal file
@ -0,0 +1,15 @@
|
||||
USE test;
|
||||
|
||||
DROP TABLE IF EXISTS kv;
|
||||
|
||||
CREATE TABLE kv (k UInt32, v UInt32) ENGINE Join(Any, Left, k);
|
||||
INSERT INTO kv VALUES (1, 2);
|
||||
INSERT INTO kv VALUES (1, 3);
|
||||
SELECT joinGet('kv', 'v', toUInt32(1));
|
||||
CREATE TABLE kv_overwrite (k UInt32, v UInt32) ENGINE Join(Any, Left, k) SETTINGS join_overwrite = 1;
|
||||
INSERT INTO kv_overwrite VALUES (1, 2);
|
||||
INSERT INTO kv_overwrite VALUES (1, 3);
|
||||
SELECT joinGet('kv_overwrite', 'v', toUInt32(1));
|
||||
|
||||
DROP TABLE kv;
|
||||
DROP TABLE kv_overwrite;
|
@ -42,6 +42,7 @@ add_library (common ${LINK_MODE}
|
||||
include/common/demangle.h
|
||||
include/common/SetTerminalEcho.h
|
||||
include/common/find_symbols.h
|
||||
include/common/constexpr_helpers.h
|
||||
|
||||
include/ext/bit_cast.h
|
||||
include/ext/collection_cast.h
|
||||
|
31
libs/libcommon/include/common/constexpr_helpers.h
Normal file
31
libs/libcommon/include/common/constexpr_helpers.h
Normal file
@ -0,0 +1,31 @@
|
||||
#pragma once
|
||||
|
||||
#include <type_traits>
|
||||
|
||||
template <auto Val, decltype(Val)... List>
|
||||
inline constexpr bool static_in_v = std::disjunction_v<std::bool_constant<Val == List>...>;
|
||||
|
||||
template <typename Func, typename Arg>
|
||||
bool func_wrapper(Func && func, Arg && arg)
|
||||
{
|
||||
if constexpr (std::is_void_v<std::invoke_result_t<Func, Arg>>)
|
||||
{
|
||||
func(arg);
|
||||
return false;
|
||||
}
|
||||
else
|
||||
return func(arg);
|
||||
}
|
||||
|
||||
template <typename T, T Begin, typename Func, T... Is>
|
||||
constexpr bool static_for_impl(Func && f, std::integer_sequence<T, Is...>)
|
||||
{
|
||||
return (func_wrapper(std::forward<Func>(f), std::integral_constant<T, Begin + Is>{}) || ...);
|
||||
}
|
||||
|
||||
template <auto Begin, decltype(Begin) End, typename Func>
|
||||
constexpr bool static_for(Func && f)
|
||||
{
|
||||
using T = decltype(Begin);
|
||||
return static_for_impl<T, Begin>(std::forward<Func>(f), std::make_integer_sequence<T, End - Begin>{});
|
||||
}
|
Loading…
Reference in New Issue
Block a user