Merge pull request #4875 from 4ertus2/joins

Fix multithreaded ASOF JOIN + some refactoring
This commit is contained in:
Artem Zuikov 2019-04-03 14:03:43 +03:00 committed by GitHub
commit 97dd0e2aa0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 159 additions and 122 deletions

View File

@ -1,6 +1,7 @@
#pragma once
#include <Common/PODArray.h>
#include <vector>
//#include <Common/PODArray.h>
namespace DB
{
@ -13,36 +14,39 @@ namespace DB
* This way the data only gets sorted once.
*/
template <typename T, size_t INITIAL_SIZE = 4096, typename TAllocator = Allocator<false>>
class SortedLookupPODArray : private PaddedPODArray<T, INITIAL_SIZE, TAllocator>
template <typename T>
class SortedLookupPODArray
{
public:
using Base = PaddedPODArray<T, INITIAL_SIZE, TAllocator>;
using typename Base::PODArray;
using Base::cbegin;
using Base::cend;
using Base = std::vector<T>;
//using Base = PaddedPODArray<T>;
template <typename U, typename ... TAllocatorParams>
void insert(U && x, TAllocatorParams &&... allocator_params)
{
Base::push_back(std::forward<U>(x), std::forward<TAllocatorParams>(allocator_params)...);
array.push_back(std::forward<U>(x), std::forward<TAllocatorParams>(allocator_params)...);
sorted = false;
}
typename Base::const_iterator upper_bound (const T& k)
typename Base::const_iterator upper_bound(const T & k)
{
if (!sorted)
this->sort();
return std::upper_bound(this->cbegin(), this->cend(), k);
}
private:
void sort()
{
std::sort(this->begin(), this->end());
sorted = true;
sort();
return std::upper_bound(array.cbegin(), array.cend(), k);
}
typename Base::const_iterator cbegin() const { return array.cbegin(); }
typename Base::const_iterator cend() const { return array.cend(); }
private:
Base array;
bool sorted = false;
void sort()
{
std::sort(array.begin(), array.end());
sorted = true;
}
};
}

View File

