cancel or finalize write buffer

This commit is contained in:
Sema Checherinda 2024-09-26 10:27:37 +02:00
parent 2748e76f9f
commit f1ab911910
209 changed files with 1838 additions and 1221 deletions

View File

@ -43,7 +43,7 @@ namespace Net
/// Sets the following default values:
/// - timeout: 60 seconds
/// - keepAlive: true
/// - maxKeepAliveRequests: 0
/// - maxKeepAliveRequests: 100
/// - keepAliveTimeout: 15 seconds
void setServerName(const std::string & serverName);
@ -87,12 +87,12 @@ namespace Net
const Poco::Timespan & getKeepAliveTimeout() const;
/// Returns the connection timeout for HTTP connections.
void setMaxKeepAliveRequests(int maxKeepAliveRequests);
void setMaxKeepAliveRequests(size_t maxKeepAliveRequests);
/// Specifies the maximum number of requests allowed
/// during a persistent connection. 0 means unlimited
/// connections.
int getMaxKeepAliveRequests() const;
size_t getMaxKeepAliveRequests() const;
/// Returns the maximum number of requests allowed
/// during a persistent connection, or 0 if
/// unlimited connections are allowed.
@ -106,7 +106,7 @@ namespace Net
std::string _softwareVersion;
Poco::Timespan _timeout;
bool _keepAlive;
int _maxKeepAliveRequests;
size_t _maxKeepAliveRequests;
Poco::Timespan _keepAliveTimeout;
};
@ -138,7 +138,7 @@ namespace Net
}
inline int HTTPServerParams::getMaxKeepAliveRequests() const
inline size_t HTTPServerParams::getMaxKeepAliveRequests() const
{
return _maxKeepAliveRequests;
}

View File

@ -65,7 +65,7 @@ namespace Net
private:
bool _firstRequest;
Poco::Timespan _keepAliveTimeout;
int _maxKeepAliveRequests;
size_t _maxKeepAliveRequests;
};
@ -74,7 +74,7 @@ namespace Net
//
inline bool HTTPServerSession::canKeepAlive() const
{
return _maxKeepAliveRequests != 0;
return getKeepAlive() && _maxKeepAliveRequests > 0;
}

View File

@ -22,7 +22,7 @@ namespace Net {
HTTPServerParams::HTTPServerParams():
_timeout(60000000),
_keepAlive(true),
_maxKeepAliveRequests(0),
_maxKeepAliveRequests(100),
_keepAliveTimeout(15000000)
{
}
@ -32,12 +32,12 @@ HTTPServerParams::~HTTPServerParams()
{
}
void HTTPServerParams::setServerName(const std::string& serverName)
{
_serverName = serverName;
}
void HTTPServerParams::setSoftwareVersion(const std::string& softwareVersion)
{
@ -50,24 +50,24 @@ void HTTPServerParams::setTimeout(const Poco::Timespan& timeout)
_timeout = timeout;
}
void HTTPServerParams::setKeepAlive(bool keepAlive)
{
_keepAlive = keepAlive;
}
void HTTPServerParams::setKeepAliveTimeout(const Poco::Timespan& timeout)
{
_keepAliveTimeout = timeout;
}
void HTTPServerParams::setMaxKeepAliveRequests(int maxKeepAliveRequests)
void HTTPServerParams::setMaxKeepAliveRequests(size_t maxKeepAliveRequests)
{
poco_assert (maxKeepAliveRequests >= 0);
_maxKeepAliveRequests = maxKeepAliveRequests;
}
} } // namespace Poco::Net

View File

@ -50,14 +50,14 @@ bool HTTPServerSession::hasMoreRequests()
--_maxKeepAliveRequests;
return socket().poll(getTimeout(), Socket::SELECT_READ);
}
else if (_maxKeepAliveRequests != 0 && getKeepAlive())
else if (canKeepAlive())
{
if (_maxKeepAliveRequests > 0)
--_maxKeepAliveRequests;
return buffered() > 0 || socket().poll(_keepAliveTimeout, Socket::SELECT_READ);
}
else
return false;
else
return false;
}

View File

@ -218,6 +218,8 @@ int mainEntryClickHouseCompressor(int argc, char ** argv)
to.finalize();
}
}
wb->finalize();
}
catch (...)
{

View File

@ -529,6 +529,13 @@ struct ResultWriter
}
}
}
void finalize()
{
commits.finalize();
file_changes.finalize();
line_changes.finalize();
}
};
@ -1178,6 +1185,8 @@ void processLog(const Options & options)
if (i + num_threads < num_commits)
show_commands[i % num_threads] = gitShow(hashes[i + num_threads]);
}
result.finalize();
}

View File

@ -100,10 +100,14 @@ namespace fs = std::filesystem;
static auto executeScript(const std::string & command, bool throw_on_error = false)
{
auto sh = ShellCommand::execute(command);
WriteBufferFromFileDescriptor wb_stdout(STDOUT_FILENO);
WriteBufferFromFileDescriptor wb_stderr(STDERR_FILENO);
copyData(sh->out, wb_stdout);
wb_stdout.finalize();
WriteBufferFromFileDescriptor wb_stderr(STDERR_FILENO);
copyData(sh->err, wb_stderr);
wb_stderr.finalize();
if (throw_on_error)
{
@ -1125,6 +1129,7 @@ namespace
WriteBufferFromFileDescriptor std_err(STDERR_FILENO);
copyData(sh->err, std_err);
std_err.finalize();
sh->tryWait();
}

View File

@ -34,6 +34,7 @@ String KeeperClient::executeFourLetterCommand(const String & command)
out.write(command.data(), command.size());
out.next();
out.finalize();
String result;
readStringUntilEOF(result, in);

View File

@ -579,34 +579,13 @@ void CatBoostLibraryBridgeRequestHandler::handleRequest(HTTPServerRequest & requ
processError(response, "Unknown library method '" + method + "'");
LOG_ERROR(log, "Unknown library method: '{}'", method);
}
}
catch (...)
{
auto message = getCurrentExceptionMessage(true);
LOG_ERROR(log, "Failed to process request. Error: {}", message);
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR, message); // can't call process_error, because of too soon response sending
try
{
writeStringBinary(message, out);
out.finalize();
}
catch (...)
{
tryLogCurrentException(log);
out.cancel();
}
return;
}
try
{
out.finalize();
}
catch (...)
{
tryLogCurrentException(log);
tryLogCurrentException(log, "Failed to process request");
out.cancelWithException(request, getCurrentExceptionCode(), getCurrentExceptionMessage(true), nullptr);
}
}

View File

@ -1474,6 +1474,8 @@ try
rewind_needed = true;
}
file_out.finalize();
return 0;
}
catch (...)

View File

@ -207,15 +207,8 @@ void ODBCColumnsInfoHandler::handleRequest(HTTPServerRequest & request, HTTPServ
throw Exception(ErrorCodes::UNKNOWN_TABLE, "Columns definition was not returned");
WriteBufferFromHTTPServerResponse out(response, request.getMethod() == Poco::Net::HTTPRequest::HTTP_HEAD);
try
{
writeStringBinary(columns.toString(), out);
out.finalize();
}
catch (...)
{
out.finalize();
}
writeStringBinary(columns.toString(), out);
out.finalize();
}
catch (...)
{

View File

@ -80,15 +80,8 @@ void IdentifierQuoteHandler::handleRequest(HTTPServerRequest & request, HTTPServ
auto identifier = getIdentifierQuote(std::move(connection));
WriteBufferFromHTTPServerResponse out(response, request.getMethod() == Poco::Net::HTTPRequest::HTTP_HEAD);
try
{
writeStringBinary(identifier, out);
out.finalize();
}
catch (...)
{
out.finalize();
}
writeStringBinary(identifier, out);
out.finalize();
}
catch (...)
{

View File

@ -194,33 +194,13 @@ void ODBCHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse
CompletedPipelineExecutor executor(pipeline);
executor.execute();
}
}
catch (...)
{
auto message = getCurrentExceptionMessage(true);
response.setStatusAndReason(
Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR); // can't call process_error, because of too soon response sending
try
{
writeStringBinary(message, out);
out.finalize();
}
catch (...)
{
tryLogCurrentException(log);
}
tryLogCurrentException(log);
}
try
{
out.finalize();
}
catch (...)
{
tryLogCurrentException(log);
out.cancelWithException(request, getCurrentExceptionCode(), getCurrentExceptionMessage(true), nullptr);
}
}

View File

