This commit is contained in:
Alexander Tokmakov 2023-02-25 01:18:34 +01:00
parent 7122ebab4d
commit cad1e0b768
12 changed files with 89 additions and 23 deletions

View File

@ -64,7 +64,8 @@ void WriteBufferFromPocoSocket::nextImpl()
}
catch (const Poco::Net::NetException & e)
{
throw NetException(ErrorCodes::NETWORK_ERROR, "{}, while writing to socket ({})", e.displayText(), peer_address.toString());
throw NetException(ErrorCodes::NETWORK_ERROR, "{}, while writing to socket ({} -> {})", e.displayText(),
our_address.toString(), peer_address.toString());
}
catch (const Poco::TimeoutException &)
{
@ -74,18 +75,20 @@ void WriteBufferFromPocoSocket::nextImpl()
}
catch (const Poco::IOException & e)
{
throw NetException(ErrorCodes::NETWORK_ERROR, "{}, while writing to socket ({})", e.displayText(), peer_address.toString());
throw NetException(ErrorCodes::NETWORK_ERROR, "{}, while writing to socket ({} -> {})", e.displayText(),
our_address.toString(), peer_address.toString());
}
if (res < 0)
throw NetException(ErrorCodes::CANNOT_WRITE_TO_SOCKET, "Cannot write to socket ({})", peer_address.toString());
throw NetException(ErrorCodes::CANNOT_WRITE_TO_SOCKET, "Cannot write to socket ({} -> {})",
our_address.toString(), peer_address.toString());
bytes_written += res;
}
}
WriteBufferFromPocoSocket::WriteBufferFromPocoSocket(Poco::Net::Socket & socket_, size_t buf_size)
: BufferWithOwnMemory<WriteBuffer>(buf_size), socket(socket_), peer_address(socket.peerAddress())
: BufferWithOwnMemory<WriteBuffer>(buf_size), socket(socket_), peer_address(socket.peerAddress()), our_address(socket.address())
{
}

View File

@ -28,6 +28,7 @@ protected:
* (getpeername will return an error).
*/
Poco::Net::SocketAddress peer_address;
Poco::Net::SocketAddress our_address;
};
}

View File

