Merge pull request #46681 from ClickHouse/fix_insert_cancellation_in_native_protocol

Fix queries cancellation when a client dies
This commit is contained in:
Alexander Tokmakov 2023-03-03 16:27:20 +03:00 committed by GitHub
commit cadaf06829
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
41 changed files with 468 additions and 60 deletions

View File

@ -1360,7 +1360,7 @@ void ClientBase::sendData(Block & sample, const ColumnsDescription & columns_des
throw;
}
if (have_data_in_stdin)
if (have_data_in_stdin && !cancelled)
sendDataFromStdin(sample, columns_description_for_query, parsed_query);
}
else if (parsed_insert_query->data)
@ -1370,7 +1370,7 @@ void ClientBase::sendData(Block & sample, const ColumnsDescription & columns_des
try
{
sendDataFrom(data_in, sample, columns_description_for_query, parsed_query, have_data_in_stdin);
if (have_data_in_stdin)
if (have_data_in_stdin && !cancelled)
sendDataFromStdin(sample, columns_description_for_query, parsed_query);
}
catch (Exception & e)

View File

@ -51,7 +51,7 @@ StatusFile::StatusFile(std::string path_, FillFunction fill_)
std::string contents;
{
ReadBufferFromFile in(path, 1024);
LimitReadBuffer limit_in(in, 1024, false);
LimitReadBuffer limit_in(in, 1024, /* trow_exception */ false, /* exact_limit */ {});
readStringUntilEOF(contents, limit_in);
}

View File

@ -34,7 +34,7 @@ ExternalTableDataPtr BaseExternalTable::getData(ContextPtr context)
{
initReadBuffer();
initSampleBlock();
auto input = context->getInputFormat(format, *read_buffer, sample_block, DEFAULT_BLOCK_SIZE);
auto input = context->getInputFormat(format, *read_buffer, sample_block, context->getSettingsRef().get("max_block_size").get<UInt64>());
auto data = std::make_unique<ExternalTableData>();
data->pipe = std::make_unique<QueryPipelineBuilder>();
@ -135,7 +135,9 @@ void ExternalTablesHandler::handlePart(const Poco::Net::MessageHeader & header,
if (settings.http_max_multipart_form_data_size)
read_buffer = std::make_unique<LimitReadBuffer>(
stream, settings.http_max_multipart_form_data_size,
true, "the maximum size of multipart/form-data. This limit can be tuned by 'http_max_multipart_form_data_size' setting");
/* trow_exception */ true, /* exact_limit */ std::optional<size_t>(),
"the maximum size of multipart/form-data. "
"This limit can be tuned by 'http_max_multipart_form_data_size' setting");
else
read_buffer = wrapReadBufferReference(stream);

View File

@ -33,13 +33,13 @@ void IMySQLReadPacket::readPayloadWithUnpacked(ReadBuffer & in)
void LimitedReadPacket::readPayload(ReadBuffer &in, uint8_t &sequence_id)
{
LimitReadBuffer limited(in, 10000, true, "too long MySQL packet.");
LimitReadBuffer limited(in, 10000, /* trow_exception */ true, /* exact_limit */ {}, "too long MySQL packet.");
IMySQLReadPacket::readPayload(limited, sequence_id);
}
void LimitedReadPacket::readPayloadWithUnpacked(ReadBuffer & in)
{
LimitReadBuffer limited(in, 10000, true, "too long MySQL packet.");
LimitReadBuffer limited(in, 10000, /* trow_exception */ true, /* exact_limit */ {}, "too long MySQL packet.");
IMySQLReadPacket::readPayloadWithUnpacked(limited);
}

View File

@ -9,6 +9,7 @@ namespace DB
namespace ErrorCodes
{
extern const int LIMIT_EXCEEDED;
extern const int CANNOT_READ_ALL_DATA;
}
@ -21,14 +22,22 @@ bool LimitReadBuffer::nextImpl()
if (bytes >= limit)
{
if (exact_limit && bytes == *exact_limit)
return false;
if (exact_limit && bytes != *exact_limit)
throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Unexpected data, got {} bytes, expected {}", bytes, *exact_limit);
if (throw_exception)
throw Exception(ErrorCodes::LIMIT_EXCEEDED, "Limit for LimitReadBuffer exceeded: {}", exception_message);
else
return false;
return false;
}
if (!in->next())
{
if (exact_limit && bytes != *exact_limit)
throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Unexpected EOF, got {} of {} bytes", bytes, *exact_limit);
/// Clearing the buffer with existing data.
set(in->position(), 0);
return false;
@ -43,12 +52,14 @@ bool LimitReadBuffer::nextImpl()
}
LimitReadBuffer::LimitReadBuffer(ReadBuffer * in_, bool owns, UInt64 limit_, bool throw_exception_, std::string exception_message_)
LimitReadBuffer::LimitReadBuffer(ReadBuffer * in_, bool owns, UInt64 limit_, bool throw_exception_,
std::optional<size_t> exact_limit_, std::string exception_message_)
: ReadBuffer(in_ ? in_->position() : nullptr, 0)
, in(in_)
, owns_in(owns)
, limit(limit_)
, throw_exception(throw_exception_)
, exact_limit(exact_limit_)
, exception_message(std::move(exception_message_))
{
assert(in);
@ -61,14 +72,16 @@ LimitReadBuffer::LimitReadBuffer(ReadBuffer * in_, bool owns, UInt64 limit_, boo
}
LimitReadBuffer::LimitReadBuffer(ReadBuffer & in_, UInt64 limit_, bool throw_exception_, std::string exception_message_)
: LimitReadBuffer(&in_, false, limit_, throw_exception_, exception_message_)
LimitReadBuffer::LimitReadBuffer(ReadBuffer & in_, UInt64 limit_, bool throw_exception_,
std::optional<size_t> exact_limit_, std::string exception_message_)
: LimitReadBuffer(&in_, false, limit_, throw_exception_, exact_limit_, exception_message_)
{
}
LimitReadBuffer::LimitReadBuffer(std::unique_ptr<ReadBuffer> in_, UInt64 limit_, bool throw_exception_, std::string exception_message_)
: LimitReadBuffer(in_.release(), true, limit_, throw_exception_, exception_message_)
LimitReadBuffer::LimitReadBuffer(std::unique_ptr<ReadBuffer> in_, UInt64 limit_, bool throw_exception_,
std::optional<size_t> exact_limit_, std::string exception_message_)
: LimitReadBuffer(in_.release(), true, limit_, throw_exception_, exact_limit_, exception_message_)
{
}

View File

@ -13,8 +13,10 @@ namespace DB
class LimitReadBuffer : public ReadBuffer
{
public:
LimitReadBuffer(ReadBuffer & in_, UInt64 limit_, bool throw_exception_, std::string exception_message_ = {});
LimitReadBuffer(std::unique_ptr<ReadBuffer> in_, UInt64 limit_, bool throw_exception_, std::string exception_message_ = {});
LimitReadBuffer(ReadBuffer & in_, UInt64 limit_, bool throw_exception_,
std::optional<size_t> exact_limit_, std::string exception_message_ = {});
LimitReadBuffer(std::unique_ptr<ReadBuffer> in_, UInt64 limit_, bool throw_exception_, std::optional<size_t> exact_limit_,
std::string exception_message_ = {});
~LimitReadBuffer() override;
private:
@ -23,9 +25,10 @@ private:
UInt64 limit;
bool throw_exception;
std::optional<size_t> exact_limit;
std::string exception_message;
LimitReadBuffer(ReadBuffer * in_, bool owns, UInt64 limit_, bool throw_exception_, std::string exception_message_);
LimitReadBuffer(ReadBuffer * in_, bool owns, UInt64 limit_, bool throw_exception_, std::optional<size_t> exact_limit_, std::string exception_message_);
bool nextImpl() override;
};

View File

@ -64,7 +64,8 @@ void WriteBufferFromPocoSocket::nextImpl()
}
catch (const Poco::Net::NetException & e)
{
throw NetException(ErrorCodes::NETWORK_ERROR, "{}, while writing to socket ({})", e.displayText(), peer_address.toString());
throw NetException(ErrorCodes::NETWORK_ERROR, "{}, while writing to socket ({} -> {})", e.displayText(),
our_address.toString(), peer_address.toString());
}
catch (const Poco::TimeoutException &)
{
@ -74,18 +75,20 @@ void WriteBufferFromPocoSocket::nextImpl()
}
catch (const Poco::IOException & e)
{
throw NetException(ErrorCodes::NETWORK_ERROR, "{}, while writing to socket ({})", e.displayText(), peer_address.toString());
throw NetException(ErrorCodes::NETWORK_ERROR, "{}, while writing to socket ({} -> {})", e.displayText(),
our_address.toString(), peer_address.toString());
}
if (res < 0)
throw NetException(ErrorCodes::CANNOT_WRITE_TO_SOCKET, "Cannot write to socket ({})", peer_address.toString());
throw NetException(ErrorCodes::CANNOT_WRITE_TO_SOCKET, "Cannot write to socket ({} -> {})",
our_address.toString(), peer_address.toString());
bytes_written += res;
}
}
WriteBufferFromPocoSocket::WriteBufferFromPocoSocket(Poco::Net::Socket & socket_, size_t buf_size)
: BufferWithOwnMemory<WriteBuffer>(buf_size), socket(socket_), peer_address(socket.peerAddress())
: BufferWithOwnMemory<WriteBuffer>(buf_size), socket(socket_), peer_address(socket.peerAddress()), our_address(socket.address())
{
}

