fix multithreaded ASOF JOIN crash

This commit is contained in:
chertus 2019-04-02 21:50:35 +03:00
parent 04efcf2bdc
commit d5ffbd9b6d
5 changed files with 59 additions and 57 deletions

View File

@ -1,6 +1,7 @@
#pragma once
#include <Common/PODArray.h>
#include <vector>
//#include <Common/PODArray.h>
namespace DB
{
@ -17,7 +18,8 @@ template <typename T>
class SortedLookupPODArray
{
public:
using Base = PaddedPODArray<T>;
using Base = std::vector<T>;
//using Base = PaddedPODArray<T>;
template <typename U, typename ... TAllocatorParams>
void insert(U && x, TAllocatorParams &&... allocator_params)

View File

@ -415,21 +415,22 @@ namespace
template <typename Map, typename KeyGetter>
struct Inserter<ASTTableJoin::Strictness::Asof, Map, KeyGetter>
{
static ALWAYS_INLINE void insert(const Join & join, Map & map, KeyGetter & key_getter, Block * stored_block, size_t i, Arena & pool, const IColumn * asof_column)
static ALWAYS_INLINE void insert(Join & join, Map & map, KeyGetter & key_getter, Block * stored_block, size_t i, Arena & pool,
const IColumn * asof_column)
{
auto emplace_result = key_getter.emplaceKey(map, i, pool);
typename Map::mapped_type * time_series_map = &emplace_result.getMapped();
if (emplace_result.isInserted())
time_series_map = new (time_series_map) typename Map::mapped_type(join.getAsofType());
time_series_map->insert(asof_column, stored_block, i);
time_series_map = new (time_series_map) typename Map::mapped_type();
time_series_map->insert(join.getAsofType(), join.getAsofData(), asof_column, stored_block, i);
}
};
template <ASTTableJoin::Strictness STRICTNESS, typename KeyGetter, typename Map, bool has_null_map>
void NO_INLINE insertFromBlockImplTypeCase(
const Join & join, Map & map, size_t rows, const ColumnRawPtrs & key_columns,
Join & join, Map & map, size_t rows, const ColumnRawPtrs & key_columns,
const Sizes & key_sizes, Block * stored_block, ConstNullMapPtr null_map, Arena & pool)
{
const IColumn * asof_column [[maybe_unused]] = nullptr;
@ -453,7 +454,7 @@ namespace
template <ASTTableJoin::Strictness STRICTNESS, typename KeyGetter, typename Map>
void insertFromBlockImplType(
const Join & join, Map & map, size_t rows, const ColumnRawPtrs & key_columns,
Join & join, Map & map, size_t rows, const ColumnRawPtrs & key_columns,
const Sizes & key_sizes, Block * stored_block, ConstNullMapPtr null_map, Arena & pool)
{
if (null_map)
@ -465,7 +466,7 @@ namespace
template <ASTTableJoin::Strictness STRICTNESS, typename Maps>
void insertFromBlockImpl(
const Join & join, Join::Type type, Maps & maps, size_t rows, const ColumnRawPtrs & key_columns,
Join & join, Join::Type type, Maps & maps, size_t rows, const ColumnRawPtrs & key_columns,
const Sizes & key_sizes, Block * stored_block, ConstNullMapPtr null_map, Arena & pool)
{
switch (type)
@ -686,7 +687,7 @@ void addNotFoundRow(AddedColumns & added [[maybe_unused]], IColumn::Offset & cur
/// Makes filter (1 if row presented in right table) and returns offsets to replicate (for ALL JOINS).
template <bool _add_missing, ASTTableJoin::Strictness STRICTNESS, typename KeyGetter, typename Map, bool _has_null_map>
std::unique_ptr<IColumn::Offsets> NO_INLINE joinRightIndexedColumns(
const Map & map, size_t rows, const ColumnRawPtrs & key_columns, const Sizes & key_sizes,
const Join & join, const Map & map, size_t rows, const ColumnRawPtrs & key_columns, const Sizes & key_sizes,
AddedColumns & added_columns, ConstNullMapPtr null_map, IColumn::Filter & filter)
{
std::unique_ptr<IColumn::Offsets> offsets_to_replicate;
@ -719,7 +720,7 @@ std::unique_ptr<IColumn::Offsets> NO_INLINE joinRightIndexedColumns(
if constexpr (STRICTNESS == ASTTableJoin::Strictness::Asof)
{
if (const RowRef * found = mapped.findAsof(asof_column, i))
if (const RowRef * found = mapped.findAsof(join.getAsofType(), join.getAsofData(), asof_column, i))
{
filter[i] = 1;
mapped.setUsed();
@ -748,7 +749,7 @@ std::unique_ptr<IColumn::Offsets> NO_INLINE joinRightIndexedColumns(
template <ASTTableJoin::Kind KIND, ASTTableJoin::Strictness STRICTNESS, typename KeyGetter, typename Map>
IColumn::Filter joinRightColumns(
const Map & map, size_t rows, const ColumnRawPtrs & key_columns, const Sizes & key_sizes,
const Join & join, const Map & map, size_t rows, const ColumnRawPtrs & key_columns, const Sizes & key_sizes,
AddedColumns & added_columns, ConstNullMapPtr null_map, std::unique_ptr<IColumn::Offsets> & offsets_to_replicate)
{
constexpr bool left_or_full = static_in_v<KIND, ASTTableJoin::Kind::Left, ASTTableJoin::Kind::Full>;
@ -757,17 +758,17 @@ IColumn::Filter joinRightColumns(
if (null_map)
offsets_to_replicate = joinRightIndexedColumns<left_or_full, STRICTNESS, KeyGetter, Map, true>(
map, rows, key_columns, key_sizes, added_columns, null_map, filter);
join, map, rows, key_columns, key_sizes, added_columns, null_map, filter);
else
offsets_to_replicate = joinRightIndexedColumns<left_or_full, STRICTNESS, KeyGetter, Map, false>(
map, rows, key_columns, key_sizes, added_columns, null_map, filter);
join, map, rows, key_columns, key_sizes, added_columns, null_map, filter);
return filter;
}
template <ASTTableJoin::Kind KIND, ASTTableJoin::Strictness STRICTNESS, typename Maps>
IColumn::Filter switchJoinRightColumns(
Join::Type type,
Join::Type type, const Join & join,
const Maps & maps_, size_t rows, const ColumnRawPtrs & key_columns, const Sizes & key_sizes,
AddedColumns & added_columns, ConstNullMapPtr null_map,
std::unique_ptr<IColumn::Offsets> & offsets_to_replicate)
@ -777,7 +778,7 @@ IColumn::Filter switchJoinRightColumns(
#define M(TYPE) \
case Join::Type::TYPE: \
return joinRightColumns<KIND, STRICTNESS, typename KeyGetterForType<Join::Type::TYPE, const std::remove_reference_t<decltype(*maps_.TYPE)>>::Type>(\
*maps_.TYPE, rows, key_columns, key_sizes, added_columns, null_map, offsets_to_replicate);
join, *maps_.TYPE, rows, key_columns, key_sizes, added_columns, null_map, offsets_to_replicate);
APPLY_FOR_JOIN_VARIANTS(M)
#undef M
@ -851,7 +852,7 @@ void Join::joinBlockImpl(
std::unique_ptr<IColumn::Offsets> offsets_to_replicate;
IColumn::Filter row_filter = switchJoinRightColumns<KIND, STRICTNESS>(
type, maps_, block.rows(), key_columns, key_sizes, added, null_map, offsets_to_replicate);
type, *this, maps_, block.rows(), key_columns, key_sizes, added, null_map, offsets_to_replicate);
for (size_t i = 0; i < added.size(); ++i)
block.insert(added.moveColumn(i));

View File

@ -132,6 +132,8 @@ public:
ASTTableJoin::Kind getKind() const { return kind; }
AsofRowRefs::Type getAsofType() const { return *asof_type; }
AsofRowRefs::LookupLists & getAsofData() { return asof_lookup_lists; }
const AsofRowRefs::LookupLists & getAsofData() const { return asof_lookup_lists; }
/** Depending on template parameter, adds or doesn't add a flag, that element was used (row was joined).
* Depending on template parameter, decide whether to overwrite existing values when encountering the same key again
@ -367,6 +369,7 @@ private:
private:
Type type = Type::EMPTY;
std::optional<AsofRowRefs::Type> asof_type;
AsofRowRefs::LookupLists asof_lookup_lists;
static Type chooseMethod(const ColumnRawPtrs & key_columns, Sizes & key_sizes);

View File

@ -30,56 +30,55 @@ void callWithType(AsofRowRefs::Type which, F && f)
} // namespace
void AsofRowRefs::createLookup(AsofRowRefs::Type which)
void AsofRowRefs::insert(Type type, LookupLists & lookup_data, const IColumn * asof_column, const Block * block, size_t row_num)
{
auto call = [&](const auto & t)
{
using T = std::decay_t<decltype(t)>;
using LookupType = typename Entry<T>::LookupType;
lookups = std::make_unique<LookupType>();
};
callWithType(which, call);
}
void AsofRowRefs::insert(const IColumn * asof_column, const Block * block, size_t row_num)
{
auto call = [&](const auto & t)
{
using T = std::decay_t<decltype(t)>;
using LookupPtr = typename Entry<T>::LookupPtr;
auto * column = typeid_cast<const ColumnVector<T> *>(asof_column);
T key = column->getElement(row_num);
auto entry = Entry<T>(key, RowRef(block, row_num));
std::get<LookupPtr>(lookups)->insert(entry);
std::lock_guard<std::mutex> lock(lookup_data.mutex);
if (!lookups)
{
lookup_data.lookups.push_back(Lookups());
lookup_data.lookups.back() = LookupType();
lookups = &lookup_data.lookups.back();
}
std::get<LookupType>(*lookups).insert(entry);
};
callWithType(*type, call);
callWithType(type, call);
}
const RowRef * AsofRowRefs::findAsof(const IColumn * asof_column, size_t row_num) const
const RowRef * AsofRowRefs::findAsof(Type type, const LookupLists & lookup_data, const IColumn * asof_column, size_t row_num) const
{
const RowRef * out = nullptr;
auto call = [&](const auto & t)
{
using T = std::decay_t<decltype(t)>;
using LookupPtr = typename Entry<T>::LookupPtr;
using LookupType = typename Entry<T>::LookupType;
auto * column = typeid_cast<const ColumnVector<T> *>(asof_column);
T key = column->getElement(row_num);
auto & typed_lookup = std::get<LookupPtr>(lookups);
auto it = typed_lookup->upper_bound(Entry<T>(key));
if (it != typed_lookup->cbegin())
std::lock_guard<std::mutex> lock(lookup_data.mutex);
if (!lookups)
return;
auto & typed_lookup = std::get<LookupType>(*lookups);
auto it = typed_lookup.upper_bound(Entry<T>(key));
if (it != typed_lookup.cbegin())
out = &((--it)->row_ref);
};
callWithType(*type, call);
callWithType(type, call);
return out;
}

View File

@ -5,6 +5,8 @@
#include <optional>
#include <variant>
#include <list>
#include <mutex>
namespace DB
{
@ -37,7 +39,6 @@ public:
struct Entry
{
using LookupType = SortedLookupPODArray<Entry<T>>;
using LookupPtr = std::unique_ptr<LookupType>;
T asof_value;
RowRef row_ref;
@ -52,10 +53,16 @@ public:
};
using Lookups = std::variant<
Entry<UInt32>::LookupPtr,
Entry<UInt64>::LookupPtr,
Entry<Float32>::LookupPtr,
Entry<Float64>::LookupPtr>;
Entry<UInt32>::LookupType,
Entry<UInt64>::LookupType,
Entry<Float32>::LookupType,
Entry<Float64>::LookupType>;
struct LookupLists
{
mutable std::mutex mutex;
std::list<Lookups> lookups;
};
enum class Type
{
@ -67,21 +74,11 @@ public:
static std::optional<Type> getTypeSize(const IColumn * asof_column, size_t & type_size);
AsofRowRefs() = default;
AsofRowRefs(Type t)
: type(t)
{
createLookup(t);
}
void insert(const IColumn * asof_column, const Block * block, size_t row_num);
const RowRef * findAsof(const IColumn * asof_column, size_t row_num) const;
void insert(Type type, LookupLists &, const IColumn * asof_column, const Block * block, size_t row_num);
const RowRef * findAsof(Type type, const LookupLists &, const IColumn * asof_column, size_t row_num) const;
private:
const std::optional<Type> type = {};
mutable Lookups lookups;
void createLookup(Type which);
Lookups * lookups = nullptr;
};
}