@ -176,7 +176,7 @@ static void setExceptionStackTrace(QueryLogElement & elem)
/// Log exception (with query info) into text log (not into system table).
static void logException(ContextPtr context, QueryLogElement & elem)
static void logException(ContextPtr context, QueryLogElement & elem, bool log_error = true)
{
String comment;
if (!elem.log_comment.empty())
@ -187,7 +187,7 @@ static void logException(ContextPtr context, QueryLogElement & elem)
PreformattedMessage message;
message.format_string = elem.exception_format_string;
if (elem.stack_trace.empty())
if (elem.stack_trace.empty() || !log_error)
message.text = fmt::format("{} (from {}){} (in query: {})", elem.exception,
context->getClientInfo().current_address.toString(),
comment,
@ -201,7 +201,10 @@ static void logException(ContextPtr context, QueryLogElement & elem)
toOneLineQuery(elem.query),
elem.stack_trace);
if (log_error)
LOG_ERROR(&Poco::Logger::get("executeQuery"), message);
else
LOG_INFO(&Poco::Logger::get("executeQuery"), message);
}
static void onExceptionBeforeStart(
@ -1101,7 +1104,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
quota(quota),
status_info_to_query_log,
implicit_txn_control,
query_span]() mutable
query_span](bool log_error) mutable
{
if (implicit_txn_control)
{
@ -1139,9 +1142,9 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
elem.query_duration_ms = start_watch.elapsedMilliseconds();
}
if (current_settings.calculate_text_stack_trace)
if (current_settings.calculate_text_stack_trace && log_error)
setExceptionStackTrace(elem);
logException(context, elem);
logException(context, elem, log_error);
/// In case of exception we log internal queries also
if (log_queries && elem.type >= log_queries_min_type && static_cast<Int64>(elem.query_duration_ms) >= log_queries_min_query_duration_ms)

View File

@ -4,6 +4,10 @@
namespace DB
{
namespace ErrorCodes
{
extern const int QUERY_WAS_CANCELLED;
}
void BlockIO::reset()
{
@ -58,7 +62,26 @@ void BlockIO::onFinish()
void BlockIO::onException()
{
if (exception_callback)
exception_callback();
exception_callback(/* log_error */ true);
pipeline.reset();
}
void BlockIO::onCancelOrConnectionLoss()
{
/// Query was not finished gracefully, so we should call exception_callback
/// But we don't have a real exception
if (exception_callback)
{
try
{
throw Exception(ErrorCodes::QUERY_WAS_CANCELLED, "Query was cancelled or a client has unexpectedly dropped the connection");
}
catch (...)
{
exception_callback(/* log_error */ false);
}
}
pipeline.reset();
}

View File

@ -26,13 +26,14 @@ struct BlockIO
/// Callbacks for query logging could be set here.
std::function<void(QueryPipeline &)> finish_callback;
std::function<void()> exception_callback;
std::function<void(bool)> exception_callback;
/// When it is true, don't bother sending any non-empty blocks to the out stream
bool null_format = false;
void onFinish();
void onException();
void onCancelOrConnectionLoss();
/// Set is_all_data_sent in system.processes for this query.
void setAllDataSent() const;

View File

@ -416,17 +416,25 @@ void TCPHandler::runImpl()
after_check_cancelled.restart();
after_send_progress.restart();
auto finish_or_cancel = [this]()
{
if (state.is_cancelled)
state.io.onCancelOrConnectionLoss();
else
state.io.onFinish();
};
if (state.io.pipeline.pushing())
{
/// FIXME: check explicitly that insert query suggests to receive data via native protocol,
state.need_receive_data_for_insert = true;
processInsertQuery();
state.io.onFinish();
finish_or_cancel();
}
else if (state.io.pipeline.pulling())
{
processOrdinaryQueryWithProcessors();
state.io.onFinish();
finish_or_cancel();
}
else if (state.io.pipeline.completed())
{
@ -455,7 +463,7 @@ void TCPHandler::runImpl()
executor.execute();
}
state.io.onFinish();
finish_or_cancel();
std::lock_guard lock(task_callback_mutex);
@ -469,7 +477,7 @@ void TCPHandler::runImpl()
}
else
{
state.io.onFinish();
finish_or_cancel();
}
/// Do it before sending end of stream, to have a chance to show log message in client.

View File

@ -569,6 +569,26 @@ void DistributedSink::onFinish()
}
}
void DistributedSink::onCancel()
{
if (pool && !pool->finished())
{
try
{
pool->wait();
}
catch (...)
{
tryLogCurrentException(storage.log);
}
}
for (auto & shard_jobs : per_shard_jobs)
for (JobReplica & job : shard_jobs.replicas_jobs)
if (job.executor)
job.executor->cancel();
}
IColumn::Selector DistributedSink::createSelector(const Block & source_block) const
{

View File

@ -54,6 +54,8 @@ public:
void onFinish() override;
private:
void onCancel() override;
IColumn::Selector createSelector(const Block & source_block) const;
void writeAsync(const Block & block);

View File

@ -288,7 +288,7 @@ def test_inserts_single_replica_no_internal_replication(started_cluster):
"prefer_localhost_replica": "0",
},
)
assert node2.query("SELECT count(*) FROM single_replicated").strip() == "1"
assert node2.query("SELECT count(*) FROM single_replicated").strip() == "0"
finally:
node2.query("TRUNCATE TABLE single_replicated")

View File