View File

@ -28,6 +28,7 @@ protected:
* (getpeername will return an error).
*/
Poco::Net::SocketAddress peer_address;
Poco::Net::SocketAddress our_address;
};
}

View File

@ -24,13 +24,13 @@ int main(int argc, char ** argv)
writeCString("--- first ---\n", out);
{
LimitReadBuffer limit_in(in, limit, false);
LimitReadBuffer limit_in(in, limit, /* trow_exception */ false, /* exact_limit */ {});
copyData(limit_in, out);
}
writeCString("\n--- second ---\n", out);
{
LimitReadBuffer limit_in(in, limit, false);
LimitReadBuffer limit_in(in, limit, /* trow_exception */ false, /* exact_limit */ {});
copyData(limit_in, out);
}

View File

@ -27,7 +27,7 @@ try
ReadBuffer in(src.data(), src.size(), 0);
LimitReadBuffer limit_in(in, 1, false);
LimitReadBuffer limit_in(in, 1, /* trow_exception */ false, /* exact_limit */ {});
{
WriteBufferFromString out(dst);
@ -55,7 +55,7 @@ try
char x;
readChar(x, in);
LimitReadBuffer limit_in(in, 1, false);
LimitReadBuffer limit_in(in, 1, /* trow_exception */ false, /* exact_limit */ {});
copyData(limit_in, out);
@ -85,7 +85,7 @@ try
ReadBuffer in(src.data(), src.size(), 0);
{
LimitReadBuffer limit_in(in, 1, false);
LimitReadBuffer limit_in(in, 1, /* trow_exception */ false, /* exact_limit */ {});
char x;
readChar(x, limit_in);

View File

@ -210,7 +210,7 @@ AsynchronousInsertQueue::push(ASTPtr query, ContextPtr query_context)
/// to avoid buffering of huge amount of data in memory.
auto read_buf = getReadBufferFromASTInsertQuery(query);
LimitReadBuffer limit_buf(*read_buf, settings.async_insert_max_data_size, false);
LimitReadBuffer limit_buf(*read_buf, settings.async_insert_max_data_size, /* trow_exception */ false, /* exact_limit */ {});
WriteBufferFromString write_buf(bytes);
copyData(limit_buf, write_buf);

View File

@ -140,6 +140,23 @@ public:
scheduleCloseSession(session, lock);
}
void closeSession(const UUID & user_id, const String & session_id)
{
std::unique_lock lock(mutex);
Key key{user_id, session_id};
auto it = sessions.find(key);
if (it == sessions.end())
{
LOG_INFO(log, "Session {} not found for user {}, probably it's already closed", session_id, user_id);
return;
}
if (!it->second.unique())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot close session {} with refcount {}", session_id, it->second.use_count());
sessions.erase(it);
}
private:
class SessionKeyHash
{
@ -408,7 +425,7 @@ ContextMutablePtr Session::makeSessionContext(const String & session_name_, std:
std::shared_ptr<NamedSessionData> new_named_session;
bool new_named_session_created = false;
std::tie(new_named_session, new_named_session_created)
= NamedSessionsStorage::instance().acquireSession(global_context, user_id.value_or(UUID{}), session_name_, timeout_, session_check_);
= NamedSessionsStorage::instance().acquireSession(global_context, *user_id, session_name_, timeout_, session_check_);
auto new_session_context = new_named_session->context;
new_session_context->makeSessionContext();
@ -533,5 +550,18 @@ void Session::releaseSessionID()
named_session = nullptr;
}
void Session::closeSession(const String & session_id)
{
if (!user_id) /// User was not authenticated
return;
/// named_session may be not set due to an early exception
if (!named_session)
return;
releaseSessionID();
NamedSessionsStorage::instance().closeSession(*user_id, session_id);
}
}

View File

@ -77,6 +77,9 @@ public:
/// Releases the currently used session ID so it becomes available for reuse by another session.
void releaseSessionID();
/// Closes and removes session
void closeSession(const String & session_id);
private:
std::shared_ptr<SessionLog> getSessionLog() const;
ContextMutablePtr makeQueryContextImpl(const ClientInfo * client_info_to_copy, ClientInfo * client_info_to_move) const;

View File

@ -176,7 +176,7 @@ static void setExceptionStackTrace(QueryLogElement & elem)
/// Log exception (with query info) into text log (not into system table).
static void logException(ContextPtr context, QueryLogElement & elem)
static void logException(ContextPtr context, QueryLogElement & elem, bool log_error = true)
{
String comment;
if (!elem.log_comment.empty())
@ -187,7 +187,7 @@ static void logException(ContextPtr context, QueryLogElement & elem)
PreformattedMessage message;
message.format_string = elem.exception_format_string;
if (elem.stack_trace.empty())
if (elem.stack_trace.empty() || !log_error)
message.text = fmt::format("{} (from {}){} (in query: {})", elem.exception,
context->getClientInfo().current_address.toString(),
comment,
@ -201,7 +201,10 @@ static void logException(ContextPtr context, QueryLogElement & elem)
toOneLineQuery(elem.query),
elem.stack_trace);
LOG_ERROR(&Poco::Logger::get("executeQuery"), message);
if (log_error)
LOG_ERROR(&Poco::Logger::get("executeQuery"), message);
else
LOG_INFO(&Poco::Logger::get("executeQuery"), message);
}
static void onExceptionBeforeStart(
@ -1101,7 +1104,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
quota(quota),
status_info_to_query_log,
implicit_txn_control,
query_span]() mutable
query_span](bool log_error) mutable
{
if (implicit_txn_control)
{
@ -1139,9 +1142,9 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
elem.query_duration_ms = start_watch.elapsedMilliseconds();
}
if (current_settings.calculate_text_stack_trace)
if (current_settings.calculate_text_stack_trace && log_error)
setExceptionStackTrace(elem);
logException(context, elem);
logException(context, elem, log_error);
/// In case of exception we log internal queries also
if (log_queries && elem.type >= log_queries_min_type && static_cast<Int64>(elem.query_duration_ms) >= log_queries_min_query_duration_ms)
@ -1262,7 +1265,7 @@ void executeQuery(
/// If not - copy enough data into 'parse_buf'.
WriteBufferFromVector<PODArray<char>> out(parse_buf);
LimitReadBuffer limit(istr, max_query_size + 1, false);
LimitReadBuffer limit(istr, max_query_size + 1, /* trow_exception */ false, /* exact_limit */ {});
copyData(limit, out);
out.finalize();

View File

@ -139,9 +139,11 @@ PushingAsyncPipelineExecutor::PushingAsyncPipelineExecutor(QueryPipeline & pipel
PushingAsyncPipelineExecutor::~PushingAsyncPipelineExecutor()
{
/// It must be finalized explicitly. Otherwise we cancel it assuming it's due to an exception.
chassert(finished || std::uncaught_exceptions() || std::current_exception());
try
{
finish();
cancel();
}
catch (...)
{

View File

@ -63,9 +63,11 @@ PushingPipelineExecutor::PushingPipelineExecutor(QueryPipeline & pipeline_) : pi
PushingPipelineExecutor::~PushingPipelineExecutor()
{
/// It must be finalized explicitly. Otherwise we cancel it assuming it's due to an exception.
chassert(finished || std::uncaught_exceptions() || std::current_exception());
try
{
finish();
cancel();
}
catch (...)
{

View File

@ -4,6 +4,10 @@
namespace DB
{
namespace ErrorCodes
{
extern const int QUERY_WAS_CANCELLED;
}
void BlockIO::reset()
{
@ -58,7 +62,26 @@ void BlockIO::onFinish()
void BlockIO::onException()
{
if (exception_callback)
exception_callback();
exception_callback(/* log_error */ true);
pipeline.reset();
}
void BlockIO::onCancelOrConnectionLoss()
{
/// Query was not finished gracefully, so we should call exception_callback
/// But we don't have a real exception
if (exception_callback)
{
try
{
throw Exception(ErrorCodes::QUERY_WAS_CANCELLED, "Query was cancelled or a client has unexpectedly dropped the connection");
}
catch (...)
{
exception_callback(/* log_error */ false);
}
}
pipeline.reset();
}

View File

@ -26,13 +26,14 @@ struct BlockIO
/// Callbacks for query logging could be set here.
std::function<void(QueryPipeline &)> finish_callback;
std::function<void()> exception_callback;
std::function<void(bool)> exception_callback;
/// When it is true, don't bother sending any non-empty blocks to the out stream
bool null_format = false;
void onFinish();
void onException();
void onCancelOrConnectionLoss();
/// Set is_all_data_sent in system.processes for this query.
void setAllDataSent() const;

View File

@ -984,7 +984,10 @@ namespace
executor.push(block);
}
executor.finish();
if (isQueryCancelled())
executor.cancel();
else
executor.finish();
}
void Call::initializePipeline(const Block & header)

View File

@ -20,6 +20,11 @@
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_READ_ALL_DATA;
}
namespace
{
@ -229,6 +234,11 @@ void HTMLForm::readMultipart(ReadBuffer & in_, PartHandler & handler)
if (!in.skipToNextBoundary())
break;
}
/// It's important to check, because we could get "fake" EOF and incomplete request if a client suddenly died in the middle.
if (!in.isActualEOF())
throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Unexpected EOF, "
"did not find the last boundary while parsing a multipart HTTP request");
}
@ -244,7 +254,8 @@ bool HTMLForm::MultipartReadBuffer::skipToNextBoundary()
if (in.eof())
return false;
assert(boundary_hit);
chassert(boundary_hit);
chassert(!found_last_boundary);
boundary_hit = false;
@ -255,7 +266,8 @@ bool HTMLForm::MultipartReadBuffer::skipToNextBoundary()
{
set(in.position(), 0);
next(); /// We need to restrict our buffer to size of next available line.
return !startsWith(line, boundary + "--");
found_last_boundary = startsWith(line, boundary + "--");
return !found_last_boundary;
}
}

View File

@ -108,10 +108,13 @@ public:
/// Returns false if last boundary found.
bool skipToNextBoundary();
bool isActualEOF() const { return found_last_boundary; }
private:
PeekableReadBuffer in;
const std::string boundary;
bool boundary_hit = true;
bool found_last_boundary = false;
std::string readLine(bool append_crlf);

View File

@ -12,6 +12,8 @@
#include <Poco/Net/HTTPStream.h>
#include <Poco/Net/NetException.h>
#include <Common/logger_useful.h>
#if USE_SSL
#include <Poco/Net/SecureStreamSocketImpl.h>
#include <Poco/Net/SSLException.h>
@ -44,12 +46,28 @@ HTTPServerRequest::HTTPServerRequest(HTTPContextPtr context, HTTPServerResponse
readRequest(*in); /// Try parse according to RFC7230
/// If a client crashes, most systems will gracefully terminate the connection with FIN just like it's done on close().
/// So we will get 0 from recv(...) and will not be able to understand that something went wrong (well, we probably
/// will get RST later on attempt to write to the socket that closed on the other side, but it will happen when the query is finished).
/// If we are extremely unlucky and data format is TSV, for example, then we may stop parsing exactly between rows
/// and decide that it's EOF (but it is not). It may break deduplication, because clients cannot control it
/// and retry with exactly the same (incomplete) set of rows.
/// That's why we have to check body size if it's provided.
if (getChunkedTransferEncoding())
stream = std::make_unique<HTTPChunkedReadBuffer>(std::move(in), context->getMaxChunkSize());
else if (hasContentLength())
stream = std::make_unique<LimitReadBuffer>(std::move(in), getContentLength(), false);
{
size_t content_length = getContentLength();
stream = std::make_unique<LimitReadBuffer>(std::move(in), content_length,
/* trow_exception */ true, /* exact_limit */ content_length);
}
else if (getMethod() != HTTPRequest::HTTP_GET && getMethod() != HTTPRequest::HTTP_HEAD && getMethod() != HTTPRequest::HTTP_DELETE)
{
stream = std::move(in);
if (!startsWith(getContentType(), "multipart/form-data"))
LOG_WARNING(&Poco::Logger::get("HTTPServerRequest"), "Got an HTTP request with no content length "
"and no chunked/multipart encoding, it may be impossible to distinguish graceful EOF from abnormal connection loss");
}
else
/// We have to distinguish empty buffer and nullptr.
stream = std::make_unique<EmptyReadBuffer>();

View File

@ -24,6 +24,7 @@
#include <Common/logger_useful.h>
#include <Common/SettingsChanges.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/scope_guard_safe.h>
#include <Common/setThreadName.h>
#include <Common/typeid_cast.h>
#include <Parsers/ASTSetQuery.h>
@ -678,7 +679,7 @@ void HTTPHandler::processQuery(
std::unique_ptr<ReadBuffer> in;
static const NameSet reserved_param_names{"compress", "decompress", "user", "password", "quota_key", "query_id", "stacktrace",
"buffer_size", "wait_end_of_query", "session_id", "session_timeout", "session_check", "client_protocol_version"};
"buffer_size", "wait_end_of_query", "session_id", "session_timeout", "session_check", "client_protocol_version", "close_session"};
Names reserved_param_suffixes;
@ -957,6 +958,14 @@ void HTTPHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse
/// In case of exception, send stack trace to client.
bool with_stacktrace = false;
/// Close http session (if any) after processing the request
bool close_session = false;
String session_id;
SCOPE_EXIT_SAFE({
if (close_session && !session_id.empty())
session->closeSession(session_id);
});
OpenTelemetry::TracingContextHolderPtr thread_trace_context;
SCOPE_EXIT({
@ -1006,6 +1015,9 @@ void HTTPHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse
HTMLForm params(default_settings, request);
with_stacktrace = params.getParsed<bool>("stacktrace", false);
close_session = params.getParsed<bool>("close_session", false);
if (close_session)
session_id = params.get("session_id");
/// FIXME: maybe this check is already unnecessary.
/// Workaround. Poco does not detect 411 Length Required case.

View File

@ -155,7 +155,7 @@ void MySQLHandler::run()
payload.readStrict(command);
// For commands which are executed without MemoryTracker.
LimitReadBuffer limited_payload(payload, 10000, true, "too long MySQL packet.");
LimitReadBuffer limited_payload(payload, 10000, /* trow_exception */ true, /* exact_limit */ {}, "too long MySQL packet.");
LOG_DEBUG(log, "Received command: {}. Connection id: {}.",
static_cast<int>(static_cast<unsigned char>(command)), connection_id);

View File

@ -109,6 +109,7 @@ namespace ErrorCodes
extern const int UNEXPECTED_PACKET_FROM_CLIENT;
extern const int UNKNOWN_PROTOCOL;
extern const int AUTHENTICATION_FAILED;
extern const int QUERY_WAS_CANCELLED;
}
TCPHandler::TCPHandler(IServer & server_, TCPServer & tcp_server_, const Poco::Net::StreamSocket & socket_, bool parse_proxy_protocol_, std::string server_display_name_)
@ -415,17 +416,25 @@ void TCPHandler::runImpl()
after_check_cancelled.restart();
after_send_progress.restart();
auto finish_or_cancel = [this]()
{
if (state.is_cancelled)
state.io.onCancelOrConnectionLoss();
else
state.io.onFinish();
};
if (state.io.pipeline.pushing())
{
/// FIXME: check explicitly that insert query suggests to receive data via native protocol,
state.need_receive_data_for_insert = true;
processInsertQuery();
state.io.onFinish();
finish_or_cancel();
}
else if (state.io.pipeline.pulling())
{
processOrdinaryQueryWithProcessors();
state.io.onFinish();
finish_or_cancel();
}
else if (state.io.pipeline.completed())
{
@ -454,7 +463,7 @@ void TCPHandler::runImpl()
executor.execute();
}
state.io.onFinish();
finish_or_cancel();
std::lock_guard lock(task_callback_mutex);
@ -468,7 +477,7 @@ void TCPHandler::runImpl()
}
else
{
state.io.onFinish();
finish_or_cancel();
}
/// Do it before sending end of stream, to have a chance to show log message in client.
@ -660,6 +669,7 @@ bool TCPHandler::readDataNext()
{
LOG_INFO(log, "Client has dropped the connection, cancel the query.");
state.is_connection_closed = true;
state.is_cancelled = true;
break;
}
@ -703,6 +713,9 @@ void TCPHandler::readData()
while (readDataNext())
;
if (state.is_cancelled)
throw Exception(ErrorCodes::QUERY_WAS_CANCELLED, "Query was cancelled");
}
@ -713,6 +726,9 @@ void TCPHandler::skipData()
while (readDataNext())
;
if (state.is_cancelled)
throw Exception(ErrorCodes::QUERY_WAS_CANCELLED, "Query was cancelled");
}
@ -749,7 +765,10 @@ void TCPHandler::processInsertQuery()
while (readDataNext())
executor.push(std::move(state.block_for_insert));
executor.finish();
if (state.is_cancelled)
executor.cancel();
else
executor.finish();
};
if (num_threads > 1)
@ -1048,7 +1067,7 @@ bool TCPHandler::receiveProxyHeader()
/// Only PROXYv1 is supported.
/// Validation of protocol is not fully performed.
LimitReadBuffer limit_in(*in, 107, true); /// Maximum length from the specs.
LimitReadBuffer limit_in(*in, 107, /* trow_exception */ true, /* exact_limit */ {}); /// Maximum length from the specs.
assertString("PROXY ", limit_in);
@ -1316,6 +1335,9 @@ bool TCPHandler::receivePacket()
std::this_thread::sleep_for(ms);
}
LOG_INFO(log, "Received 'Cancel' packet from the client, canceling the query");
state.is_cancelled = true;
return false;
}
@ -1357,6 +1379,7 @@ String TCPHandler::receiveReadTaskResponseAssumeLocked()
{
if (packet_type == Protocol::Client::Cancel)
{
LOG_INFO(log, "Received 'Cancel' packet from the client, canceling the read task");
state.is_cancelled = true;
/// For testing connection collector.
if (unlikely(sleep_in_receive_cancel.totalMilliseconds()))
@ -1390,6 +1413,7 @@ std::optional<ParallelReadResponse> TCPHandler::receivePartitionMergeTreeReadTas
{
if (packet_type == Protocol::Client::Cancel)
{
LOG_INFO(log, "Received 'Cancel' packet from the client, canceling the MergeTree read task");
state.is_cancelled = true;
/// For testing connection collector.
if (unlikely(sleep_in_receive_cancel.totalMilliseconds()))

View File

@ -569,6 +569,26 @@ void DistributedSink::onFinish()
}
}
void DistributedSink::onCancel()
{
if (pool && !pool->finished())
{
try
{
pool->wait();
}
catch (...)
{
tryLogCurrentException(storage.log);
}
}
for (auto & shard_jobs : per_shard_jobs)
for (JobReplica & job : shard_jobs.replicas_jobs)
if (job.executor)
job.executor->cancel();
}
IColumn::Selector DistributedSink::createSelector(const Block & source_block) const
{

View File

@ -54,6 +54,8 @@ public:
void onFinish() override;
private:
void onCancel() override;
IColumn::Selector createSelector(const Block & source_block) const;
void writeAsync(const Block & block);

View File

@ -118,7 +118,7 @@ private:
if (limited_by_file_size)
{
limited.emplace(*plain, file_size - offset, false);
limited.emplace(*plain, file_size - offset, /* trow_exception */ false, /* exact_limit */ std::optional<size_t>());
compressed.emplace(*limited);
}
else

View File

@ -796,7 +796,7 @@ bool StorageReplicatedMergeTree::createTableIfNotExists(const StorageMetadataPtr
auto code = zookeeper->tryMulti(ops, responses);
if (code == Coordination::Error::ZNODEEXISTS)
{
LOG_WARNING(log, "It looks like the table {} was created by another server at the same moment, will retry", zookeeper_path);
LOG_INFO(log, "It looks like the table {} was created by another server at the same moment, will retry", zookeeper_path);
continue;
}
else if (code != Coordination::Error::ZOK)
@ -876,7 +876,7 @@ void StorageReplicatedMergeTree::createReplica(const StorageMetadataPtr & metada
case Coordination::Error::ZNODEEXISTS:
throw Exception(ErrorCodes::REPLICA_ALREADY_EXISTS, "Replica {} already exists", replica_path);
case Coordination::Error::ZBADVERSION:
LOG_ERROR(log, "Retrying createReplica(), because some other replicas were created at the same time");
LOG_INFO(log, "Retrying createReplica(), because some other replicas were created at the same time");
break;
case Coordination::Error::ZNONODE:
throw Exception(ErrorCodes::ALL_REPLICAS_LOST, "Table {} was suddenly removed", zookeeper_path);

View File

@ -76,8 +76,9 @@ def trim_for_log(s):
if not s:
return s
lines = s.splitlines()
if len(lines) > 100:
return "\n".join(lines[:50] + ["#" * 100] + lines[-50:])
if len(lines) > 10000:
separator = "-" * 40 + str(len(lines) - 10000) + " lines are hidden" + "-" * 40
return "\n".join(lines[:5000] + [] + [separator] + [] + lines[-5000:])
else:
return "\n".join(lines)

View File

@ -288,7 +288,7 @@ def test_inserts_single_replica_no_internal_replication(started_cluster):
"prefer_localhost_replica": "0",
},
)
assert node2.query("SELECT count(*) FROM single_replicated").strip() == "1"
assert node2.query("SELECT count(*) FROM single_replicated").strip() == "0"
finally:
node2.query("TRUNCATE TABLE single_replicated")

View File

@ -2,6 +2,7 @@
# Tags: long, no-replicated-database, no-ordinary-database
# shellcheck disable=SC2015
# shellcheck disable=SC2119
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh

View File

@ -1,6 +1,7 @@
#!/usr/bin/env bash
# Tags: long, no-parallel, no-ordinary-database
# Test is too heavy, avoid parallel run in Flaky Check
# shellcheck disable=SC2119
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh

View File

@ -2,6 +2,7 @@
# Tags: long, no-ordinary-database
# shellcheck disable=SC2015
# shellcheck disable=SC2119
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh

View File

@ -49,7 +49,7 @@ insert_client_opts=(
timeout 250s $CLICKHOUSE_CLIENT "${client_opts[@]}" "${insert_client_opts[@]}" -q "insert into function remote('127.2', currentDatabase(), in_02232) select * from numbers(1e6)"
# Kill underlying query of remote() to make KILL faster
timeout 30s $CLICKHOUSE_CLIENT "${client_opts[@]}" -q "KILL QUERY WHERE Settings['log_comment'] = '$CLICKHOUSE_LOG_COMMENT' SYNC" --format Null
timeout 60s $CLICKHOUSE_CLIENT "${client_opts[@]}" -q "KILL QUERY WHERE Settings['log_comment'] = '$CLICKHOUSE_LOG_COMMENT' SYNC" --format Null
echo $?
$CLICKHOUSE_CLIENT "${client_opts[@]}" -nm -q "

View File

@ -0,0 +1,3 @@
5000000
5000000
1

View File

@ -0,0 +1,98 @@
#!/usr/bin/env bash
# Tags: no-random-settings
# shellcheck disable=SC2009
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
export DATA_FILE="$CLICKHOUSE_TMP/deduptest.tsv"
export TEST_MARK="02434_insert_${CLICKHOUSE_DATABASE}_"
$CLICKHOUSE_CLIENT -q 'select * from numbers(5000000) format TSV' > $DATA_FILE
$CLICKHOUSE_CLIENT -q 'create table dedup_test(A Int64) Engine = MergeTree order by A settings non_replicated_deduplication_window=1000;'
$CLICKHOUSE_CLIENT -q "create table dedup_dist(A Int64) Engine = Distributed('test_cluster_one_shard_two_replicas', currentDatabase(), dedup_test)"
function insert_data
{
SETTINGS="query_id=$ID&max_insert_block_size=110000&min_insert_block_size_rows=110000"
# max_block_size=10000, so external table will contain smaller blocks that will be squashed on insert-select (more chances to catch a bug on query cancellation)
TRASH_SETTINGS="query_id=$ID&input_format_parallel_parsing=0&max_threads=1&max_insert_threads=1&max_insert_block_size=110000&max_block_size=10000&min_insert_block_size_bytes=0&min_insert_block_size_rows=110000&max_insert_block_size=110000"
TYPE=$(( RANDOM % 5 ))
if [[ "$TYPE" -eq 0 ]]; then
# client will send 10000-rows blocks, server will squash them into 110000-rows blocks (more chances to catch a bug on query cancellation)
$CLICKHOUSE_CLIENT --max_block_size=10000 --max_insert_block_size=10000 --query_id="$ID" \
-q 'insert into dedup_test settings max_insert_block_size=110000, min_insert_block_size_rows=110000 format TSV' < $DATA_FILE
elif [[ "$TYPE" -eq 1 ]]; then
$CLICKHOUSE_CLIENT --max_block_size=10000 --max_insert_block_size=10000 --query_id="$ID" --prefer_localhost_replica="$(( RANDOM % 2))" \
-q 'insert into dedup_dist settings max_insert_block_size=110000, min_insert_block_size_rows=110000 format TSV' < $DATA_FILE
elif [[ "$TYPE" -eq 2 ]]; then
$CLICKHOUSE_CURL -sS -X POST --data-binary @- "$CLICKHOUSE_URL&$SETTINGS&query=insert+into+dedup_test+format+TSV" < $DATA_FILE
elif [[ "$TYPE" -eq 3 ]]; then
$CLICKHOUSE_CURL -sS -X POST -H "Transfer-Encoding: chunked" --data-binary @- "$CLICKHOUSE_URL&$SETTINGS&query=insert+into+dedup_test+format+TSV" < $DATA_FILE
else
$CLICKHOUSE_CURL -sS -F 'file=@-' "$CLICKHOUSE_URL&$TRASH_SETTINGS&file_format=TSV&file_types=UInt64" -X POST --form-string 'query=insert into dedup_test select * from file' < $DATA_FILE
fi
}
export -f insert_data
ID="02434_insert_init_${CLICKHOUSE_DATABASE}_$RANDOM"
insert_data
$CLICKHOUSE_CLIENT -q "system flush distributed dedup_dist"
$CLICKHOUSE_CLIENT -q 'select count() from dedup_test'
function thread_insert
{
# supress "Killed" messages from bash
while true; do
export ID="$TEST_MARK$RANDOM"
bash -c insert_data 2>&1| grep -Fav "Killed"
done
}
function thread_select
{
while true; do
$CLICKHOUSE_CLIENT -q "with (select count() from dedup_test) as c select throwIf(c != 5000000, 'Expected 5000000 rows, got ' || toString(c)) format Null"
sleep 0.$RANDOM;
done
}
function thread_cancel
{
while true; do
SIGNAL="INT"
if (( RANDOM % 2 )); then
SIGNAL="KILL"
fi
PID=$(grep -Fa "$TEST_MARK" /proc/*/cmdline | grep -Fav grep | grep -Eoa "/proc/[0-9]*/cmdline:" | grep -Eo "[0-9]*" | head -1)
if [ ! -z "$PID" ]; then kill -s "$SIGNAL" "$PID"; fi
sleep 0.$RANDOM;
sleep 0.$RANDOM;
sleep 0.$RANDOM;
done
}
export -f thread_insert;
export -f thread_select;
export -f thread_cancel;
TIMEOUT=40
timeout $TIMEOUT bash -c thread_insert &
timeout $TIMEOUT bash -c thread_select &
timeout $TIMEOUT bash -c thread_cancel 2> /dev/null &
wait
$CLICKHOUSE_CLIENT -q 'select count() from dedup_test'
$CLICKHOUSE_CLIENT -q 'system flush logs'
# Ensure that thread_cancel actually did something
$CLICKHOUSE_CLIENT -q "select count() > 0 from system.text_log where event_date >= yesterday() and query_id like '$TEST_MARK%' and (
message_format_string in ('Unexpected end of file while reading chunk header of HTTP chunked data', 'Unexpected EOF, got {} of {} bytes',
'Query was cancelled or a client has unexpectedly dropped the connection') or
message like '%Connection reset by peer%' or message like '%Broken pipe, while writing to socket%')"

View File

@ -0,0 +1,3 @@
1000000
0
1

View File

@ -0,0 +1,117 @@
#!/usr/bin/env bash
# Tags: no-random-settings, no-ordinary-database
# shellcheck disable=SC2009
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
export DATA_FILE="$CLICKHOUSE_TMP/deduptest.tsv"
export TEST_MARK="02435_insert_${CLICKHOUSE_DATABASE}_"
export SESSION="02435_session_${CLICKHOUSE_DATABASE}"
$CLICKHOUSE_CLIENT -q 'select * from numbers(1000000) format TSV' > $DATA_FILE
$CLICKHOUSE_CLIENT -q 'create table dedup_test(A Int64) Engine = MergeTree order by sin(A) partition by intDiv(A, 100000)'
function insert_data
{
IMPLICIT=$(( RANDOM % 2 ))
SESSION_ID="${SESSION}_$RANDOM.$RANDOM.$RANDOM"
TXN_SETTINGS="session_id=$SESSION_ID&throw_on_unsupported_query_inside_transaction=0&implicit_transaction=$IMPLICIT"
BEGIN=""
COMMIT=""
SETTINGS="query_id=$ID&$TXN_SETTINGS&max_insert_block_size=110000&min_insert_block_size_rows=110000"
if [[ "$IMPLICIT" -eq 0 ]]; then
$CLICKHOUSE_CURL -sS -d 'begin transaction' "$CLICKHOUSE_URL&$TXN_SETTINGS"
SETTINGS="$SETTINGS&session_check=1"
BEGIN="begin transaction;"
COMMIT=$(echo -ne "\n\ncommit")
fi
# max_block_size=10000, so external table will contain smaller blocks that will be squashed on insert-select (more chances to catch a bug on query cancellation)
TRASH_SETTINGS="$SETTINGS&input_format_parallel_parsing=0&max_threads=1&max_insert_threads=1&max_block_size=10000&min_insert_block_size_bytes=0"
TYPE=$(( RANDOM % 6 ))
if [[ "$TYPE" -eq 0 ]]; then
$CLICKHOUSE_CURL -sS -X POST --data-binary @- "$CLICKHOUSE_URL&$SETTINGS&query=insert+into+dedup_test+format+TSV" < $DATA_FILE
elif [[ "$TYPE" -eq 1 ]]; then
$CLICKHOUSE_CURL -sS -X POST -H "Transfer-Encoding: chunked" --data-binary @- "$CLICKHOUSE_URL&$SETTINGS&query=insert+into+dedup_test+format+TSV" < $DATA_FILE
elif [[ "$TYPE" -eq 2 ]]; then
$CLICKHOUSE_CURL -sS -F 'file=@-' "$CLICKHOUSE_URL&$TRASH_SETTINGS&file_format=TSV&file_types=UInt64" -X POST --form-string 'query=insert into dedup_test select * from file' < $DATA_FILE
else
# client will send 1000-rows blocks, server will squash them into 110000-rows blocks (more chances to catch a bug on query cancellation)
$CLICKHOUSE_CLIENT --stacktrace --query_id="$ID" --throw_on_unsupported_query_inside_transaction=0 --implicit_transaction="$IMPLICIT" \
--max_block_size=1000 --max_insert_block_size=1000 --multiquery -q \
"${BEGIN}insert into dedup_test settings max_insert_block_size=110000, min_insert_block_size_rows=110000 format TSV$COMMIT" < $DATA_FILE \
| grep -Fv "Transaction is not in RUNNING state"
fi
if [[ "$IMPLICIT" -eq 0 ]]; then
$CLICKHOUSE_CURL -sS -d 'commit' "$CLICKHOUSE_URL&$TXN_SETTINGS&close_session=1"
fi
}
export -f insert_data
ID="02435_insert_init_${CLICKHOUSE_DATABASE}_$RANDOM"
insert_data
$CLICKHOUSE_CLIENT -q 'select count() from dedup_test'
function thread_insert
{
# supress "Killed" messages from bash
while true; do
export ID="$TEST_MARK$RANDOM"
bash -c insert_data 2>&1| grep -Fav "Killed" | grep -Fav "SESSION_IS_LOCKED" | grep -Fav "SESSION_NOT_FOUND"
done
}
function thread_select
{
while true; do
$CLICKHOUSE_CLIENT --implicit_transaction=1 -q "with (select count() from dedup_test) as c select throwIf(c % 1000000 != 0, 'Expected 1000000 * N rows, got ' || toString(c)) format Null"
sleep 0.$RANDOM;
done
}
function thread_cancel
{
while true; do
SIGNAL="INT"
if (( RANDOM % 2 )); then
SIGNAL="KILL"
fi
PID=$(grep -Fa "$TEST_MARK" /proc/*/cmdline | grep -Fav grep | grep -Eoa "/proc/[0-9]*/cmdline:" | grep -Eo "[0-9]*" | head -1)
if [ ! -z "$PID" ]; then kill -s "$SIGNAL" "$PID"; fi
sleep 0.$RANDOM;
done
}
export -f thread_insert;
export -f thread_select;
export -f thread_cancel;
TIMEOUT=20
timeout $TIMEOUT bash -c thread_insert &
timeout $TIMEOUT bash -c thread_select &
timeout $TIMEOUT bash -c thread_cancel 2> /dev/null &
wait
$CLICKHOUSE_CLIENT -q 'system flush logs'
ID="02435_insert_last_${CLICKHOUSE_DATABASE}_$RANDOM"
insert_data
$CLICKHOUSE_CLIENT --implicit_transaction=1 -q 'select throwIf(count() % 1000000 != 0 or count() = 0) from dedup_test' \
|| $CLICKHOUSE_CLIENT -q "select name, rows, active, visible, creation_tid, creation_csn from system.parts where database=currentDatabase();"
# Ensure that thread_cancel actually did something
$CLICKHOUSE_CLIENT -q "select count() > 0 from system.text_log where event_date >= yesterday() and query_id like '$TEST_MARK%' and (
message_format_string in ('Unexpected end of file while reading chunk header of HTTP chunked data', 'Unexpected EOF, got {} of {} bytes',
'Query was cancelled or a client has unexpectedly dropped the connection') or
message like '%Connection reset by peer%' or message like '%Broken pipe, while writing to socket%')"
wait_for_queries_to_finish 30
$CLICKHOUSE_CLIENT --database_atomic_wait_for_drop_and_detach_synchronously=0 -q "drop table dedup_test"

View File

@ -1,4 +1,5 @@
#!/usr/bin/env bash
# shellcheck disable=SC2120
# Don't check for ODR violation, since we may test shared build with ASAN
export ASAN_OPTIONS=detect_odr_violation=0
@ -136,12 +137,13 @@ function clickhouse_client_removed_host_parameter()
function wait_for_queries_to_finish()
{
local max_tries="${1:-20}"
# Wait for all queries to finish (query may still be running if thread is killed by timeout)
num_tries=0
while [[ $($CLICKHOUSE_CLIENT -q "SELECT count() FROM system.processes WHERE current_database=currentDatabase() AND query NOT LIKE '%system.processes%'") -ne 0 ]]; do
sleep 0.5;
num_tries=$((num_tries+1))
if [ $num_tries -eq 20 ]; then
if [ $num_tries -eq $max_tries ]; then
$CLICKHOUSE_CLIENT -q "SELECT * FROM system.processes WHERE current_database=currentDatabase() AND query NOT LIKE '%system.processes%' FORMAT Vertical"
break
fi

View File

@ -17,7 +17,8 @@ static DB::MySQLReplication::BinlogEventPtr parseSingleEventBody(
std::shared_ptr<DB::MySQLReplication::TableMapEvent> & last_table_map_event, bool exist_checksum)
{
DB::MySQLReplication::BinlogEventPtr event;
DB::ReadBufferPtr limit_read_buffer = std::make_shared<DB::LimitReadBuffer>(payload, header.event_size - 19, false);
DB::ReadBufferPtr limit_read_buffer = std::make_shared<DB::LimitReadBuffer>(payload, header.event_size - 19,
/* trow_exception */ false, /* exact_limit */ std::nullopt);
DB::ReadBufferPtr event_payload = std::make_shared<DB::MySQLBinlogEventReadBuffer>(*limit_read_buffer, exist_checksum ? 4 : 0);
switch (header.type)