From d5ffbd9b6d28719174a3658d3589ba7a0ad54ce9 Mon Sep 17 00:00:00 2001 From: chertus Date: Tue, 2 Apr 2019 21:50:35 +0300 Subject: [PATCH] fix multithreaded ASOF JOIN crash --- dbms/src/Common/SortedLookupPODArray.h | 6 ++-- dbms/src/Interpreters/Join.cpp | 29 +++++++++-------- dbms/src/Interpreters/Join.h | 3 ++ dbms/src/Interpreters/RowRefs.cpp | 45 +++++++++++++------------- dbms/src/Interpreters/RowRefs.h | 33 +++++++++---------- 5 files changed, 59 insertions(+), 57 deletions(-) diff --git a/dbms/src/Common/SortedLookupPODArray.h b/dbms/src/Common/SortedLookupPODArray.h index ce96e8e2839..d9b03f5704d 100644 --- a/dbms/src/Common/SortedLookupPODArray.h +++ b/dbms/src/Common/SortedLookupPODArray.h @@ -1,6 +1,7 @@ #pragma once -#include +#include +//#include namespace DB { @@ -17,7 +18,8 @@ template class SortedLookupPODArray { public: - using Base = PaddedPODArray; + using Base = std::vector; + //using Base = PaddedPODArray; template void insert(U && x, TAllocatorParams &&... allocator_params) diff --git a/dbms/src/Interpreters/Join.cpp b/dbms/src/Interpreters/Join.cpp index be4284004ef..7faaac5f607 100644 --- a/dbms/src/Interpreters/Join.cpp +++ b/dbms/src/Interpreters/Join.cpp @@ -415,21 +415,22 @@ namespace template struct Inserter { - static ALWAYS_INLINE void insert(const Join & join, Map & map, KeyGetter & key_getter, Block * stored_block, size_t i, Arena & pool, const IColumn * asof_column) + static ALWAYS_INLINE void insert(Join & join, Map & map, KeyGetter & key_getter, Block * stored_block, size_t i, Arena & pool, + const IColumn * asof_column) { auto emplace_result = key_getter.emplaceKey(map, i, pool); typename Map::mapped_type * time_series_map = &emplace_result.getMapped(); if (emplace_result.isInserted()) - time_series_map = new (time_series_map) typename Map::mapped_type(join.getAsofType()); - time_series_map->insert(asof_column, stored_block, i); + time_series_map = new (time_series_map) typename Map::mapped_type(); + time_series_map->insert(join.getAsofType(), join.getAsofData(), asof_column, stored_block, i); } }; template void NO_INLINE insertFromBlockImplTypeCase( - const Join & join, Map & map, size_t rows, const ColumnRawPtrs & key_columns, + Join & join, Map & map, size_t rows, const ColumnRawPtrs & key_columns, const Sizes & key_sizes, Block * stored_block, ConstNullMapPtr null_map, Arena & pool) { const IColumn * asof_column [[maybe_unused]] = nullptr; @@ -453,7 +454,7 @@ namespace template void insertFromBlockImplType( - const Join & join, Map & map, size_t rows, const ColumnRawPtrs & key_columns, + Join & join, Map & map, size_t rows, const ColumnRawPtrs & key_columns, const Sizes & key_sizes, Block * stored_block, ConstNullMapPtr null_map, Arena & pool) { if (null_map) @@ -465,7 +466,7 @@ namespace template void insertFromBlockImpl( - const Join & join, Join::Type type, Maps & maps, size_t rows, const ColumnRawPtrs & key_columns, + Join & join, Join::Type type, Maps & maps, size_t rows, const ColumnRawPtrs & key_columns, const Sizes & key_sizes, Block * stored_block, ConstNullMapPtr null_map, Arena & pool) { switch (type) @@ -686,7 +687,7 @@ void addNotFoundRow(AddedColumns & added [[maybe_unused]], IColumn::Offset & cur /// Makes filter (1 if row presented in right table) and returns offsets to replicate (for ALL JOINS). template std::unique_ptr NO_INLINE joinRightIndexedColumns( - const Map & map, size_t rows, const ColumnRawPtrs & key_columns, const Sizes & key_sizes, + const Join & join, const Map & map, size_t rows, const ColumnRawPtrs & key_columns, const Sizes & key_sizes, AddedColumns & added_columns, ConstNullMapPtr null_map, IColumn::Filter & filter) { std::unique_ptr offsets_to_replicate; @@ -719,7 +720,7 @@ std::unique_ptr NO_INLINE joinRightIndexedColumns( if constexpr (STRICTNESS == ASTTableJoin::Strictness::Asof) { - if (const RowRef * found = mapped.findAsof(asof_column, i)) + if (const RowRef * found = mapped.findAsof(join.getAsofType(), join.getAsofData(), asof_column, i)) { filter[i] = 1; mapped.setUsed(); @@ -748,7 +749,7 @@ std::unique_ptr NO_INLINE joinRightIndexedColumns( template IColumn::Filter joinRightColumns( - const Map & map, size_t rows, const ColumnRawPtrs & key_columns, const Sizes & key_sizes, + const Join & join, const Map & map, size_t rows, const ColumnRawPtrs & key_columns, const Sizes & key_sizes, AddedColumns & added_columns, ConstNullMapPtr null_map, std::unique_ptr & offsets_to_replicate) { constexpr bool left_or_full = static_in_v; @@ -757,17 +758,17 @@ IColumn::Filter joinRightColumns( if (null_map) offsets_to_replicate = joinRightIndexedColumns( - map, rows, key_columns, key_sizes, added_columns, null_map, filter); + join, map, rows, key_columns, key_sizes, added_columns, null_map, filter); else offsets_to_replicate = joinRightIndexedColumns( - map, rows, key_columns, key_sizes, added_columns, null_map, filter); + join, map, rows, key_columns, key_sizes, added_columns, null_map, filter); return filter; } template IColumn::Filter switchJoinRightColumns( - Join::Type type, + Join::Type type, const Join & join, const Maps & maps_, size_t rows, const ColumnRawPtrs & key_columns, const Sizes & key_sizes, AddedColumns & added_columns, ConstNullMapPtr null_map, std::unique_ptr & offsets_to_replicate) @@ -777,7 +778,7 @@ IColumn::Filter switchJoinRightColumns( #define M(TYPE) \ case Join::Type::TYPE: \ return joinRightColumns>::Type>(\ - *maps_.TYPE, rows, key_columns, key_sizes, added_columns, null_map, offsets_to_replicate); + join, *maps_.TYPE, rows, key_columns, key_sizes, added_columns, null_map, offsets_to_replicate); APPLY_FOR_JOIN_VARIANTS(M) #undef M @@ -851,7 +852,7 @@ void Join::joinBlockImpl( std::unique_ptr offsets_to_replicate; IColumn::Filter row_filter = switchJoinRightColumns( - type, maps_, block.rows(), key_columns, key_sizes, added, null_map, offsets_to_replicate); + type, *this, maps_, block.rows(), key_columns, key_sizes, added, null_map, offsets_to_replicate); for (size_t i = 0; i < added.size(); ++i) block.insert(added.moveColumn(i)); diff --git a/dbms/src/Interpreters/Join.h b/dbms/src/Interpreters/Join.h index 7a223f46b35..85255aaaaa0 100644 --- a/dbms/src/Interpreters/Join.h +++ b/dbms/src/Interpreters/Join.h @@ -132,6 +132,8 @@ public: ASTTableJoin::Kind getKind() const { return kind; } AsofRowRefs::Type getAsofType() const { return *asof_type; } + AsofRowRefs::LookupLists & getAsofData() { return asof_lookup_lists; } + const AsofRowRefs::LookupLists & getAsofData() const { return asof_lookup_lists; } /** Depending on template parameter, adds or doesn't add a flag, that element was used (row was joined). * Depending on template parameter, decide whether to overwrite existing values when encountering the same key again @@ -367,6 +369,7 @@ private: private: Type type = Type::EMPTY; std::optional asof_type; + AsofRowRefs::LookupLists asof_lookup_lists; static Type chooseMethod(const ColumnRawPtrs & key_columns, Sizes & key_sizes); diff --git a/dbms/src/Interpreters/RowRefs.cpp b/dbms/src/Interpreters/RowRefs.cpp index 95d2c796b23..9fea9819132 100644 --- a/dbms/src/Interpreters/RowRefs.cpp +++ b/dbms/src/Interpreters/RowRefs.cpp @@ -30,56 +30,55 @@ void callWithType(AsofRowRefs::Type which, F && f) } // namespace -void AsofRowRefs::createLookup(AsofRowRefs::Type which) +void AsofRowRefs::insert(Type type, LookupLists & lookup_data, const IColumn * asof_column, const Block * block, size_t row_num) { auto call = [&](const auto & t) { using T = std::decay_t; using LookupType = typename Entry::LookupType; - lookups = std::make_unique(); - }; - - callWithType(which, call); -} - - -void AsofRowRefs::insert(const IColumn * asof_column, const Block * block, size_t row_num) -{ - auto call = [&](const auto & t) - { - using T = std::decay_t; - using LookupPtr = typename Entry::LookupPtr; - auto * column = typeid_cast *>(asof_column); T key = column->getElement(row_num); auto entry = Entry(key, RowRef(block, row_num)); - std::get(lookups)->insert(entry); + std::lock_guard lock(lookup_data.mutex); + + if (!lookups) + { + lookup_data.lookups.push_back(Lookups()); + lookup_data.lookups.back() = LookupType(); + lookups = &lookup_data.lookups.back(); + } + std::get(*lookups).insert(entry); }; - callWithType(*type, call); + callWithType(type, call); } -const RowRef * AsofRowRefs::findAsof(const IColumn * asof_column, size_t row_num) const +const RowRef * AsofRowRefs::findAsof(Type type, const LookupLists & lookup_data, const IColumn * asof_column, size_t row_num) const { const RowRef * out = nullptr; auto call = [&](const auto & t) { using T = std::decay_t; - using LookupPtr = typename Entry::LookupPtr; + using LookupType = typename Entry::LookupType; auto * column = typeid_cast *>(asof_column); T key = column->getElement(row_num); - auto & typed_lookup = std::get(lookups); - auto it = typed_lookup->upper_bound(Entry(key)); - if (it != typed_lookup->cbegin()) + std::lock_guard lock(lookup_data.mutex); + + if (!lookups) + return; + + auto & typed_lookup = std::get(*lookups); + auto it = typed_lookup.upper_bound(Entry(key)); + if (it != typed_lookup.cbegin()) out = &((--it)->row_ref); }; - callWithType(*type, call); + callWithType(type, call); return out; } diff --git a/dbms/src/Interpreters/RowRefs.h b/dbms/src/Interpreters/RowRefs.h index f476b4146e4..227fba965b3 100644 --- a/dbms/src/Interpreters/RowRefs.h +++ b/dbms/src/Interpreters/RowRefs.h @@ -5,6 +5,8 @@ #include #include +#include +#include namespace DB { @@ -37,7 +39,6 @@ public: struct Entry { using LookupType = SortedLookupPODArray>; - using LookupPtr = std::unique_ptr; T asof_value; RowRef row_ref; @@ -52,10 +53,16 @@ public: }; using Lookups = std::variant< - Entry::LookupPtr, - Entry::LookupPtr, - Entry::LookupPtr, - Entry::LookupPtr>; + Entry::LookupType, + Entry::LookupType, + Entry::LookupType, + Entry::LookupType>; + + struct LookupLists + { + mutable std::mutex mutex; + std::list lookups; + }; enum class Type { @@ -67,21 +74,11 @@ public: static std::optional getTypeSize(const IColumn * asof_column, size_t & type_size); - AsofRowRefs() = default; - AsofRowRefs(Type t) - : type(t) - { - createLookup(t); - } - - void insert(const IColumn * asof_column, const Block * block, size_t row_num); - const RowRef * findAsof(const IColumn * asof_column, size_t row_num) const; + void insert(Type type, LookupLists &, const IColumn * asof_column, const Block * block, size_t row_num); + const RowRef * findAsof(Type type, const LookupLists &, const IColumn * asof_column, size_t row_num) const; private: - const std::optional type = {}; - mutable Lookups lookups; - - void createLookup(Type which); + Lookups * lookups = nullptr; }; }