diff --git a/cmake/find/cxx.cmake b/cmake/find/cxx.cmake index 4f2430228d4..839b49e1889 100644 --- a/cmake/find/cxx.cmake +++ b/cmake/find/cxx.cmake @@ -1,7 +1,5 @@ -if (COMPILER_CLANG) - option (USE_LIBCXX "Use libc++ and libc++abi instead of libstdc++" ON) - option (USE_INTERNAL_LIBCXX_LIBRARY "Set to FALSE to use system libcxx and libcxxabi libraries instead of bundled" ${NOT_UNBUNDLED}) -endif() +option (USE_LIBCXX "Use libc++ and libc++abi instead of libstdc++" ${NOT_UNBUNDLED}) +option (USE_INTERNAL_LIBCXX_LIBRARY "Set to FALSE to use system libcxx and libcxxabi libraries instead of bundled" ${NOT_UNBUNDLED}) if (USE_LIBCXX) set (CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -D_LIBCPP_DEBUG=0") # More checks in debug build. diff --git a/dbms/CMakeLists.txt b/dbms/CMakeLists.txt index 295124c6ada..45d12f8ed93 100644 --- a/dbms/CMakeLists.txt +++ b/dbms/CMakeLists.txt @@ -77,9 +77,7 @@ if (CMAKE_CXX_COMPILER_ID STREQUAL "Clang") elseif (CMAKE_CXX_COMPILER_ID STREQUAL "GNU") # Add compiler options only to c++ compiler function(add_cxx_compile_options option) - add_compile_options( - "$<$,CXX>:${option}>" - ) + add_compile_options("$<$,CXX>:${option}>") endfunction() # Warn about boolean expression compared with an integer value different from true/false add_cxx_compile_options(-Wbool-compare) @@ -113,8 +111,8 @@ elseif (CMAKE_CXX_COMPILER_ID STREQUAL "GNU") add_cxx_compile_options(-Wnon-virtual-dtor) # Obvious add_cxx_compile_options(-Wno-return-local-addr) - # Obvious - add_cxx_compile_options(-Wnull-dereference) + # This warning is disabled due to false positives if compiled with libc++: https://gcc.gnu.org/bugzilla/show_bug.cgi?id=90037 + #add_cxx_compile_options(-Wnull-dereference) # Obvious add_cxx_compile_options(-Wodr) # Obvious diff --git a/dbms/src/Core/SortCursor.h b/dbms/src/Core/SortCursor.h index c898ee71b8f..5b4db43024f 100644 --- a/dbms/src/Core/SortCursor.h +++ b/dbms/src/Core/SortCursor.h @@ -1,5 +1,9 @@ #pragma once +#include +#include +#include + #include #include #include @@ -98,9 +102,12 @@ struct SortCursorImpl bool isFirst() const { return pos == 0; } bool isLast() const { return pos + 1 >= rows; } + bool isValid() const { return pos < rows; } void next() { ++pos; } }; +using SortCursorImpls = std::vector; + /// For easy copying. struct SortCursor @@ -203,4 +210,102 @@ struct SortCursorWithCollation } }; + +/** Allows to fetch data from multiple sort cursors in sorted order (merging sorted data streams). + */ +template +class SortingHeap +{ +public: + SortingHeap() = default; + + template + SortingHeap(Cursors & cursors) + { + size_t size = cursors.size(); + queue.reserve(size); + for (size_t i = 0; i < size; ++i) + queue.emplace_back(&cursors[i]); + std::make_heap(queue.begin(), queue.end()); + } + + bool isValid() const { return !queue.empty(); } + + Cursor & current() { return queue.front(); } + + void next() + { + assert(isValid()); + + if (!current()->isLast()) + { + current()->next(); + updateTop(); + } + else + removeTop(); + } + +private: + using Container = std::vector; + Container queue; + + /// This is adapted version of the function __sift_down from libc++. + /// Why cannot simply use std::priority_queue? + /// - because it doesn't support updating the top element and requires pop and push instead. + void updateTop() + { + size_t size = queue.size(); + if (size < 2) + return; + + size_t child_idx = 1; + auto begin = queue.begin(); + auto child_it = begin + 1; + + /// Right child exists and is greater than left child. + if (size > 2 && *child_it < *(child_it + 1)) + { + ++child_it; + ++child_idx; + } + + /// Check if we are in order. + if (*child_it < *begin) + return; + + auto curr_it = begin; + auto top(std::move(*begin)); + do + { + /// We are not in heap-order, swap the parent with it's largest child. + *curr_it = std::move(*child_it); + curr_it = child_it; + + if ((size - 2) / 2 < child_idx) + break; + + // recompute the child based off of the updated parent + child_idx = 2 * child_idx + 1; + child_it = begin + child_idx; + + if ((child_idx + 1) < size && *child_it < *(child_it + 1)) + { + /// Right child exists and is greater than left child. + ++child_it; + ++child_idx; + } + + /// Check if we are in order. + } while (!(*child_it < top)); + *curr_it = std::move(top); + } + + void removeTop() + { + std::pop_heap(queue.begin(), queue.end()); + queue.pop_back(); + } +}; + } diff --git a/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp b/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp index 9f6f8173cde..1c50316fc3f 100644 --- a/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp +++ b/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp @@ -1,4 +1,3 @@ -#include #include #include #include @@ -152,15 +151,9 @@ MergeSortingBlocksBlockInputStream::MergeSortingBlocksBlockInputStream( blocks.swap(nonempty_blocks); if (!has_collation) - { - for (size_t i = 0; i < cursors.size(); ++i) - queue_without_collation.push(SortCursor(&cursors[i])); - } + queue_without_collation = SortingHeap(cursors); else - { - for (size_t i = 0; i < cursors.size(); ++i) - queue_with_collation.push(SortCursorWithCollation(&cursors[i])); - } + queue_with_collation = SortingHeap(cursors); } @@ -177,52 +170,50 @@ Block MergeSortingBlocksBlockInputStream::readImpl() } return !has_collation - ? mergeImpl(queue_without_collation) - : mergeImpl(queue_with_collation); + ? mergeImpl(queue_without_collation) + : mergeImpl(queue_with_collation); } -template -Block MergeSortingBlocksBlockInputStream::mergeImpl(std::priority_queue & queue) +template +Block MergeSortingBlocksBlockInputStream::mergeImpl(TSortingHeap & queue) { - size_t num_columns = blocks[0].columns(); + size_t num_columns = header.columns(); - MutableColumns merged_columns = blocks[0].cloneEmptyColumns(); + MutableColumns merged_columns = header.cloneEmptyColumns(); /// TODO: reserve (in each column) /// Take rows from queue in right order and push to 'merged'. size_t merged_rows = 0; - while (!queue.empty()) + while (queue.isValid()) { - TSortCursor current = queue.top(); - queue.pop(); + auto current = queue.current(); + /// Append a row from queue. for (size_t i = 0; i < num_columns; ++i) merged_columns[i]->insertFrom(*current->all_columns[i], current->pos); - if (!current->isLast()) - { - current->next(); - queue.push(current); - } - ++total_merged_rows; + ++merged_rows; + + /// We don't need more rows because of limit has reached. if (limit && total_merged_rows == limit) { - auto res = blocks[0].cloneWithColumns(std::move(merged_columns)); blocks.clear(); - return res; + break; } - ++merged_rows; + queue.next(); + + /// It's enough for current output block but we will continue. if (merged_rows == max_merged_block_size) - return blocks[0].cloneWithColumns(std::move(merged_columns)); + break; } if (merged_rows == 0) return {}; - return blocks[0].cloneWithColumns(std::move(merged_columns)); + return header.cloneWithColumns(std::move(merged_columns)); } diff --git a/dbms/src/DataStreams/MergeSortingBlockInputStream.h b/dbms/src/DataStreams/MergeSortingBlockInputStream.h index a8b8e8cfd3b..9492bdb074b 100644 --- a/dbms/src/DataStreams/MergeSortingBlockInputStream.h +++ b/dbms/src/DataStreams/MergeSortingBlockInputStream.h @@ -1,7 +1,5 @@ #pragma once -#include - #include #include @@ -56,19 +54,18 @@ private: UInt64 limit; size_t total_merged_rows = 0; - using CursorImpls = std::vector; - CursorImpls cursors; + SortCursorImpls cursors; bool has_collation = false; - std::priority_queue queue_without_collation; - std::priority_queue queue_with_collation; + SortingHeap queue_without_collation; + SortingHeap queue_with_collation; /** Two different cursors are supported - with and without Collation. * Templates are used (instead of virtual functions in SortCursor) for zero-overhead. */ - template - Block mergeImpl(std::priority_queue & queue); + template + Block mergeImpl(TSortingHeap & queue); };