From 89515861dff53f74ea2ccd9660bc578373a7f588 Mon Sep 17 00:00:00 2001 From: Martijn Bakker Date: Fri, 29 Mar 2019 21:20:23 +0000 Subject: [PATCH 1/7] asof join without using std::map, but still only on u32 --- dbms/src/Interpreters/Join.cpp | 149 ++++++++++++++++++++++----------- dbms/src/Interpreters/Join.h | 66 ++++++++++++--- 2 files changed, 152 insertions(+), 63 deletions(-) diff --git a/dbms/src/Interpreters/Join.cpp b/dbms/src/Interpreters/Join.cpp index ddc2173d75b..f8af06bf6a8 100644 --- a/dbms/src/Interpreters/Join.cpp +++ b/dbms/src/Interpreters/Join.cpp @@ -298,12 +298,17 @@ void Join::setSampleBlock(const Block & block) if (kind != ASTTableJoin::Kind::Left and kind != ASTTableJoin::Kind::Inner) throw Exception("ASOF only supports LEFT and INNER as base joins", ErrorCodes::NOT_IMPLEMENTED); - if (key_columns.back()->sizeOfValueIfFixed() != sizeof(ASOFTimeType)) + const IColumn * asof_column = key_columns.back(); + + if (auto t = AsofRowRefs::getType(asof_column)) + asof_type = *t; + else { - std::string msg = "ASOF join column needs to have size "; - msg += std::to_string(sizeof(ASOFTimeType)); + std::string msg = "ASOF join not supported for type"; + msg += asof_column->getFamilyName(); throw Exception(msg, ErrorCodes::BAD_TYPE_OF_FIELD); } + key_columns.pop_back(); if (key_columns.empty()) @@ -314,7 +319,7 @@ void Join::setSampleBlock(const Block & block) /// Therefore, add it back in such that it can be extracted appropriately from the full stored /// key_columns and key_sizes init(chooseMethod(key_columns, key_sizes)); - key_sizes.push_back(sizeof(ASOFTimeType)); + key_sizes.push_back(AsofRowRefs::getSize(asof_type)); } else { @@ -357,34 +362,99 @@ void Join::setSampleBlock(const Block & block) convertColumnToNullable(sample_block_with_columns_to_add.getByPosition(i)); } -void Join::TSRowRef::insert(Join::ASOFTimeType t, const Block * block, size_t row_num) +void Join::AsofRowRefs::AsofLookups::create(Join::AsofRowRefs::AsofType which) { - ts.insert(std::pair(t, RowRef(block, row_num))); + switch (which) + { + #define M(NAME, TYPE) \ + case AsofType::NAME: NAME = std::make_unique(); break; + APPLY_FOR_ASOF_JOIN_VARIANTS(M) + #undef M + } } -std::string Join::TSRowRef::dumpStructure() const +void Join::AsofRowRefs::create(AsofType which) { - std::stringstream ss; + type = which; + lookups.create(which); +} - for (auto const& x : ts) +template +using AsofGetterType = ColumnsHashing::HashMethodOneNumber; + +void Join::AsofRowRefs::insert(const IColumn * asof_column, const Block * block, size_t row_num, Arena & pool) +{ + assert(!sorted); + switch (type) { - ss << "(t=" << x.first << " row_num=" << x.second.row_num << " ptr=" << x.second.block << "),"; + #define M(NAME, TYPE) \ + case AsofType::NAME: { \ + auto asof_getter = AsofGetterType(asof_column); \ + auto entry = AsofEntry(asof_getter.getKey(row_num, pool), RowRef(block, row_num)); \ + lookups.NAME->push_back(entry); \ + break; \ + } + APPLY_FOR_ASOF_JOIN_VARIANTS(M) + #undef M + } +} + +const Join::RowRef * Join::AsofRowRefs::findAsof(const IColumn * asof_column, size_t row_num, Arena & pool) const +{ + if (!sorted) + { + // sort whenever needed + switch (type) + { + #define M(NAME, TYPE) \ + case AsofType::NAME: std::sort(lookups.NAME->begin(), lookups.NAME->end()); break; + APPLY_FOR_ASOF_JOIN_VARIANTS(M) + #undef M + } + sorted = true; } - return ss.str(); + switch (type) + { + #define M(NAME, TYPE) \ + case AsofType::NAME: { \ + auto asof_getter = AsofGetterType(asof_column); \ + TYPE key = asof_getter.getKey(row_num, pool); \ + auto it = std::upper_bound(lookups.NAME->cbegin(), lookups.NAME->cend(), AsofEntry(key)); \ + if (it == lookups.NAME->cbegin()) \ + return nullptr; \ + return &((--it)->row_ref); \ + } + APPLY_FOR_ASOF_JOIN_VARIANTS(M) + #undef M + } + + __builtin_unreachable(); } -size_t Join::TSRowRef::size() const + +std::optional Join::AsofRowRefs::getType(const IColumn * asof_column) { - return ts.size(); + #define M(NAME, TYPE) \ + if (strcmp(#TYPE, asof_column->getFamilyName()) == 0) \ + return AsofType::NAME; + APPLY_FOR_ASOF_JOIN_VARIANTS(M) + #undef M + return {}; } -std::optional> Join::TSRowRef::findAsof(Join::ASOFTimeType t) const + +size_t Join::AsofRowRefs::getSize(Join::AsofRowRefs::AsofType type) { - auto it = ts.upper_bound(t); - if (it == ts.cbegin()) - return {}; - return *(--it); + switch (type) + { + #define M(NAME, TYPE) \ + case AsofType::NAME: return sizeof(TYPE); + APPLY_FOR_ASOF_JOIN_VARIANTS(M) + #undef M + } + __builtin_unreachable(); } + namespace { /// Inserting an element into a hash table of the form `key -> reference to a string`, which will then be used by JOIN. @@ -435,8 +505,7 @@ namespace template struct Inserter { - template - static ALWAYS_INLINE void insert(Map & map, KeyGetter & key_getter, AsofGetter & asof_getter, Block * stored_block, size_t i, Arena & pool) + static ALWAYS_INLINE void insert(Map & map, KeyGetter & key_getter, Block * stored_block, size_t i, Arena & pool, const IColumn * asof_column) { auto emplace_result = key_getter.emplaceKey(map, i, pool); typename Map::mapped_type * time_series_map = &emplace_result.getMapped(); @@ -444,10 +513,11 @@ namespace if (emplace_result.isInserted()) { time_series_map = new (time_series_map) typename Map::mapped_type(); + // TODO extract this from either the column type or from the main join object + time_series_map->create(Join::AsofRowRefs::AsofType::key32); } - auto k = asof_getter.getKey(i, pool); - time_series_map->insert(k, stored_block, i); -// std::cout << "inserted key into time series map=" << k << " result=" << time_series_map->dumpStructure() << std::endl; + + time_series_map->insert(asof_column, stored_block, i, pool); } }; @@ -469,10 +539,8 @@ namespace continue; if constexpr (STRICTNESS == ASTTableJoin::Strictness::Asof) - { - auto asof_getter = Join::AsofGetterType(asof_column); - Inserter::insert(map, key_getter, asof_getter, stored_block, i, pool); - } else + Inserter::insert(map, key_getter, stored_block, i, pool, asof_column); + else Inserter::insert(map, key_getter, stored_block, i, pool); } } @@ -678,20 +746,6 @@ void addFoundRow(const typename Map::mapped_type & mapped, AddedColumns & added, } }; -template -bool addFoundRowAsof(const typename Map::mapped_type & mapped, AddedColumns & added, IColumn::Offset & current_offset [[maybe_unused]], Join::ASOFTimeType asof_key) -{ - if (auto v = mapped.findAsof(asof_key)) - { - std::pair res = *v; -// std::cout << "Adder::addFound" << " to_add" << num_columns_to_add << " i=" << i << " asof_key=" << asof_key << " found=" << res.first << std::endl; - added.appendFromBlock(*res.second.block, res.second.row_num); - return true; - } -// std::cout << "Adder::addFound" << " not found in map" << num_columns_to_add << " i=" << i << " asof_key=" << asof_key << std::endl; - return false; -} - template void addNotFoundRow(AddedColumns & added [[maybe_unused]], IColumn::Offset & current_offset [[maybe_unused]]) { @@ -739,19 +793,14 @@ std::unique_ptr NO_INLINE joinRightIndexedColumns( auto & mapped = find_result.getMapped(); if constexpr (STRICTNESS == ASTTableJoin::Strictness::Asof) - { - Join::AsofGetterType asof_getter(asof_column); - auto asof_key = asof_getter.getKey(i, pool); - bool actually_found = addFoundRowAsof(mapped, added_columns, current_offset, asof_key); - - if (actually_found) + if (const Join::RowRef * found = mapped.findAsof(asof_column, i, pool)) { - filter[i] = 1; - mapped.setUsed(); + filter[i] = 1; + mapped.setUsed(); + added_columns.appendFromBlock(*found->block, found->row_num); } else addNotFoundRow<_add_missing>(added_columns, current_offset); - } else { filter[i] = 1; diff --git a/dbms/src/Interpreters/Join.h b/dbms/src/Interpreters/Join.h index 426ea93a365..1b716d422ac 100644 --- a/dbms/src/Interpreters/Join.h +++ b/dbms/src/Interpreters/Join.h @@ -151,20 +151,59 @@ public: RowRefList(const Block * block_, size_t row_num_) : RowRef(block_, row_num_) {} }; - /// Map for a time series - using ASOFTimeType = UInt32; - using AsofGetterType = ColumnsHashing::HashMethodOneNumber; - struct TSRowRef + struct AsofRowRefs { - // TODO use the arena allocator to get memory for this - // This would require ditching std::map because std::allocator is incompatible with the arena allocator - std::map ts; + /// Different types of asof join keys + #define APPLY_FOR_ASOF_JOIN_VARIANTS(M) \ + M(key32, UInt32) \ + M(key64, UInt64) - TSRowRef() {} - void insert(ASOFTimeType t, const Block * block, size_t row_num); - std::optional> findAsof(ASOFTimeType t) const; - std::string dumpStructure() const; - size_t size() const; + enum class AsofType + { + #define M(NAME, TYPE) NAME, + APPLY_FOR_ASOF_JOIN_VARIANTS(M) + #undef M + }; + + static std::optional getType(const IColumn * asof_column); + static size_t getSize(AsofType type); + + template + struct AsofEntry + { + T asof_value; + RowRef row_ref; + + AsofEntry(T v) : asof_value(v) {} + AsofEntry(T v, RowRef rr) : asof_value(v), row_ref(rr) {} + + bool operator< (const AsofEntry& o) const + { + return asof_value < o.asof_value; + } + }; + + struct AsofLookups + { + #define M(NAME, TYPE) \ + std::unique_ptr>> NAME; + APPLY_FOR_ASOF_JOIN_VARIANTS(M) + #undef M + + void create(AsofType which); + }; + + AsofRowRefs() {} + + void create(AsofType which); + void insert(const IColumn * asof_column, const Block * block, size_t row_num, Arena & pool); + + const RowRef * findAsof(const IColumn * asof_column, size_t row_num, Arena & pool) const; + + private: + AsofType type; + mutable AsofLookups lookups; + mutable bool sorted = false; }; /** Depending on template parameter, adds or doesn't add a flag, that element was used (row was joined). @@ -297,7 +336,7 @@ public: using MapsAnyFull = MapsTemplate>; using MapsAnyFullOverwrite = MapsTemplate>; using MapsAllFull = MapsTemplate>; - using MapsAsof = MapsTemplate>; + using MapsAsof = MapsTemplate>; template struct KindTrait @@ -400,6 +439,7 @@ private: private: Type type = Type::EMPTY; + AsofRowRefs::AsofType asof_type; static Type chooseMethod(const ColumnRawPtrs & key_columns, Sizes & key_sizes); From 389f1088076d56a45def73198de8ff0d66974020 Mon Sep 17 00:00:00 2001 From: Martijn Bakker Date: Sat, 30 Mar 2019 01:32:08 +0000 Subject: [PATCH 2/7] working multi type asof join columns --- dbms/src/Interpreters/Join.cpp | 62 +++++++++++++++------------------- dbms/src/Interpreters/Join.h | 27 +++++++++------ 2 files changed, 44 insertions(+), 45 deletions(-) diff --git a/dbms/src/Interpreters/Join.cpp b/dbms/src/Interpreters/Join.cpp index f8af06bf6a8..d09e0f10a66 100644 --- a/dbms/src/Interpreters/Join.cpp +++ b/dbms/src/Interpreters/Join.cpp @@ -362,23 +362,18 @@ void Join::setSampleBlock(const Block & block) convertColumnToNullable(sample_block_with_columns_to_add.getByPosition(i)); } -void Join::AsofRowRefs::AsofLookups::create(Join::AsofRowRefs::AsofType which) +void Join::AsofRowRefs::Lookups::create(Join::AsofRowRefs::AsofType which) { switch (which) { + case AsofType::EMPTY: break; #define M(NAME, TYPE) \ - case AsofType::NAME: NAME = std::make_unique(); break; + case AsofType::NAME: NAME = std::make_unique(); break; APPLY_FOR_ASOF_JOIN_VARIANTS(M) #undef M } } -void Join::AsofRowRefs::create(AsofType which) -{ - type = which; - lookups.create(which); -} - template using AsofGetterType = ColumnsHashing::HashMethodOneNumber; @@ -387,13 +382,14 @@ void Join::AsofRowRefs::insert(const IColumn * asof_column, const Block * block, assert(!sorted); switch (type) { + case AsofType::EMPTY: break; #define M(NAME, TYPE) \ - case AsofType::NAME: { \ - auto asof_getter = AsofGetterType(asof_column); \ - auto entry = AsofEntry(asof_getter.getKey(row_num, pool), RowRef(block, row_num)); \ - lookups.NAME->push_back(entry); \ - break; \ - } + case AsofType::NAME: { \ + auto asof_getter = AsofGetterType(asof_column); \ + auto entry = Entry(asof_getter.getKey(row_num, pool), RowRef(block, row_num)); \ + lookups.NAME->push_back(entry); \ + break; \ + } APPLY_FOR_ASOF_JOIN_VARIANTS(M) #undef M } @@ -406,6 +402,7 @@ const Join::RowRef * Join::AsofRowRefs::findAsof(const IColumn * asof_column, si // sort whenever needed switch (type) { + case AsofType::EMPTY: break; #define M(NAME, TYPE) \ case AsofType::NAME: std::sort(lookups.NAME->begin(), lookups.NAME->end()); break; APPLY_FOR_ASOF_JOIN_VARIANTS(M) @@ -416,11 +413,12 @@ const Join::RowRef * Join::AsofRowRefs::findAsof(const IColumn * asof_column, si switch (type) { + case AsofType::EMPTY: return nullptr; #define M(NAME, TYPE) \ case AsofType::NAME: { \ auto asof_getter = AsofGetterType(asof_column); \ TYPE key = asof_getter.getKey(row_num, pool); \ - auto it = std::upper_bound(lookups.NAME->cbegin(), lookups.NAME->cend(), AsofEntry(key)); \ + auto it = std::upper_bound(lookups.NAME->cbegin(), lookups.NAME->cend(), Entry(key)); \ if (it == lookups.NAME->cbegin()) \ return nullptr; \ return &((--it)->row_ref); \ @@ -446,6 +444,7 @@ size_t Join::AsofRowRefs::getSize(Join::AsofRowRefs::AsofType type) { switch (type) { + case AsofType::EMPTY: return 0; #define M(NAME, TYPE) \ case AsofType::NAME: return sizeof(TYPE); APPLY_FOR_ASOF_JOIN_VARIANTS(M) @@ -461,13 +460,13 @@ namespace template struct Inserter { - static void insert(Map & map, KeyGetter & key_getter, Block * stored_block, size_t i, Arena & pool); + static void insert(const Join *, Map & map, KeyGetter & key_getter, Block * stored_block, size_t i, Arena & pool); }; template struct Inserter { - static ALWAYS_INLINE void insert(Map & map, KeyGetter & key_getter, Block * stored_block, size_t i, Arena & pool) + static ALWAYS_INLINE void insert(const Join *, Map & map, KeyGetter & key_getter, Block * stored_block, size_t i, Arena & pool) { auto emplace_result = key_getter.emplaceKey(map, i, pool); @@ -479,7 +478,7 @@ namespace template struct Inserter { - static ALWAYS_INLINE void insert(Map & map, KeyGetter & key_getter, Block * stored_block, size_t i, Arena & pool) + static ALWAYS_INLINE void insert(const Join *, Map & map, KeyGetter & key_getter, Block * stored_block, size_t i, Arena & pool) { auto emplace_result = key_getter.emplaceKey(map, i, pool); @@ -505,18 +504,13 @@ namespace template struct Inserter { - static ALWAYS_INLINE void insert(Map & map, KeyGetter & key_getter, Block * stored_block, size_t i, Arena & pool, const IColumn * asof_column) + static ALWAYS_INLINE void insert(const Join * join, Map & map, KeyGetter & key_getter, Block * stored_block, size_t i, Arena & pool, const IColumn * asof_column) { auto emplace_result = key_getter.emplaceKey(map, i, pool); typename Map::mapped_type * time_series_map = &emplace_result.getMapped(); if (emplace_result.isInserted()) - { - time_series_map = new (time_series_map) typename Map::mapped_type(); - // TODO extract this from either the column type or from the main join object - time_series_map->create(Join::AsofRowRefs::AsofType::key32); - } - + time_series_map = new (time_series_map) typename Map::mapped_type(join->getAsofType()); time_series_map->insert(asof_column, stored_block, i, pool); } }; @@ -524,7 +518,7 @@ namespace template void NO_INLINE insertFromBlockImplTypeCase( - Map & map, size_t rows, const ColumnRawPtrs & key_columns, + const Join * join, Map & map, size_t rows, const ColumnRawPtrs & key_columns, const Sizes & key_sizes, Block * stored_block, ConstNullMapPtr null_map, Arena & pool) { const IColumn * asof_column [[maybe_unused]] = nullptr; @@ -539,28 +533,28 @@ namespace continue; if constexpr (STRICTNESS == ASTTableJoin::Strictness::Asof) - Inserter::insert(map, key_getter, stored_block, i, pool, asof_column); + Inserter::insert(join, map, key_getter, stored_block, i, pool, asof_column); else - Inserter::insert(map, key_getter, stored_block, i, pool); + Inserter::insert(join, map, key_getter, stored_block, i, pool); } } template void insertFromBlockImplType( - Map & map, size_t rows, const ColumnRawPtrs & key_columns, + const Join * join, Map & map, size_t rows, const ColumnRawPtrs & key_columns, const Sizes & key_sizes, Block * stored_block, ConstNullMapPtr null_map, Arena & pool) { if (null_map) - insertFromBlockImplTypeCase(map, rows, key_columns, key_sizes, stored_block, null_map, pool); + insertFromBlockImplTypeCase(join, map, rows, key_columns, key_sizes, stored_block, null_map, pool); else - insertFromBlockImplTypeCase(map, rows, key_columns, key_sizes, stored_block, null_map, pool); + insertFromBlockImplTypeCase(join, map, rows, key_columns, key_sizes, stored_block, null_map, pool); } template void insertFromBlockImpl( - Join::Type type, Maps & maps, size_t rows, const ColumnRawPtrs & key_columns, + const Join * join, Join::Type type, Maps & maps, size_t rows, const ColumnRawPtrs & key_columns, const Sizes & key_sizes, Block * stored_block, ConstNullMapPtr null_map, Arena & pool) { switch (type) @@ -571,7 +565,7 @@ namespace #define M(TYPE) \ case Join::Type::TYPE: \ insertFromBlockImplType>::Type>(\ - *maps.TYPE, rows, key_columns, key_sizes, stored_block, null_map, pool); \ + join, *maps.TYPE, rows, key_columns, key_sizes, stored_block, null_map, pool); \ break; APPLY_FOR_JOIN_VARIANTS(M) #undef M @@ -658,7 +652,7 @@ bool Join::insertFromBlock(const Block & block) { dispatch([&](auto, auto strictness_, auto & map) { - insertFromBlockImpl(type, map, rows, key_columns, key_sizes, stored_block, null_map, pool); + insertFromBlockImpl(this, type, map, rows, key_columns, key_sizes, stored_block, null_map, pool); }); } diff --git a/dbms/src/Interpreters/Join.h b/dbms/src/Interpreters/Join.h index 1b716d422ac..dcd4ea6e9d4 100644 --- a/dbms/src/Interpreters/Join.h +++ b/dbms/src/Interpreters/Join.h @@ -160,6 +160,7 @@ public: enum class AsofType { + EMPTY, #define M(NAME, TYPE) NAME, APPLY_FOR_ASOF_JOIN_VARIANTS(M) #undef M @@ -169,43 +170,47 @@ public: static size_t getSize(AsofType type); template - struct AsofEntry + struct Entry { T asof_value; RowRef row_ref; - AsofEntry(T v) : asof_value(v) {} - AsofEntry(T v, RowRef rr) : asof_value(v), row_ref(rr) {} + Entry(T v) : asof_value(v) {} + Entry(T v, RowRef rr) : asof_value(v), row_ref(rr) {} - bool operator< (const AsofEntry& o) const + bool operator< (const Entry& o) const { return asof_value < o.asof_value; } }; - struct AsofLookups + struct Lookups { #define M(NAME, TYPE) \ - std::unique_ptr>> NAME; + std::unique_ptr>> NAME; APPLY_FOR_ASOF_JOIN_VARIANTS(M) #undef M void create(AsofType which); }; - AsofRowRefs() {} + AsofRowRefs() : type(AsofType::EMPTY) {} + AsofRowRefs(AsofType t) : type(t) { + lookups.create(t); + } - void create(AsofType which); void insert(const IColumn * asof_column, const Block * block, size_t row_num, Arena & pool); - const RowRef * findAsof(const IColumn * asof_column, size_t row_num, Arena & pool) const; private: - AsofType type; - mutable AsofLookups lookups; + const AsofType type; + mutable Lookups lookups; mutable bool sorted = false; }; + AsofRowRefs::AsofType getAsofType() const { return asof_type; } + + /** Depending on template parameter, adds or doesn't add a flag, that element was used (row was joined). * Depending on template parameter, decide whether to overwrite existing values when encountering the same key again * with_used is for implementation of RIGHT and FULL JOINs. From 4a94545882ae9c9e261c87c6a07c91deedbb4249 Mon Sep 17 00:00:00 2001 From: Martijn Bakker Date: Sat, 30 Mar 2019 12:52:48 +0000 Subject: [PATCH 3/7] add test for multiple supported asof types --- dbms/src/Interpreters/Join.cpp | 24 +++++-------------- dbms/src/Interpreters/Join.h | 7 +++--- .../00927_asof_join_other_types.reference | 12 ++++++++++ .../00927_asof_join_other_types.sh | 22 +++++++++++++++++ 4 files changed, 44 insertions(+), 21 deletions(-) create mode 100644 dbms/tests/queries/0_stateless/00927_asof_join_other_types.reference create mode 100755 dbms/tests/queries/0_stateless/00927_asof_join_other_types.sh diff --git a/dbms/src/Interpreters/Join.cpp b/dbms/src/Interpreters/Join.cpp index 234a56f7183..b93dcbf0cab 100644 --- a/dbms/src/Interpreters/Join.cpp +++ b/dbms/src/Interpreters/Join.cpp @@ -299,9 +299,10 @@ void Join::setSampleBlock(const Block & block) throw Exception("ASOF only supports LEFT and INNER as base joins", ErrorCodes::NOT_IMPLEMENTED); const IColumn * asof_column = key_columns.back(); + size_t asof_size; - if (auto t = AsofRowRefs::getType(asof_column)) - asof_type = *t; + if (auto t = AsofRowRefs::getTypeSize(asof_column)) + std::tie(asof_type, asof_size) = *t; else { std::string msg = "ASOF join not supported for type"; @@ -319,7 +320,7 @@ void Join::setSampleBlock(const Block & block) /// Therefore, add it back in such that it can be extracted appropriately from the full stored /// key_columns and key_sizes init(chooseMethod(key_columns, key_sizes)); - key_sizes.push_back(AsofRowRefs::getSize(asof_type)); + key_sizes.push_back(asof_size); } else { @@ -430,29 +431,16 @@ const Join::RowRef * Join::AsofRowRefs::findAsof(const IColumn * asof_column, si __builtin_unreachable(); } -std::optional Join::AsofRowRefs::getType(const IColumn * asof_column) +std::optional> Join::AsofRowRefs::getTypeSize(const IColumn * asof_column) { #define M(NAME, TYPE) \ if (strcmp(#TYPE, asof_column->getFamilyName()) == 0) \ - return AsofType::NAME; + return std::make_pair(AsofType::NAME,sizeof(TYPE)); APPLY_FOR_ASOF_JOIN_VARIANTS(M) #undef M return {}; } -size_t Join::AsofRowRefs::getSize(Join::AsofRowRefs::AsofType type) -{ - switch (type) - { - case AsofType::EMPTY: return 0; - #define M(NAME, TYPE) \ - case AsofType::NAME: return sizeof(TYPE); - APPLY_FOR_ASOF_JOIN_VARIANTS(M) - #undef M - } - __builtin_unreachable(); -} - namespace { diff --git a/dbms/src/Interpreters/Join.h b/dbms/src/Interpreters/Join.h index dcd4ea6e9d4..d8d54cdac30 100644 --- a/dbms/src/Interpreters/Join.h +++ b/dbms/src/Interpreters/Join.h @@ -156,7 +156,9 @@ public: /// Different types of asof join keys #define APPLY_FOR_ASOF_JOIN_VARIANTS(M) \ M(key32, UInt32) \ - M(key64, UInt64) + M(key64, UInt64) \ + M(keyf32, Float32) \ + M(keyf64, Float64) enum class AsofType { @@ -166,8 +168,7 @@ public: #undef M }; - static std::optional getType(const IColumn * asof_column); - static size_t getSize(AsofType type); + static std::optional> getTypeSize(const IColumn * asof_column); template struct Entry diff --git a/dbms/tests/queries/0_stateless/00927_asof_join_other_types.reference b/dbms/tests/queries/0_stateless/00927_asof_join_other_types.reference new file mode 100644 index 00000000000..674df7e4845 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00927_asof_join_other_types.reference @@ -0,0 +1,12 @@ +2 1 1 0 +2 3 3 3 +2 5 5 3 +2 1 1 0 +2 3 3 3 +2 5 5 3 +2 1 1 0 +2 3 3 3 +2 5 5 3 +2 1 1 0 +2 3 3 3 +2 5 5 3 diff --git a/dbms/tests/queries/0_stateless/00927_asof_join_other_types.sh b/dbms/tests/queries/0_stateless/00927_asof_join_other_types.sh new file mode 100755 index 00000000000..3cf1791bcfe --- /dev/null +++ b/dbms/tests/queries/0_stateless/00927_asof_join_other_types.sh @@ -0,0 +1,22 @@ +#!/usr/bin/env bash + +set -e + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +. $CURDIR/../shell_config.sh + +$CLICKHOUSE_CLIENT -q "USE test;" + +for typename in "UInt32" "UInt64" "Float64" "Float32" +do + $CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS A;" + $CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS B;" + + $CLICKHOUSE_CLIENT -q "CREATE TABLE A(k UInt32, t ${typename}, a Float64) ENGINE = MergeTree() ORDER BY (k, t);" + $CLICKHOUSE_CLIENT -q "INSERT INTO A(k,t,a) VALUES (2,1,1),(2,3,3),(2,5,5);" + + $CLICKHOUSE_CLIENT -q "CREATE TABLE B(k UInt32, t ${typename}, b Float64) ENGINE = MergeTree() ORDER BY (k, t);" + $CLICKHOUSE_CLIENT -q "INSERT INTO B(k,t,b) VALUES (2,3,3);" + + $CLICKHOUSE_CLIENT -q "SELECT k, t, a, b FROM A ASOF LEFT JOIN B USING(k,t) ORDER BY (k,t);" +done \ No newline at end of file From 20e5fb61c476fffbaeade7891447b31b7460b2a7 Mon Sep 17 00:00:00 2001 From: Martijn Bakker Date: Sat, 30 Mar 2019 13:02:30 +0000 Subject: [PATCH 4/7] fix style --- dbms/src/Interpreters/Join.h | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/dbms/src/Interpreters/Join.h b/dbms/src/Interpreters/Join.h index d8d54cdac30..910c235c70f 100644 --- a/dbms/src/Interpreters/Join.h +++ b/dbms/src/Interpreters/Join.h @@ -196,7 +196,8 @@ public: }; AsofRowRefs() : type(AsofType::EMPTY) {} - AsofRowRefs(AsofType t) : type(t) { + AsofRowRefs(AsofType t) : type(t) + { lookups.create(t); } @@ -210,7 +211,7 @@ public: }; AsofRowRefs::AsofType getAsofType() const { return asof_type; } - + /** Depending on template parameter, adds or doesn't add a flag, that element was used (row was joined). * Depending on template parameter, decide whether to overwrite existing values when encountering the same key again From 3ac66dfdb656363a0463e94ac9897d8ced09b0a7 Mon Sep 17 00:00:00 2001 From: Martijn Bakker Date: Sat, 30 Mar 2019 13:09:03 +0000 Subject: [PATCH 5/7] set default asof type value --- dbms/src/Interpreters/Join.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/src/Interpreters/Join.h b/dbms/src/Interpreters/Join.h index 910c235c70f..876fd2a70b1 100644 --- a/dbms/src/Interpreters/Join.h +++ b/dbms/src/Interpreters/Join.h @@ -446,7 +446,7 @@ private: private: Type type = Type::EMPTY; - AsofRowRefs::AsofType asof_type; + AsofRowRefs::AsofType asof_type = AsofRowRefs::AsofType::EMPTY; static Type chooseMethod(const ColumnRawPtrs & key_columns, Sizes & key_sizes); From 4709b744bb8e4d3f9f4493e5c0e6fc35f38c4bee Mon Sep 17 00:00:00 2001 From: Martijn Bakker Date: Sat, 30 Mar 2019 21:30:21 +0000 Subject: [PATCH 6/7] address the code review issues --- dbms/src/Common/SortedLookupPODArray.h | 48 +++++++++++ dbms/src/Interpreters/Join.cpp | 107 ++++--------------------- dbms/src/Interpreters/Join.h | 86 +------------------- dbms/src/Interpreters/RowRefs.cpp | 75 +++++++++++++++++ dbms/src/Interpreters/RowRefs.h | 91 +++++++++++++++++++++ 5 files changed, 232 insertions(+), 175 deletions(-) create mode 100644 dbms/src/Common/SortedLookupPODArray.h create mode 100644 dbms/src/Interpreters/RowRefs.cpp create mode 100644 dbms/src/Interpreters/RowRefs.h diff --git a/dbms/src/Common/SortedLookupPODArray.h b/dbms/src/Common/SortedLookupPODArray.h new file mode 100644 index 00000000000..60b3529c69e --- /dev/null +++ b/dbms/src/Common/SortedLookupPODArray.h @@ -0,0 +1,48 @@ +#pragma once + +#include + +namespace DB { + +/** + * This class is intended to push sortable data into. + * When looking up values the container ensures that it is sorted for log(N) lookup + * + * Note, this is only efficient when the insertions happen in one stage, followed by all retrievals + * This way the data only gets sorted once. + */ + +template > +class SortedLookupPODArray : private PaddedPODArray +{ +public: + using Base = PaddedPODArray; + using Base::PODArray; + using Base::cbegin; + using Base::cend; + + template + void insert(U && x, TAllocatorParams &&... allocator_params) + { + Base::push_back(std::forward(x), std::forward(allocator_params)...); + sorted = false; + } + + typename Base::const_iterator upper_bound (const T& k) + { + if (!sorted) + this->sort(); + return std::upper_bound(this->cbegin(), this->cend(), k); + } +private: + void sort() + { + std::sort(this->begin(), this->end()); + sorted = true; + } + + bool sorted = false; +}; + + +} \ No newline at end of file diff --git a/dbms/src/Interpreters/Join.cpp b/dbms/src/Interpreters/Join.cpp index b93dcbf0cab..2d959be98f2 100644 --- a/dbms/src/Interpreters/Join.cpp +++ b/dbms/src/Interpreters/Join.cpp @@ -363,98 +363,19 @@ void Join::setSampleBlock(const Block & block) convertColumnToNullable(sample_block_with_columns_to_add.getByPosition(i)); } -void Join::AsofRowRefs::Lookups::create(Join::AsofRowRefs::AsofType which) -{ - switch (which) - { - case AsofType::EMPTY: break; - #define M(NAME, TYPE) \ - case AsofType::NAME: NAME = std::make_unique(); break; - APPLY_FOR_ASOF_JOIN_VARIANTS(M) - #undef M - } -} - -template -using AsofGetterType = ColumnsHashing::HashMethodOneNumber; - -void Join::AsofRowRefs::insert(const IColumn * asof_column, const Block * block, size_t row_num, Arena & pool) -{ - assert(!sorted); - switch (type) - { - case AsofType::EMPTY: break; - #define M(NAME, TYPE) \ - case AsofType::NAME: { \ - auto asof_getter = AsofGetterType(asof_column); \ - auto entry = Entry(asof_getter.getKey(row_num, pool), RowRef(block, row_num)); \ - lookups.NAME->push_back(entry); \ - break; \ - } - APPLY_FOR_ASOF_JOIN_VARIANTS(M) - #undef M - } -} - -const Join::RowRef * Join::AsofRowRefs::findAsof(const IColumn * asof_column, size_t row_num, Arena & pool) const -{ - if (!sorted) - { - // sort whenever needed - switch (type) - { - case AsofType::EMPTY: break; - #define M(NAME, TYPE) \ - case AsofType::NAME: std::sort(lookups.NAME->begin(), lookups.NAME->end()); break; - APPLY_FOR_ASOF_JOIN_VARIANTS(M) - #undef M - } - sorted = true; - } - - switch (type) - { - case AsofType::EMPTY: return nullptr; - #define M(NAME, TYPE) \ - case AsofType::NAME: { \ - auto asof_getter = AsofGetterType(asof_column); \ - TYPE key = asof_getter.getKey(row_num, pool); \ - auto it = std::upper_bound(lookups.NAME->cbegin(), lookups.NAME->cend(), Entry(key)); \ - if (it == lookups.NAME->cbegin()) \ - return nullptr; \ - return &((--it)->row_ref); \ - } - APPLY_FOR_ASOF_JOIN_VARIANTS(M) - #undef M - } - - __builtin_unreachable(); -} - -std::optional> Join::AsofRowRefs::getTypeSize(const IColumn * asof_column) -{ - #define M(NAME, TYPE) \ - if (strcmp(#TYPE, asof_column->getFamilyName()) == 0) \ - return std::make_pair(AsofType::NAME,sizeof(TYPE)); - APPLY_FOR_ASOF_JOIN_VARIANTS(M) - #undef M - return {}; -} - - namespace { /// Inserting an element into a hash table of the form `key -> reference to a string`, which will then be used by JOIN. template struct Inserter { - static void insert(const Join *, Map & map, KeyGetter & key_getter, Block * stored_block, size_t i, Arena & pool); + static void insert(const Join &, Map & map, KeyGetter & key_getter, Block * stored_block, size_t i, Arena & pool); }; template struct Inserter { - static ALWAYS_INLINE void insert(const Join *, Map & map, KeyGetter & key_getter, Block * stored_block, size_t i, Arena & pool) + static ALWAYS_INLINE void insert(const Join &, Map & map, KeyGetter & key_getter, Block * stored_block, size_t i, Arena & pool) { auto emplace_result = key_getter.emplaceKey(map, i, pool); @@ -466,7 +387,7 @@ namespace template struct Inserter { - static ALWAYS_INLINE void insert(const Join *, Map & map, KeyGetter & key_getter, Block * stored_block, size_t i, Arena & pool) + static ALWAYS_INLINE void insert(const Join &, Map & map, KeyGetter & key_getter, Block * stored_block, size_t i, Arena & pool) { auto emplace_result = key_getter.emplaceKey(map, i, pool); @@ -492,13 +413,13 @@ namespace template struct Inserter { - static ALWAYS_INLINE void insert(const Join * join, Map & map, KeyGetter & key_getter, Block * stored_block, size_t i, Arena & pool, const IColumn * asof_column) + static ALWAYS_INLINE void insert(const Join & join, Map & map, KeyGetter & key_getter, Block * stored_block, size_t i, Arena & pool, const IColumn * asof_column) { auto emplace_result = key_getter.emplaceKey(map, i, pool); typename Map::mapped_type * time_series_map = &emplace_result.getMapped(); if (emplace_result.isInserted()) - time_series_map = new (time_series_map) typename Map::mapped_type(join->getAsofType()); + time_series_map = new (time_series_map) typename Map::mapped_type(join.getAsofType()); time_series_map->insert(asof_column, stored_block, i, pool); } }; @@ -506,7 +427,7 @@ namespace template void NO_INLINE insertFromBlockImplTypeCase( - const Join * join, Map & map, size_t rows, const ColumnRawPtrs & key_columns, + const Join & join, Map & map, size_t rows, const ColumnRawPtrs & key_columns, const Sizes & key_sizes, Block * stored_block, ConstNullMapPtr null_map, Arena & pool) { const IColumn * asof_column [[maybe_unused]] = nullptr; @@ -530,7 +451,7 @@ namespace template void insertFromBlockImplType( - const Join * join, Map & map, size_t rows, const ColumnRawPtrs & key_columns, + const Join & join, Map & map, size_t rows, const ColumnRawPtrs & key_columns, const Sizes & key_sizes, Block * stored_block, ConstNullMapPtr null_map, Arena & pool) { if (null_map) @@ -542,7 +463,7 @@ namespace template void insertFromBlockImpl( - const Join * join, Join::Type type, Maps & maps, size_t rows, const ColumnRawPtrs & key_columns, + const Join & join, Join::Type type, Maps & maps, size_t rows, const ColumnRawPtrs & key_columns, const Sizes & key_sizes, Block * stored_block, ConstNullMapPtr null_map, Arena & pool) { switch (type) @@ -640,7 +561,7 @@ bool Join::insertFromBlock(const Block & block) { dispatch([&](auto, auto strictness_, auto & map) { - insertFromBlockImpl(this, type, map, rows, key_columns, key_sizes, stored_block, null_map, pool); + insertFromBlockImpl(*this, type, map, rows, key_columns, key_sizes, stored_block, null_map, pool); }); } @@ -775,14 +696,16 @@ std::unique_ptr NO_INLINE joinRightIndexedColumns( auto & mapped = find_result.getMapped(); if constexpr (STRICTNESS == ASTTableJoin::Strictness::Asof) - if (const Join::RowRef * found = mapped.findAsof(asof_column, i, pool)) + { + if (const RowRef * found = mapped.findAsof(asof_column, i, pool)) { - filter[i] = 1; - mapped.setUsed(); - added_columns.appendFromBlock(*found->block, found->row_num); + filter[i] = 1; + mapped.setUsed(); + added_columns.appendFromBlock(*found->block, found->row_num); } else addNotFoundRow<_add_missing>(added_columns, current_offset); + } else { filter[i] = 1; diff --git a/dbms/src/Interpreters/Join.h b/dbms/src/Interpreters/Join.h index 876fd2a70b1..f6ddaf87af0 100644 --- a/dbms/src/Interpreters/Join.h +++ b/dbms/src/Interpreters/Join.h @@ -6,6 +6,7 @@ #include #include +#include #include #include @@ -130,88 +131,7 @@ public: size_t getTotalByteCount() const; ASTTableJoin::Kind getKind() const { return kind; } - - - /// Reference to the row in block. - struct RowRef - { - const Block * block = nullptr; - size_t row_num = 0; - - RowRef() {} - RowRef(const Block * block_, size_t row_num_) : block(block_), row_num(row_num_) {} - }; - - /// Single linked list of references to rows. Used for ALL JOINs (non-unique JOINs) - struct RowRefList : RowRef - { - RowRefList * next = nullptr; - - RowRefList() {} - RowRefList(const Block * block_, size_t row_num_) : RowRef(block_, row_num_) {} - }; - - struct AsofRowRefs - { - /// Different types of asof join keys - #define APPLY_FOR_ASOF_JOIN_VARIANTS(M) \ - M(key32, UInt32) \ - M(key64, UInt64) \ - M(keyf32, Float32) \ - M(keyf64, Float64) - - enum class AsofType - { - EMPTY, - #define M(NAME, TYPE) NAME, - APPLY_FOR_ASOF_JOIN_VARIANTS(M) - #undef M - }; - - static std::optional> getTypeSize(const IColumn * asof_column); - - template - struct Entry - { - T asof_value; - RowRef row_ref; - - Entry(T v) : asof_value(v) {} - Entry(T v, RowRef rr) : asof_value(v), row_ref(rr) {} - - bool operator< (const Entry& o) const - { - return asof_value < o.asof_value; - } - }; - - struct Lookups - { - #define M(NAME, TYPE) \ - std::unique_ptr>> NAME; - APPLY_FOR_ASOF_JOIN_VARIANTS(M) - #undef M - - void create(AsofType which); - }; - - AsofRowRefs() : type(AsofType::EMPTY) {} - AsofRowRefs(AsofType t) : type(t) - { - lookups.create(t); - } - - void insert(const IColumn * asof_column, const Block * block, size_t row_num, Arena & pool); - const RowRef * findAsof(const IColumn * asof_column, size_t row_num, Arena & pool) const; - - private: - const AsofType type; - mutable Lookups lookups; - mutable bool sorted = false; - }; - - AsofRowRefs::AsofType getAsofType() const { return asof_type; } - + AsofRowRefs::Type getAsofType() const { return asof_type; } /** Depending on template parameter, adds or doesn't add a flag, that element was used (row was joined). * Depending on template parameter, decide whether to overwrite existing values when encountering the same key again @@ -446,7 +366,7 @@ private: private: Type type = Type::EMPTY; - AsofRowRefs::AsofType asof_type = AsofRowRefs::AsofType::EMPTY; + AsofRowRefs::Type asof_type = AsofRowRefs::Type::EMPTY; static Type chooseMethod(const ColumnRawPtrs & key_columns, Sizes & key_sizes); diff --git a/dbms/src/Interpreters/RowRefs.cpp b/dbms/src/Interpreters/RowRefs.cpp new file mode 100644 index 00000000000..18b144e984e --- /dev/null +++ b/dbms/src/Interpreters/RowRefs.cpp @@ -0,0 +1,75 @@ +#include + +#include +#include +#include + +#include + +namespace DB +{ + +void AsofRowRefs::Lookups::create(AsofRowRefs::Type which) +{ + switch (which) + { + case Type::EMPTY: break; + #define M(NAME, TYPE) \ + case Type::NAME: NAME = std::make_unique(); break; + APPLY_FOR_ASOF_JOIN_VARIANTS(M) + #undef M + } +} + +template +using AsofGetterType = ColumnsHashing::HashMethodOneNumber; + +void AsofRowRefs::insert(const IColumn * asof_column, const Block * block, size_t row_num, Arena & pool) +{ + switch (type) + { + case Type::EMPTY: break; + #define M(NAME, TYPE) \ + case Type::NAME: { \ + auto asof_getter = AsofGetterType(asof_column); \ + auto entry = Entry(asof_getter.getKey(row_num, pool), RowRef(block, row_num)); \ + lookups.NAME->insert(entry); \ + break; \ + } + APPLY_FOR_ASOF_JOIN_VARIANTS(M) + #undef M + } +} + +const RowRef * AsofRowRefs::findAsof(const IColumn * asof_column, size_t row_num, Arena & pool) const +{ + switch (type) + { + case Type::EMPTY: return nullptr; + #define M(NAME, TYPE) \ + case Type::NAME: { \ + auto asof_getter = AsofGetterType(asof_column); \ + TYPE key = asof_getter.getKey(row_num, pool); \ + auto it = lookups.NAME->upper_bound(Entry(key)); \ + if (it == lookups.NAME->cbegin()) \ + return nullptr; \ + return &((--it)->row_ref); \ + } + APPLY_FOR_ASOF_JOIN_VARIANTS(M) + #undef M + } + + __builtin_unreachable(); +} + +std::optional> AsofRowRefs::getTypeSize(const IColumn * asof_column) +{ + #define M(NAME, TYPE) \ + if (strcmp(#TYPE, asof_column->getFamilyName()) == 0) \ + return std::make_pair(Type::NAME,sizeof(TYPE)); + APPLY_FOR_ASOF_JOIN_VARIANTS(M) + #undef M + return {}; +} + +} \ No newline at end of file diff --git a/dbms/src/Interpreters/RowRefs.h b/dbms/src/Interpreters/RowRefs.h new file mode 100644 index 00000000000..84f2a91af34 --- /dev/null +++ b/dbms/src/Interpreters/RowRefs.h @@ -0,0 +1,91 @@ +#pragma once + +#include +#include + +#include + +namespace DB +{ + +class Block; + +/// Reference to the row in block. +struct RowRef +{ + const Block * block = nullptr; + size_t row_num = 0; + + RowRef() {} + RowRef(const Block * block_, size_t row_num_) : block(block_), row_num(row_num_) {} +}; + +/// Single linked list of references to rows. Used for ALL JOINs (non-unique JOINs) +struct RowRefList : RowRef +{ + RowRefList * next = nullptr; + + RowRefList() {} + RowRefList(const Block * block_, size_t row_num_) : RowRef(block_, row_num_) {} +}; + +class AsofRowRefs +{ +public: + /// Different types of asof join keys + #define APPLY_FOR_ASOF_JOIN_VARIANTS(M) \ + M(key32, UInt32) \ + M(key64, UInt64) \ + M(keyf32, Float32) \ + M(keyf64, Float64) + + enum class Type + { + EMPTY, + #define M(NAME, TYPE) NAME, + APPLY_FOR_ASOF_JOIN_VARIANTS(M) + #undef M + }; + + static std::optional> getTypeSize(const IColumn * asof_column); + + template + struct Entry + { + T asof_value; + RowRef row_ref; + + Entry(T v) : asof_value(v) {} + Entry(T v, RowRef rr) : asof_value(v), row_ref(rr) {} + + bool operator< (const Entry& o) const + { + return asof_value < o.asof_value; + } + }; + + struct Lookups + { + #define M(NAME, TYPE) \ + std::unique_ptr>> NAME; + APPLY_FOR_ASOF_JOIN_VARIANTS(M) + #undef M + + void create(Type which); + }; + + AsofRowRefs() : type(Type::EMPTY) {} + AsofRowRefs(Type t) : type(t) + { + lookups.create(t); + } + + void insert(const IColumn * asof_column, const Block * block, size_t row_num, Arena & pool); + const RowRef * findAsof(const IColumn * asof_column, size_t row_num, Arena & pool) const; + +private: + const Type type; + mutable Lookups lookups; +}; + +} \ No newline at end of file From 6695e304afd49215e9aad226b6025324a0fa2834 Mon Sep 17 00:00:00 2001 From: Martijn Bakker Date: Sat, 30 Mar 2019 21:55:40 +0000 Subject: [PATCH 7/7] fix style --- dbms/src/Common/SortedLookupPODArray.h | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dbms/src/Common/SortedLookupPODArray.h b/dbms/src/Common/SortedLookupPODArray.h index 60b3529c69e..72cb5648735 100644 --- a/dbms/src/Common/SortedLookupPODArray.h +++ b/dbms/src/Common/SortedLookupPODArray.h @@ -2,7 +2,8 @@ #include -namespace DB { +namespace DB +{ /** * This class is intended to push sortable data into.