@ -11,20 +11,25 @@ export TEST_MARK="02434_insert_${CLICKHOUSE_DATABASE}_"
$CLICKHOUSE_CLIENT -q 'select * from numbers(5000000) format TSV' > $DATA_FILE
$CLICKHOUSE_CLIENT -q 'create table dedup_test(A Int64) Engine = MergeTree order by A settings non_replicated_deduplication_window=1000;'
$CLICKHOUSE_CLIENT -q "create table dedup_dist(A Int64) Engine = Distributed('test_cluster_one_shard_two_replicas', currentDatabase(), dedup_test)"
function insert_data
{
SETTINGS="query_id=$ID&max_insert_block_size=110000&min_insert_block_size_rows=110000"
# max_block_size=10000, so external table will contain smaller blocks that will be squashed on insert-select (more chances to catch a bug on query cancellation)
TRASH_SETTINGS="query_id=$ID&input_format_parallel_parsing=0&max_threads=1&max_insert_threads=1&max_insert_block_size=110000&max_block_size=10000&min_insert_block_size_bytes=0&min_insert_block_size_rows=110000&max_insert_block_size=110000"
TYPE=$(( RANDOM % 4 ))
TYPE=$(( RANDOM % 5 ))
if [[ "$TYPE" -eq 0 ]]; then
# client will send 10000-rows blocks, server will squash them into 110000-rows blocks (more chances to catch a bug on query cancellation)
$CLICKHOUSE_CLIENT --max_block_size=10000 --max_insert_block_size=10000 --query_id="$ID" -q 'insert into dedup_test settings max_insert_block_size=110000, min_insert_block_size_rows=110000 format TSV' < $DATA_FILE
$CLICKHOUSE_CLIENT --max_block_size=10000 --max_insert_block_size=10000 --query_id="$ID" \
-q 'insert into dedup_test settings max_insert_block_size=110000, min_insert_block_size_rows=110000 format TSV' < $DATA_FILE
elif [[ "$TYPE" -eq 1 ]]; then
$CLICKHOUSE_CURL -sS -X POST --data-binary @- "$CLICKHOUSE_URL&$SETTINGS&query=insert+into+dedup_test+format+TSV" < $DATA_FILE
$CLICKHOUSE_CLIENT --max_block_size=10000 --max_insert_block_size=10000 --query_id="$ID" --prefer_localhost_replica="$(( RANDOM % 2))" \
-q 'insert into dedup_dist settings max_insert_block_size=110000, min_insert_block_size_rows=110000 format TSV' < $DATA_FILE
elif [[ "$TYPE" -eq 2 ]]; then
$CLICKHOUSE_CURL -sS -X POST --data-binary @- "$CLICKHOUSE_URL&$SETTINGS&query=insert+into+dedup_test+format+TSV" < $DATA_FILE
elif [[ "$TYPE" -eq 3 ]]; then
$CLICKHOUSE_CURL -sS -X POST -H "Transfer-Encoding: chunked" --data-binary @- "$CLICKHOUSE_URL&$SETTINGS&query=insert+into+dedup_test+format+TSV" < $DATA_FILE
else
$CLICKHOUSE_CURL -sS -F 'file=@-' "$CLICKHOUSE_URL&$TRASH_SETTINGS&file_format=TSV&file_types=UInt64" -X POST --form-string 'query=insert into dedup_test select * from file' < $DATA_FILE
@ -73,7 +78,7 @@ export -f thread_insert;
export -f thread_select;
export -f thread_cancel;
TIMEOUT=40 # 10 seconds for each TYPE
TIMEOUT=40
timeout $TIMEOUT bash -c thread_insert &
timeout $TIMEOUT bash -c thread_select &

View File

@ -114,6 +114,6 @@ $CLICKHOUSE_CLIENT --implicit_transaction=1 -q 'select throwIf(count() % 1000000
# So use this query to check that thread_cancel do something
$CLICKHOUSE_CLIENT -q "select count() > 0 from system.text_log where event_date >= yesterday() and query_id like '$TEST_MARK%' and (
message_format_string in ('Unexpected end of file while reading chunk header of HTTP chunked data', 'Unexpected EOF, got {} of {} bytes') or
message like '%Connection reset by peer%')"
message like '%Connection reset by peer%' or message like '%Broken pipe, while writing to socket%')"
$CLICKHOUSE_CLIENT --database_atomic_wait_for_drop_and_detach_synchronously=0 -q "drop table dedup_test"

View File

@ -18,7 +18,7 @@ static DB::MySQLReplication::BinlogEventPtr parseSingleEventBody(
{
DB::MySQLReplication::BinlogEventPtr event;
DB::ReadBufferPtr limit_read_buffer = std::make_shared<DB::LimitReadBuffer>(payload, header.event_size - 19,
/* trow_exception */ false, /* exact_limit */ {});
/* trow_exception */ false, /* exact_limit */ std::nullopt);
DB::ReadBufferPtr event_payload = std::make_shared<DB::MySQLBinlogEventReadBuffer>(*limit_read_buffer, exist_checksum ? 4 : 0);
switch (header.type)