Merge pull request #19290 from azat/dist-broken-on-EOF-fix

Do not mark file for distributed send as broken on EOF
This commit is contained in:
alexey-milovidov 2021-01-21 17:00:36 +03:00 committed by GitHub
commit 062f00aa5d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 38 additions and 19 deletions

View File

@ -803,6 +803,9 @@ Packet Connection::receivePacket(std::function<void(Poco::Net::Socket &)> async_
} }
catch (Exception & e) catch (Exception & e)
{ {
/// This is to consider ATTEMPT_TO_READ_AFTER_EOF as a remote exception.
e.setRemoteException();
/// Add server address to exception message, if need. /// Add server address to exception message, if need.
if (e.code() != ErrorCodes::UNKNOWN_PACKET_FROM_SERVER) if (e.code() != ErrorCodes::UNKNOWN_PACKET_FROM_SERVER)
e.addMessage("while receiving packet from " + getDescription()); e.addMessage("while receiving packet from " + getDescription());
@ -892,7 +895,7 @@ void Connection::setDescription()
std::unique_ptr<Exception> Connection::receiveException() std::unique_ptr<Exception> Connection::receiveException()
{ {
return std::make_unique<Exception>(readException(*in, "Received from " + getDescription())); return std::make_unique<Exception>(readException(*in, "Received from " + getDescription(), true /* remote */));
} }

View File

@ -50,8 +50,9 @@ void handle_error_code([[maybe_unused]] const std::string & msg, int code)
ErrorCodes::increment(code); ErrorCodes::increment(code);
} }
Exception::Exception(const std::string & msg, int code) Exception::Exception(const std::string & msg, int code, bool remote_)
: Poco::Exception(msg, code) : Poco::Exception(msg, code)
, remote(remote_)
{ {
handle_error_code(msg, code); handle_error_code(msg, code);
} }

View File

@ -25,7 +25,7 @@ class Exception : public Poco::Exception
{ {
public: public:
Exception() = default; Exception() = default;
Exception(const std::string & msg, int code); Exception(const std::string & msg, int code, bool remote_ = false);
Exception(const std::string & msg, const Exception & nested, int code); Exception(const std::string & msg, const Exception & nested, int code);
Exception(int code, const std::string & message) Exception(int code, const std::string & message)
@ -61,12 +61,17 @@ public:
extendedMessage(message); extendedMessage(message);
} }
/// Used to distinguish local exceptions from the one that was received from remote node.
void setRemoteException(bool remote_ = true) { remote = remote_; }
bool isRemoteException() const { return remote; }
std::string getStackTraceString() const; std::string getStackTraceString() const;
private: private:
#ifndef STD_EXCEPTION_HAS_STACK_TRACE #ifndef STD_EXCEPTION_HAS_STACK_TRACE
StackTrace trace; StackTrace trace;
#endif #endif
bool remote = false;
const char * className() const throw() override { return "DB::Exception"; } const char * className() const throw() override { return "DB::Exception"; }
}; };

View File

@ -1014,7 +1014,7 @@ void skipJSONField(ReadBuffer & buf, const StringRef & name_of_field)
} }
Exception readException(ReadBuffer & buf, const String & additional_message) Exception readException(ReadBuffer & buf, const String & additional_message, bool remote_exception)
{ {
int code = 0; int code = 0;
String name; String name;
@ -1041,7 +1041,7 @@ Exception readException(ReadBuffer & buf, const String & additional_message)
if (!stack_trace.empty()) if (!stack_trace.empty())
out << " Stack trace:\n\n" << stack_trace; out << " Stack trace:\n\n" << stack_trace;
return Exception(out.str(), code); return Exception(out.str(), code, remote_exception);
} }
void readAndThrowException(ReadBuffer & buf, const String & additional_message) void readAndThrowException(ReadBuffer & buf, const String & additional_message)

View File

