work with review comments

This commit is contained in:
Sema Checherinda 2024-11-14 16:57:50 +01:00
parent 5b3b9fad2f
commit 21a39e6b50
8 changed files with 4 additions and 16 deletions

View File

@ -449,12 +449,10 @@ void ClientBase::onData(Block & block, ASTPtr parsed_query)
}
catch (const NetException &)
{
//std::cerr << "write throw NetException exception: " << getCurrentExceptionMessage(true) << '\n';
throw;
}
catch (const ErrnoException &)
{
//std::cerr << "write throw ErrnoException exception: " << getCurrentExceptionMessage(true) << '\n';
throw;
}
catch (const Exception &)
@ -555,7 +553,6 @@ try
ShellCommand::Config config(pager);
config.pipe_stdin_only = true;
//std::cerr << "create pager cmd " << config.command << std::endl;
pager_cmd = ShellCommand::execute(config);
out_buf = &pager_cmd->in;
}

View File

@ -892,7 +892,6 @@ void Connection::sendQuery(
void Connection::sendCancel()
{
LOG_DEBUG(log_wrapper.get(), "send cancel from {}", StackTrace().toString());
/// If we already disconnected.
if (!out)
return;

View File

@ -3,7 +3,7 @@
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <AggregateFunctions/Combinators/AggregateFunctionCombinatorFactory.h>
#include <Columns/ColumnString.h>
#include "Common/Exception.h"
#include <Common/Exception.h>
#include <Common/typeid_cast.h>
#include <Common/Macros.h>
#include "Core/Protocol.h"

View File

@ -39,8 +39,6 @@ try
if (argc >= 5)
port = parse<uint16_t>(argv[4]);
// WriteBufferFromFileDescriptor out(STDERR_FILENO);
std::atomic_bool cancel{false};
std::vector<std::thread> threads(num_threads);
for (auto & thread : threads)

View File

@ -87,12 +87,10 @@ StatusFile::StatusFile(std::string path_, FillFunction fill_)
try
{
fill(out);
/// Finalize here to avoid throwing exceptions in destructor.
out.finalize();
}
catch (...)
{
/// Finalize in case of exception to avoid throwing exceptions in destructor
out.cancel();
throw;
}

View File

@ -1,4 +1,3 @@
#include "Common/logger_useful.h"
#include <Common/ConcurrentBoundedQueue.h>
#include <QueryPipeline/RemoteQueryExecutor.h>
#include <QueryPipeline/RemoteQueryExecutorReadContext.h>

View File

@ -154,7 +154,7 @@ namespace DB::ErrorCodes
// We have to distinguish the case when query is killed by `KILL QUERY` statement
// and when it is killed by `Protocol::Client::Cancel` packet.
// When query is illed by `KILL QUERY` statement we have to end the execution
// When query is killed by `KILL QUERY` statement we have to end the execution
// and send the exception to the actual client which initiated the TCP connection.
// When query is killed by `Protocol::Client::Cancel` packet we just stop execution,

View File

@ -119,8 +119,6 @@ struct QueryState
/// Timeouts setter for current query
std::unique_ptr<TimeoutSetter> timeout_setter;
std::mutex mutex;
void finalizeOut(std::shared_ptr<WriteBufferFromPocoSocketChunked> & raw_out) const
{
if (maybe_compressed_out && maybe_compressed_out.get() != raw_out.get())
@ -241,8 +239,8 @@ private:
std::optional<UInt64> nonce;
String cluster;
/// `callback_mutex` protects using `out` (WriteBuffer) and `in` (ReadBuffer) inside callback.
/// So it is used for method sendData(), sendProgress(), sendLogs() along with all callbacks which interact with that buffers
/// `callback_mutex` protects using `out` (WriteBuffer), `in` (ReadBuffer) and other members concurrent inside callbacks.
/// All the methods which are run inside callbacks are marked with TSA_REQUIRES.
std::mutex callback_mutex;
/// Last block input parameters are saved to be able to receive unexpected data packet sent after exception.
@ -272,7 +270,6 @@ private:
String receiveReadTaskResponse(QueryState & state) TSA_REQUIRES(callback_mutex);
std::optional<ParallelReadResponse> receivePartitionMergeTreeReadTaskResponse(QueryState & state) TSA_REQUIRES(callback_mutex);
//bool receivePacket(std::optional<QueryState> & state);
void processCancel(QueryState & state, bool throw_exception = true) TSA_REQUIRES(callback_mutex);
void processQuery(std::optional<QueryState> & state);
void processIgnoredPartUUIDs();