@ -94,15 +94,8 @@ void SchemaAllowedHandler::handleRequest(HTTPServerRequest & request, HTTPServer
bool result = isSchemaAllowed(std::move(connection));
WriteBufferFromHTTPServerResponse out(response, request.getMethod() == Poco::Net::HTTPRequest::HTTP_HEAD);
try
{
writeBoolText(result, out);
out.finalize();
}
catch (...)
{
out.finalize();
}
writeBoolText(result, out);
out.finalize();
}
catch (...)
{

View File

@ -670,7 +670,7 @@ void ClientBase::initLogsOutputStream()
if (server_logs_file.empty())
{
/// Use stderr by default
out_logs_buf = std::make_unique<WriteBufferFromFileDescriptor>(STDERR_FILENO);
out_logs_buf = std::make_unique<AutoCanceledWriteBuffer<WriteBufferFromFileDescriptor>>(STDERR_FILENO);
wb = out_logs_buf.get();
color_logs = stderr_is_a_tty;
}
@ -683,7 +683,7 @@ void ClientBase::initLogsOutputStream()
else
{
out_logs_buf
= std::make_unique<WriteBufferFromFile>(server_logs_file, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_APPEND | O_CREAT);
= std::make_unique<AutoCanceledWriteBuffer<WriteBufferFromFile>>(server_logs_file, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_APPEND | O_CREAT);
wb = out_logs_buf.get();
}
}
@ -847,7 +847,7 @@ void ClientBase::initTTYBuffer(ProgressOption progress_option, ProgressOption pr
{
try
{
tty_buf = std::make_unique<WriteBufferFromFile>(tty_file_name, buf_size);
tty_buf = std::make_unique<AutoCanceledWriteBuffer<WriteBufferFromFile>>(tty_file_name, buf_size);
/// It is possible that the terminal file has writeable permissions
/// but we cannot write anything there. Check it with invisible character.
@ -858,8 +858,7 @@ void ClientBase::initTTYBuffer(ProgressOption progress_option, ProgressOption pr
}
catch (const Exception & e)
{
if (tty_buf)
tty_buf.reset();
tty_buf.reset();
if (e.code() != ErrorCodes::CANNOT_OPEN_FILE)
throw;
@ -872,7 +871,7 @@ void ClientBase::initTTYBuffer(ProgressOption progress_option, ProgressOption pr
if (stderr_is_a_tty || progress == ProgressOption::ERR)
{
tty_buf = std::make_unique<WriteBufferFromFileDescriptor>(STDERR_FILENO, buf_size);
tty_buf = std::make_unique<AutoCanceledWriteBuffer<WriteBufferFromFileDescriptor>>(STDERR_FILENO, buf_size);
}
else
{
@ -1475,10 +1474,10 @@ void ClientBase::resetOutput()
logs_out_stream.reset();
if (out_file_buf)
{
out_file_buf->finalize();
out_file_buf.reset();
}
out_file_buf.reset();
out_logs_buf.reset();
if (pager_cmd)
{
@ -1496,12 +1495,6 @@ void ClientBase::resetOutput()
}
pager_cmd = nullptr;
if (out_logs_buf)
{
out_logs_buf->finalize();
out_logs_buf.reset();
}
std_out.next();
}
@ -2664,9 +2657,7 @@ void ClientBase::runInteractive()
catch (const ErrnoException & e)
{
if (e.getErrno() != EEXIST)
{
error_stream << getCurrentExceptionMessage(false) << '\n';
}
}
}

View File

@ -3,6 +3,13 @@
#include <Client/ProgressTable.h>
#include <Client/Suggest.h>
#include <Common/QueryFuzzer.h>
#include <Common/DNSResolver.h>
#include <Common/InterruptListener.h>
#include <Common/ProgressIndication.h>
#include <Common/ShellCommand.h>
#include <Common/Stopwatch.h>
#include <IO/WriteBuffer.h>
#include <Core/ExternalTable.h>
#include <Core/Settings.h>
#include <Interpreters/Context.h>
@ -310,7 +317,7 @@ protected:
/// Buffer that reads from stdin in batch mode.
ReadBufferFromFileDescriptor std_in;
/// Console output.
WriteBufferFromFileDescriptor std_out;
AutoCanceledWriteBuffer<WriteBufferFromFileDescriptor> std_out;
std::unique_ptr<ShellCommand> pager_cmd;
/// The user can specify to redirect query output to a file.

View File

@ -78,14 +78,8 @@ namespace ErrorCodes
Connection::~Connection()
{
try{
if (connected)
Connection::disconnect();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
if (connected)
Connection::cancel();
}
Connection::Connection(const String & host_, UInt16 port_,
@ -126,6 +120,7 @@ Connection::Connection(const String & host_, UInt16 port_,
void Connection::connect(const ConnectionTimeouts & timeouts)
{
LOG_DEBUG(getLogger("Connection::connect"), "begin");
try
{
LOG_TRACE(log_wrapper.get(), "Connecting. Database: {}. User: {}{}{}",
@ -142,7 +137,7 @@ void Connection::connect(const ConnectionTimeouts & timeouts)
have_more_addresses_to_connect = it != std::prev(addresses.end());
if (connected)
disconnect();
cancel();
if (static_cast<bool>(secure))
{
@ -293,7 +288,7 @@ void Connection::connect(const ConnectionTimeouts & timeouts)
}
catch (DB::NetException & e)
{
disconnect();
cancel();
/// Remove this possible stale entry from cache
DNSResolver::instance().removeHostFromCache(host);
@ -304,7 +299,7 @@ void Connection::connect(const ConnectionTimeouts & timeouts)
}
catch (Poco::Net::NetException & e)
{
disconnect();
cancel();
/// Remove this possible stale entry from cache
DNSResolver::instance().removeHostFromCache(host);
@ -314,7 +309,7 @@ void Connection::connect(const ConnectionTimeouts & timeouts)
}
catch (Poco::TimeoutException & e)
{
disconnect();
cancel();
/// Remove this possible stale entry from cache
DNSResolver::instance().removeHostFromCache(host);
@ -329,55 +324,50 @@ void Connection::connect(const ConnectionTimeouts & timeouts)
getDescription(/*with_extra*/ true),
connection_timeout.totalMilliseconds());
}
LOG_DEBUG(getLogger("Connection::connect"), "end");
}
void Connection::cancel() noexcept
{
LOG_DEBUG(getLogger("Connection::cancel"), "begin");
if (maybe_compressed_out)
maybe_compressed_out->cancel();
if (out)
out->cancel();
if (socket)
socket->close();
LOG_DEBUG(getLogger("Connection::cancel"), "mdl");
reset();
LOG_DEBUG(getLogger("Connection::cancel"), "end");
}
void Connection::reset() noexcept
{
maybe_compressed_out.reset();
out.reset();
socket.reset();
nonce.reset();
connected = false;
}
void Connection::disconnect()
{
LOG_DEBUG(getLogger("Connection::disconnect"), "begin");
in = nullptr;
last_input_packet_type.reset();
std::exception_ptr finalize_exception;
try
{
// finalize() can write and throw an exception.
if (maybe_compressed_out)
maybe_compressed_out->finalize();
}
catch (...)
{
/// Don't throw an exception here, it will leave Connection in invalid state.
finalize_exception = std::current_exception();
// no point to finalize tcp connections
cancel();
if (out)
{
out->cancel();
out = nullptr;
}
}
maybe_compressed_out = nullptr;
try
{
if (out)
out->finalize();
}
catch (...)
{
/// Don't throw an exception here, it will leave Connection in invalid state.
finalize_exception = std::current_exception();
}
out = nullptr;
if (socket)
socket->close();
socket = nullptr;
connected = false;
nonce.reset();
if (finalize_exception)
std::rethrow_exception(finalize_exception);
LOG_DEBUG(getLogger("Connection::disconnect"), "end");
}
@ -888,7 +878,7 @@ void Connection::sendQuery(
maybe_compressed_in.reset();
if (maybe_compressed_out && maybe_compressed_out != out)
maybe_compressed_out->cancel();
maybe_compressed_out->finalize();
maybe_compressed_out.reset();
block_in.reset();
block_logs_in.reset();

View File

@ -134,7 +134,6 @@ public:
void disconnect() override;
/// Send prepared block of data (serialized and, if need, compressed), that will be read from 'input'.
/// You could pass size of serialized/compressed block.
void sendPreparedData(ReadBuffer & input, size_t size, const String & name = "");
@ -276,6 +275,9 @@ private:
void connect(const ConnectionTimeouts & timeouts);
void sendHello();
void cancel() noexcept;
void reset() noexcept;
#if USE_SSH
void performHandshakeForSSHAuth();
#endif

View File

@ -406,9 +406,10 @@ WeakHash32 ColumnAggregateFunction::getWeakHash32() const
std::vector<UInt8> v;
for (size_t i = 0; i < s; ++i)
{
WriteBufferFromVector<std::vector<UInt8>> wbuf(v);
func->serialize(data[i], wbuf, version);
wbuf.finalize();
{
WriteBufferFromVector<std::vector<UInt8>> wbuf(v);
func->serialize(data[i], wbuf, version);
}
hash_data[i] = ::updateWeakHash32(v.data(), v.size(), hash_data[i]);
}

View File

@ -260,10 +260,11 @@ void ColumnDynamic::insert(const Field & x)
/// We store values in shared variant in binary form with binary encoded type.
auto & shared_variant = getSharedVariant();
auto & chars = shared_variant.getChars();
WriteBufferFromVector<ColumnString::Chars> value_buf(chars, AppendModeTag());
encodeDataType(field_data_type, value_buf);
getVariantSerialization(field_data_type, field_data_type_name)->serializeBinary(x, value_buf, getFormatSettings());
value_buf.finalize();
{
WriteBufferFromVector<ColumnString::Chars> value_buf(chars, AppendModeTag());
encodeDataType(field_data_type, value_buf);
getVariantSerialization(field_data_type, field_data_type_name)->serializeBinary(x, value_buf, getFormatSettings());
}
chars.push_back(0);
shared_variant.getOffsets().push_back(chars.size());
variant_col.getLocalDiscriminators().push_back(variant_col.localDiscriminatorByGlobal(shared_variant_discr));
@ -697,10 +698,11 @@ void ColumnDynamic::serializeValueIntoSharedVariant(
size_t n)
{
auto & chars = shared_variant.getChars();
WriteBufferFromVector<ColumnString::Chars> value_buf(chars, AppendModeTag());
encodeDataType(type, value_buf);
serialization->serializeBinary(src, n, value_buf, getFormatSettings());
value_buf.finalize();
{
WriteBufferFromVector<ColumnString::Chars> value_buf(chars, AppendModeTag());
encodeDataType(type, value_buf);
serialization->serializeBinary(src, n, value_buf, getFormatSettings());
}
chars.push_back(0);
shared_variant.getOffsets().push_back(chars.size());
}

View File

@ -380,9 +380,10 @@ void ColumnObject::insert(const Field & x)
{
shared_data_paths->insertData(path.data(), path.size());
auto & shared_data_values_chars = shared_data_values->getChars();
WriteBufferFromVector<ColumnString::Chars> value_buf(shared_data_values_chars, AppendModeTag());
getDynamicSerialization()->serializeBinary(value_field, value_buf, getFormatSettings());
value_buf.finalize();
{
WriteBufferFromVector<ColumnString::Chars> value_buf(shared_data_values_chars, AppendModeTag());
getDynamicSerialization()->serializeBinary(value_field, value_buf, getFormatSettings());
}
shared_data_values_chars.push_back(0);
shared_data_values->getOffsets().push_back(shared_data_values_chars.size());
}
@ -674,9 +675,10 @@ void ColumnObject::serializePathAndValueIntoSharedData(ColumnString * shared_dat
shared_data_paths->insertData(path.data(), path.size());
auto & shared_data_values_chars = shared_data_values->getChars();
WriteBufferFromVector<ColumnString::Chars> value_buf(shared_data_values_chars, AppendModeTag());
getDynamicSerialization()->serializeBinary(column, n, value_buf, getFormatSettings());
value_buf.finalize();
{
WriteBufferFromVector<ColumnString::Chars> value_buf(shared_data_values_chars, AppendModeTag());
getDynamicSerialization()->serializeBinary(column, n, value_buf, getFormatSettings());
}
shared_data_values_chars.push_back(0);
shared_data_values->getOffsets().push_back(shared_data_values_chars.size());
}

View File

@ -105,6 +105,7 @@ public:
DB::writeIntText(res, wb);
DB::writeChar('\n', wb);
wb.sync();
wb.finalize();
}
locked_callback(res);

View File

@ -52,6 +52,7 @@ void writeSignalIDtoSignalPipe(int sig)
WriteBufferFromFileDescriptor out(signal_pipe.fds_rw[1], signal_pipe_buf_size, buf);
writeBinary(sig, out);
out.next();
out.finalize();
errno = saved_errno;
}
@ -98,7 +99,7 @@ void signalHandler(int sig, siginfo_t * info, void * context)
writeBinary(static_cast<UInt32>(getThreadId()), out);
writePODBinary(current_thread, out);
out.next();
out.finalize();
if (sig != SIGTSTP) /// This signal is used for debugging.
{
@ -153,6 +154,7 @@ void signalHandler(int sig, siginfo_t * info, void * context)
writeBinary(static_cast<UInt32>(getThreadId()), out);
writeBinary(log_message, out);
out.next();
out.finalize();
abort();
}
@ -195,6 +197,7 @@ static DISABLE_SANITIZER_INSTRUMENTATION void sanitizerDeathCallback()
writePODBinary(current_thread, out);
out.next();
out.finalize();
/// The time that is usually enough for separate thread to print info into log.
sleepForSeconds(20);

View File

@ -92,6 +92,7 @@ void TraceSender::send(TraceType trace_type, const StackTrace & stack_trace, Ext
writePODBinary(extras.increment, out);
out.next();
out.finalize();
inside_send = false;
}

View File

@ -306,6 +306,14 @@ void ZooKeeper::flushWriteBuffer()
out->next();
}
void ZooKeeper::cancelWriteBuffer() noexcept
{
if (compressed_out)
compressed_out->cancel();
if (out)
out->cancel();
}
ReadBuffer & ZooKeeper::getReadBuffer()
{
if (compressed_in)
@ -548,6 +556,7 @@ void ZooKeeper::connect(
catch (...)
{
fail_reasons << "\n" << getCurrentExceptionMessage(false) << ", " << node.address->toString();
cancelWriteBuffer();
}
}
@ -1087,6 +1096,8 @@ void ZooKeeper::finalize(bool error_send, bool error_receive, const String & rea
/// Set expired flag after we sent close event
expire_session_if_not_expired();
cancelWriteBuffer();
try
{
/// This will also wakeup the receiving thread.

View File

@ -338,6 +338,7 @@ private:
WriteBuffer & getWriteBuffer();
void flushWriteBuffer();
void cancelWriteBuffer() noexcept;
ReadBuffer & getReadBuffer();
void logOperationIfNeeded(const ZooKeeperRequestPtr & request, const ZooKeeperResponsePtr & response = nullptr, bool finalize = false, UInt64 elapsed_microseconds = 0);

View File

@ -136,12 +136,14 @@ class CgroupsMemoryUsageObserverFixture : public ::testing::TestWithParam<ICgrou
auto stat_file = WriteBufferFromFile(tmp_dir + "/memory.stat");
stat_file.write(SAMPLE_FILE[version].data(), SAMPLE_FILE[version].size());
stat_file.finalize();
stat_file.sync();
if (GetParam() == ICgroupsReader::CgroupsVersion::V2)
{
auto current_file = WriteBufferFromFile(tmp_dir + "/memory.current");
current_file.write("29645422592", 11);
current_file.finalize();
current_file.sync();
}
}

View File

@ -28,6 +28,7 @@ TEST(Common, ConfigProcessorManyElements)
for (size_t i = 0; i < element_count; ++i)
writeString("<x><name>" + std::to_string(i) + "</name></x>\n", out);
writeString("</clickhouse>\n", out);
out.finalize();
}
Poco::Timestamp load_start;

View File

@ -28,13 +28,14 @@ TEST(FST, SimpleTest)
};
std::vector<UInt8> buffer;
DB::WriteBufferFromVector<std::vector<UInt8>> wbuf(buffer);
DB::FST::FstBuilder builder(wbuf);
{
DB::WriteBufferFromVector<std::vector<UInt8>> wbuf(buffer);
DB::FST::FstBuilder builder(wbuf);
for (auto & [term, output] : indexed_data)
builder.add(term, output);
builder.build();
wbuf.finalize();
for (auto & [term, output] : indexed_data)
builder.add(term, output);
builder.build();
}
DB::FST::FiniteStateTransducer fst(buffer);
for (auto & [term, output] : indexed_data)
@ -61,14 +62,15 @@ TEST(FST, TestForLongTerms)
DB::FST::Output output2 = 200;
std::vector<UInt8> buffer;
DB::WriteBufferFromVector<std::vector<UInt8>> wbuf(buffer);
DB::FST::FstBuilder builder(wbuf);
{
DB::WriteBufferFromVector<std::vector<UInt8>> wbuf(buffer);
DB::FST::FstBuilder builder(wbuf);
builder.add(term1, output1);
builder.add(term2, output2);
builder.add(term1, output1);
builder.add(term2, output2);
builder.build();
wbuf.finalize();
builder.build();
}
DB::FST::FiniteStateTransducer fst(buffer);

View File

@ -26,6 +26,7 @@ inline std::unique_ptr<Poco::File> getFileWithContents(const char *fileName, con
{
WriteBufferFromFile out(config_file->path());
writeString(fileContents, out);
out.finalize();
}
return config_file;

View File

@ -32,6 +32,7 @@ clickhouse:
field2: "2"
)YAML";
writeString(data, out);
out.finalize();
}
auto system_tables_file = std::make_unique<File>(path / "config.d/system_tables.yaml");
@ -49,6 +50,7 @@ clickhouse:
level: debug
)YAML";
writeString(data, out);
out.finalize();
}
@ -96,6 +98,7 @@ clickhouse:
level: debug
)YAML";
writeString(data, out);
out.finalize();
}
auto system_tables_file = std::make_unique<File>(path / "config.d/system_tables.yaml");
@ -115,6 +118,7 @@ clickhouse:
level: debug
)YAML";
writeString(data, out);
out.finalize();
}