@ -301,9 +301,8 @@ void Join::setSampleBlock(const Block & block)
const IColumn * asof_column = key_columns.back();
size_t asof_size;
if (auto t = AsofRowRefs::getTypeSize(asof_column))
std::tie(asof_type, asof_size) = *t;
else
asof_type = AsofRowRefs::getTypeSize(asof_column, asof_size);
if (!asof_type)
{
std::string msg = "ASOF join not supported for type";
msg += asof_column->getFamilyName();
@ -416,21 +415,22 @@ namespace
template <typename Map, typename KeyGetter>
struct Inserter<ASTTableJoin::Strictness::Asof, Map, KeyGetter>
{
static ALWAYS_INLINE void insert(const Join & join, Map & map, KeyGetter & key_getter, Block * stored_block, size_t i, Arena & pool, const IColumn * asof_column)
static ALWAYS_INLINE void insert(Join & join, Map & map, KeyGetter & key_getter, Block * stored_block, size_t i, Arena & pool,
const IColumn * asof_column)
{
auto emplace_result = key_getter.emplaceKey(map, i, pool);
typename Map::mapped_type * time_series_map = &emplace_result.getMapped();
if (emplace_result.isInserted())
time_series_map = new (time_series_map) typename Map::mapped_type(join.getAsofType());
time_series_map->insert(asof_column, stored_block, i, pool);
time_series_map = new (time_series_map) typename Map::mapped_type();
time_series_map->insert(join.getAsofType(), join.getAsofData(), asof_column, stored_block, i);
}
};
template <ASTTableJoin::Strictness STRICTNESS, typename KeyGetter, typename Map, bool has_null_map>
void NO_INLINE insertFromBlockImplTypeCase(
const Join & join, Map & map, size_t rows, const ColumnRawPtrs & key_columns,
Join & join, Map & map, size_t rows, const ColumnRawPtrs & key_columns,
const Sizes & key_sizes, Block * stored_block, ConstNullMapPtr null_map, Arena & pool)
{
const IColumn * asof_column [[maybe_unused]] = nullptr;
@ -454,7 +454,7 @@ namespace
template <ASTTableJoin::Strictness STRICTNESS, typename KeyGetter, typename Map>
void insertFromBlockImplType(
const Join & join, Map & map, size_t rows, const ColumnRawPtrs & key_columns,
Join & join, Map & map, size_t rows, const ColumnRawPtrs & key_columns,
const Sizes & key_sizes, Block * stored_block, ConstNullMapPtr null_map, Arena & pool)
{
if (null_map)
@ -466,7 +466,7 @@ namespace
template <ASTTableJoin::Strictness STRICTNESS, typename Maps>
void insertFromBlockImpl(
const Join & join, Join::Type type, Maps & maps, size_t rows, const ColumnRawPtrs & key_columns,
Join & join, Join::Type type, Maps & maps, size_t rows, const ColumnRawPtrs & key_columns,
const Sizes & key_sizes, Block * stored_block, ConstNullMapPtr null_map, Arena & pool)
{
switch (type)
@ -687,7 +687,7 @@ void addNotFoundRow(AddedColumns & added [[maybe_unused]], IColumn::Offset & cur
/// Makes filter (1 if row presented in right table) and returns offsets to replicate (for ALL JOINS).
template <bool _add_missing, ASTTableJoin::Strictness STRICTNESS, typename KeyGetter, typename Map, bool _has_null_map>
std::unique_ptr<IColumn::Offsets> NO_INLINE joinRightIndexedColumns(
const Map & map, size_t rows, const ColumnRawPtrs & key_columns, const Sizes & key_sizes,
const Join & join, const Map & map, size_t rows, const ColumnRawPtrs & key_columns, const Sizes & key_sizes,
AddedColumns & added_columns, ConstNullMapPtr null_map, IColumn::Filter & filter)
{
std::unique_ptr<IColumn::Offsets> offsets_to_replicate;
@ -720,7 +720,7 @@ std::unique_ptr<IColumn::Offsets> NO_INLINE joinRightIndexedColumns(
if constexpr (STRICTNESS == ASTTableJoin::Strictness::Asof)
{
if (const RowRef * found = mapped.findAsof(asof_column, i, pool))
if (const RowRef * found = mapped.findAsof(join.getAsofType(), join.getAsofData(), asof_column, i))
{
filter[i] = 1;
mapped.setUsed();
@ -749,7 +749,7 @@ std::unique_ptr<IColumn::Offsets> NO_INLINE joinRightIndexedColumns(
template <ASTTableJoin::Kind KIND, ASTTableJoin::Strictness STRICTNESS, typename KeyGetter, typename Map>
IColumn::Filter joinRightColumns(
const Map & map, size_t rows, const ColumnRawPtrs & key_columns, const Sizes & key_sizes,
const Join & join, const Map & map, size_t rows, const ColumnRawPtrs & key_columns, const Sizes & key_sizes,
AddedColumns & added_columns, ConstNullMapPtr null_map, std::unique_ptr<IColumn::Offsets> & offsets_to_replicate)
{
constexpr bool left_or_full = static_in_v<KIND, ASTTableJoin::Kind::Left, ASTTableJoin::Kind::Full>;
@ -758,17 +758,17 @@ IColumn::Filter joinRightColumns(
if (null_map)
offsets_to_replicate = joinRightIndexedColumns<left_or_full, STRICTNESS, KeyGetter, Map, true>(
map, rows, key_columns, key_sizes, added_columns, null_map, filter);
join, map, rows, key_columns, key_sizes, added_columns, null_map, filter);
else
offsets_to_replicate = joinRightIndexedColumns<left_or_full, STRICTNESS, KeyGetter, Map, false>(
map, rows, key_columns, key_sizes, added_columns, null_map, filter);
join, map, rows, key_columns, key_sizes, added_columns, null_map, filter);
return filter;
}
template <ASTTableJoin::Kind KIND, ASTTableJoin::Strictness STRICTNESS, typename Maps>
IColumn::Filter switchJoinRightColumns(
Join::Type type,
Join::Type type, const Join & join,
const Maps & maps_, size_t rows, const ColumnRawPtrs & key_columns, const Sizes & key_sizes,
AddedColumns & added_columns, ConstNullMapPtr null_map,
std::unique_ptr<IColumn::Offsets> & offsets_to_replicate)
@ -778,7 +778,7 @@ IColumn::Filter switchJoinRightColumns(
#define M(TYPE) \
case Join::Type::TYPE: \
return joinRightColumns<KIND, STRICTNESS, typename KeyGetterForType<Join::Type::TYPE, const std::remove_reference_t<decltype(*maps_.TYPE)>>::Type>(\
*maps_.TYPE, rows, key_columns, key_sizes, added_columns, null_map, offsets_to_replicate);
join, *maps_.TYPE, rows, key_columns, key_sizes, added_columns, null_map, offsets_to_replicate);
APPLY_FOR_JOIN_VARIANTS(M)
#undef M
@ -852,7 +852,7 @@ void Join::joinBlockImpl(
std::unique_ptr<IColumn::Offsets> offsets_to_replicate;
IColumn::Filter row_filter = switchJoinRightColumns<KIND, STRICTNESS>(
type, maps_, block.rows(), key_columns, key_sizes, added, null_map, offsets_to_replicate);
type, *this, maps_, block.rows(), key_columns, key_sizes, added, null_map, offsets_to_replicate);
for (size_t i = 0; i < added.size(); ++i)
block.insert(added.moveColumn(i));

View File

@ -131,7 +131,9 @@ public:
size_t getTotalByteCount() const;
ASTTableJoin::Kind getKind() const { return kind; }
AsofRowRefs::Type getAsofType() const { return asof_type; }
AsofRowRefs::Type getAsofType() const { return *asof_type; }
AsofRowRefs::LookupLists & getAsofData() { return asof_lookup_lists; }
const AsofRowRefs::LookupLists & getAsofData() const { return asof_lookup_lists; }
/** Depending on template parameter, adds or doesn't add a flag, that element was used (row was joined).
* Depending on template parameter, decide whether to overwrite existing values when encountering the same key again
@ -366,7 +368,8 @@ private:
private:
Type type = Type::EMPTY;
AsofRowRefs::Type asof_type = AsofRowRefs::Type::EMPTY;
std::optional<AsofRowRefs::Type> asof_type;
AsofRowRefs::LookupLists asof_lookup_lists;
static Type chooseMethod(const ColumnRawPtrs & key_columns, Sizes & key_sizes);

View File

@ -1,74 +1,111 @@
#include <Interpreters/RowRefs.h>
#include <Common/typeid_cast.h>
#include <Common/ColumnsHashing.h>
#include <Core/Block.h>
#include <Columns/IColumn.h>
#include <optional>
namespace DB
{
void AsofRowRefs::Lookups::create(AsofRowRefs::Type which)
namespace
{
/// maps enum values to types
template <typename F>
void callWithType(AsofRowRefs::Type which, F && f)
{
switch (which)
{
case Type::EMPTY: break;
#define M(NAME, TYPE) \
case Type::NAME: NAME = std::make_unique<typename decltype(NAME)::element_type>(); break;
APPLY_FOR_ASOF_JOIN_VARIANTS(M)
#undef M
}
}
template<typename T>
using AsofGetterType = ColumnsHashing::HashMethodOneNumber<T, T, T, false>;
void AsofRowRefs::insert(const IColumn * asof_column, const Block * block, size_t row_num, Arena & pool)
{
switch (type)
{
case Type::EMPTY: break;
#define M(NAME, TYPE) \
case Type::NAME: { \
auto asof_getter = AsofGetterType<TYPE>(asof_column); \
auto entry = Entry<TYPE>(asof_getter.getKey(row_num, pool), RowRef(block, row_num)); \
lookups.NAME->insert(entry); \
break; \
}
APPLY_FOR_ASOF_JOIN_VARIANTS(M)
#undef M
}
}
const RowRef * AsofRowRefs::findAsof(const IColumn * asof_column, size_t row_num, Arena & pool) const
{
switch (type)
{
case Type::EMPTY: return nullptr;
#define M(NAME, TYPE) \
case Type::NAME: { \
auto asof_getter = AsofGetterType<TYPE>(asof_column); \
TYPE key = asof_getter.getKey(row_num, pool); \
auto it = lookups.NAME->upper_bound(Entry<TYPE>(key)); \
if (it == lookups.NAME->cbegin()) \
return nullptr; \
return &((--it)->row_ref); \
}
APPLY_FOR_ASOF_JOIN_VARIANTS(M)
#undef M
case AsofRowRefs::Type::key32: return f(UInt32());
case AsofRowRefs::Type::key64: return f(UInt64());
case AsofRowRefs::Type::keyf32: return f(Float32());
case AsofRowRefs::Type::keyf64: return f(Float64());
}
__builtin_unreachable();
}
std::optional<std::pair<AsofRowRefs::Type, size_t>> AsofRowRefs::getTypeSize(const IColumn * asof_column)
} // namespace
void AsofRowRefs::insert(Type type, LookupLists & lookup_data, const IColumn * asof_column, const Block * block, size_t row_num)
{
#define M(NAME, TYPE) \
if (strcmp(#TYPE, asof_column->getFamilyName()) == 0) \
return std::make_pair(Type::NAME,sizeof(TYPE));
APPLY_FOR_ASOF_JOIN_VARIANTS(M)
#undef M
auto call = [&](const auto & t)
{
using T = std::decay_t<decltype(t)>;
using LookupType = typename Entry<T>::LookupType;
auto * column = typeid_cast<const ColumnVector<T> *>(asof_column);
T key = column->getElement(row_num);
auto entry = Entry<T>(key, RowRef(block, row_num));
std::lock_guard<std::mutex> lock(lookup_data.mutex);
if (!lookups)
{
lookup_data.lookups.push_back(Lookups());
lookup_data.lookups.back() = LookupType();
lookups = &lookup_data.lookups.back();
}
std::get<LookupType>(*lookups).insert(entry);
};
callWithType(type, call);
}
const RowRef * AsofRowRefs::findAsof(Type type, const LookupLists & lookup_data, const IColumn * asof_column, size_t row_num) const
{
const RowRef * out = nullptr;
auto call = [&](const auto & t)
{
using T = std::decay_t<decltype(t)>;
using LookupType = typename Entry<T>::LookupType;
auto * column = typeid_cast<const ColumnVector<T> *>(asof_column);
T key = column->getElement(row_num);
std::lock_guard<std::mutex> lock(lookup_data.mutex);
if (!lookups)
return;
auto & typed_lookup = std::get<LookupType>(*lookups);
auto it = typed_lookup.upper_bound(Entry<T>(key));
if (it != typed_lookup.cbegin())
out = &((--it)->row_ref);
};
callWithType(type, call);
return out;
}
std::optional<AsofRowRefs::Type> AsofRowRefs::getTypeSize(const IColumn * asof_column, size_t & size)
{
if (typeid_cast<const ColumnVector<UInt32> *>(asof_column))
{
size = sizeof(UInt32);
return Type::key32;
}
else if (typeid_cast<const ColumnVector<UInt64> *>(asof_column))
{
size = sizeof(UInt64);
return Type::key64;
}
else if (typeid_cast<const ColumnVector<Float32> *>(asof_column))
{
size = sizeof(Float32);
return Type::keyf32;
}
else if (typeid_cast<const ColumnVector<Float64> *>(asof_column))
{
size = sizeof(Float64);
return Type::keyf64;
}
size = 0;
return {};
}

View File

@ -4,6 +4,9 @@
#include <Common/SortedLookupPODArray.h>
#include <optional>
#include <variant>
#include <list>
#include <mutex>
namespace DB
{
@ -32,60 +35,50 @@ struct RowRefList : RowRef
class AsofRowRefs
{
public:
/// Different types of asof join keys
#define APPLY_FOR_ASOF_JOIN_VARIANTS(M) \
M(key32, UInt32) \
M(key64, UInt64) \
M(keyf32, Float32) \
M(keyf64, Float64)
enum class Type
{
EMPTY,
#define M(NAME, TYPE) NAME,
APPLY_FOR_ASOF_JOIN_VARIANTS(M)
#undef M
};
static std::optional<std::pair<Type, size_t>> getTypeSize(const IColumn * asof_column);
template<typename T>
template <typename T>
struct Entry
{
using LookupType = SortedLookupPODArray<Entry<T>>;
T asof_value;
RowRef row_ref;
Entry(T v) : asof_value(v) {}
Entry(T v, RowRef rr) : asof_value(v), row_ref(rr) {}
bool operator< (const Entry& o) const
bool operator < (const Entry & o) const
{
return asof_value < o.asof_value;
}
};
struct Lookups
{
#define M(NAME, TYPE) \
std::unique_ptr<SortedLookupPODArray<Entry<TYPE>>> NAME;
APPLY_FOR_ASOF_JOIN_VARIANTS(M)
#undef M
using Lookups = std::variant<
Entry<UInt32>::LookupType,
Entry<UInt64>::LookupType,
Entry<Float32>::LookupType,
Entry<Float64>::LookupType>;
void create(Type which);
struct LookupLists
{
mutable std::mutex mutex;
std::list<Lookups> lookups;
};
AsofRowRefs() : type(Type::EMPTY) {}
AsofRowRefs(Type t) : type(t)
enum class Type
{
lookups.create(t);
}
key32,
key64,
keyf32,
keyf64,
};
void insert(const IColumn * asof_column, const Block * block, size_t row_num, Arena & pool);
const RowRef * findAsof(const IColumn * asof_column, size_t row_num, Arena & pool) const;
static std::optional<Type> getTypeSize(const IColumn * asof_column, size_t & type_size);
void insert(Type type, LookupLists &, const IColumn * asof_column, const Block * block, size_t row_num);
const RowRef * findAsof(Type type, const LookupLists &, const IColumn * asof_column, size_t row_num) const;
private:
const Type type;
mutable Lookups lookups;
Lookups * lookups = nullptr;
};
}