Merge remote-tracking branch 'upstream/master' into fix3

This commit is contained in:
proller 2018-04-08 14:47:48 +03:00
commit 10dd23203e
6 changed files with 129 additions and 115 deletions

2
contrib/zstd vendored

@ -1 +1 @@
Subproject commit f4340f46b2387bc8de7d5320c0b83bb1499933ad
Subproject commit 255597502c3a4ef150abc964e376d4202a8c2929

View File

@ -759,41 +759,6 @@ void ZooKeeper::sendThread()
tryLogCurrentException(__PRETTY_FUNCTION__);
finalize(true, false);
}
/// Drain queue
RequestInfo info;
while (requests_queue.tryPop(info))
{
if (info.callback)
{
ResponsePtr response = info.request->makeResponse();
response->error = ZSESSIONEXPIRED;
try
{
info.callback(*response);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
if (info.watch)
{
WatchResponse response;
response.type = SESSION;
response.state = EXPIRED_SESSION;
response.error = ZSESSIONEXPIRED;
try
{
info.watch(response);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
}
}
@ -1030,102 +995,139 @@ void ZooKeeper::receiveEvent()
void ZooKeeper::finalize(bool error_send, bool error_receive)
{
bool expired_prev = false;
if (expired.compare_exchange_strong(expired_prev, true))
std::lock_guard lock(finalize_mutex);
if (expired)
return;
expired = true;
active_session_metric_increment.destroy();
try
{
active_session_metric_increment.destroy();
try
if (!error_send)
{
if (!error_send)
{
/// Send close event. This also signals sending thread to wakeup and then stop.
try
{
close();
}
catch (...)
{
/// This happens for example, when "Cannot push request to queue within operation timeout".
tryLogCurrentException(__PRETTY_FUNCTION__);
}
send_thread.join();
}
/// Send close event. This also signals sending thread to wakeup and then stop.
try
{
/// This will also wakeup the receiving thread.
socket.shutdown();
close();
}
catch (...)
{
/// We must continue to execute all callbacks, because the user is waiting for them.
/// This happens for example, when "Cannot push request to queue within operation timeout".
tryLogCurrentException(__PRETTY_FUNCTION__);
}
send_thread.join();
}
if (!error_receive)
receive_thread.join();
try
{
/// This will also wakeup the receiving thread.
socket.shutdown();
}
catch (...)
{
/// We must continue to execute all callbacks, because the user is waiting for them.
tryLogCurrentException(__PRETTY_FUNCTION__);
}
if (!error_receive)
receive_thread.join();
{
std::lock_guard lock(operations_mutex);
for (auto & op : operations)
{
std::lock_guard lock(operations_mutex);
for (auto & op : operations)
RequestInfo & request_info = op.second;
ResponsePtr response = request_info.request->makeResponse();
response->error = ZSESSIONEXPIRED;
if (request_info.callback)
{
RequestInfo & request_info = op.second;
ResponsePtr response = request_info.request->makeResponse();
response->error = ZSESSIONEXPIRED;
if (request_info.callback)
try
{
request_info.callback(*response);
}
catch (...)
{
/// We must continue to all other callbacks, because the user is waiting for them.
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
}
CurrentMetrics::sub(CurrentMetrics::ZooKeeperRequest, operations.size());
operations.clear();
}
{
std::lock_guard lock(watches_mutex);
for (auto & path_watches : watches)
{
WatchResponse response;
response.type = SESSION;
response.state = EXPIRED_SESSION;
response.error = ZSESSIONEXPIRED;
for (auto & callback : path_watches.second)
{
if (callback)
{
try
{
request_info.callback(*response);
callback(response);
}
catch (...)
{
/// We must continue to all other callbacks, because the user is waiting for them.
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
}
CurrentMetrics::sub(CurrentMetrics::ZooKeeperRequest, operations.size());
operations.clear();
}
CurrentMetrics::sub(CurrentMetrics::ZooKeeperWatch, watches.size());
watches.clear();
}
/// Drain queue
RequestInfo info;
while (requests_queue.tryPop(info))
{
if (info.callback)
{
std::lock_guard lock(watches_mutex);
for (auto & path_watches : watches)
ResponsePtr response = info.request->makeResponse();
response->error = ZSESSIONEXPIRED;
try
{
WatchResponse response;
response.type = SESSION;
response.state = EXPIRED_SESSION;
response.error = ZSESSIONEXPIRED;
for (auto & callback : path_watches.second)
{
if (callback)
{
try
{
callback(response);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
}
info.callback(*response);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
CurrentMetrics::sub(CurrentMetrics::ZooKeeperWatch, watches.size());
watches.clear();
}
if (info.watch)
{
WatchResponse response;
response.type = SESSION;
response.state = EXPIRED_SESSION;
response.error = ZSESSIONEXPIRED;
try
{
info.watch(response);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
@ -1329,10 +1331,6 @@ void ZooKeeper::MultiResponse::readImpl(ReadBuffer & in)
void ZooKeeper::pushRequest(RequestInfo && info)
{
/// If the request is close request, we push it even after session is expired - because it will signal sending thread to stop.
if (expired && info.request->xid != close_xid)
throw Exception("Session expired", ZSESSIONEXPIRED);
try
{
info.request->addRootPath(root_path);
@ -1346,7 +1344,15 @@ void ZooKeeper::pushRequest(RequestInfo && info)
throw Exception("XID overflow", ZSESSIONEXPIRED);
}
ProfileEvents::increment(ProfileEvents::ZooKeeperTransactions);
/// We must serialize 'pushRequest' and 'finalize' (from sendThread, receiveThread) calls
/// to avoid forgotten operations in the queue when session is expired.
/// Invariant: when expired, no new operations will be pushed to the queue in 'pushRequest'
/// and the queue will be drained in 'finalize'.
std::lock_guard lock(finalize_mutex);
/// If the request is close request, we push it even after session is expired - because it will signal sending thread to stop.
if (expired)
throw Exception("Session expired", ZSESSIONEXPIRED);
if (!requests_queue.tryPush(std::move(info), operation_timeout.totalMilliseconds()))
throw Exception("Cannot push request to queue within operation timeout", ZOPERATIONTIMEOUT);
@ -1356,6 +1362,8 @@ void ZooKeeper::pushRequest(RequestInfo && info)
finalize(false, false);
throw;
}
ProfileEvents::increment(ProfileEvents::ZooKeeperTransactions);
}
@ -1522,7 +1530,9 @@ void ZooKeeper::close()
RequestInfo request_info;
request_info.request = std::make_shared<CloseRequest>(std::move(request));
pushRequest(std::move(request_info));
if (!requests_queue.tryPush(std::move(request_info), operation_timeout.totalMilliseconds()))
throw Exception("Cannot push close request to queue within operation timeout", ZOPERATIONTIMEOUT);
ProfileEvents::increment(ProfileEvents::ZooKeeperClose);
}

View File

@ -570,7 +570,10 @@ private:
std::optional<WriteBufferFromPocoSocket> out;
int64_t session_id = 0;
std::atomic<XID> xid {1};
std::atomic<bool> expired {false};
std::mutex finalize_mutex;
using clock = std::chrono::steady_clock;
@ -601,8 +604,6 @@ private:
std::thread send_thread;
std::thread receive_thread;
std::atomic<bool> expired {false};
void connect(
const Addresses & addresses,
Poco::Timespan connection_timeout);

View File

@ -71,7 +71,9 @@ MergeSortingBlockInputStream::MergeSortingBlockInputStream(
max_bytes_before_external_sort(max_bytes_before_external_sort_), tmp_path(tmp_path_)
{
children.push_back(input);
header = getHeader();
header = children.at(0)->getHeader();
header_without_constants = header;
removeConstantsFromBlock(header_without_constants);
removeConstantsFromSortDescription(header, description);
}
@ -110,7 +112,7 @@ Block MergeSortingBlockInputStream::readImpl()
const std::string & path = temporary_files.back()->path();
WriteBufferFromFile file_buf(path);
CompressedWriteBuffer compressed_buf(file_buf);
NativeBlockOutputStream block_out(compressed_buf, 0, block.cloneEmpty());
NativeBlockOutputStream block_out(compressed_buf, 0, header_without_constants);
MergeSortingBlocksBlockInputStream block_in(blocks, description, max_merged_block_size, limit);
LOG_INFO(log, "Sorting and writing part of data into temporary file " + path);
@ -140,7 +142,7 @@ Block MergeSortingBlockInputStream::readImpl()
/// Create sorted streams to merge.
for (const auto & file : temporary_files)
{
temporary_inputs.emplace_back(std::make_unique<TemporaryFileStream>(file->path(), header));
temporary_inputs.emplace_back(std::make_unique<TemporaryFileStream>(file->path(), header_without_constants));
inputs_to_merge.emplace_back(temporary_inputs.back()->block_in);
}

View File

@ -81,7 +81,7 @@ public:
bool isSortedOutput() const override { return true; }
const SortDescription & getSortDescription() const override { return description; }
Block getHeader() const override { return children.at(0)->getHeader(); }
Block getHeader() const override { return header; }
protected:
Block readImpl() override;
@ -104,6 +104,7 @@ private:
/// (to avoid excessive virtual function calls and because constants cannot be serialized in Native format for temporary files)
/// Save original block structure here.
Block header;
Block header_without_constants;
/// Everything below is for external sorting.
std::vector<std::unique_ptr<Poco::TemporaryFile>> temporary_files;

View File

@ -15,7 +15,7 @@ target_include_directories (hash_map BEFORE PRIVATE ${SPARCEHASH_INCLUDE_DIR})
target_link_libraries (hash_map dbms)
add_executable (hash_map3 hash_map3.cpp)
target_link_libraries (hash_map3 dbms)
target_link_libraries (hash_map3 dbms ${FARMHASH_LIBRARIES} ${METROHASH_LIBRARIES})
add_executable (hash_map_string hash_map_string.cpp)
target_include_directories (hash_map_string BEFORE PRIVATE ${SPARCEHASH_INCLUDE_DIR})
@ -25,7 +25,7 @@ add_executable (hash_map_string_2 hash_map_string_2.cpp)
target_link_libraries (hash_map_string_2 dbms)
add_executable (hash_map_string_3 hash_map_string_3.cpp)
target_link_libraries (hash_map_string_3 dbms)
target_link_libraries (hash_map_string_3 dbms ${FARMHASH_LIBRARIES} ${METROHASH_LIBRARIES})
target_include_directories (hash_map_string_3 BEFORE PRIVATE ${ClickHouse_SOURCE_DIR}/contrib/libfarmhash)
target_include_directories (hash_map_string_3 BEFORE PRIVATE ${ClickHouse_SOURCE_DIR}/contrib/libmetrohash/src)