View File

@ -68,6 +68,7 @@ void CompressedWriteBuffer::finalizeImpl()
/// Don't try to resize buffer in nextImpl.
use_adaptive_buffer_size = false;
next();
BufferWithOwnMemory<WriteBuffer>::finalizeImpl();
}
CompressedWriteBuffer::CompressedWriteBuffer(
@ -80,11 +81,10 @@ CompressedWriteBuffer::CompressedWriteBuffer(
{
}
CompressedWriteBuffer::~CompressedWriteBuffer()
void CompressedWriteBuffer::cancelImpl() noexcept
{
if (!canceled)
finalize();
BufferWithOwnMemory<WriteBuffer>::cancelImpl();
out.cancel();
}
}

View File

@ -13,7 +13,7 @@
namespace DB
{
class CompressedWriteBuffer final : public BufferWithOwnMemory<WriteBuffer>
class CompressedWriteBuffer : public BufferWithOwnMemory<WriteBuffer>
{
public:
explicit CompressedWriteBuffer(
@ -23,8 +23,6 @@ public:
bool use_adaptive_buffer_size_ = false,
size_t adaptive_buffer_initial_size = DBMS_DEFAULT_INITIAL_ADAPTIVE_BUFFER_SIZE);
~CompressedWriteBuffer() override;
/// The amount of compressed data
size_t getCompressedBytes()
{
@ -46,8 +44,12 @@ public:
}
private:
void nextImpl() override;
void finalizeImpl() override;
void nextImpl() override;
/// finalize call does not affect the out buffer.
/// That is made in order to handle the usecase when several CompressedWriteBuffer's write to the one file
/// cancel call canecels the out buffer
void cancelImpl() noexcept override;
WriteBuffer & out;
CompressionCodecPtr codec;

View File

@ -163,14 +163,20 @@ public:
// we have a file we need to finalize first
if (tryGetFileBaseBuffer() && prealloc_done)
{
finalizeCurrentFile();
assert(current_file_description);
// if we wrote at least 1 log in the log file we can rename the file to reflect correctly the
// contained logs
// file can be deleted from disk earlier by compaction
if (!current_file_description->deleted)
if (current_file_description->deleted)
{
LOG_WARNING(log, "Log {} is already deleted", current_file_description->path);
prealloc_done = false;
cancelCurrentFile();
}
else
{
finalizeCurrentFile();
auto log_disk = current_file_description->disk;
const auto & path = current_file_description->path;
std::string new_path = path;
@ -204,6 +210,10 @@ public:
}
}
}
else
{
cancelCurrentFile();
}
auto latest_log_disk = getLatestLogDisk();
chassert(file_description->disk == latest_log_disk);
@ -348,6 +358,8 @@ public:
{
if (isFileSet() && prealloc_done)
finalizeCurrentFile();
else
cancelCurrentFile();
}
private:
@ -357,17 +369,16 @@ private:
chassert(current_file_description);
// compact can delete the file and we don't need to do anything
if (current_file_description->deleted)
{
LOG_WARNING(log, "Log {} is already deleted", current_file_description->path);
return;
}
chassert(!current_file_description->deleted);
if (log_file_settings.compress_logs)
if (compressed_buffer)
compressed_buffer->finalize();
flush();
if (file_buf)
file_buf->finalize();
const auto * file_buffer = tryGetFileBuffer();
if (log_file_settings.max_size != 0 && file_buffer)
@ -382,16 +393,22 @@ private:
LOG_WARNING(log, "Could not ftruncate file. Error: {}, errno: {}", errnoToString(), errno);
}
if (log_file_settings.compress_logs)
{
compressed_buffer.reset();
}
else
{
chassert(file_buf);
file_buf->finalize();
file_buf.reset();
}
compressed_buffer.reset();
file_buf.reset();
}
void cancelCurrentFile()
{
chassert(!prealloc_done);
if (compressed_buffer)
compressed_buffer->cancel();
if (file_buf)
file_buf->cancel();
compressed_buffer.reset();
file_buf.reset();
}
WriteBuffer & getBuffer()

View File