@ -1073,7 +1073,7 @@ void skipJSONField(ReadBuffer & buf, const StringRef & name_of_field);
* (type is cut to base class, 'message' replaced by 'displayText', and stack trace is appended to 'message') * (type is cut to base class, 'message' replaced by 'displayText', and stack trace is appended to 'message')
* Some additional message could be appended to exception (example: you could add information about from where it was received). * Some additional message could be appended to exception (example: you could add information about from where it was received).
*/ */
Exception readException(ReadBuffer & buf, const String & additional_message = ""); Exception readException(ReadBuffer & buf, const String & additional_message = "", bool remote_exception = false);
void readAndThrowException(ReadBuffer & buf, const String & additional_message = ""); void readAndThrowException(ReadBuffer & buf, const String & additional_message = "");

View File

@ -155,6 +155,27 @@ namespace
return header; return header;
} }
/// remote_error argument is used to decide whether some errors should be
/// ignored or not, in particular:
///
/// - ATTEMPT_TO_READ_AFTER_EOF should not be ignored
/// if we receive it from remote (receiver), since:
/// - the sender will got ATTEMPT_TO_READ_AFTER_EOF when the client just go away,
/// i.e. server had been restarted
/// - since #18853 the file will be checked on the sender locally, and
/// if there is something wrong with the file itself, we will receive
/// ATTEMPT_TO_READ_AFTER_EOF not from the remote at first
/// and mark batch as broken.
bool isFileBrokenErrorCode(int code, bool remote_error)
{
return code == ErrorCodes::CHECKSUM_DOESNT_MATCH
|| code == ErrorCodes::TOO_LARGE_SIZE_COMPRESSED
|| code == ErrorCodes::CANNOT_READ_ALL_DATA
|| code == ErrorCodes::UNKNOWN_CODEC
|| code == ErrorCodes::CANNOT_DECOMPRESS
|| (!remote_error && code == ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF);
}
} }
@ -571,7 +592,7 @@ struct StorageDistributedDirectoryMonitor::Batch
} }
catch (const Exception & e) catch (const Exception & e)
{ {
if (isFileBrokenErrorCode(e.code())) if (isFileBrokenErrorCode(e.code(), e.isRemoteException()))
{ {
tryLogCurrentException(parent.log, "Failed to send batch due to"); tryLogCurrentException(parent.log, "Failed to send batch due to");
batch_broken = true; batch_broken = true;
@ -801,16 +822,6 @@ void StorageDistributedDirectoryMonitor::processFilesWithBatching(const std::map
} }
} }
bool StorageDistributedDirectoryMonitor::isFileBrokenErrorCode(int code)
{
return code == ErrorCodes::CHECKSUM_DOESNT_MATCH
|| code == ErrorCodes::TOO_LARGE_SIZE_COMPRESSED
|| code == ErrorCodes::CANNOT_READ_ALL_DATA
|| code == ErrorCodes::UNKNOWN_CODEC
|| code == ErrorCodes::CANNOT_DECOMPRESS
|| code == ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF;
}
void StorageDistributedDirectoryMonitor::markAsBroken(const std::string & file_path) const void StorageDistributedDirectoryMonitor::markAsBroken(const std::string & file_path) const
{ {
const auto last_path_separator_pos = file_path.rfind('/'); const auto last_path_separator_pos = file_path.rfind('/');
@ -837,7 +848,7 @@ void StorageDistributedDirectoryMonitor::markAsBroken(const std::string & file_p
bool StorageDistributedDirectoryMonitor::maybeMarkAsBroken(const std::string & file_path, const Exception & e) const bool StorageDistributedDirectoryMonitor::maybeMarkAsBroken(const std::string & file_path, const Exception & e) const
{ {
/// mark file as broken if necessary /// mark file as broken if necessary
if (isFileBrokenErrorCode(e.code())) if (isFileBrokenErrorCode(e.code(), e.isRemoteException()))
{ {
markAsBroken(file_path); markAsBroken(file_path);
return true; return true;

View File

@ -70,7 +70,6 @@ private:
void processFile(const std::string & file_path); void processFile(const std::string & file_path);
void processFilesWithBatching(const std::map<UInt64, std::string> & files); void processFilesWithBatching(const std::map<UInt64, std::string> & files);
static bool isFileBrokenErrorCode(int code);
void markAsBroken(const std::string & file_path) const; void markAsBroken(const std::string & file_path) const;
bool maybeMarkAsBroken(const std::string & file_path, const Exception & e) const; bool maybeMarkAsBroken(const std::string & file_path, const Exception & e) const;