Merge pull request #19451 from azat/safe-writes

Do not silently ignore write errors
This commit is contained in:
alexey-milovidov 2021-02-11 21:19:11 +03:00 committed by GitHub
commit dc3ffd3fe2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 116 additions and 134 deletions

View File

@ -24,8 +24,8 @@ namespace
/// ///
/// - when it is explicitly blocked with LockExceptionInThread /// - when it is explicitly blocked with LockExceptionInThread
/// ///
/// - to avoid std::terminate(), when stack unwinding is current in progress in /// - to avoid std::terminate(), when stack unwinding is currently in progress
/// this thread. /// in this thread.
/// ///
/// NOTE: that since C++11 destructor marked with noexcept by default, and /// NOTE: that since C++11 destructor marked with noexcept by default, and
/// this means that any throw from destructor (that is not marked with /// this means that any throw from destructor (that is not marked with

View File

@ -331,7 +331,7 @@ public:
class IKeeper class IKeeper
{ {
public: public:
virtual ~IKeeper() {} virtual ~IKeeper() = default;
/// If expired, you can only destroy the object. All other methods will throw exception. /// If expired, you can only destroy the object. All other methods will throw exception.
virtual bool isExpired() const = 0; virtual bool isExpired() const = 0;

View File

@ -8,6 +8,7 @@
#include <Compression/CompressionFactory.h> #include <Compression/CompressionFactory.h>
#include <Common/MemorySanitizer.h> #include <Common/MemorySanitizer.h>
#include <Common/MemoryTracker.h>
namespace DB namespace DB
@ -49,14 +50,9 @@ CompressedWriteBuffer::CompressedWriteBuffer(
CompressedWriteBuffer::~CompressedWriteBuffer() CompressedWriteBuffer::~CompressedWriteBuffer()
{ {
try /// FIXME move final flush into the caller
{ MemoryTracker::LockExceptionInThread lock;
next(); next();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
} }
} }

View File

@ -57,7 +57,7 @@ public:
*/ */
virtual std::string getContentType() const { return "text/plain; charset=UTF-8"; } virtual std::string getContentType() const { return "text/plain; charset=UTF-8"; }
virtual ~IBlockOutputStream() {} virtual ~IBlockOutputStream() = default;
/** Don't let to alter table while instance of stream is alive. /** Don't let to alter table while instance of stream is alive.
*/ */

View File

@ -1,10 +1,8 @@
#pragma once #pragma once
#include <math.h>
#include <vector> #include <vector>
#include <Common/ThreadPool.h> #include <Common/ThreadPool.h>
#include <Common/MemoryTracker.h>
#include <IO/WriteBuffer.h> #include <IO/WriteBuffer.h>
@ -53,18 +51,14 @@ public:
~AsynchronousWriteBuffer() override ~AsynchronousWriteBuffer() override
{ {
try /// FIXME move final flush into the caller
{ MemoryTracker::LockExceptionInThread lock;
if (started)
pool.wait();
swapBuffers(); if (started)
out.next(); pool.wait();
}
catch (...) swapBuffers();
{ out.next();
tryLogCurrentException(__PRETTY_FUNCTION__);
}
} }
/// That is executed in a separate thread /// That is executed in a separate thread

View File

@ -6,6 +6,8 @@
# include <IO/BrotliWriteBuffer.h> # include <IO/BrotliWriteBuffer.h>
# include <brotli/encode.h> # include <brotli/encode.h>
#include <Common/MemoryTracker.h>
namespace DB namespace DB
{ {
@ -47,14 +49,9 @@ BrotliWriteBuffer::BrotliWriteBuffer(std::unique_ptr<WriteBuffer> out_, int comp
BrotliWriteBuffer::~BrotliWriteBuffer() BrotliWriteBuffer::~BrotliWriteBuffer()
{ {
try /// FIXME move final flush into the caller
{ MemoryTracker::LockExceptionInThread lock;
finish(); finish();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
} }
void BrotliWriteBuffer::nextImpl() void BrotliWriteBuffer::nextImpl()

View File

@ -1,6 +1,6 @@
#include <common/types.h> #include <common/types.h>
#include <Common/hex.h> #include <Common/hex.h>
#include <Common/Exception.h> #include <Common/MemoryTracker.h>
#include <IO/HexWriteBuffer.h> #include <IO/HexWriteBuffer.h>
@ -22,14 +22,9 @@ void HexWriteBuffer::nextImpl()
HexWriteBuffer::~HexWriteBuffer() HexWriteBuffer::~HexWriteBuffer()
{ {
try /// FIXME move final flush into the caller
{ MemoryTracker::LockExceptionInThread lock;
nextImpl(); nextImpl();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
} }
} }

View File

@ -17,7 +17,7 @@ struct IReadableWriteBuffer
return getReadBufferImpl(); return getReadBufferImpl();
} }
virtual ~IReadableWriteBuffer() {} virtual ~IReadableWriteBuffer() = default;
protected: protected:

View File

@ -1,4 +1,5 @@
#include <IO/LZMADeflatingWriteBuffer.h> #include <IO/LZMADeflatingWriteBuffer.h>
#include <Common/MemoryTracker.h>
#if !defined(ARCADIA_BUILD) #if !defined(ARCADIA_BUILD)
@ -48,16 +49,11 @@ LZMADeflatingWriteBuffer::LZMADeflatingWriteBuffer(
LZMADeflatingWriteBuffer::~LZMADeflatingWriteBuffer() LZMADeflatingWriteBuffer::~LZMADeflatingWriteBuffer()
{ {
try /// FIXME move final flush into the caller
{ MemoryTracker::LockExceptionInThread lock;
finish();
lzma_end(&lstr); finish();
} lzma_end(&lstr);
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
} }
void LZMADeflatingWriteBuffer::nextImpl() void LZMADeflatingWriteBuffer::nextImpl()

View File

@ -3,6 +3,7 @@
#include <errno.h> #include <errno.h>
#include <Common/ProfileEvents.h> #include <Common/ProfileEvents.h>
#include <Common/MemoryTracker.h>
#include <IO/WriteBufferFromFile.h> #include <IO/WriteBufferFromFile.h>
#include <IO/WriteHelpers.h> #include <IO/WriteHelpers.h>
@ -77,14 +78,10 @@ WriteBufferFromFile::~WriteBufferFromFile()
if (fd < 0) if (fd < 0)
return; return;
try /// FIXME move final flush into the caller
{ MemoryTracker::LockExceptionInThread lock;
next();
} next();
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
::close(fd); ::close(fd);
} }

View File

@ -8,6 +8,7 @@
#include <Common/ProfileEvents.h> #include <Common/ProfileEvents.h>
#include <Common/CurrentMetrics.h> #include <Common/CurrentMetrics.h>
#include <Common/Stopwatch.h> #include <Common/Stopwatch.h>
#include <Common/MemoryTracker.h>
#include <IO/WriteBufferFromFileDescriptor.h> #include <IO/WriteBufferFromFileDescriptor.h>
#include <IO/WriteHelpers.h> #include <IO/WriteHelpers.h>
@ -90,17 +91,15 @@ WriteBufferFromFileDescriptor::WriteBufferFromFileDescriptor(
WriteBufferFromFileDescriptor::~WriteBufferFromFileDescriptor() WriteBufferFromFileDescriptor::~WriteBufferFromFileDescriptor()
{ {
try if (fd < 0)
{ {
if (fd >= 0) assert(!offset() && "attempt to write after close");
next(); return;
else
assert(!offset() && "attempt to write after close");
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
} }
/// FIXME move final flush into the caller
MemoryTracker::LockExceptionInThread lock;
next();
} }

View File

@ -7,6 +7,7 @@
#include <Common/Exception.h> #include <Common/Exception.h>
#include <Common/NetException.h> #include <Common/NetException.h>
#include <Common/Stopwatch.h> #include <Common/Stopwatch.h>
#include <Common/MemoryTracker.h>
#if !defined(ARCADIA_BUILD) #if !defined(ARCADIA_BUILD)
# include <Common/config.h> # include <Common/config.h>
@ -206,14 +207,9 @@ void WriteBufferFromHTTPServerResponse::finalize()
WriteBufferFromHTTPServerResponse::~WriteBufferFromHTTPServerResponse() WriteBufferFromHTTPServerResponse::~WriteBufferFromHTTPServerResponse()
{ {
try /// FIXME move final flush into the caller
{ MemoryTracker::LockExceptionInThread lock;
finalize(); finalize();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
} }
} }

View File

@ -1,5 +1,5 @@
#include <IO/WriteBufferFromOStream.h> #include <IO/WriteBufferFromOStream.h>
#include <Common/Exception.h> #include <Common/MemoryTracker.h>
namespace DB namespace DB
@ -42,14 +42,9 @@ WriteBufferFromOStream::WriteBufferFromOStream(
WriteBufferFromOStream::~WriteBufferFromOStream() WriteBufferFromOStream::~WriteBufferFromOStream()
{ {
try /// FIXME move final flush into the caller
{ MemoryTracker::LockExceptionInThread lock;
next(); next();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
} }
} }

View File

@ -5,6 +5,7 @@
#include <Common/Exception.h> #include <Common/Exception.h>
#include <Common/NetException.h> #include <Common/NetException.h>
#include <Common/Stopwatch.h> #include <Common/Stopwatch.h>
#include <Common/MemoryTracker.h>
namespace ProfileEvents namespace ProfileEvents
@ -70,14 +71,9 @@ WriteBufferFromPocoSocket::WriteBufferFromPocoSocket(Poco::Net::Socket & socket_
WriteBufferFromPocoSocket::~WriteBufferFromPocoSocket() WriteBufferFromPocoSocket::~WriteBufferFromPocoSocket()
{ {
try /// FIXME move final flush into the caller
{ MemoryTracker::LockExceptionInThread lock;
next(); next();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
} }
} }

View File

@ -4,6 +4,7 @@
# include <IO/WriteBufferFromS3.h> # include <IO/WriteBufferFromS3.h>
# include <IO/WriteHelpers.h> # include <IO/WriteHelpers.h>
# include <Common/MemoryTracker.h>
# include <aws/s3/S3Client.h> # include <aws/s3/S3Client.h>
# include <aws/s3/model/CreateMultipartUploadRequest.h> # include <aws/s3/model/CreateMultipartUploadRequest.h>
@ -78,6 +79,8 @@ void WriteBufferFromS3::nextImpl()
void WriteBufferFromS3::finalize() void WriteBufferFromS3::finalize()
{ {
/// FIXME move final flush into the caller
MemoryTracker::LockExceptionInThread lock;
finalizeImpl(); finalizeImpl();
} }
@ -104,14 +107,7 @@ void WriteBufferFromS3::finalizeImpl()
WriteBufferFromS3::~WriteBufferFromS3() WriteBufferFromS3::~WriteBufferFromS3()
{ {
try finalizeImpl();
{
finalizeImpl();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
} }
void WriteBufferFromS3::createMultipartUpload() void WriteBufferFromS3::createMultipartUpload()

View File

@ -3,6 +3,7 @@
#include <vector> #include <vector>
#include <IO/WriteBuffer.h> #include <IO/WriteBuffer.h>
#include <Common/MemoryTracker.h>
namespace DB namespace DB
@ -93,14 +94,9 @@ public:
~WriteBufferFromVector() override ~WriteBufferFromVector() override
{ {
try /// FIXME move final flush into the caller
{ MemoryTracker::LockExceptionInThread lock;
finalize(); finalize();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
} }
}; };

View File

@ -1,5 +1,6 @@
#include <Poco/UTF8Encoding.h> #include <Poco/UTF8Encoding.h>
#include <IO/WriteBufferValidUTF8.h> #include <IO/WriteBufferValidUTF8.h>
#include <Common/MemoryTracker.h>
#include <common/types.h> #include <common/types.h>
#ifdef __SSE2__ #ifdef __SSE2__
@ -136,14 +137,9 @@ void WriteBufferValidUTF8::finish()
WriteBufferValidUTF8::~WriteBufferValidUTF8() WriteBufferValidUTF8::~WriteBufferValidUTF8()
{ {
try /// FIXME move final flush into the caller
{ MemoryTracker::LockExceptionInThread lock;
finish(); finish();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
} }
} }

View File

@ -1,5 +1,7 @@
#include <IO/ZlibDeflatingWriteBuffer.h> #include <IO/ZlibDeflatingWriteBuffer.h>
#include <Common/MemorySanitizer.h> #include <Common/MemorySanitizer.h>
#include <Common/MemoryTracker.h>
#include <Common/Exception.h>
namespace DB namespace DB
@ -46,16 +48,21 @@ ZlibDeflatingWriteBuffer::ZlibDeflatingWriteBuffer(
ZlibDeflatingWriteBuffer::~ZlibDeflatingWriteBuffer() ZlibDeflatingWriteBuffer::~ZlibDeflatingWriteBuffer()
{ {
/// FIXME move final flush into the caller
MemoryTracker::LockExceptionInThread lock;
finish();
try try
{ {
finish();
int rc = deflateEnd(&zstr); int rc = deflateEnd(&zstr);
if (rc != Z_OK) if (rc != Z_OK)
throw Exception(std::string("deflateEnd failed: ") + zError(rc), ErrorCodes::ZLIB_DEFLATE_FAILED); throw Exception(std::string("deflateEnd failed: ") + zError(rc), ErrorCodes::ZLIB_DEFLATE_FAILED);
} }
catch (...) catch (...)
{ {
/// It is OK not to terminate under an error from deflateEnd()
/// since all data already written to the stream.
tryLogCurrentException(__PRETTY_FUNCTION__); tryLogCurrentException(__PRETTY_FUNCTION__);
} }
} }

View File

@ -1,4 +1,6 @@
#include <IO/ZstdDeflatingWriteBuffer.h> #include <IO/ZstdDeflatingWriteBuffer.h>
#include <Common/MemoryTracker.h>
#include <Common/Exception.h>
namespace DB namespace DB
{ {
@ -28,14 +30,22 @@ ZstdDeflatingWriteBuffer::ZstdDeflatingWriteBuffer(
ZstdDeflatingWriteBuffer::~ZstdDeflatingWriteBuffer() ZstdDeflatingWriteBuffer::~ZstdDeflatingWriteBuffer()
{ {
/// FIXME move final flush into the caller
MemoryTracker::LockExceptionInThread lock;
finish();
try try
{ {
finish(); int err = ZSTD_freeCCtx(cctx);
/// This is just in case, since it is impossible to get an error by using this wrapper.
ZSTD_freeCCtx(cctx); if (unlikely(err))
throw Exception(ErrorCodes::ZSTD_ENCODER_FAILED, "ZSTD_freeCCtx failed: error code: {}; zstd version: {}", err, ZSTD_VERSION_STRING);
} }
catch (...) catch (...)
{ {
/// It is OK not to terminate under an error from ZSTD_freeCCtx()
/// since all data already written to the stream.
tryLogCurrentException(__PRETTY_FUNCTION__); tryLogCurrentException(__PRETTY_FUNCTION__);
} }
} }

View File

@ -1,6 +1,7 @@
#include <iomanip> #include <iomanip>
#include <ext/scope_guard.h> #include <ext/scope_guard.h>
#include <Poco/Net/NetException.h> #include <Poco/Net/NetException.h>
#include <Poco/Util/LayeredConfiguration.h>
#include <Common/CurrentThread.h> #include <Common/CurrentThread.h>
#include <Common/Stopwatch.h> #include <Common/Stopwatch.h>
#include <Common/NetException.h> #include <Common/NetException.h>
@ -56,6 +57,28 @@ namespace ErrorCodes
extern const int SUPPORT_IS_DISABLED; extern const int SUPPORT_IS_DISABLED;
} }
TCPHandler::TCPHandler(IServer & server_, const Poco::Net::StreamSocket & socket_, bool parse_proxy_protocol_, std::string server_display_name_)
: Poco::Net::TCPServerConnection(socket_)
, server(server_)
, parse_proxy_protocol(parse_proxy_protocol_)
, log(&Poco::Logger::get("TCPHandler"))
, connection_context(server.context())
, query_context(server.context())
, server_display_name(std::move(server_display_name_))
{
}
TCPHandler::~TCPHandler()
{
try
{
state.reset();
out->next();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
void TCPHandler::runImpl() void TCPHandler::runImpl()
{ {

View File

@ -10,6 +10,7 @@
#include <IO/Progress.h> #include <IO/Progress.h>
#include <DataStreams/BlockIO.h> #include <DataStreams/BlockIO.h>
#include <Interpreters/InternalTextLogsQueue.h> #include <Interpreters/InternalTextLogsQueue.h>
#include <Interpreters/Context.h>
#include <Client/TimeoutSetter.h> #include <Client/TimeoutSetter.h>
#include "IServer.h" #include "IServer.h"
@ -113,16 +114,8 @@ public:
* Proxy-forwarded (original client) IP address is used for quota accounting if quota is keyed by forwarded IP. * Proxy-forwarded (original client) IP address is used for quota accounting if quota is keyed by forwarded IP.
*/ */
TCPHandler(IServer & server_, const Poco::Net::StreamSocket & socket_, bool parse_proxy_protocol_, TCPHandler(IServer & server_, const Poco::Net::StreamSocket & socket_, bool parse_proxy_protocol_,
std::string server_display_name_) std::string server_display_name_);
: Poco::Net::TCPServerConnection(socket_) ~TCPHandler() override;
, server(server_)
, parse_proxy_protocol(parse_proxy_protocol_)
, log(&Poco::Logger::get("TCPHandler"))
, connection_context(server.context())
, query_context(server.context())
, server_display_name(std::move(server_display_name_))
{
}
void run() override; void run() override;