@ -14,6 +14,7 @@
#include "Coordination/KeeperFeatureFlags.h"
#include <Coordination/Keeper4LWInfo.h>
#include <IO/WriteHelpers.h>
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>
#include <boost/algorithm/string.hpp>
@ -257,7 +258,9 @@ String RuokCommand::run()
namespace
{
void print(IFourLetterCommand::StringBuffer & buf, const String & key, const String & value)
using StringBuffer = DB::WriteBufferFromOwnString;
void print(StringBuffer & buf, const String & key, const String & value)
{
writeText("zk_", buf);
writeText(key, buf);
@ -266,7 +269,7 @@ void print(IFourLetterCommand::StringBuffer & buf, const String & key, const Str
writeText('\n', buf);
}
void print(IFourLetterCommand::StringBuffer & buf, const String & key, uint64_t value)
void print(StringBuffer & buf, const String & key, uint64_t value)
{
print(buf, key, toString(value));
}

View File

@ -12,7 +12,6 @@
namespace DB
{
class WriteBufferFromOwnString;
class KeeperDispatcher;
using String = std::string;
@ -27,7 +26,6 @@ using FourLetterCommandPtr = std::shared_ptr<DB::IFourLetterCommand>;
struct IFourLetterCommand
{
public:
using StringBuffer = DB::WriteBufferFromOwnString;
explicit IFourLetterCommand(KeeperDispatcher & keeper_dispatcher_);
virtual String name() = 0;

View File

@ -23,8 +23,6 @@ class DiskSelector;
class IDisk;
using DiskPtr = std::shared_ptr<IDisk>;
class WriteBufferFromOwnString;
class KeeperContext
{
public:

View File

@ -43,6 +43,8 @@ void WriteBufferFromNuraftBuffer::finalizeImpl()
/// Prevent further writes.
set(nullptr, 0);
WriteBuffer::finalizeImpl();
}
nuraft::ptr<nuraft::buffer> WriteBufferFromNuraftBuffer::getBuffer()

View File

@ -1062,6 +1062,7 @@ TYPED_TEST(CoordinationTest, ChangelogTestReadAfterBrokenTruncate)
DB::WriteBufferFromFile plain_buf(
"./logs/changelog_11_15.bin" + this->extension, DB::DBMS_DEFAULT_BUFFER_SIZE, O_APPEND | O_CREAT | O_WRONLY);
plain_buf.truncate(0);
plain_buf.finalize();
DB::KeeperLogStore changelog_reader(
DB::LogFileSettings{.force_sync = true, .compress_logs = this->enable_compression, .rotate_interval = 5},
@ -1135,6 +1136,7 @@ TYPED_TEST(CoordinationTest, ChangelogTestReadAfterBrokenTruncate2)
DB::WriteBufferFromFile plain_buf(
"./logs/changelog_1_20.bin" + this->extension, DB::DBMS_DEFAULT_BUFFER_SIZE, O_APPEND | O_CREAT | O_WRONLY);
plain_buf.truncate(30);
plain_buf.finalize();
DB::KeeperLogStore changelog_reader(
DB::LogFileSettings{.force_sync = true, .compress_logs = this->enable_compression, .rotate_interval = 20},
@ -1192,6 +1194,7 @@ TYPED_TEST(CoordinationTest, ChangelogTestReadAfterBrokenTruncate3)
DB::WriteBufferFromFile plain_buf(
"./logs/changelog_1_20.bin", DB::DBMS_DEFAULT_BUFFER_SIZE, O_APPEND | O_CREAT | O_WRONLY);
plain_buf.truncate(plain_buf.size() - 30);
plain_buf.finalize();
DB::KeeperLogStore changelog_reader(
DB::LogFileSettings{.force_sync = true, .compress_logs = false, .rotate_interval = 20},
@ -1824,7 +1827,7 @@ TYPED_TEST(CoordinationTest, TestStorageSnapshotBroken)
DB::WriteBufferFromFile plain_buf(
"./snapshots/snapshot_50.bin" + this->extension, DB::DBMS_DEFAULT_BUFFER_SIZE, O_APPEND | O_CREAT | O_WRONLY);
plain_buf.truncate(34);
plain_buf.sync();
plain_buf.finalize();
EXPECT_THROW(manager.restoreFromLatestSnapshot(), DB::Exception);
}

View File

@ -769,18 +769,22 @@ void getBlocksDifference(const Block & lhs, const Block & rhs, std::string & out
while (r > 0)
right_columns.push_back(rhs.safeGetByPosition(--r));
WriteBufferFromString lhs_diff_writer(out_lhs_diff);
WriteBufferFromString rhs_diff_writer(out_rhs_diff);
for (auto it = left_columns.rbegin(); it != left_columns.rend(); ++it)
{
lhs_diff_writer << it->dumpStructure();
lhs_diff_writer << ", position: " << lhs.getPositionByName(it->name) << '\n';
WriteBufferFromString lhs_diff_writer(out_lhs_diff);
for (auto it = left_columns.rbegin(); it != left_columns.rend(); ++it)
{
lhs_diff_writer << it->dumpStructure();
lhs_diff_writer << ", position: " << lhs.getPositionByName(it->name) << '\n';
}
}
for (auto it = right_columns.rbegin(); it != right_columns.rend(); ++it)
{
rhs_diff_writer << it->dumpStructure();
rhs_diff_writer << ", position: " << rhs.getPositionByName(it->name) << '\n';
WriteBufferFromString rhs_diff_writer(out_rhs_diff);
for (auto it = right_columns.rbegin(); it != right_columns.rend(); ++it)
{
rhs_diff_writer << it->dumpStructure();
rhs_diff_writer << ", position: " << rhs.getPositionByName(it->name) << '\n';
}
}
}

View File

@ -86,7 +86,7 @@ void Native41::authenticate(
{
if (!auth_response)
{
packet_endpoint->sendPacket(AuthSwitchRequest(getName(), scramble), true);
packet_endpoint->sendPacket(AuthSwitchRequest(getName(), scramble));
AuthSwitchResponse response;
packet_endpoint->receivePacket(response);
auth_response = response.value;
@ -123,7 +123,7 @@ void Sha256Password::authenticate(
{
if (!auth_response)
{
packet_endpoint->sendPacket(AuthSwitchRequest(getName(), scramble), true);
packet_endpoint->sendPacket(AuthSwitchRequest(getName(), scramble));
if (packet_endpoint->in->eof())
throw Exception(ErrorCodes::MYSQL_CLIENT_INSUFFICIENT_CAPABILITIES,
@ -158,7 +158,7 @@ void Sha256Password::authenticate(
LOG_TRACE(log, "Key: {}", pem);
AuthMoreData data(pem);
packet_endpoint->sendPacket(data, true);
packet_endpoint->sendPacket(data);
sent_public_key = true;
AuthSwitchResponse response;

View File

@ -7,6 +7,7 @@
#include <Core/MySQL/PacketsReplication.h>
#include <Core/MySQL/MySQLReplication.h>
#include <Common/DNSResolver.h>
#include <IO/WriteBuffer.h>
#include <Poco/String.h>
@ -55,7 +56,7 @@ void MySQLClient::connect()
connected = true;
in = std::make_shared<ReadBufferFromPocoSocket>(*socket);
out = std::make_shared<WriteBufferFromPocoSocket>(*socket);
out = std::make_shared<AutoCanceledWriteBuffer<WriteBufferFromPocoSocket>>(*socket);
packet_endpoint = std::make_shared<MySQLProtocol::PacketEndpoint>(*in, *out, sequence_id);
handshake();
@ -88,7 +89,7 @@ void MySQLClient::handshake()
HandshakeResponse handshake_response(
client_capabilities, MAX_PACKET_LENGTH, charset_utf8, user, "", auth_plugin_data, mysql_native_password);
packet_endpoint->sendPacket<HandshakeResponse>(handshake_response, true);
packet_endpoint->sendPacket<HandshakeResponse>(handshake_response);
ResponsePacket packet_response(client_capabilities, true);
packet_endpoint->receivePacket(packet_response);
@ -103,7 +104,7 @@ void MySQLClient::handshake()
void MySQLClient::writeCommand(char command, String query)
{
WriteCommand write_command(command, query);
packet_endpoint->sendPacket<WriteCommand>(write_command, true);
packet_endpoint->sendPacket<WriteCommand>(write_command);
ResponsePacket packet_response(client_capabilities);
packet_endpoint->receivePacket(packet_response);
@ -122,7 +123,7 @@ void MySQLClient::writeCommand(char command, String query)
void MySQLClient::registerSlaveOnMaster(UInt32 slave_id)
{
RegisterSlave register_slave(slave_id);
packet_endpoint->sendPacket<RegisterSlave>(register_slave, true);
packet_endpoint->sendPacket<RegisterSlave>(register_slave);
ResponsePacket packet_response(client_capabilities);
packet_endpoint->receivePacket(packet_response);
@ -167,7 +168,7 @@ void MySQLClient::startBinlogDumpGTID(UInt32 slave_id, String replicate_db, std:
replication.setReplicateTables(replicate_tables);
BinlogDumpGTID binlog_dump(slave_id, gtid_sets.toPayload());
packet_endpoint->sendPacket<BinlogDumpGTID>(binlog_dump, true);
packet_endpoint->sendPacket<BinlogDumpGTID>(binlog_dump);
}
BinlogEventPtr MySQLClient::readOneBinlogEvent(UInt64 milliseconds)

View File

@ -39,7 +39,7 @@ public:
void resetSequenceId();
template<class T>
void sendPacket(const T & packet, bool flush = false)
void sendPacket(const T & packet, bool flush = true)
{
static_assert(std::is_base_of<IMySQLWritePacket, T>());
packet.writePayload(*out, sequence_id);

View File

@ -61,6 +61,10 @@ UUID loadServerUUID(const fs::path & server_uuid_file, Poco::Logger * log)
out.finalize();
return new_uuid;
}
catch (ErrnoException &)
{
throw;
}
catch (...)
{
throw Exception(ErrorCodes::CANNOT_CREATE_FILE, "Caught Exception {} while writing the Server UUID file {}",

View File

@ -17,6 +17,7 @@
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromOStream.h>
#include <IO/Operators.h>
#include <IO/WriteBuffer.h>
#include <random>
@ -252,7 +253,7 @@ void BinlogFromSocket::connect(const String & host, UInt16 port, const String &
connected = true;
in = std::make_unique<ReadBufferFromPocoSocket>(*socket);
out = std::make_unique<WriteBufferFromPocoSocket>(*socket);
out = std::make_unique<AutoCanceledWriteBuffer<WriteBufferFromPocoSocket>>(*socket);
packet_endpoint = std::make_shared<MySQLProtocol::PacketEndpoint>(*in, *out, sequence_id);
handshake(user, password);
@ -294,7 +295,7 @@ void BinlogFromSocket::handshake(const String & user, const String & password)
const UInt8 charset_utf8 = 33;
HandshakeResponse handshake_response(
client_capabilities, MAX_PACKET_LENGTH, charset_utf8, user, "", auth_plugin_data, mysql_native_password);
packet_endpoint->sendPacket<HandshakeResponse>(handshake_response, true);
packet_endpoint->sendPacket<HandshakeResponse>(handshake_response);
ResponsePacket packet_response(client_capabilities, true);
packet_endpoint->receivePacket(packet_response);
@ -309,7 +310,7 @@ void BinlogFromSocket::handshake(const String & user, const String & password)
void BinlogFromSocket::writeCommand(char command, const String & query)
{
WriteCommand write_command(command, query);
packet_endpoint->sendPacket<WriteCommand>(write_command, true);
packet_endpoint->sendPacket<WriteCommand>(write_command);
ResponsePacket packet_response(client_capabilities);
packet_endpoint->receivePacket(packet_response);
@ -328,7 +329,7 @@ void BinlogFromSocket::writeCommand(char command, const String & query)
void BinlogFromSocket::registerSlaveOnMaster(UInt32 slave_id)
{
RegisterSlave register_slave(slave_id);
packet_endpoint->sendPacket<RegisterSlave>(register_slave, true);
packet_endpoint->sendPacket<RegisterSlave>(register_slave);
ResponsePacket packet_response(client_capabilities);
packet_endpoint->receivePacket(packet_response);
@ -358,7 +359,7 @@ void BinlogFromSocket::start(UInt32 slave_id, const String & executed_gtid_set)
position.gtid_sets.parse(executed_gtid_set);
BinlogDumpGTID binlog_dump(slave_id, position.gtid_sets.toPayload());
packet_endpoint->sendPacket<BinlogDumpGTID>(binlog_dump, true);
packet_endpoint->sendPacket<BinlogDumpGTID>(binlog_dump);
}
class ReadPacketFromSocket : public IMySQLReadPacket

View File

@ -13,9 +13,10 @@ BinlogClientPtr BinlogClientFactory::getClient(const String & host, UInt16 port,
{
std::lock_guard lock(mutex);
String binlog_client_name;
WriteBufferFromString stream(binlog_client_name);
stream << user << "@" << host << ":" << port;
stream.finalize();
{
WriteBufferFromString stream(binlog_client_name);
stream << user << "@" << host << ":" << port;
}
String binlog_client_key = binlog_client_name + ":" + password;
auto it = clients.find(binlog_client_key);
BinlogClientPtr ret = it != clients.end() ? it->second.lock() : nullptr;

View File

@ -669,6 +669,7 @@ void DiskLocal::setup()
{
auto buf = writeFile(disk_checker_path, DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite, {});
writeIntBinary(magic_number, *buf);
buf->finalize();
}
disk_checker_magic_number = magic_number;
}

View File

@ -292,6 +292,7 @@ void WriteFileOperation::undo(std::unique_lock<SharedMutex> &)
{
auto buf = disk.writeFile(path);
writeString(prev_data, *buf);
buf->finalize();
}
}

View File

@ -4,6 +4,7 @@
#include <IO/CascadeWriteBuffer.h>
#include <IO/MemoryReadWriteBuffer.h>
#include <IO/WriteBufferFromString.h>
#include <IO/WriteBuffer.h>
#include <IO/ConcatReadBuffer.h>
#include <IO/copyData.h>
#include <Common/typeid_cast.h>
@ -55,9 +56,8 @@ static void testCascadeBufferRedability(
cascade.write(data.data(), data.size());
EXPECT_EQ(cascade.count(), data.size());
std::vector<WriteBufferPtr> write_buffers;
ConcatReadBuffer concat;
cascade.getResultBuffers(write_buffers);
auto write_buffers = cascade.getResultBuffers();
for (WriteBufferPtr & wbuf : write_buffers)
{
@ -222,7 +222,7 @@ TEST(MemoryWriteBuffer, WriteAndReread)
if (s > 1)
{
MemoryWriteBuffer buf(s - 1);
EXPECT_THROW(buf.write(data.data(), data.size()), MemoryWriteBuffer::CurrentBufferExhausted);
EXPECT_THROW(buf.write(data.data(), data.size()), WriteBuffer::CurrentBufferExhausted);
buf.finalize();
}
}

View File

@ -45,6 +45,7 @@ TEST_F(DiskTest, writeFile)
{
std::unique_ptr<DB::WriteBuffer> out = disk->writeFile("test_file");
writeString("test data", *out);
out->finalize();
}
String data;
@ -63,6 +64,7 @@ TEST_F(DiskTest, readFile)
{
std::unique_ptr<DB::WriteBuffer> out = disk->writeFile("test_file");
writeString("test data", *out);
out->finalize();
}
auto read_settings = DB::getReadSettings();

View File

@ -362,9 +362,10 @@ public:
auto & col_str = assert_cast<ColumnString &>(column);
auto & chars = col_str.getChars();
WriteBufferFromVector<ColumnString::Chars> buf(chars, AppendModeTag());
jsonElementToString<JSONParser>(element, buf, format_settings);
buf.finalize();
{
WriteBufferFromVector<ColumnString::Chars> buf(chars, AppendModeTag());
jsonElementToString<JSONParser>(element, buf, format_settings);
}
chars.push_back(0);
col_str.getOffsets().push_back(chars.size());
}

View File

@ -522,9 +522,10 @@ namespace
{
write_function = [this](NumberType value)
{
WriteBufferFromString buf{text_buffer};
writeText(value, buf);
buf.finalize();
{
WriteBufferFromString buf{text_buffer};
writeText(value, buf);
}
writeStr(text_buffer);
};
@ -894,7 +895,7 @@ namespace
template <typename NumberType>
void toStringAppend(NumberType value, PaddedPODArray<UInt8> & str)
{
WriteBufferFromVector buf{str, AppendModeTag{}};
auto buf = WriteBufferFromVector<PaddedPODArray<UInt8>>(str, AppendModeTag{});
writeText(value, buf);
}

View File

@ -39,6 +39,7 @@ TemporaryFileStreamLegacy::Stat TemporaryFileStreamLegacy::write(const std::stri
output.write(block);
compressed_buf.finalize();
file_buf.finalize();
return Stat{compressed_buf.getCompressedBytes(), compressed_buf.getUncompressedBytes()};
}

View File

@ -138,9 +138,8 @@ public:
throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED, "File is not inside {}", user_files_absolute_path.string());
ReadBufferFromFile in(file_path);
WriteBufferFromVector out(res_chars, AppendModeTag{});
auto out = WriteBufferFromVector<ColumnString::Chars>(res_chars, AppendModeTag{});
copyData(in, out);
out.finalize();
}
catch (...)
{

View File

@ -415,9 +415,10 @@ ColumnPtr FunctionGenerateRandomStructure::executeImpl(const ColumnsWithTypeAndN
auto col_res = ColumnString::create();
auto & string_column = assert_cast<ColumnString &>(*col_res);
auto & chars = string_column.getChars();
WriteBufferFromVector buf(chars);
writeRandomStructure(rng, number_of_columns, buf, allow_suspicious_lc_types);
buf.finalize();
{
auto buf = WriteBufferFromVector<ColumnString::Chars>(chars);
writeRandomStructure(rng, number_of_columns, buf, allow_suspicious_lc_types);
}
chars.push_back(0);
string_column.getOffsets().push_back(chars.size());
return ColumnConst::create(std::move(col_res), input_rows_count);

View File

@ -953,9 +953,10 @@ public:
{
ColumnString & col_str = assert_cast<ColumnString &>(dest);
auto & chars = col_str.getChars();
WriteBufferFromVector<ColumnString::Chars> buf(chars, AppendModeTag());
jsonElementToString<JSONParser>(element, buf, format_settings);
buf.finalize();
{
WriteBufferFromVector<ColumnString::Chars> buf(chars, AppendModeTag());
jsonElementToString<JSONParser>(element, buf, format_settings);
}
chars.push_back(0);
col_str.getOffsets().push_back(chars.size());
return true;

View File

@ -59,7 +59,7 @@ public:
{
auto col_str = ColumnString::create();
ColumnString::Chars & vec = col_str->getChars();
WriteBufferFromVector buffer(vec);
WriteBufferFromVector<ColumnString::Chars> buffer(vec);
ColumnString::Offsets & offsets = col_str->getOffsets();
offsets.resize(input_rows_count);

View File

@ -88,9 +88,10 @@ public:
auto columns_list = parseColumnsListFromString(structure, context);
auto col_res = ColumnString::create();
auto & data = assert_cast<ColumnString &>(*col_res).getChars();
WriteBufferFromVector buf(data);
Impl::writeSchema(buf, message_name, columns_list.getAll());
buf.finalize();
{
auto buf = WriteBufferFromVector<ColumnString::Chars>(data);
Impl::writeSchema(buf, message_name, columns_list.getAll());
}
auto & offsets = assert_cast<ColumnString &>(*col_res).getOffsets();
offsets.push_back(data.size());
return ColumnConst::create(std::move(col_res), input_rows_count);

View File

@ -281,10 +281,12 @@ namespace
size_t size_to_stage = task.part_size;
PODArray<char> memory;
memory.resize(size_to_stage);
WriteBufferFromVector<PODArray<char>> wb(memory);
{
memory.resize(size_to_stage);
WriteBufferFromVector<PODArray<char>> wb(memory);
copyData(*read_buffer, wb, size_to_stage);
}
copyData(*read_buffer, wb, size_to_stage);
Azure::Core::IO::MemoryBodyStream stream(reinterpret_cast<const uint8_t *>(memory.data()), size_to_stage);
const auto & block_id = task.block_ids.emplace_back(getRandomASCIIString(64));

View File

@ -36,7 +36,7 @@ void CascadeWriteBuffer::nextImpl()
curr_buffer->position() = position();
curr_buffer->next();
}
catch (const MemoryWriteBuffer::CurrentBufferExhausted &)
catch (const WriteBuffer::CurrentBufferExhausted &)
{
if (curr_buffer_num < num_sources)
{
@ -54,46 +54,50 @@ void CascadeWriteBuffer::nextImpl()
}
void CascadeWriteBuffer::getResultBuffers(WriteBufferPtrs & res)
CascadeWriteBuffer::WriteBufferPtrs CascadeWriteBuffer::getResultBuffers()
{
finalize();
/// Sync position with underlying buffer before invalidating
curr_buffer->position() = position();
res = std::move(prepared_sources);
auto result = std::move(prepared_sources);
curr_buffer = nullptr;
curr_buffer_num = num_sources = 0;
prepared_sources.clear();
lazy_sources.clear();
// we do not need this object any more
cancel();
return result;
}
void CascadeWriteBuffer::finalizeImpl()
{
WriteBuffer::finalizeImpl();
if (curr_buffer)
curr_buffer->position() = position();
for (auto & buf : prepared_sources)
{
if (buf)
{
buf->finalize();
}
}
}
void CascadeWriteBuffer::cancelImpl() noexcept
{
WriteBuffer::cancelImpl();
if (curr_buffer)
curr_buffer->position() = position();
for (auto & buf : prepared_sources)
{
if (buf)
{
buf->cancel();
}
}
}
@ -103,7 +107,7 @@ WriteBuffer * CascadeWriteBuffer::setNextBuffer()
{
if (!prepared_sources[curr_buffer_num])
{
WriteBufferPtr prev_buf = (curr_buffer_num > 0) ? prepared_sources[curr_buffer_num - 1] : nullptr;
auto prev_buf = (curr_buffer_num > 0) ? prepared_sources[curr_buffer_num - 1] : nullptr;
prepared_sources[curr_buffer_num] = lazy_sources[curr_buffer_num - first_lazy_source_num](prev_buf);
}
}
@ -121,13 +125,4 @@ WriteBuffer * CascadeWriteBuffer::setNextBuffer()
return res;
}
CascadeWriteBuffer::~CascadeWriteBuffer()
{
/// Sync position with underlying buffer before exit
if (curr_buffer)
curr_buffer->position() = position();
}
}

View File

@ -1,4 +1,5 @@
#pragma once
#include <functional>
#include <IO/WriteBuffer.h>
@ -27,24 +28,22 @@ class CascadeWriteBuffer : public WriteBuffer
{
public:
using WriteBufferPtrs = std::vector<WriteBufferPtr>;
using WriteBufferConstructor = std::function<WriteBufferPtr (const WriteBufferPtr & prev_buf)>;
using WriteBufferConstructors = std::vector<WriteBufferConstructor>;
using WriteBufferPtrs = std::vector<WriteBufferPtr>;
explicit CascadeWriteBuffer(WriteBufferPtrs && prepared_sources_, WriteBufferConstructors && lazy_sources_ = {});
void nextImpl() override;
/// Should be called once
void getResultBuffers(WriteBufferPtrs & res);
WriteBufferPtrs getResultBuffers();
const WriteBuffer * getCurrentBuffer() const
{
return curr_buffer;
}
~CascadeWriteBuffer() override;
private:
void finalizeImpl() override;

View File

@ -45,16 +45,20 @@ void ForkWriteBuffer::nextImpl()
void ForkWriteBuffer::finalizeImpl()
{
WriteBuffer::finalizeImpl();
for (const WriteBufferPtr & buffer : sources)
{
buffer->finalize();
}
}
ForkWriteBuffer::~ForkWriteBuffer()
void ForkWriteBuffer::cancelImpl() noexcept
{
finalize();
WriteBuffer::cancelImpl();
for (const WriteBufferPtr & buffer : sources)
{
buffer->cancel();
}
}
}

View File

@ -5,10 +5,6 @@
namespace DB
{
namespace ErrorCodes
{
}
/** ForkWriteBuffer takes a vector of WriteBuffer and writes data to all of them
* If the vector of WriteBufferPts is empty, then it throws an error
* It uses the buffer of the first element as its buffer and copies data from
@ -17,15 +13,13 @@ namespace ErrorCodes
class ForkWriteBuffer : public WriteBuffer
{
public:
using WriteBufferPtrs = std::vector<WriteBufferPtr>;
explicit ForkWriteBuffer(WriteBufferPtrs && sources_);
~ForkWriteBuffer() override;
protected:
void nextImpl() override;
void finalizeImpl() override;
void cancelImpl() noexcept override;
private:
WriteBufferPtrs sources;

View File

@ -112,7 +112,7 @@ void MemoryWriteBuffer::addChunk()
if (0 == next_chunk_size)
{
set(position(), 0);
throw MemoryWriteBuffer::CurrentBufferExhausted();
throw WriteBuffer::CurrentBufferExhausted();
}
}

View File

@ -16,13 +16,6 @@ namespace DB
class MemoryWriteBuffer : public WriteBuffer, public IReadableWriteBuffer, boost::noncopyable, private Allocator<false>
{
public:
/// Special exception to throw when the current MemoryWriteBuffer cannot receive data
class CurrentBufferExhausted : public std::exception
{
public:
const char * what() const noexcept override { return "WriteBuffer limit is exhausted"; }
};
/// Use max_total_size_ = 0 for unlimited storage
explicit MemoryWriteBuffer(
size_t max_total_size_ = 0,

View File

@ -1,16 +1,21 @@
#include <IO/NullWriteBuffer.h>
namespace DB
{
NullWriteBuffer::NullWriteBuffer()
: WriteBuffer(data, sizeof(data))
: WriteBufferFromPointer(data, sizeof(data))
{
}
NullWriteBuffer::~NullWriteBuffer()
{
cancel();
}
void NullWriteBuffer::nextImpl()
{
// no op
}
}

View File

@ -1,17 +1,17 @@
#pragma once
#include <IO/WriteBuffer.h>
#include <IO/BufferWithOwnMemory.h>
#include <boost/noncopyable.hpp>
namespace DB
{
/// Simply do nothing, can be used to measure amount of written bytes.
class NullWriteBuffer : public WriteBuffer, boost::noncopyable
class NullWriteBuffer final : public WriteBufferFromPointer
{
public:
NullWriteBuffer();
~NullWriteBuffer() override;
void nextImpl() override;
private:

View File

@ -23,8 +23,6 @@ ProtobufZeroCopyOutputStreamFromWriteBuffer::ProtobufZeroCopyOutputStreamFromWri
out_holder = std::move(out_);
}
ProtobufZeroCopyOutputStreamFromWriteBuffer::~ProtobufZeroCopyOutputStreamFromWriteBuffer() = default;
bool ProtobufZeroCopyOutputStreamFromWriteBuffer::Next(void ** data, int * size)
{
*data = out->position();

View File

@ -16,8 +16,6 @@ public:
explicit ProtobufZeroCopyOutputStreamFromWriteBuffer(WriteBuffer & out_);
explicit ProtobufZeroCopyOutputStreamFromWriteBuffer(std::unique_ptr<WriteBuffer> out_);
~ProtobufZeroCopyOutputStreamFromWriteBuffer() override;
// Obtains a buffer into which data can be written.
bool Next(void ** data, int * size) override;

View File

@ -31,10 +31,12 @@ bool SnappyReadBuffer::nextImpl()
{
if (compress_buffer.empty() && uncompress_buffer.empty())
{
WriteBufferFromString wb(compress_buffer);
copyData(*in, wb);
{
WriteBufferFromString wb(compress_buffer);
copyData(*in, wb);
}
bool success = snappy::Uncompress(compress_buffer.data(), wb.count(), &uncompress_buffer);
bool success = snappy::Uncompress(compress_buffer.data(), compress_buffer.size(), &uncompress_buffer);
if (!success)
{
throw Exception(ErrorCodes::SNAPPY_UNCOMPRESS_FAILED, "snappy uncomress failed: ");

View File

@ -26,11 +26,6 @@ SnappyWriteBuffer::SnappyWriteBuffer(WriteBuffer & out_, size_t buf_size, char *
{
}
SnappyWriteBuffer::~SnappyWriteBuffer()
{
finish();
}
void SnappyWriteBuffer::nextImpl()
{
if (!offset())
@ -45,28 +40,12 @@ void SnappyWriteBuffer::nextImpl()
void SnappyWriteBuffer::finish()
{
if (finished)
return;
try
{
finishImpl();
out->finalize();
finished = true;
}
catch (...)
{
/// Do not try to flush next time after exception.
out->position() = out->buffer().begin();
finished = true;
throw;
}
finishImpl();
out->finalize();
}
void SnappyWriteBuffer::finishImpl()
{
next();
bool success = snappy::Compress(uncompress_buffer.data(), uncompress_buffer.size(), &compress_buffer);
if (!success)
{
@ -92,7 +71,18 @@ void SnappyWriteBuffer::finishImpl()
}
}
void SnappyWriteBuffer::cancelImpl() noexcept
{
BASE::cancelImpl();
out->cancel();
}
void SnappyWriteBuffer::finalizeImpl()
{
BASE::finalizeImpl();
finish();
}
}
#endif

View File

@ -11,6 +11,7 @@ namespace DB
/// Performs compression using snappy library and write compressed data to the underlying buffer.
class SnappyWriteBuffer : public BufferWithOwnMemory<WriteBuffer>
{
using BASE = BufferWithOwnMemory<WriteBuffer>;
public:
explicit SnappyWriteBuffer(
std::unique_ptr<WriteBuffer> out_,
@ -24,9 +25,7 @@ public:
char * existing_memory = nullptr,
size_t alignment = 0);
~SnappyWriteBuffer() override;
void finalizeImpl() override { finish(); }
void finalizeImpl() override;
private:
void nextImpl() override;
@ -34,11 +33,11 @@ private:
void finishImpl();
void finish();
void cancelImpl() noexcept override;
WriteBuffer * out;
std::unique_ptr<WriteBuffer> out_holder;
bool finished = false;
String uncompress_buffer;
String compress_buffer;
};
@ -46,4 +45,3 @@ private:
}
#endif

View File

@ -11,22 +11,18 @@ namespace DB
WriteBuffer::~WriteBuffer()
{
// That destructor could be call with finalized=false in case of exceptions
if (count() > 0 && !finalized && !canceled)
if (!finalized && !canceled && !isStackUnwinding())
{
/// It is totally OK to destroy instance without finalization when an exception occurs
/// However it is suspicious to destroy instance without finalization at the green path
if (!std::uncaught_exceptions() && std::current_exception() == nullptr)
{
LoggerPtr log = getLogger("WriteBuffer");
LOG_ERROR(
log,
"WriteBuffer is neither finalized nor canceled when destructor is called. "
"No exceptions in flight are detected. "
"The file might not be written at all or might be truncated. "
"Stack trace: {}",
StackTrace().toString());
chassert(false && "WriteBuffer is not finalized in destructor.");
}
LoggerPtr log = getLogger("WriteBuffer");
LOG_ERROR(
log,
"WriteBuffer is neither finalized nor canceled when destructor is called. "
"No exceptions in flight are detected. "
"The file might not be written at all or might be truncated. exception_level at c-tor {} ar d-tor {}."
"Stack trace: {}",
exception_level, std::uncaught_exceptions(),
StackTrace().toString());
chassert(false && "WriteBuffer is not finalized in destructor.");
}
}
@ -39,4 +35,5 @@ void WriteBuffer::cancel() noexcept
cancelImpl();
canceled = true;
}
}

View File

@ -1,6 +1,7 @@
#pragma once
#include <algorithm>
#include <exception>
#include <memory>
#include <cassert>
#include <cstring>
@ -29,6 +30,13 @@ namespace ErrorCodes
class WriteBuffer : public BufferBase
{
public:
/// Special exception to throw when the current MemoryWriteBuffer cannot receive data
class CurrentBufferExhausted : public std::exception
{
public:
const char * what() const noexcept override { return "WriteBuffer limit is exhausted"; }
};
using BufferBase::set;
using BufferBase::position;
void set(Position ptr, size_t size) { BufferBase::set(ptr, size, 0); }
@ -52,6 +60,12 @@ public:
{
nextImpl();
}
catch (CurrentBufferExhausted &)
{
pos = working_buffer.begin();
bytes += bytes_in_buffer;
throw;
}
catch (...)
{
/** If the nextImpl() call was unsuccessful, move the cursor to the beginning,
@ -60,6 +74,8 @@ public:
pos = working_buffer.begin();
bytes += bytes_in_buffer;
cancel();
throw;
}
@ -141,6 +157,9 @@ public:
}
}
bool isFinalized() const { return finalized; }
bool isCanceled() const { return canceled; }
void cancel() noexcept;
/// Wait for data to be reliably written. Mainly, call fsync for fd.
@ -160,6 +179,7 @@ protected:
virtual void cancelImpl() noexcept
{
/* no op */
}
bool finalized = false;
@ -178,6 +198,13 @@ private:
{
throw Exception(ErrorCodes::CANNOT_WRITE_AFTER_END_OF_BUFFER, "Cannot write after end of buffer.");
}
bool isStackUnwinding() const
{
return exception_level < std::uncaught_exceptions();
}
int exception_level = std::uncaught_exceptions();
};
@ -201,4 +228,51 @@ private:
}
};
// AutoCancelWriteBuffer cancel the buffer in d-tor when it has not been finalized before d-tor
// AutoCancelWriteBuffer could not be inherited.
// Otherwise cancel method could not call proper cancelImpl because inheritor is destroyed already.
// But the ussage of final inheritance is avoided in faivor to keep the possibility to use std::make_shared.
template<class Base>
class AutoCanceledWriteBuffer final : public Base
{
static_assert(std::derived_from<Base, WriteBuffer>);
public:
using Base::Base;
~AutoCanceledWriteBuffer() override
{
if (!this->finalized && !this->canceled)
this->cancel();
}
};
/// That class is applied only by
// AutoCancelWriteBuffer could not be inherited.
// case 1 - HTTPServerResponse. The external interface HTTPResponse forces that.
// case 3 - WriteBufferFromVector, WriteBufferFromString. It is safe to make them autofinaliziable.
template<class Base>
class AutoFinalizedWriteBuffer final: public Base
{
static_assert(std::derived_from<Base, WriteBuffer>);
public:
using Base::Base;
~AutoFinalizedWriteBuffer() override
{
try
{
if (!this->finalized && !this->canceled)
this->finalize();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
};
}

View File

@ -49,6 +49,7 @@ public:
void cancelImpl() noexcept override
{
Base::cancelImpl();
out->cancel();
}

View File

@ -81,17 +81,8 @@ WriteBufferFromFile::~WriteBufferFromFile()
if (fd < 0)
return;
try
{
if (!canceled)
finalize();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
[[maybe_unused]] int err = ::close(fd);
[[maybe_unused]] int err = ::close(fd);
/// Everything except for EBADF should be ignored in dtor, since all of
/// others (EINTR/EIO/ENOSPC/EDQUOT) could be possible during writing to
/// fd, and then write already failed and the error had been reported to
@ -106,7 +97,7 @@ void WriteBufferFromFile::finalizeImpl()
if (fd < 0)
return;
next();
WriteBufferFromFileDescriptor::finalizeImpl();
}

View File

@ -22,6 +22,7 @@ void WriteBufferFromFileDecorator::finalizeImpl()
if (!is_prefinalized)
WriteBufferFromFileDecorator::preFinalize();
WriteBufferFromFileBase::finalizeImpl();
{
SwapHelper swap(*this, *impl);
impl->finalize();
@ -30,8 +31,12 @@ void WriteBufferFromFileDecorator::finalizeImpl()
void WriteBufferFromFileDecorator::cancelImpl() noexcept
{
SwapHelper swap(*this, *impl);
impl->cancel();
WriteBufferFromFileBase::cancelImpl();
{
SwapHelper swap(*this, *impl);
impl->cancel();
}
}
WriteBufferFromFileDecorator::~WriteBufferFromFileDecorator()

View File

@ -113,30 +113,16 @@ WriteBufferFromFileDescriptor::WriteBufferFromFileDescriptor(
{
}
WriteBufferFromFileDescriptor::~WriteBufferFromFileDescriptor()
{
try
{
if (!canceled)
finalize();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
void WriteBufferFromFileDescriptor::finalizeImpl()
{
if (fd < 0)
{
assert(!offset() && "attempt to write after close");
chassert(!offset(), "attempt to write after close");
return;
}
use_adaptive_buffer_size = false;
next();
WriteBufferFromFileBase::finalizeImpl();
}
void WriteBufferFromFileDescriptor::sync()

View File

@ -30,8 +30,6 @@ public:
fd = fd_;
}
~WriteBufferFromFileDescriptor() override;
int getFD() const
{
return fd;

View File

@ -47,6 +47,8 @@ void WriteBufferFromHTTP::finalizeImpl()
receiveResponse(*session, request, response, false);
/// TODO: Response body is ignored.
WriteBufferFromOStream::finalizeImpl();
}
}

View File

@ -196,17 +196,4 @@ WriteBufferFromPocoSocket::WriteBufferFromPocoSocket(Poco::Net::Socket & socket_
write_event = write_event_;
}
WriteBufferFromPocoSocket::~WriteBufferFromPocoSocket()
{
try
{
if (!canceled)
finalize();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
}

View File

@ -19,8 +19,6 @@ public:
explicit WriteBufferFromPocoSocket(Poco::Net::Socket & socket_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE);
explicit WriteBufferFromPocoSocket(Poco::Net::Socket & socket_, const ProfileEvents::Event & write_event_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE);
~WriteBufferFromPocoSocket() override;
void setAsyncCallback(AsyncCallback async_callback_) { async_callback = std::move(async_callback_); }
using WriteBuffer::write;

View File

@ -26,7 +26,8 @@ WriteBufferFromPocoSocketChunked::WriteBufferFromPocoSocketChunked(Poco::Net::So
socket_, write_event_,
std::clamp(buf_size, sizeof(*chunk_size_ptr) + 1, static_cast<size_t>(std::numeric_limits<std::remove_reference_t<decltype(*chunk_size_ptr)>>::max()))),
log(getLogger("Protocol"))
{}
{
}
void WriteBufferFromPocoSocketChunked::enableChunked()
{
@ -108,18 +109,6 @@ void WriteBufferFromPocoSocketChunked::finishChunk()
last_finish_chunk = chunk_size_ptr;
}
WriteBufferFromPocoSocketChunked::~WriteBufferFromPocoSocketChunked()
{
try
{
finalize();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
void WriteBufferFromPocoSocketChunked::nextImpl()
{
if (!chunked)

View File

@ -16,7 +16,6 @@ public:
void enableChunked();
void finishChunk();
~WriteBufferFromPocoSocketChunked() override;
protected:
void nextImpl() override;

View File

@ -209,6 +209,8 @@ void WriteBufferFromS3::finalizeImpl()
LOG_TRACE(limited_log, "finalizeImpl WriteBufferFromS3. {}.", getShortLogDetails());
WriteBufferFromFileBase::finalizeImpl();
if (!is_prefinalized)
preFinalize();
@ -240,6 +242,7 @@ void WriteBufferFromS3::finalizeImpl()
void WriteBufferFromS3::cancelImpl() noexcept
{
WriteBufferFromFileBase::cancelImpl();
tryToAbortMultipartUpload();
}
@ -274,7 +277,7 @@ void WriteBufferFromS3::tryToAbortMultipartUpload() noexcept
}
catch (...)
{
LOG_ERROR(log, "Multipart upload hasn't aborted. {}", getVerboseLogDetails());
LOG_ERROR(log, "Multipart upload hasn't been aborted. {}", getVerboseLogDetails());
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
@ -454,11 +457,11 @@ void WriteBufferFromS3::abortMultipartUpload()
{
if (multipart_upload_id.empty())
{
LOG_WARNING(log, "Nothing to abort. {}", getVerboseLogDetails());
LOG_INFO(log, "Nothing to abort. {}", getVerboseLogDetails());
return;
}
LOG_WARNING(log, "Abort multipart upload. {}", getVerboseLogDetails());
LOG_INFO(log, "Abort multipart upload. {}", getVerboseLogDetails());
S3::AbortMultipartUploadRequest req;
req.SetBucket(bucket);
@ -485,7 +488,7 @@ void WriteBufferFromS3::abortMultipartUpload()
throw S3Exception(outcome.GetError().GetMessage(), outcome.GetError().GetErrorType());
}
LOG_WARNING(log, "Multipart upload has aborted successfully. {}", getVerboseLogDetails());
LOG_INFO(log, "Multipart upload has aborted successfully. {}", getVerboseLogDetails());
}
S3::UploadPartRequest WriteBufferFromS3::getUploadRequest(size_t part_number, PartData & data)

View File

@ -2,6 +2,7 @@
#include <string>
#include <IO/WriteBufferFromVector.h>
#include <IO/WriteBuffer.h>
#include <base/StringRef.h>
@ -9,9 +10,9 @@ namespace DB
{
/** Writes the data to a string.
* Note: before using the resulting string, destroy this object.
* Note: before using the resulting string, destroy this object or call finalize.
*/
using WriteBufferFromString = WriteBufferFromVector<std::string>;
using WriteBufferFromString = AutoFinalizedWriteBuffer<WriteBufferFromVectorImpl<std::string>>;
namespace detail
@ -25,12 +26,13 @@ namespace detail
}
/// Creates the string by itself and allows to get it.
class WriteBufferFromOwnString : public detail::StringHolder, public WriteBufferFromString
class WriteBufferFromOwnStringImpl : public detail::StringHolder, public WriteBufferFromVectorImpl<std::string>
{
using Base = WriteBufferFromVectorImpl<std::string>;
public:
WriteBufferFromOwnString() : WriteBufferFromString(value) {}
WriteBufferFromOwnStringImpl() : Base(value) {}
std::string_view stringView() const { return isFinished() ? std::string_view(value) : std::string_view(value.data(), pos - value.data()); }
std::string_view stringView() const { return isFinalized() ? std::string_view(value) : std::string_view(value.data(), pos - value.data()); }
std::string & str()
{
@ -39,4 +41,6 @@ public:
}
};
using WriteBufferFromOwnString = AutoFinalizedWriteBuffer<WriteBufferFromOwnStringImpl>;
}

View File

@ -23,11 +23,11 @@ struct AppendModeTag {};
* The vector should live until this object is destroyed or until the 'finalizeImpl()' method is called.
*/
template <typename VectorType>
class WriteBufferFromVector : public WriteBuffer
class WriteBufferFromVectorImpl : public WriteBuffer
{
public:
using ValueType = typename VectorType::value_type;
explicit WriteBufferFromVector(VectorType & vector_)
explicit WriteBufferFromVectorImpl(VectorType & vector_)
: WriteBuffer(reinterpret_cast<Position>(vector_.data()), vector_.size()), vector(vector_)
{
if (vector.empty())
@ -38,7 +38,7 @@ public:
}
/// Append to vector instead of rewrite.
WriteBufferFromVector(VectorType & vector_, AppendModeTag)
WriteBufferFromVectorImpl(VectorType & vector_, AppendModeTag)
: WriteBuffer(nullptr, 0), vector(vector_)
{
size_t old_size = vector.size();
@ -49,8 +49,6 @@ public:
set(reinterpret_cast<Position>(vector.data() + old_size), (size - old_size) * sizeof(typename VectorType::value_type));
}
bool isFinished() const { return finalized; }
void restart(std::optional<size_t> max_capacity = std::nullopt)
{
if (max_capacity && vector.capacity() > max_capacity)
@ -59,12 +57,7 @@ public:
vector.resize(initial_size);
set(reinterpret_cast<Position>(vector.data()), vector.size());
finalized = false;
}
~WriteBufferFromVector() override
{
if (!canceled)
finalize();
canceled = false;
}
private:
@ -101,4 +94,7 @@ private:
static constexpr size_t size_multiplier = 2;
};
template<typename VectorType>
using WriteBufferFromVector = AutoFinalizedWriteBuffer<WriteBufferFromVectorImpl<VectorType>>;
}

View File

@ -88,11 +88,6 @@ void ZstdDeflatingAppendableWriteBuffer::nextImpl()
}
ZstdDeflatingAppendableWriteBuffer::~ZstdDeflatingAppendableWriteBuffer()
{
finalize();
}
void ZstdDeflatingAppendableWriteBuffer::finalizeImpl()
{
if (first_write)
@ -103,18 +98,9 @@ void ZstdDeflatingAppendableWriteBuffer::finalizeImpl()
}
else
{
try
{
finalizeBefore();
out->finalize();
finalizeAfter();
}
catch (...)
{
/// Do not try to flush next time after exception.
out->position() = out->buffer().begin();
throw;
}
finalizeBefore();
out->finalize();
finalizeAfter();
}
}
@ -165,6 +151,9 @@ void ZstdDeflatingAppendableWriteBuffer::finalizeAfter()
void ZstdDeflatingAppendableWriteBuffer::finalizeZstd()
{
if (!cctx)
return;
try
{
size_t err = ZSTD_freeCCtx(cctx);
@ -179,6 +168,8 @@ void ZstdDeflatingAppendableWriteBuffer::finalizeZstd()
/// since all data already written to the stream.
tryLogCurrentException(__PRETTY_FUNCTION__);
}
cctx = nullptr;
}
void ZstdDeflatingAppendableWriteBuffer::addEmptyBlock()
@ -221,4 +212,11 @@ bool ZstdDeflatingAppendableWriteBuffer::isNeedToAddEmptyBlock()
return false;
}
void ZstdDeflatingAppendableWriteBuffer::cancelImpl() noexcept
{
out->cancel();
/// To free cctx
finalizeZstd();
}
}

View File

@ -38,8 +38,6 @@ public:
char * existing_memory = nullptr,
size_t alignment = 0);
~ZstdDeflatingAppendableWriteBuffer() override;
void sync() override
{
next();
@ -63,6 +61,8 @@ private:
void finalizeAfter();
void finalizeZstd();
void cancelImpl() noexcept override;
/// Read three last bytes from non-empty compressed file and compares them with
/// ZSTD_CORRECT_TERMINATION_LAST_BLOCK.
bool isNeedToAddEmptyBlock();

View File

@ -1,5 +1,6 @@
#include <iostream>
#include <iomanip>
#include <vector>
#include <pcg_random.hpp>
#include <base/types.h>
@ -62,8 +63,7 @@ int main(int argc, char ** argv)
formatted.reserve(n * 21);
{
DB::WriteBufferFromVector wb(formatted);
// DB::CompressedWriteBuffer wb2(wb1);
auto wb = DB::WriteBufferFromVector<std::vector<char>>(formatted);
Stopwatch watch;
UInt64 tsc = rdtsc();
@ -92,7 +92,6 @@ int main(int argc, char ** argv)
{
DB::ReadBuffer rb(formatted.data(), formatted.size(), 0);
// DB::CompressedReadBuffer rb(rb_);
Stopwatch watch;
for (size_t i = 0; i < n; ++i)

View File

@ -58,9 +58,10 @@ TEST(HadoopSnappyDecoder, repeatNeedMoreInput)
std::unique_ptr<ReadBuffer> in = std::make_unique<ReadBufferFromFile>("./test.snappy", 128);
HadoopSnappyReadBuffer read_buffer(std::move(in));
String output;
WriteBufferFromString out(output);
copyData(read_buffer, out);
out.finalize();
{
WriteBufferFromString out(output);
copyData(read_buffer, out);
}
UInt128 hashcode = sipHash128(output.c_str(), output.size());
String hashcode_str = getHexUIntLowercase(hashcode);
ASSERT_EQ(hashcode_str, "673e5b065186cec146789451c2a8f703");

View File

@ -18,6 +18,7 @@ TEST(ReadBufferFromFile, seekBackwards)
WriteBufferFromFile out(tmp_file->path());
for (size_t i = 0; i < N; ++i)
writeIntBinary(i, out);
out.finalize();
}
ReadBufferFromFile in(tmp_file->path(), BUF_SIZE);

View File

@ -385,13 +385,13 @@ AsynchronousInsertQueue::pushQueryWithInlinedData(ASTPtr query, ContextPtr query
/*throw_exception=*/false,
/*exact_limit=*/{});
WriteBufferFromString write_buf(bytes);
copyData(limit_buf, write_buf);
{
WriteBufferFromString write_buf(bytes);
copyData(limit_buf, write_buf);
}
if (!read_buf->eof())
{
write_buf.finalize();
/// Concat read buffer with already extracted from insert
/// query data and with the rest data from insert query.
std::vector<std::unique_ptr<ReadBuffer>> buffers;

View File

@ -602,7 +602,7 @@ void FileSegment::setDownloadFailedUnlocked(const FileSegmentGuard::Lock & lock)
if (cache_writer)
{
cache_writer->finalize();
cache_writer->cancel();
cache_writer.reset();
}
@ -976,15 +976,9 @@ void FileSegment::setDetachedState(const FileSegmentGuard::Lock & lock)
setDownloadState(State::DETACHED, lock);
key_metadata.reset();
queue_iterator = nullptr;
try
{
cache_writer.reset();
remote_file_reader.reset();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
if (cache_writer)
cache_writer->cancel();
cache_writer.reset();
}
void FileSegment::detach(const FileSegmentGuard::Lock & lock, const LockedKey &)

View File

@ -18,6 +18,10 @@
#include <Parsers/ASTQueryWithOnCluster.h>
#include <Parsers/ASTQueryWithTableAndOutput.h>
#include <Parsers/ParserQuery.h>
#include <IO/WriteHelpers.h>
#include <IO/NullWriteBuffer.h>
#include <IO/ReadHelpers.h>
#include <IO/ReadBufferFromString.h>
#include <Storages/IStorage.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Poco/Timestamp.h>
@ -484,8 +488,6 @@ bool DDLWorker::tryExecuteQuery(DDLTaskBase & task, const ZooKeeperPtr & zookeep
String query_to_show_in_logs = query_prefix + task.query_for_logging;
ReadBufferFromString istr(query_to_execute);
String dummy_string;
WriteBufferFromString ostr(dummy_string);
std::optional<CurrentThread::QueryScope> query_scope;
try
@ -505,7 +507,8 @@ bool DDLWorker::tryExecuteQuery(DDLTaskBase & task, const ZooKeeperPtr & zookeep
if (!task.is_initial_query)
query_scope.emplace(query_context);
executeQuery(istr, ostr, !task.is_initial_query, query_context, {}, QueryFlags{ .internal = false, .distributed_backup_restore = task.entry.is_backup_restore });
NullWriteBuffer nullwb;
executeQuery(istr, nullwb, !task.is_initial_query, query_context, {}, QueryFlags{ .internal = false, .distributed_backup_restore = task.entry.is_backup_restore });
if (auto txn = query_context->getZooKeeperMetadataTransaction())
{

View File

@ -547,6 +547,8 @@ QueryPipeline InterpreterExplainQuery::executeImpl()
/* async_isnert */ false);
auto io = insert.execute();
printPipeline(io.pipeline.getProcessors(), buf);
// we do not need it anymore, it would be executed
io.pipeline.cancel();
}
else
throw Exception(ErrorCodes::INCORRECT_QUERY, "Only SELECT and INSERT is supported for EXPLAIN PIPELINE query");
@ -617,6 +619,7 @@ QueryPipeline InterpreterExplainQuery::executeImpl()
break;
}
}
buf.finalize();
if (insert_buf)
{
if (single_line)

View File

@ -1,6 +1,7 @@
#include <Interpreters/SystemLog.h>
#include <base/scope_guard.h>
#include <Common/Logger.h>
#include <Common/SystemLogBase.h>
#include <Common/logger_useful.h>
#include <Common/MemoryTrackerBlockerInThread.h>

View File

@ -150,6 +150,13 @@ std::vector<TemporaryFileStream *> TemporaryDataOnDisk::getStreams() const
return res;
}
void TemporaryDataOnDisk::cancel() noexcept
{
std::lock_guard lock(mutex);
for (const auto & stream : streams)
stream->cancel();
}
bool TemporaryDataOnDisk::empty() const
{
std::lock_guard lock(mutex);
@ -206,16 +213,16 @@ struct TemporaryFileStream::OutputWriter
out_buf->finalize();
}
void cancel() noexcept
{
out_compressed_buf.cancel();
out_buf->cancel();
}
~OutputWriter()
{
try
{
finalize();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
if (!finalized)
cancel();
}
std::unique_ptr<WriteBuffer> out_buf;
@ -300,6 +307,11 @@ void TemporaryFileStream::flush()
out_writer->flush();
}
void TemporaryFileStream::cancel() noexcept
{
release();
}
TemporaryFileStream::Stat TemporaryFileStream::finishWriting()
{
if (isWriteFinished())
@ -394,7 +406,7 @@ void TemporaryFileStream::release()
if (out_writer)
{
out_writer->finalize();
out_writer->cancel();
out_writer.reset();
}

View File

@ -124,6 +124,7 @@ public:
bool empty() const;
const StatAtomic & getStat() const { return stat; }
void cancel() noexcept;
private:
FileSegmentsHolderPtr createCacheFile(size_t max_file_size);
@ -174,6 +175,7 @@ public:
size_t write(const Block & block);
void flush();
void cancel() noexcept;
Stat finishWriting();
Stat finishWritingAsyncSafe();

View File

@ -72,7 +72,7 @@ TraceCollector::~TraceCollector()
*/
WriteBufferFromFileDescriptor out(TraceSender::pipe.fds_rw[1]);
writeChar(true, out);
out.next();
out.finalize();
}
catch (...)
{

Some files were not shown because too many files have changed in this diff Show More