Simplify the code: get rid of WriteBufferFromGRPC.

This commit is contained in:
Vitaly Baranov 2020-10-21 17:35:38 +03:00
parent c2edd9f8ce
commit ba723d6d75
2 changed files with 78 additions and 154 deletions

View File

@ -15,7 +15,6 @@
#include <Parsers/ParserQuery.h>
#include <Processors/Executors/PullingAsyncPipelineExecutor.h>
#include <Server/IServer.h>
#include <Server/WriteBufferFromGRPC.h>
#include <Storages/IStorage.h>
#include <grpc++/security/server_credentials.h>
#include <grpc++/server.h>
@ -57,10 +56,10 @@ namespace
reader_writer.Read(&query_info_, tag);
}
/*void write(const GRPCResult & result)
void write(const GRPCResult & result)
{
reader_writer.Write(result, tag);
}*/
}
void writeAndFinish(const GRPCResult & result, const grpc::Status & status)
{
@ -73,11 +72,6 @@ namespace
return Poco::Net::SocketAddress{peer.substr(peer.find(':') + 1)};
}
grpc::ServerAsyncReaderWriter<GRPCResult, GRPCQueryInfo> & getReaderWriter()
{
return reader_writer;
}
private:
grpc::ServerContext grpc_context;
grpc::ServerAsyncReaderWriter<GRPCResult, GRPCQueryInfo> reader_writer{&grpc_context};
@ -108,10 +102,11 @@ namespace
void onException(const Exception & exception);
void close();
void sendOutput(const Block & block);
void sendProgress();
void sendTotals(const Block & totals);
void sendExtremes(const Block & extremes);
void addOutputToResult(const Block & block);
void addProgressToResult();
void addTotalsToResult(const Block & totals);
void addExtremesToResult(const Block & extremes);
void sendResult();
void sendException(const Exception & exception);
std::unique_ptr<Responder> responder;
@ -129,12 +124,14 @@ namespace
bool send_exception_with_stacktrace = true;
BlockIO io;
std::shared_ptr<WriteBufferFromGRPC> out;
Progress progress;
GRPCQueryInfo query_info; /// We reuse the same messages multiple times.
GRPCResult result;
bool finalize = false;
bool responder_finished = false;
ThreadFromGlobalPool call_thread;
std::condition_variable signal;
std::atomic<size_t> num_syncs_pending = 0;
@ -145,7 +142,6 @@ namespace
: responder(std::move(responder_)), iserver(iserver_), log(log_)
{
responder->setTag(this);
out = std::make_shared<WriteBufferFromGRPC>(&responder->getReaderWriter(), this, nullptr);
}
Call::~Call()
@ -399,26 +395,30 @@ namespace
async_in.readPrefix();
while (true)
{
Block block;
if (async_in.poll(interactive_delay / 1000))
{
const auto block = async_in.read();
block = async_in.read();
if (!block)
break;
if (!io.null_format)
sendOutput(block);
}
if (block && !io.null_format)
addOutputToResult(block);
if (after_send_progress.elapsedMicroseconds() >= interactive_delay)
{
sendProgress();
addProgressToResult();
after_send_progress.restart();
}
if (!result.output().empty() || result.has_progress())
sendResult();
}
async_in.readSuffix();
sendTotals(io.in->getTotals());
sendExtremes(io.in->getExtremes());
addTotalsToResult(io.in->getTotals());
addExtremesToResult(io.in->getExtremes());
}
void Call::generateOutputWithProcessors()
@ -435,33 +435,37 @@ namespace
if (block)
{
if (!io.null_format)
sendOutput(block);
addOutputToResult(block);
}
if (after_send_progress.elapsedMicroseconds() >= interactive_delay)
{
sendProgress();
addProgressToResult();
after_send_progress.restart();
}
if (!result.output().empty() || result.has_progress())
sendResult();
}
sendTotals(executor->getTotalsBlock());
sendExtremes(executor->getExtremesBlock());
addTotalsToResult(executor->getTotalsBlock());
addExtremesToResult(executor->getExtremesBlock());
}
void Call::finishQuery()
{
finalize = true;
io.onFinish();
query_scope->logPeakMemoryUsage();
out->finalize();
waitForSync();
sendResult();
close();
}
void Call::onException(const Exception & exception)
{
io.onException();
if (responder)
if (responder && !responder_finished)
{
try
{
@ -480,89 +484,65 @@ namespace
{
responder.reset();
io = {};
out.reset();
query_scope.reset();
query_context.reset();
}
void Call::sendOutput(const Block & block)
void Call::addOutputToResult(const Block & block)
{
out->setResponse([](const String & buffer)
{
GRPCResult tmp_response;
tmp_response.set_output(buffer);
return tmp_response;
});
auto my_block_out_stream = query_context->getOutputFormat(output_format, *out, block);
my_block_out_stream->write(block);
my_block_out_stream->flush();
out->next();
waitForSync();
WriteBufferFromString buf{*result.mutable_output()};
auto stream = query_context->getOutputFormat(output_format, buf, block);
stream->write(block);
}
void Call::sendProgress()
void Call::addProgressToResult()
{
auto grpc_progress = [](const String & buffer)
{
auto in = std::make_unique<ReadBufferFromString>(buffer);
ProgressValues progress_values;
progress_values.read(*in, DBMS_MIN_REVISION_WITH_CLIENT_WRITE_INFO);
GRPCProgress tmp_progress;
tmp_progress.set_read_rows(progress_values.read_rows);
tmp_progress.set_read_bytes(progress_values.read_bytes);
tmp_progress.set_total_rows_to_read(progress_values.total_rows_to_read);
tmp_progress.set_written_rows(progress_values.written_rows);
tmp_progress.set_written_bytes(progress_values.written_bytes);
return tmp_progress;
};
out->setResponse([&grpc_progress](const String & buffer)
{
GRPCResult tmp_response;
auto tmp_progress = std::make_unique<GRPCProgress>(grpc_progress(buffer));
tmp_response.set_allocated_progress(tmp_progress.release());
return tmp_response;
});
auto increment = progress.fetchAndResetPiecewiseAtomically();
increment.write(*out, DBMS_MIN_REVISION_WITH_CLIENT_WRITE_INFO);
out->next();
waitForSync();
auto & grpc_progress = *result.mutable_progress();
auto values = progress.fetchAndResetPiecewiseAtomically();
grpc_progress.set_read_rows(values.read_rows);
grpc_progress.set_read_bytes(values.read_bytes);
grpc_progress.set_total_rows_to_read(values.total_rows_to_read);
grpc_progress.set_written_rows(values.written_rows);
grpc_progress.set_written_bytes(values.written_bytes);
}
void Call::sendTotals(const Block & totals)
void Call::addTotalsToResult(const Block & totals)
{
if (totals)
if (!totals)
return;
WriteBufferFromString buf{*result.mutable_totals()};
auto stream = query_context->getOutputFormat(output_format, buf, totals);
stream->write(totals);
}
void Call::addExtremesToResult(const Block & extremes)
{
if (!extremes)
return;
WriteBufferFromString buf{*result.mutable_extremes()};
auto stream = query_context->getOutputFormat(output_format, buf, extremes);
stream->write(extremes);
}
void Call::sendResult()
{
/// gRPC doesn't allow to write anything to a finished responder.
if (responder_finished)
return;
bool send_final_message = finalize || result.has_exception();
if (send_final_message)
{
out->setResponse([](const String & buffer)
{
GRPCResult tmp_response;
tmp_response.set_totals(buffer);
return tmp_response;
});
auto my_block_out_stream = query_context->getOutputFormat(output_format, *out, totals);
my_block_out_stream->write(totals);
my_block_out_stream->flush();
out->next();
waitForSync();
responder_finished = true;
responder->writeAndFinish(result, {});
}
}
else
responder->write(result);
void Call::sendExtremes(const Block & extremes)
{
if (extremes)
{
out->setResponse([](const String & buffer)
{
GRPCResult tmp_response;
tmp_response.set_extremes(buffer);
return tmp_response;
});
auto my_block_out_stream = query_context->getOutputFormat(output_format, *out, extremes);
my_block_out_stream->write(extremes);
my_block_out_stream->flush();
out->next();
waitForSync();
}
waitForSync();
result.Clear();
}
void Call::sendException(const Exception & exception)
@ -570,8 +550,7 @@ namespace
auto & grpc_exception = *result.mutable_exception();
grpc_exception.set_code(exception.code());
grpc_exception.set_message(getExceptionMessage(exception, send_exception_with_stacktrace, true));
responder->writeAndFinish(result, {});
waitForSync();
sendResult();
}
}

View File

@ -1,55 +0,0 @@
#pragma once
#include <IO/WriteBuffer.h>
#include <IO/BufferWithOwnMemory.h>
#include <common/types.h>
#include <grpc++/server.h>
#include "clickhouse_grpc.grpc.pb.h"
namespace DB
{
class WriteBufferFromGRPC : public BufferWithOwnMemory<WriteBuffer>
{
public:
using GRPCQueryInfo = clickhouse::grpc::QueryInfo;
using GRPCResult = clickhouse::grpc::Result;
WriteBufferFromGRPC(
grpc::ServerAsyncReaderWriter<GRPCResult, GRPCQueryInfo> * responder_,
void * tag_,
std::function<GRPCResult(const String & buffer)> set_response_details_)
: responder(responder_), tag(tag_), set_response_details(set_response_details_)
{
}
~WriteBufferFromGRPC() override {}
bool onProgress() { return progress; }
bool isFinished() { return finished; }
void setFinish(bool fl) { finished = fl; }
void setResponse(std::function<GRPCResult(const String & buffer)> function) { set_response_details = function; }
void finalize() override
{
progress = false;
finished = true;
responder->Finish(grpc::Status(), tag);
}
protected:
grpc::ServerAsyncReaderWriter<GRPCResult, GRPCQueryInfo> * responder;
void * tag;
bool progress = false;
bool finished = false;
std::function<GRPCResult(const String & buffer)> set_response_details;
void nextImpl() override
{
progress = true;
String buffer(working_buffer.begin(), working_buffer.begin() + offset());
auto response = set_response_details(buffer);
responder->Write(response, tag);
}
};
}