mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 07:31:57 +00:00
asof refactoring (searching for the crash reason)
This commit is contained in:
parent
886e6883e6
commit
04efcf2bdc
@ -13,36 +13,38 @@ namespace DB
|
||||
* This way the data only gets sorted once.
|
||||
*/
|
||||
|
||||
template <typename T, size_t INITIAL_SIZE = 4096, typename TAllocator = Allocator<false>>
|
||||
class SortedLookupPODArray : private PaddedPODArray<T, INITIAL_SIZE, TAllocator>
|
||||
template <typename T>
|
||||
class SortedLookupPODArray
|
||||
{
|
||||
public:
|
||||
using Base = PaddedPODArray<T, INITIAL_SIZE, TAllocator>;
|
||||
using typename Base::PODArray;
|
||||
using Base::cbegin;
|
||||
using Base::cend;
|
||||
using Base = PaddedPODArray<T>;
|
||||
|
||||
template <typename U, typename ... TAllocatorParams>
|
||||
void insert(U && x, TAllocatorParams &&... allocator_params)
|
||||
{
|
||||
Base::push_back(std::forward<U>(x), std::forward<TAllocatorParams>(allocator_params)...);
|
||||
array.push_back(std::forward<U>(x), std::forward<TAllocatorParams>(allocator_params)...);
|
||||
sorted = false;
|
||||
}
|
||||
|
||||
typename Base::const_iterator upper_bound (const T& k)
|
||||
typename Base::const_iterator upper_bound(const T & k)
|
||||
{
|
||||
if (!sorted)
|
||||
this->sort();
|
||||
return std::upper_bound(this->cbegin(), this->cend(), k);
|
||||
}
|
||||
private:
|
||||
void sort()
|
||||
{
|
||||
std::sort(this->begin(), this->end());
|
||||
sorted = true;
|
||||
sort();
|
||||
return std::upper_bound(array.cbegin(), array.cend(), k);
|
||||
}
|
||||
|
||||
typename Base::const_iterator cbegin() const { return array.cbegin(); }
|
||||
typename Base::const_iterator cend() const { return array.cend(); }
|
||||
|
||||
private:
|
||||
Base array;
|
||||
bool sorted = false;
|
||||
|
||||
void sort()
|
||||
{
|
||||
std::sort(array.begin(), array.end());
|
||||
sorted = true;
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -422,7 +422,7 @@ namespace
|
||||
|
||||
if (emplace_result.isInserted())
|
||||
time_series_map = new (time_series_map) typename Map::mapped_type(join.getAsofType());
|
||||
time_series_map->insert(asof_column, stored_block, i, pool);
|
||||
time_series_map->insert(asof_column, stored_block, i);
|
||||
}
|
||||
};
|
||||
|
||||
@ -719,7 +719,7 @@ std::unique_ptr<IColumn::Offsets> NO_INLINE joinRightIndexedColumns(
|
||||
|
||||
if constexpr (STRICTNESS == ASTTableJoin::Strictness::Asof)
|
||||
{
|
||||
if (const RowRef * found = mapped.findAsof(asof_column, i, pool))
|
||||
if (const RowRef * found = mapped.findAsof(asof_column, i))
|
||||
{
|
||||
filter[i] = 1;
|
||||
mapped.setUsed();
|
||||
|
@ -18,10 +18,10 @@ void callWithType(AsofRowRefs::Type which, F && f)
|
||||
{
|
||||
switch (which)
|
||||
{
|
||||
case AsofRowRefs::Type::key32: return f(AsofRowRefs::LookupTypes<UInt32>());
|
||||
case AsofRowRefs::Type::key64: return f(AsofRowRefs::LookupTypes<UInt64>());
|
||||
case AsofRowRefs::Type::keyf32: return f(AsofRowRefs::LookupTypes<Float32>());
|
||||
case AsofRowRefs::Type::keyf64: return f(AsofRowRefs::LookupTypes<Float64>());
|
||||
case AsofRowRefs::Type::key32: return f(UInt32());
|
||||
case AsofRowRefs::Type::key64: return f(UInt64());
|
||||
case AsofRowRefs::Type::keyf32: return f(Float32());
|
||||
case AsofRowRefs::Type::keyf64: return f(Float64());
|
||||
}
|
||||
|
||||
__builtin_unreachable();
|
||||
@ -32,52 +32,49 @@ void callWithType(AsofRowRefs::Type which, F && f)
|
||||
|
||||
void AsofRowRefs::createLookup(AsofRowRefs::Type which)
|
||||
{
|
||||
auto call = [&](const auto & types)
|
||||
auto call = [&](const auto & t)
|
||||
{
|
||||
using Types = std::decay_t<decltype(types)>;
|
||||
using SearcherType = typename Types::SearcherType;
|
||||
using T = std::decay_t<decltype(t)>;
|
||||
using LookupType = typename Entry<T>::LookupType;
|
||||
|
||||
lookups = std::make_unique<SearcherType>();
|
||||
lookups = std::make_unique<LookupType>();
|
||||
};
|
||||
|
||||
callWithType(which, call);
|
||||
}
|
||||
|
||||
template<typename T>
|
||||
using AsofGetterType = ColumnsHashing::HashMethodOneNumber<T, T, T, false>;
|
||||
|
||||
void AsofRowRefs::insert(const IColumn * asof_column, const Block * block, size_t row_num, Arena & pool)
|
||||
void AsofRowRefs::insert(const IColumn * asof_column, const Block * block, size_t row_num)
|
||||
{
|
||||
auto call = [&](const auto & types)
|
||||
auto call = [&](const auto & t)
|
||||
{
|
||||
using Types = std::decay_t<decltype(types)>;
|
||||
using ElementType = typename Types::ElementType;
|
||||
using SearcherPtr = typename Types::Ptr;
|
||||
using T = std::decay_t<decltype(t)>;
|
||||
using LookupPtr = typename Entry<T>::LookupPtr;
|
||||
|
||||
auto asof_getter = AsofGetterType<ElementType>(asof_column);
|
||||
auto entry = Entry<ElementType>(asof_getter.getKey(row_num, pool), RowRef(block, row_num));
|
||||
auto * column = typeid_cast<const ColumnVector<T> *>(asof_column);
|
||||
T key = column->getElement(row_num);
|
||||
auto entry = Entry<T>(key, RowRef(block, row_num));
|
||||
|
||||
std::get<SearcherPtr>(lookups)->insert(entry);
|
||||
std::get<LookupPtr>(lookups)->insert(entry);
|
||||
};
|
||||
|
||||
callWithType(*type, call);
|
||||
}
|
||||
|
||||
const RowRef * AsofRowRefs::findAsof(const IColumn * asof_column, size_t row_num, Arena & pool) const
|
||||
const RowRef * AsofRowRefs::findAsof(const IColumn * asof_column, size_t row_num) const
|
||||
{
|
||||
const RowRef * out = nullptr;
|
||||
|
||||
auto call = [&](const auto & types)
|
||||
auto call = [&](const auto & t)
|
||||
{
|
||||
using Types = std::decay_t<decltype(types)>;
|
||||
using ElementType = typename Types::ElementType;
|
||||
using SearcherPtr = typename Types::Ptr;
|
||||
using T = std::decay_t<decltype(t)>;
|
||||
using LookupPtr = typename Entry<T>::LookupPtr;
|
||||
|
||||
auto asof_getter = AsofGetterType<ElementType>(asof_column);
|
||||
ElementType key = asof_getter.getKey(row_num, pool);
|
||||
auto & typed_lookup = std::get<SearcherPtr>(lookups);
|
||||
auto * column = typeid_cast<const ColumnVector<T> *>(asof_column);
|
||||
T key = column->getElement(row_num);
|
||||
|
||||
auto it = typed_lookup->upper_bound(Entry<ElementType>(key));
|
||||
auto & typed_lookup = std::get<LookupPtr>(lookups);
|
||||
auto it = typed_lookup->upper_bound(Entry<T>(key));
|
||||
if (it != typed_lookup->cbegin())
|
||||
out = &((--it)->row_ref);
|
||||
};
|
||||
|
@ -33,34 +33,29 @@ struct RowRefList : RowRef
|
||||
class AsofRowRefs
|
||||
{
|
||||
public:
|
||||
template<typename T>
|
||||
template <typename T>
|
||||
struct Entry
|
||||
{
|
||||
using LookupType = SortedLookupPODArray<Entry<T>>;
|
||||
using LookupPtr = std::unique_ptr<LookupType>;
|
||||
|
||||
T asof_value;
|
||||
RowRef row_ref;
|
||||
|
||||
Entry(T v) : asof_value(v) {}
|
||||
Entry(T v, RowRef rr) : asof_value(v), row_ref(rr) {}
|
||||
|
||||
bool operator< (const Entry& o) const
|
||||
bool operator < (const Entry & o) const
|
||||
{
|
||||
return asof_value < o.asof_value;
|
||||
}
|
||||
};
|
||||
|
||||
template <typename T>
|
||||
struct LookupTypes
|
||||
{
|
||||
using ElementType = T;
|
||||
using SearcherType = SortedLookupPODArray<Entry<T>>;
|
||||
using Ptr = std::unique_ptr<SearcherType>;
|
||||
};
|
||||
|
||||
using Lookups = std::variant<
|
||||
LookupTypes<UInt32>::Ptr,
|
||||
LookupTypes<UInt64>::Ptr,
|
||||
LookupTypes<Float32>::Ptr,
|
||||
LookupTypes<Float64>::Ptr>;
|
||||
Entry<UInt32>::LookupPtr,
|
||||
Entry<UInt64>::LookupPtr,
|
||||
Entry<Float32>::LookupPtr,
|
||||
Entry<Float64>::LookupPtr>;
|
||||
|
||||
enum class Type
|
||||
{
|
||||
@ -79,8 +74,8 @@ public:
|
||||
createLookup(t);
|
||||
}
|
||||
|
||||
void insert(const IColumn * asof_column, const Block * block, size_t row_num, Arena & pool);
|
||||
const RowRef * findAsof(const IColumn * asof_column, size_t row_num, Arena & pool) const;
|
||||
void insert(const IColumn * asof_column, const Block * block, size_t row_num);
|
||||
const RowRef * findAsof(const IColumn * asof_column, size_t row_num) const;
|
||||
|
||||
private:
|
||||
const std::optional<Type> type = {};
|
||||
|
Loading…
Reference in New Issue
Block a user