mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 00:30:49 +00:00
working multi type asof join columns
This commit is contained in:
parent
89515861df
commit
389f108807
@ -362,23 +362,18 @@ void Join::setSampleBlock(const Block & block)
|
||||
convertColumnToNullable(sample_block_with_columns_to_add.getByPosition(i));
|
||||
}
|
||||
|
||||
void Join::AsofRowRefs::AsofLookups::create(Join::AsofRowRefs::AsofType which)
|
||||
void Join::AsofRowRefs::Lookups::create(Join::AsofRowRefs::AsofType which)
|
||||
{
|
||||
switch (which)
|
||||
{
|
||||
case AsofType::EMPTY: break;
|
||||
#define M(NAME, TYPE) \
|
||||
case AsofType::NAME: NAME = std::make_unique<typename decltype(NAME)::element_type>(); break;
|
||||
case AsofType::NAME: NAME = std::make_unique<typename decltype(NAME)::element_type>(); break;
|
||||
APPLY_FOR_ASOF_JOIN_VARIANTS(M)
|
||||
#undef M
|
||||
}
|
||||
}
|
||||
|
||||
void Join::AsofRowRefs::create(AsofType which)
|
||||
{
|
||||
type = which;
|
||||
lookups.create(which);
|
||||
}
|
||||
|
||||
template<typename T>
|
||||
using AsofGetterType = ColumnsHashing::HashMethodOneNumber<T, T, T, false>;
|
||||
|
||||
@ -387,13 +382,14 @@ void Join::AsofRowRefs::insert(const IColumn * asof_column, const Block * block,
|
||||
assert(!sorted);
|
||||
switch (type)
|
||||
{
|
||||
case AsofType::EMPTY: break;
|
||||
#define M(NAME, TYPE) \
|
||||
case AsofType::NAME: { \
|
||||
auto asof_getter = AsofGetterType<TYPE>(asof_column); \
|
||||
auto entry = AsofEntry<TYPE>(asof_getter.getKey(row_num, pool), RowRef(block, row_num)); \
|
||||
lookups.NAME->push_back(entry); \
|
||||
break; \
|
||||
}
|
||||
case AsofType::NAME: { \
|
||||
auto asof_getter = AsofGetterType<TYPE>(asof_column); \
|
||||
auto entry = Entry<TYPE>(asof_getter.getKey(row_num, pool), RowRef(block, row_num)); \
|
||||
lookups.NAME->push_back(entry); \
|
||||
break; \
|
||||
}
|
||||
APPLY_FOR_ASOF_JOIN_VARIANTS(M)
|
||||
#undef M
|
||||
}
|
||||
@ -406,6 +402,7 @@ const Join::RowRef * Join::AsofRowRefs::findAsof(const IColumn * asof_column, si
|
||||
// sort whenever needed
|
||||
switch (type)
|
||||
{
|
||||
case AsofType::EMPTY: break;
|
||||
#define M(NAME, TYPE) \
|
||||
case AsofType::NAME: std::sort(lookups.NAME->begin(), lookups.NAME->end()); break;
|
||||
APPLY_FOR_ASOF_JOIN_VARIANTS(M)
|
||||
@ -416,11 +413,12 @@ const Join::RowRef * Join::AsofRowRefs::findAsof(const IColumn * asof_column, si
|
||||
|
||||
switch (type)
|
||||
{
|
||||
case AsofType::EMPTY: return nullptr;
|
||||
#define M(NAME, TYPE) \
|
||||
case AsofType::NAME: { \
|
||||
auto asof_getter = AsofGetterType<TYPE>(asof_column); \
|
||||
TYPE key = asof_getter.getKey(row_num, pool); \
|
||||
auto it = std::upper_bound(lookups.NAME->cbegin(), lookups.NAME->cend(), AsofEntry<TYPE>(key)); \
|
||||
auto it = std::upper_bound(lookups.NAME->cbegin(), lookups.NAME->cend(), Entry<TYPE>(key)); \
|
||||
if (it == lookups.NAME->cbegin()) \
|
||||
return nullptr; \
|
||||
return &((--it)->row_ref); \
|
||||
@ -446,6 +444,7 @@ size_t Join::AsofRowRefs::getSize(Join::AsofRowRefs::AsofType type)
|
||||
{
|
||||
switch (type)
|
||||
{
|
||||
case AsofType::EMPTY: return 0;
|
||||
#define M(NAME, TYPE) \
|
||||
case AsofType::NAME: return sizeof(TYPE);
|
||||
APPLY_FOR_ASOF_JOIN_VARIANTS(M)
|
||||
@ -461,13 +460,13 @@ namespace
|
||||
template <ASTTableJoin::Strictness STRICTNESS, typename Map, typename KeyGetter>
|
||||
struct Inserter
|
||||
{
|
||||
static void insert(Map & map, KeyGetter & key_getter, Block * stored_block, size_t i, Arena & pool);
|
||||
static void insert(const Join *, Map & map, KeyGetter & key_getter, Block * stored_block, size_t i, Arena & pool);
|
||||
};
|
||||
|
||||
template <typename Map, typename KeyGetter>
|
||||
struct Inserter<ASTTableJoin::Strictness::Any, Map, KeyGetter>
|
||||
{
|
||||
static ALWAYS_INLINE void insert(Map & map, KeyGetter & key_getter, Block * stored_block, size_t i, Arena & pool)
|
||||
static ALWAYS_INLINE void insert(const Join *, Map & map, KeyGetter & key_getter, Block * stored_block, size_t i, Arena & pool)
|
||||
{
|
||||
auto emplace_result = key_getter.emplaceKey(map, i, pool);
|
||||
|
||||
@ -479,7 +478,7 @@ namespace
|
||||
template <typename Map, typename KeyGetter>
|
||||
struct Inserter<ASTTableJoin::Strictness::All, Map, KeyGetter>
|
||||
{
|
||||
static ALWAYS_INLINE void insert(Map & map, KeyGetter & key_getter, Block * stored_block, size_t i, Arena & pool)
|
||||
static ALWAYS_INLINE void insert(const Join *, Map & map, KeyGetter & key_getter, Block * stored_block, size_t i, Arena & pool)
|
||||
{
|
||||
auto emplace_result = key_getter.emplaceKey(map, i, pool);
|
||||
|
||||
@ -505,18 +504,13 @@ namespace
|
||||
template <typename Map, typename KeyGetter>
|
||||
struct Inserter<ASTTableJoin::Strictness::Asof, Map, KeyGetter>
|
||||
{
|
||||
static ALWAYS_INLINE void insert(Map & map, KeyGetter & key_getter, Block * stored_block, size_t i, Arena & pool, const IColumn * asof_column)
|
||||
static ALWAYS_INLINE void insert(const Join * join, Map & map, KeyGetter & key_getter, Block * stored_block, size_t i, Arena & pool, const IColumn * asof_column)
|
||||
{
|
||||
auto emplace_result = key_getter.emplaceKey(map, i, pool);
|
||||
typename Map::mapped_type * time_series_map = &emplace_result.getMapped();
|
||||
|
||||
if (emplace_result.isInserted())
|
||||
{
|
||||
time_series_map = new (time_series_map) typename Map::mapped_type();
|
||||
// TODO extract this from either the column type or from the main join object
|
||||
time_series_map->create(Join::AsofRowRefs::AsofType::key32);
|
||||
}
|
||||
|
||||
time_series_map = new (time_series_map) typename Map::mapped_type(join->getAsofType());
|
||||
time_series_map->insert(asof_column, stored_block, i, pool);
|
||||
}
|
||||
};
|
||||
@ -524,7 +518,7 @@ namespace
|
||||
|
||||
template <ASTTableJoin::Strictness STRICTNESS, typename KeyGetter, typename Map, bool has_null_map>
|
||||
void NO_INLINE insertFromBlockImplTypeCase(
|
||||
Map & map, size_t rows, const ColumnRawPtrs & key_columns,
|
||||
const Join * join, Map & map, size_t rows, const ColumnRawPtrs & key_columns,
|
||||
const Sizes & key_sizes, Block * stored_block, ConstNullMapPtr null_map, Arena & pool)
|
||||
{
|
||||
const IColumn * asof_column [[maybe_unused]] = nullptr;
|
||||
@ -539,28 +533,28 @@ namespace
|
||||
continue;
|
||||
|
||||
if constexpr (STRICTNESS == ASTTableJoin::Strictness::Asof)
|
||||
Inserter<STRICTNESS, Map, KeyGetter>::insert(map, key_getter, stored_block, i, pool, asof_column);
|
||||
Inserter<STRICTNESS, Map, KeyGetter>::insert(join, map, key_getter, stored_block, i, pool, asof_column);
|
||||
else
|
||||
Inserter<STRICTNESS, Map, KeyGetter>::insert(map, key_getter, stored_block, i, pool);
|
||||
Inserter<STRICTNESS, Map, KeyGetter>::insert(join, map, key_getter, stored_block, i, pool);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
template <ASTTableJoin::Strictness STRICTNESS, typename KeyGetter, typename Map>
|
||||
void insertFromBlockImplType(
|
||||
Map & map, size_t rows, const ColumnRawPtrs & key_columns,
|
||||
const Join * join, Map & map, size_t rows, const ColumnRawPtrs & key_columns,
|
||||
const Sizes & key_sizes, Block * stored_block, ConstNullMapPtr null_map, Arena & pool)
|
||||
{
|
||||
if (null_map)
|
||||
insertFromBlockImplTypeCase<STRICTNESS, KeyGetter, Map, true>(map, rows, key_columns, key_sizes, stored_block, null_map, pool);
|
||||
insertFromBlockImplTypeCase<STRICTNESS, KeyGetter, Map, true>(join, map, rows, key_columns, key_sizes, stored_block, null_map, pool);
|
||||
else
|
||||
insertFromBlockImplTypeCase<STRICTNESS, KeyGetter, Map, false>(map, rows, key_columns, key_sizes, stored_block, null_map, pool);
|
||||
insertFromBlockImplTypeCase<STRICTNESS, KeyGetter, Map, false>(join, map, rows, key_columns, key_sizes, stored_block, null_map, pool);
|
||||
}
|
||||
|
||||
|
||||
template <ASTTableJoin::Strictness STRICTNESS, typename Maps>
|
||||
void insertFromBlockImpl(
|
||||
Join::Type type, Maps & maps, size_t rows, const ColumnRawPtrs & key_columns,
|
||||
const Join * join, Join::Type type, Maps & maps, size_t rows, const ColumnRawPtrs & key_columns,
|
||||
const Sizes & key_sizes, Block * stored_block, ConstNullMapPtr null_map, Arena & pool)
|
||||
{
|
||||
switch (type)
|
||||
@ -571,7 +565,7 @@ namespace
|
||||
#define M(TYPE) \
|
||||
case Join::Type::TYPE: \
|
||||
insertFromBlockImplType<STRICTNESS, typename KeyGetterForType<Join::Type::TYPE, std::remove_reference_t<decltype(*maps.TYPE)>>::Type>(\
|
||||
*maps.TYPE, rows, key_columns, key_sizes, stored_block, null_map, pool); \
|
||||
join, *maps.TYPE, rows, key_columns, key_sizes, stored_block, null_map, pool); \
|
||||
break;
|
||||
APPLY_FOR_JOIN_VARIANTS(M)
|
||||
#undef M
|
||||
@ -658,7 +652,7 @@ bool Join::insertFromBlock(const Block & block)
|
||||
{
|
||||
dispatch([&](auto, auto strictness_, auto & map)
|
||||
{
|
||||
insertFromBlockImpl<strictness_>(type, map, rows, key_columns, key_sizes, stored_block, null_map, pool);
|
||||
insertFromBlockImpl<strictness_>(this, type, map, rows, key_columns, key_sizes, stored_block, null_map, pool);
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -160,6 +160,7 @@ public:
|
||||
|
||||
enum class AsofType
|
||||
{
|
||||
EMPTY,
|
||||
#define M(NAME, TYPE) NAME,
|
||||
APPLY_FOR_ASOF_JOIN_VARIANTS(M)
|
||||
#undef M
|
||||
@ -169,43 +170,47 @@ public:
|
||||
static size_t getSize(AsofType type);
|
||||
|
||||
template<typename T>
|
||||
struct AsofEntry
|
||||
struct Entry
|
||||
{
|
||||
T asof_value;
|
||||
RowRef row_ref;
|
||||
|
||||
AsofEntry(T v) : asof_value(v) {}
|
||||
AsofEntry(T v, RowRef rr) : asof_value(v), row_ref(rr) {}
|
||||
Entry(T v) : asof_value(v) {}
|
||||
Entry(T v, RowRef rr) : asof_value(v), row_ref(rr) {}
|
||||
|
||||
bool operator< (const AsofEntry& o) const
|
||||
bool operator< (const Entry& o) const
|
||||
{
|
||||
return asof_value < o.asof_value;
|
||||
}
|
||||
};
|
||||
|
||||
struct AsofLookups
|
||||
struct Lookups
|
||||
{
|
||||
#define M(NAME, TYPE) \
|
||||
std::unique_ptr<PODArray<AsofEntry<TYPE>>> NAME;
|
||||
std::unique_ptr<PODArray<Entry<TYPE>>> NAME;
|
||||
APPLY_FOR_ASOF_JOIN_VARIANTS(M)
|
||||
#undef M
|
||||
|
||||
void create(AsofType which);
|
||||
};
|
||||
|
||||
AsofRowRefs() {}
|
||||
AsofRowRefs() : type(AsofType::EMPTY) {}
|
||||
AsofRowRefs(AsofType t) : type(t) {
|
||||
lookups.create(t);
|
||||
}
|
||||
|
||||
void create(AsofType which);
|
||||
void insert(const IColumn * asof_column, const Block * block, size_t row_num, Arena & pool);
|
||||
|
||||
const RowRef * findAsof(const IColumn * asof_column, size_t row_num, Arena & pool) const;
|
||||
|
||||
private:
|
||||
AsofType type;
|
||||
mutable AsofLookups lookups;
|
||||
const AsofType type;
|
||||
mutable Lookups lookups;
|
||||
mutable bool sorted = false;
|
||||
};
|
||||
|
||||
AsofRowRefs::AsofType getAsofType() const { return asof_type; }
|
||||
|
||||
|
||||
/** Depending on template parameter, adds or doesn't add a flag, that element was used (row was joined).
|
||||
* Depending on template parameter, decide whether to overwrite existing values when encountering the same key again
|
||||
* with_used is for implementation of RIGHT and FULL JOINs.
|
||||
|
Loading…
Reference in New Issue
Block a user