diff --git a/contrib/zstd b/contrib/zstd index f4340f46b23..255597502c3 160000 --- a/contrib/zstd +++ b/contrib/zstd @@ -1 +1 @@ -Subproject commit f4340f46b2387bc8de7d5320c0b83bb1499933ad +Subproject commit 255597502c3a4ef150abc964e376d4202a8c2929 diff --git a/dbms/src/Common/ZooKeeper/ZooKeeperImpl.cpp b/dbms/src/Common/ZooKeeper/ZooKeeperImpl.cpp index 37ff3078d75..10b167657e4 100644 --- a/dbms/src/Common/ZooKeeper/ZooKeeperImpl.cpp +++ b/dbms/src/Common/ZooKeeper/ZooKeeperImpl.cpp @@ -759,41 +759,6 @@ void ZooKeeper::sendThread() tryLogCurrentException(__PRETTY_FUNCTION__); finalize(true, false); } - - /// Drain queue - RequestInfo info; - while (requests_queue.tryPop(info)) - { - if (info.callback) - { - ResponsePtr response = info.request->makeResponse(); - response->error = ZSESSIONEXPIRED; - try - { - info.callback(*response); - } - catch (...) - { - tryLogCurrentException(__PRETTY_FUNCTION__); - } - - } - if (info.watch) - { - WatchResponse response; - response.type = SESSION; - response.state = EXPIRED_SESSION; - response.error = ZSESSIONEXPIRED; - try - { - info.watch(response); - } - catch (...) - { - tryLogCurrentException(__PRETTY_FUNCTION__); - } - } - } } @@ -1030,102 +995,139 @@ void ZooKeeper::receiveEvent() void ZooKeeper::finalize(bool error_send, bool error_receive) { - bool expired_prev = false; - if (expired.compare_exchange_strong(expired_prev, true)) + std::lock_guard lock(finalize_mutex); + + if (expired) + return; + expired = true; + + active_session_metric_increment.destroy(); + + try { - active_session_metric_increment.destroy(); - - try + if (!error_send) { - if (!error_send) - { - /// Send close event. This also signals sending thread to wakeup and then stop. - try - { - close(); - } - catch (...) - { - /// This happens for example, when "Cannot push request to queue within operation timeout". - tryLogCurrentException(__PRETTY_FUNCTION__); - } - send_thread.join(); - } - + /// Send close event. This also signals sending thread to wakeup and then stop. try { - /// This will also wakeup the receiving thread. - socket.shutdown(); + close(); } catch (...) { - /// We must continue to execute all callbacks, because the user is waiting for them. + /// This happens for example, when "Cannot push request to queue within operation timeout". tryLogCurrentException(__PRETTY_FUNCTION__); } + send_thread.join(); + } - if (!error_receive) - receive_thread.join(); + try + { + /// This will also wakeup the receiving thread. + socket.shutdown(); + } + catch (...) + { + /// We must continue to execute all callbacks, because the user is waiting for them. + tryLogCurrentException(__PRETTY_FUNCTION__); + } + if (!error_receive) + receive_thread.join(); + + { + std::lock_guard lock(operations_mutex); + + for (auto & op : operations) { - std::lock_guard lock(operations_mutex); - - for (auto & op : operations) + RequestInfo & request_info = op.second; + ResponsePtr response = request_info.request->makeResponse(); + response->error = ZSESSIONEXPIRED; + if (request_info.callback) { - RequestInfo & request_info = op.second; - ResponsePtr response = request_info.request->makeResponse(); - response->error = ZSESSIONEXPIRED; - if (request_info.callback) + try + { + request_info.callback(*response); + } + catch (...) + { + /// We must continue to all other callbacks, because the user is waiting for them. + tryLogCurrentException(__PRETTY_FUNCTION__); + } + } + } + + CurrentMetrics::sub(CurrentMetrics::ZooKeeperRequest, operations.size()); + operations.clear(); + } + + { + std::lock_guard lock(watches_mutex); + + for (auto & path_watches : watches) + { + WatchResponse response; + response.type = SESSION; + response.state = EXPIRED_SESSION; + response.error = ZSESSIONEXPIRED; + + for (auto & callback : path_watches.second) + { + if (callback) { try { - request_info.callback(*response); + callback(response); } catch (...) { - /// We must continue to all other callbacks, because the user is waiting for them. tryLogCurrentException(__PRETTY_FUNCTION__); } } } - - CurrentMetrics::sub(CurrentMetrics::ZooKeeperRequest, operations.size()); - operations.clear(); } + CurrentMetrics::sub(CurrentMetrics::ZooKeeperWatch, watches.size()); + watches.clear(); + } + + /// Drain queue + RequestInfo info; + while (requests_queue.tryPop(info)) + { + if (info.callback) { - std::lock_guard lock(watches_mutex); - - for (auto & path_watches : watches) + ResponsePtr response = info.request->makeResponse(); + response->error = ZSESSIONEXPIRED; + try { - WatchResponse response; - response.type = SESSION; - response.state = EXPIRED_SESSION; - response.error = ZSESSIONEXPIRED; - - for (auto & callback : path_watches.second) - { - if (callback) - { - try - { - callback(response); - } - catch (...) - { - tryLogCurrentException(__PRETTY_FUNCTION__); - } - } - } + info.callback(*response); + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); } - CurrentMetrics::sub(CurrentMetrics::ZooKeeperWatch, watches.size()); - watches.clear(); + } + if (info.watch) + { + WatchResponse response; + response.type = SESSION; + response.state = EXPIRED_SESSION; + response.error = ZSESSIONEXPIRED; + try + { + info.watch(response); + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + } } } - catch (...) - { - tryLogCurrentException(__PRETTY_FUNCTION__); - } + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); } } @@ -1329,10 +1331,6 @@ void ZooKeeper::MultiResponse::readImpl(ReadBuffer & in) void ZooKeeper::pushRequest(RequestInfo && info) { - /// If the request is close request, we push it even after session is expired - because it will signal sending thread to stop. - if (expired && info.request->xid != close_xid) - throw Exception("Session expired", ZSESSIONEXPIRED); - try { info.request->addRootPath(root_path); @@ -1346,7 +1344,15 @@ void ZooKeeper::pushRequest(RequestInfo && info) throw Exception("XID overflow", ZSESSIONEXPIRED); } - ProfileEvents::increment(ProfileEvents::ZooKeeperTransactions); + /// We must serialize 'pushRequest' and 'finalize' (from sendThread, receiveThread) calls + /// to avoid forgotten operations in the queue when session is expired. + /// Invariant: when expired, no new operations will be pushed to the queue in 'pushRequest' + /// and the queue will be drained in 'finalize'. + std::lock_guard lock(finalize_mutex); + + /// If the request is close request, we push it even after session is expired - because it will signal sending thread to stop. + if (expired) + throw Exception("Session expired", ZSESSIONEXPIRED); if (!requests_queue.tryPush(std::move(info), operation_timeout.totalMilliseconds())) throw Exception("Cannot push request to queue within operation timeout", ZOPERATIONTIMEOUT); @@ -1356,6 +1362,8 @@ void ZooKeeper::pushRequest(RequestInfo && info) finalize(false, false); throw; } + + ProfileEvents::increment(ProfileEvents::ZooKeeperTransactions); } @@ -1522,7 +1530,9 @@ void ZooKeeper::close() RequestInfo request_info; request_info.request = std::make_shared(std::move(request)); - pushRequest(std::move(request_info)); + if (!requests_queue.tryPush(std::move(request_info), operation_timeout.totalMilliseconds())) + throw Exception("Cannot push close request to queue within operation timeout", ZOPERATIONTIMEOUT); + ProfileEvents::increment(ProfileEvents::ZooKeeperClose); } diff --git a/dbms/src/Common/ZooKeeper/ZooKeeperImpl.h b/dbms/src/Common/ZooKeeper/ZooKeeperImpl.h index b86e2bbf1e2..8a65d09b529 100644 --- a/dbms/src/Common/ZooKeeper/ZooKeeperImpl.h +++ b/dbms/src/Common/ZooKeeper/ZooKeeperImpl.h @@ -570,7 +570,10 @@ private: std::optional out; int64_t session_id = 0; + std::atomic xid {1}; + std::atomic expired {false}; + std::mutex finalize_mutex; using clock = std::chrono::steady_clock; @@ -601,8 +604,6 @@ private: std::thread send_thread; std::thread receive_thread; - std::atomic expired {false}; - void connect( const Addresses & addresses, Poco::Timespan connection_timeout); diff --git a/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp b/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp index 9995dde25f5..2506810be80 100644 --- a/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp +++ b/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp @@ -71,7 +71,9 @@ MergeSortingBlockInputStream::MergeSortingBlockInputStream( max_bytes_before_external_sort(max_bytes_before_external_sort_), tmp_path(tmp_path_) { children.push_back(input); - header = getHeader(); + header = children.at(0)->getHeader(); + header_without_constants = header; + removeConstantsFromBlock(header_without_constants); removeConstantsFromSortDescription(header, description); } @@ -110,7 +112,7 @@ Block MergeSortingBlockInputStream::readImpl() const std::string & path = temporary_files.back()->path(); WriteBufferFromFile file_buf(path); CompressedWriteBuffer compressed_buf(file_buf); - NativeBlockOutputStream block_out(compressed_buf, 0, block.cloneEmpty()); + NativeBlockOutputStream block_out(compressed_buf, 0, header_without_constants); MergeSortingBlocksBlockInputStream block_in(blocks, description, max_merged_block_size, limit); LOG_INFO(log, "Sorting and writing part of data into temporary file " + path); @@ -140,7 +142,7 @@ Block MergeSortingBlockInputStream::readImpl() /// Create sorted streams to merge. for (const auto & file : temporary_files) { - temporary_inputs.emplace_back(std::make_unique(file->path(), header)); + temporary_inputs.emplace_back(std::make_unique(file->path(), header_without_constants)); inputs_to_merge.emplace_back(temporary_inputs.back()->block_in); } diff --git a/dbms/src/DataStreams/MergeSortingBlockInputStream.h b/dbms/src/DataStreams/MergeSortingBlockInputStream.h index feb882effb0..7a3172912b7 100644 --- a/dbms/src/DataStreams/MergeSortingBlockInputStream.h +++ b/dbms/src/DataStreams/MergeSortingBlockInputStream.h @@ -81,7 +81,7 @@ public: bool isSortedOutput() const override { return true; } const SortDescription & getSortDescription() const override { return description; } - Block getHeader() const override { return children.at(0)->getHeader(); } + Block getHeader() const override { return header; } protected: Block readImpl() override; @@ -104,6 +104,7 @@ private: /// (to avoid excessive virtual function calls and because constants cannot be serialized in Native format for temporary files) /// Save original block structure here. Block header; + Block header_without_constants; /// Everything below is for external sorting. std::vector> temporary_files; diff --git a/dbms/src/Interpreters/tests/CMakeLists.txt b/dbms/src/Interpreters/tests/CMakeLists.txt index 48793cbcf8d..ad3a24d8608 100644 --- a/dbms/src/Interpreters/tests/CMakeLists.txt +++ b/dbms/src/Interpreters/tests/CMakeLists.txt @@ -15,7 +15,7 @@ target_include_directories (hash_map BEFORE PRIVATE ${SPARCEHASH_INCLUDE_DIR}) target_link_libraries (hash_map dbms) add_executable (hash_map3 hash_map3.cpp) -target_link_libraries (hash_map3 dbms) +target_link_libraries (hash_map3 dbms ${FARMHASH_LIBRARIES} ${METROHASH_LIBRARIES}) add_executable (hash_map_string hash_map_string.cpp) target_include_directories (hash_map_string BEFORE PRIVATE ${SPARCEHASH_INCLUDE_DIR}) @@ -25,7 +25,7 @@ add_executable (hash_map_string_2 hash_map_string_2.cpp) target_link_libraries (hash_map_string_2 dbms) add_executable (hash_map_string_3 hash_map_string_3.cpp) -target_link_libraries (hash_map_string_3 dbms) +target_link_libraries (hash_map_string_3 dbms ${FARMHASH_LIBRARIES} ${METROHASH_LIBRARIES}) target_include_directories (hash_map_string_3 BEFORE PRIVATE ${ClickHouse_SOURCE_DIR}/contrib/libfarmhash) target_include_directories (hash_map_string_3 BEFORE PRIVATE ${ClickHouse_SOURCE_DIR}/contrib/libmetrohash/src)