Implement GRPC protocol.

This commit is contained in:
mnkonkova 2020-09-22 01:12:55 +03:00 committed by Vitaly Baranov
parent 8d96a11d8d
commit 6cd1557d67
15 changed files with 1071 additions and 1 deletions

View File

@ -112,6 +112,8 @@ add_subdirectory (obfuscator)
add_subdirectory (install)
add_subdirectory (git-import)
add_subdirectory (grpc-client)
if (ENABLE_CLICKHOUSE_ODBC_BRIDGE)
add_subdirectory (odbc-bridge)
endif ()

View File

@ -0,0 +1,7 @@
include_directories(${CMAKE_CURRENT_BINARY_DIR})
get_filename_component(rpc_proto "${CMAKE_CURRENT_SOURCE_DIR}/../server/grpc_protos/GrpcConnection.proto" ABSOLUTE)
protobuf_generate_cpp(PROTO_SRCS PROTO_HDRS ${rpc_proto})
PROTOBUF_GENERATE_GRPC_CPP(GRPC_SRCS GRPC_HDRS ${rpc_proto})
add_executable(grpc-client grpc_client.cpp ${PROTO_SRCS} ${PROTO_HDRS} ${GRPC_SRCS} ${GRPC_HDRS})
target_link_libraries(grpc-client PUBLIC grpc++ PUBLIC libprotobuf PUBLIC daemon)

View File

@ -0,0 +1,173 @@
#include <iostream>
#include <memory>
#include <string>
#include <vector>
#include <thread>
#include <stdlib.h>
#include <grpc++/channel.h>
#include <grpc++/client_context.h>
#include <grpc++/create_channel.h>
#include <grpc++/security/credentials.h>
#include "GrpcConnection.grpc.pb.h"
class GRPCClient
{
public:
explicit GRPCClient(std::shared_ptr<grpc::Channel> channel)
: stub_(GRPCConnection::GRPC::NewStub(channel))
{}
std::string Query(const GRPCConnection::User& userInfo,
const std::string& query,
std::vector<std::string> insert_data = {})
{
GRPCConnection::QueryRequest request;
grpc::Status status;
GRPCConnection::QueryResponse reply;
grpc::ClientContext context;
auto deadline = std::chrono::system_clock::now() + std::chrono::milliseconds(10000);
context.set_deadline(deadline);
auto user = std::make_unique<GRPCConnection::User>(userInfo);
auto querySettigs = std::make_unique<GRPCConnection::QuerySettings>();
int id = rand();
request.set_allocated_user_info(user.release());
// interactive_delay in miliseconds
request.set_interactive_delay(1000);
querySettigs->set_query(query);
querySettigs->set_format("Values");
querySettigs->set_query_id(std::to_string(id));
querySettigs->set_data_stream((insert_data.size() != 0));
(*querySettigs->mutable_settings())["max_query_size"] ="100";
request.set_allocated_query_info(querySettigs.release());
void* got_tag = (void*)1;
bool ok = false;
std::unique_ptr<grpc::ClientReaderWriter<GRPCConnection::QueryRequest, GRPCConnection::QueryResponse> > reader(stub_->Query(&context));
reader->Write(request);
auto write = [&reply, &reader, &insert_data]()
{
GRPCConnection::QueryRequest request_insert;
for (const auto& data : insert_data)
{
request_insert.set_insert_data(data);
if (reply.exception_occured().empty())
{
reader->Write(request_insert);
}
else
{
break;
}
}
request_insert.set_insert_data("");
if (reply.exception_occured().empty())
{
reader->Write(request_insert);
}
// reader->WritesDone();
};
std::thread write_thread(write);
write_thread.detach();
while (reader->Read(&reply))
{
if (!reply.output().empty())
{
std::cout << "Query Part:\n " << id<< reply.output()<<'\n';
}
else if (reply.progress().read_rows()
|| reply.progress().read_bytes()
|| reply.progress().total_rows_to_read()
|| reply.progress().written_rows()
|| reply.progress().written_bytes())
{
std::cout << "Progress " << id<< ":{\n" << "read_rows: " << reply.progress().read_rows() << '\n'
<< "read_bytes: " << reply.progress().read_bytes() << '\n'
<< "total_rows_to_read: " << reply.progress().total_rows_to_read() << '\n'
<< "written_rows: " << reply.progress().written_rows() << '\n'
<< "written_bytes: " << reply.progress().written_bytes() << '\n';
}
else if (!reply.totals().empty())
{
std::cout << "Totals:\n " << id << " " << reply.totals() <<'\n';
}
else if (!reply.extremes().empty())
{
std::cout << "Extremes:\n " << id << " " << reply.extremes() <<'\n';
}
}
if (status.ok() && reply.exception_occured().empty())
{
return "";
}
else if (status.ok() && !reply.exception_occured().empty())
{
return reply.exception_occured();
}
else
{
return "RPC failed";
}
}
private:
std::unique_ptr<GRPCConnection::GRPC::Stub> stub_;
};
int main(int argc, char** argv)
{
GRPCConnection::User userInfo1;
userInfo1.set_user("default");
userInfo1.set_password("");
userInfo1.set_quota("default");
std::cout << "Try: " << argv[1] << std::endl;
grpc::ChannelArguments ch_args;
ch_args.SetMaxReceiveMessageSize(-1);
GRPCClient client(
grpc::CreateCustomChannel(argv[1], grpc::InsecureChannelCredentials(), ch_args));
{
std::cout << client.Query(userInfo1, "CREATE TABLE t (a UInt8) ENGINE = Memory") << std::endl;
std::cout << client.Query(userInfo1, "CREATE TABLE t (a UInt8) ENGINE = Memory") << std::endl;
std::cout << client.Query(userInfo1, "INSERT INTO t VALUES", {"(1),(2),(3)", "(4),(6),(5)"}) << std::endl;
std::cout << client.Query(userInfo1, "INSERT INTO t_not_defined VALUES", {"(1),(2),(3)", "(4),(6),(5)"}) << std::endl;
std::cout << client.Query(userInfo1, "SELECT a FROM t ORDER BY a") << std::endl;
std::cout << client.Query(userInfo1, "DROP TABLE t") << std::endl;
}
{
std::cout << client.Query(userInfo1, "SELECT count() FROM numbers(1)") << std::endl;
std::cout << client.Query(userInfo1, "SELECT 100") << std::endl;
std::cout << client.Query(userInfo1, "SELECT count() FROM numbers(10000000000)") << std::endl;
std::cout << client.Query(userInfo1, "SELECT count() FROM numbers(100)") << std::endl;
}
{
std::cout << client.Query(userInfo1, "CREATE TABLE arrays_test (s String, arr Array(UInt8)) ENGINE = Memory;") << std::endl;
std::cout << client.Query(userInfo1, "INSERT INTO arrays_test VALUES ('Hello', [1,2]), ('World', [3,4,5]), ('Goodbye', []);") << std::endl;
std::cout << client.Query(userInfo1, "SELECT s FROM arrays_test") << std::endl;
std::cout << client.Query(userInfo1, "DROP TABLE arrays_test") << std::endl;
std::cout << client.Query(userInfo1, "") << std::endl;
}
{//Check null return from pipe
std::cout << client.Query(userInfo1, "CREATE TABLE table2 (x UInt8, y UInt8) ENGINE = Memory;") << std::endl;
std::cout << client.Query(userInfo1, "SELECT x FROM table2") << std::endl;
std::cout << client.Query(userInfo1, "DROP TABLE table2") << std::endl;
}
{//Check Totals
std::cout << client.Query(userInfo1, "CREATE TABLE tabl (x UInt8, y UInt8) ENGINE = Memory;") << std::endl;
std::cout << client.Query(userInfo1, "INSERT INTO tabl VALUES (1, 2), (2, 4), (3, 2), (3, 3), (3, 4);") << std::endl;
std::cout << client.Query(userInfo1, "SELECT sum(x), y FROM tabl GROUP BY y WITH TOTALS") << std::endl;
std::cout << client.Query(userInfo1, "DROP TABLE tabl") << std::endl;
}
return 0;
}

View File

@ -1,6 +1,13 @@
include (${CMAKE_CURRENT_SOURCE_DIR}/grpc_protos/grpc_protos.cmake)
set(CLICKHOUSE_SERVER_SOURCES
MetricsTransmitter.cpp
Server.cpp
${PROTO_SRCS}
${PROTO_HDRS}
${GRPC_SRCS}
${GRPC_HDRS}
${CMAKE_CURRENT_SOURCE_DIR}/GRPCHandler.cpp
)
if (OS_LINUX)
@ -24,6 +31,8 @@ set (CLICKHOUSE_SERVER_LINK
PUBLIC
daemon
grpc++
libprotobuf
)
clickhouse_program_add(server)

View File

@ -0,0 +1,399 @@
#include "GRPCHandler.h"
#include <ext/scope_guard.h>
#include <common/getFQDNOrHostName.h>
#include <Common/CurrentThread.h>
#include <Common/SettingsChanges.h>
#include <DataStreams/copyData.h>
#include <DataStreams/InputStreamFromASTInsertQuery.h>
#include <DataStreams/NativeBlockInputStream.h>
#include <DataStreams/NativeBlockOutputStream.h>
#include <DataStreams/AddingDefaultsBlockInputStream.h>
#include <Interpreters/executeQuery.h>
#include <IO/ConcatReadBuffer.h>
#include <IO/ReadBufferFromString.h>
#include <IO/ReadHelpers.h>
#include <Parsers/parseQuery.h>
#include <Parsers/ASTInsertQuery.h>
#include <Parsers/ParserQuery.h>
#include <Storages/IStorage.h>
using GRPCConnection::QueryRequest;
using GRPCConnection::QueryResponse;
using GRPCConnection::GRPC;
namespace DB
{
namespace ErrorCodes
{
extern const int UNKNOWN_DATABASE;
extern const int NO_DATA_TO_INSERT;
}
std::string ParseGrpcPeer(const grpc::ServerContext& context_)
{
String info = context_.peer();
return info.substr(info.find(":") + 1);
}
void CallDataQuery::respond()
{
try
{
switch (status)
{
case START_QUERY:
{
new CallDataQuery(service, notification_cq, new_call_cq, iServer, log);
status = PARSE_QUERY;
responder.Read(&request, (void*)this);
break;
}
case PARSE_QUERY:
{
ParseQuery();
ParseData();
break;
}
case READ_DATA:
{
ReadData();
break;
}
case PROGRESS:
{
ProgressQuery();
break;
}
case FINISH_QUERY:
{
delete this;
}
}
}
catch (...)
{
io.onException();
tryLogCurrentException(log);
std::string exception_message = getCurrentExceptionMessage(with_stacktrace, true);
int exception_code = getCurrentExceptionCode();
response.set_exception_occured(exception_message);
status = FINISH_QUERY;
responder.WriteAndFinish(response, grpc::WriteOptions(), grpc::Status(), (void*)this);
}
}
void CallDataQuery::ParseQuery()
{
LOG_TRACE(log, "Process query");
Poco::Net::SocketAddress user_adress(ParseGrpcPeer(gRPCcontext));
LOG_TRACE(log, "Request: " << request.query_info().query());
std::string user = request.user_info().user();
std::string password = request.user_info().password();
std::string quota_key = request.user_info().quota();
interactive_delay = request.interactive_delay();
format_output = "Values";
if (user.empty())
{
user = "default";
password = "";
}
if (interactive_delay == 0)
interactive_delay = INT_MAX;
context.setProgressCallback([this] (const Progress & value) { return progress.incrementPiecewiseAtomically(value); });
query_context = context;
query_scope.emplace(*query_context);
query_context->setUser(user, password, user_adress);
query_context->setCurrentQueryId(request.query_info().query_id());
if (!quota_key.empty())
query_context->setQuotaKey(quota_key);
if (!request.query_info().format().empty())
{
format_output = request.query_info().format();
query_context->setDefaultFormat(request.query_info().format());
}
if (!request.query_info().database().empty())
{
if (!DatabaseCatalog::instance().isDatabaseExist(request.query_info().database()))
{
Exception e("Database " + request.query_info().database() + " doesn't exist", ErrorCodes::UNKNOWN_DATABASE);
}
query_context->setCurrentDatabase(request.query_info().database());
}
SettingsChanges settings_changes;
for (const auto & [key, value] : request.query_info().settings())
{
settings_changes.push_back({key, value});
}
query_context->checkSettingsConstraints(settings_changes);
query_context->applySettingsChanges(settings_changes);
ClientInfo & client_info = query_context->getClientInfo();
client_info.query_kind = ClientInfo::QueryKind::INITIAL_QUERY;
client_info.interface = ClientInfo::Interface::GRPC;
client_info.initial_user = client_info.current_user;
client_info.initial_query_id = client_info.current_query_id;
client_info.initial_address = client_info.current_address;
}
void CallDataQuery::ParseData()
{
LOG_TRACE(log, "ParseData");
const char * begin = request.query_info().query().data();
const char * end = begin + request.query_info().query().size();
const Settings & settings = query_context->getSettingsRef();
ParserQuery parser(end, settings.enable_debug_queries);
ASTPtr ast = parseQuery(parser, begin, end, "", settings.max_query_size, settings.max_parser_depth);
auto * insert_query = ast->as<ASTInsertQuery>();
auto query_end = end;
if (insert_query && insert_query->data)
{
query_end = insert_query->data;
}
String query(begin, query_end);
io = executeQuery(query, *query_context, false, QueryProcessingStage::Complete, true, true);
if (io.out)
{
if (!insert_query || !(insert_query->data || request.query_info().data_stream() || !request.insert_data().empty()))
{
Exception e("Logical error: query requires data to insert, but it is not INSERT query", ErrorCodes::NO_DATA_TO_INSERT);
}
format_input = insert_query->format;
if (format_input.empty())
format_input = "Values";
if (format_output.empty())
format_output = format_input;
ConcatReadBuffer::ReadBuffers buffers;
std::shared_ptr<ReadBufferFromMemory> data_in_query;
std::shared_ptr<ReadBufferFromMemory> data_in_insert_data;
if (insert_query->data)
{
data_in_query = std::make_shared<ReadBufferFromMemory>(insert_query->data, insert_query->end - insert_query->data);
buffers.push_back(data_in_query.get());
}
if (!request.insert_data().empty())
{
data_in_insert_data = std::make_shared<ReadBufferFromMemory>(request.insert_data().data(), request.insert_data().size());
buffers.push_back(data_in_insert_data.get());
}
auto input_buffer_contacenated = std::make_unique<ConcatReadBuffer>(buffers);
auto res_stream = query_context->getInputFormat(format_input, *input_buffer_contacenated, io.out->getHeader(), query_context->getSettings().max_insert_block_size);
auto table_id = query_context->resolveStorageID(insert_query->table_id, Context::ResolveOrdinary);
if (query_context->getSettingsRef().input_format_defaults_for_omitted_fields && table_id)
{
StoragePtr storage = DatabaseCatalog::instance().getTable(table_id);
auto column_defaults = storage->getColumns().getDefaults();
if (!column_defaults.empty())
res_stream = std::make_shared<AddingDefaultsBlockInputStream>(res_stream, column_defaults, *query_context);
}
io.out->writePrefix();
while (auto block = res_stream->read())
io.out->write(block);
if (request.query_info().data_stream())
{
status = READ_DATA;
responder.Read(&request, (void*)this);
return;
}
io.out->writeSuffix();
}
ExecuteQuery();
}
void CallDataQuery::ReadData()
{
if (request.insert_data().empty())
{
io.out->writeSuffix();
ExecuteQuery();
}
else
{
const char * begin = request.insert_data().data();
const char * end = begin + request.insert_data().size();
ReadBufferFromMemory data_in(begin, end - begin);
auto res_stream = query_context->getInputFormat(format_input, data_in, io.out->getHeader(), query_context->getSettings().max_insert_block_size);
while (auto block = res_stream->read())
io.out->write(block);
responder.Read(&request, (void*)this);
}
}
void CallDataQuery::ExecuteQuery()
{
LOG_TRACE(log, "Execute Query");
if (io.pipeline.initialized())
{
query_watch.start();
progress_watch.start();
executor = std::make_shared<PullingPipelineExecutor>(io.pipeline);
ProgressQuery();
}
else
{
FinishQuery();
}
}
void CallDataQuery::ProgressQuery()
{
status = PROGRESS;
bool sent = false;
Block block;
while (executor->pull(block, query_watch.elapsedMilliseconds()))
{
if (block)
{
if (!io.null_format)
sent = sendData(block);
break;
}
if (progress_watch.elapsedMilliseconds() >= interactive_delay)
{
progress_watch.restart();
sent = sendProgress();
break;
}
query_watch.restart();
}
if (!sent)
{
SendDetails();
}
}
void CallDataQuery::SendDetails()
{
bool sent = false;
while (!sent)
{
switch (detailsStatus)
{
case SEND_TOTALS:
{
sent = sendTotals(executor->getTotalsBlock());
detailsStatus = SEND_EXTREMES;
break;
}
case SEND_EXTREMES:
{
sent = sendExtremes(executor->getExtremesBlock());
detailsStatus = FINISH;
break;
}
case FINISH:
{
sent = true;
FinishQuery();
}
}
}
}
void CallDataQuery::FinishQuery()
{
io.onFinish();
query_scope->logPeakMemoryUsage();
status = FINISH_QUERY;
out->finalize();
}
bool CallDataQuery::sendData(const Block & block)
{
out->setResponse([](const String& buffer)
{
QueryResponse tmp_response;
tmp_response.set_output(buffer);
return tmp_response;
});
auto my_block_out_stream = query_context->getOutputFormat(format_output, *out, block);
my_block_out_stream->write(block);
my_block_out_stream->flush();
out->next();
return true;
}
bool CallDataQuery::sendProgress()
{
auto grpcProgress = [](const String& buffer)
{
auto in = std::make_unique<ReadBufferFromString>(buffer);
ProgressValues progressValues;
progressValues.read(*in, DBMS_MIN_REVISION_WITH_CLIENT_WRITE_INFO);
GRPCConnection::Progress progress;
progress.set_read_rows(progressValues.read_rows);
progress.set_read_bytes(progressValues.read_bytes);
progress.set_total_rows_to_read(progressValues.total_rows_to_read);
progress.set_written_rows(progressValues.written_rows);
progress.set_written_bytes(progressValues.written_bytes);
return progress;
};
out->setResponse([&grpcProgress](const String& buffer)
{
QueryResponse tmp_response;
auto progress = std::make_unique<GRPCConnection::Progress>(grpcProgress(buffer));
tmp_response.set_allocated_progress(progress.release());
return tmp_response;
});
auto increment = progress.fetchAndResetPiecewiseAtomically();
increment.write(*out, DBMS_MIN_REVISION_WITH_CLIENT_WRITE_INFO);
out->next();
return true;
}
bool CallDataQuery::sendTotals(const Block & totals)
{
if (totals)
{
out->setResponse([](const String& buffer)
{
QueryResponse tmp_response;
tmp_response.set_totals(buffer);
return tmp_response;
});
auto my_block_out_stream = query_context->getOutputFormat(format_output, *out, totals);
my_block_out_stream->write(totals);
my_block_out_stream->flush();
out->next();
return true;
}
return false;
}
bool CallDataQuery::sendExtremes(const Block & extremes)
{
if (extremes)
{
out->setResponse([](const String& buffer)
{
QueryResponse tmp_response;
tmp_response.set_extremes(buffer);
return tmp_response;
});
auto my_block_out_stream = query_context->getOutputFormat(format_output, *out, extremes);
my_block_out_stream->write(extremes);
my_block_out_stream->flush();
out->next();
return true;
}
return false;
}
}

View File

@ -0,0 +1,209 @@
#pragma once
#include <atomic>
#include <iostream>
#include <string>
#include <memory>
#include <mutex>
#include <Common/Stopwatch.h>
#include <DataStreams/BlockIO.h>
#include <IO/Progress.h>
#include <IO/WriteBufferFromString.h>
#include "IServer.h"
#include <Poco/RunnableAdapter.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <grpc++/server.h>
#include <grpc++/server_builder.h>
#include <grpc++/server_context.h>
#include "GrpcConnection.grpc.pb.h"
#include "WriteBufferFromGRPC.h"
using GRPCConnection::QueryRequest;
using GRPCConnection::QueryResponse;
using GRPCConnection::GRPC;
namespace DB
{
class CommonCallData
{
public:
GRPC::AsyncService* service;
grpc::ServerCompletionQueue* notification_cq;
grpc::ServerCompletionQueue* new_call_cq;
grpc::ServerContext gRPCcontext;
IServer* iServer;
bool with_stacktrace = false;
Poco::Logger * log;
std::unique_ptr<CommonCallData> next_client;
public:
explicit CommonCallData(GRPC::AsyncService* Service_, grpc::ServerCompletionQueue* notification_cq_, grpc::ServerCompletionQueue* new_call_cq_, IServer* iServer_, Poco::Logger * log_)
: service(Service_), notification_cq(notification_cq_), new_call_cq(new_call_cq_), iServer(iServer_), log(log_)
{}
virtual ~CommonCallData()
{}
virtual void respond() = 0;
};
class CallDataQuery : public CommonCallData
{
public:
CallDataQuery(GRPC::AsyncService* Service_, grpc::ServerCompletionQueue* notification_cq_, grpc::ServerCompletionQueue* new_call_cq_, IServer* server_, Poco::Logger * log_)
: CommonCallData(Service_, notification_cq_, new_call_cq_, server_, log_), responder(&gRPCcontext), context(iServer->context())
{
detailsStatus = SEND_TOTALS;
status = START_QUERY;
out = std::make_shared<WriteBufferFromGRPC>(&responder, (void*)this, nullptr);
service->RequestQuery(&gRPCcontext, &responder, new_call_cq, notification_cq, this);
}
void ParseQuery();
void ParseData();
void ReadData();
void ExecuteQuery();
void ProgressQuery();
void FinishQuery();
enum DetailsStatus
{
SEND_TOTALS,
SEND_EXTREMES,
SEND_PROFILEINFO,
FINISH
};
void SendDetails();
bool sendData(const Block & block);
bool sendProgress();
bool sendTotals(const Block & totals);
bool sendExtremes(const Block & block);
enum Status
{
START_QUERY,
PARSE_QUERY,
READ_DATA,
PROGRESS,
FINISH_QUERY
};
virtual void respond() override;
virtual ~CallDataQuery() override
{
query_watch.stop();
progress_watch.stop();
query_context.reset();
query_scope.reset();
}
private:
QueryRequest request;
QueryResponse response;
grpc::ServerAsyncReaderWriter<QueryResponse, QueryRequest> responder;
Stopwatch progress_watch;
Stopwatch query_watch;
Progress progress;
DetailsStatus detailsStatus;
Status status;
BlockIO io;
Context context;
std::shared_ptr<PullingPipelineExecutor> executor;
std::optional<Context> query_context;
std::shared_ptr<WriteBufferFromGRPC> out;
String format_output;
String format_input;
uint64_t interactive_delay;
std::optional<CurrentThread::QueryScope> query_scope;
};
class GRPCServer final : public Poco::Runnable
{
public:
GRPCServer(const GRPCServer &handler) = delete;
GRPCServer(GRPCServer &&handler) = delete;
GRPCServer(std::string server_address_, IServer & server_) : iServer(server_), log(&Poco::Logger::get("GRPCHandler"))
{
grpc::ServerBuilder builder;
builder.AddListeningPort(server_address_, grpc::InsecureServerCredentials());
//keepalive pings default values
builder.RegisterService(&service);
builder.SetMaxReceiveMessageSize(INT_MAX);
notification_cq = builder.AddCompletionQueue();
new_call_cq = builder.AddCompletionQueue();
server = builder.BuildAndStart();
}
void stop()
{
server->Shutdown();
notification_cq->Shutdown();
new_call_cq->Shutdown();
}
virtual void run() override
{
HandleRpcs();
}
void HandleRpcs()
{
new CallDataQuery(&service, notification_cq.get(), new_call_cq.get(), &iServer, log);
// rpc event "read done / write done / close(already connected)" call-back by this completion queue
auto handle_calls_completion = [&]()
{
void* tag;
bool ok;
while (true)
{
GPR_ASSERT(new_call_cq->Next(&tag, &ok));
if (!ok)
{
LOG_WARNING(log, "Client has gone away.");
delete static_cast<CallDataQuery*>(tag);
continue;
}
auto thread = ThreadFromGlobalPool{&CallDataQuery::respond, static_cast<CallDataQuery*>(tag)};
thread.detach();
}
};
// rpc event "new connection / close(waiting for connect)" call-back by this completion queue
auto handle_requests_completion = [&]
{
void* tag;
bool ok;
while (true)
{
GPR_ASSERT(notification_cq->Next(&tag, &ok));
if (!ok)
{
LOG_WARNING(log, "Client has gone away.");
delete static_cast<CallDataQuery*>(tag);
continue;
}
auto thread = ThreadFromGlobalPool{&CallDataQuery::respond, static_cast<CallDataQuery*>(tag)};
thread.detach();
}
};
auto notification_cq_thread = ThreadFromGlobalPool{handle_requests_completion};
auto new_call_cq_thread = ThreadFromGlobalPool{handle_calls_completion};
notification_cq_thread.detach();
new_call_cq_thread.detach();
}
private:
IServer & iServer;
Poco::Logger * log;
std::unique_ptr<grpc::ServerCompletionQueue> notification_cq;
std::unique_ptr<grpc::ServerCompletionQueue> new_call_cq;
GRPC::AsyncService service;
std::unique_ptr<grpc::Server> server;
std::string server_address;
};
}

View File

@ -64,6 +64,7 @@
#include <Common/ThreadFuzzer.h>
#include <Server/MySQLHandlerFactory.h>
#include <Server/PostgreSQLHandlerFactory.h>
#include "GRPCHandler.h"
#if !defined(ARCADIA_BUILD)
@ -817,7 +818,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
listen_hosts.emplace_back("127.0.0.1");
listen_try = true;
}
std::vector<std::unique_ptr<GRPCServer>> gRPCServers;
auto make_socket_address = [&](const std::string & host, UInt16 port)
{
Poco::Net::SocketAddress socket_address;
@ -1035,6 +1036,13 @@ int Server::main(const std::vector<std::string> & /*args*/)
LOG_INFO(log, "Listening for PostgreSQL compatibility protocol: " + address.toString());
});
create_server("grpc_port", [&](UInt16 port)
{
Poco::Net::SocketAddress server_address(listen_host, port);
gRPCServers.emplace_back(new GRPCServer(server_address.toString(), *this));
LOG_INFO(log, "Listening for gRPC protocol: " + server_address.toString());
});
/// Prometheus (if defined and not setup yet with http_port)
create_server("prometheus.port", [&](UInt16 port)
{
@ -1057,6 +1065,11 @@ int Server::main(const std::vector<std::string> & /*args*/)
for (auto & server : servers)
server->start();
for (auto & server : gRPCServers)
{
if (server)
server_pool.start(*server);
}
{
String level_str = config().getString("text_log.level", "");
@ -1091,6 +1104,11 @@ int Server::main(const std::vector<std::string> & /*args*/)
server->stop();
current_connections += server->currentConnections();
}
for (auto & server : gRPCServers)
{
if (server)
server->stop();
}
if (current_connections)
LOG_INFO(log, "Closed all listening sockets. Waiting for {} outstanding connections.", current_connections);

View File

@ -0,0 +1,66 @@
#pragma once
#include <IO/WriteBuffer.h>
#include <IO/BufferWithOwnMemory.h>
#include "GrpcConnection.grpc.pb.h"
#include <grpc++/server.h>
using GRPCConnection::QueryRequest;
using GRPCConnection::QueryResponse;
using GRPCConnection::GRPC;
namespace DB
{
class WriteBufferFromGRPC : public BufferWithOwnMemory<WriteBuffer>
{
protected:
grpc::ServerAsyncReaderWriter<QueryResponse, QueryRequest>* responder;
void* tag;
bool progress = false;
bool finished = false;
std::function<QueryResponse(const String& buffer)> setResposeDetails;
void nextImpl() override
{
progress = true;
String buffer(working_buffer.begin(), working_buffer.begin() + offset());
auto response = setResposeDetails(buffer);
responder->Write(response, tag);
}
public:
WriteBufferFromGRPC(grpc::ServerAsyncReaderWriter<QueryResponse, QueryRequest>* responder_, void* tag_, std::function<QueryResponse(const String& buffer)> setResposeDetails_)
: responder(responder_), tag(tag_), setResposeDetails(setResposeDetails_)
{}
~WriteBufferFromGRPC() override {}
bool onProgress()
{
return progress;
}
bool isFinished()
{
return finished;
}
void setFinish(bool fl)
{
finished = fl;
}
void setResponse(std::function<QueryResponse(const String& buffer)> function)
{
setResposeDetails = function;
}
void finalize() override
{
progress = false;
finished = true;
responder->Finish(grpc::Status(), tag);
}
};
}

View File

@ -0,0 +1,43 @@
syntax = "proto3";
package GRPCConnection;
message User {
string user = 1;
string password = 2;
string quota = 3;
}
message QuerySettings {
string query = 1;
string query_id = 2;
bool data_stream = 4;
string database = 5;
string format = 6;
map<string, string> settings = 7;
}
message QueryRequest {
User user_info = 1;
QuerySettings query_info = 2;
string insert_data = 3;
uint64 interactive_delay = 4;
}
message Progress {
uint64 read_rows = 1;
uint64 read_bytes = 2;
uint64 total_rows_to_read = 3;
uint64 written_rows = 4;
uint64 written_bytes = 5;
}
message QueryResponse {
string output = 1;
string exception_occured = 2;
Progress progress = 3;
string totals = 4;
string extremes = 5;
}
service GRPC {
rpc Query(stream QueryRequest) returns (stream QueryResponse) {}
}

View File

@ -0,0 +1,7 @@
include_directories(${CMAKE_CURRENT_SOURCE_DIR}/etcd_protos)
get_filename_component(rpc_proto "${CMAKE_CURRENT_SOURCE_DIR}/grpc_protos/GrpcConnection.proto" ABSOLUTE)
include_directories(${Protobuf_INCLUDE_DIRS})
include_directories(${CMAKE_CURRENT_BINARY_DIR})
protobuf_generate_cpp(PROTO_SRCS PROTO_HDRS ${rpc_proto})
PROTOBUF_GENERATE_GRPC_CPP(GRPC_SRCS GRPC_HDRS ${rpc_proto})

View File

@ -25,6 +25,7 @@ public:
{
TCP = 1,
HTTP = 2,
GRPC = 3,
};
enum class HTTPMethod : uint8_t

View File

@ -0,0 +1,20 @@
<?xml version="1.0"?>
<yandex>
<logger>
<level>trace</level>
<log>/var/log/clickhouse-server/clickhouse-server.log</log>
<errorlog>/var/log/clickhouse-server/clickhouse-server.err.log</errorlog>
<size>1000M</size>
<count>10</count>
</logger>
<tcp_port>9000</tcp_port>
<grpc_port>9001</grpc_port>
<listen_host>127.0.0.1</listen_host>
<max_concurrent_queries>500</max_concurrent_queries>
<mark_cache_size>5368709120</mark_cache_size>
<path>./clickhouse/</path>
<users_config>users.xml</users_config>
</yandex>

View File

@ -0,0 +1,23 @@
<?xml version="1.0"?>
<yandex>
<profiles>
<default>
</default>
</profiles>
<users>
<default>
<password>123</password>
<networks incl="networks" replace="replace">
<ip>::/0</ip>
</networks>
<profile>default</profile>
<quota>default</quota>
</default>
</users>
<quotas>
<default>
</default>
</quotas>
</yandex>

View File

@ -0,0 +1 @@
../../../../programs/server/grpc_protos/GrpcConnection.proto

View File

@ -0,0 +1,92 @@
# coding: utf-8
# proto file should be the same, as in server GRPC
import os
import pytest
import subprocess
import grpc
from docker.models.containers import Container
from helpers.cluster import ClickHouseCluster
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
proto_dir = os.path.join(SCRIPT_DIR, './protos')
try:
subprocess.check_call(
'python -m grpc_tools.protoc -I{proto_path} --python_out=. --grpc_python_out=. \
{proto_path}/GrpcConnection.proto'.format(proto_path=proto_dir), shell=True)
except subprocess.CalledProcessError, e:
print "Please, copy proto file, can be programs/server/grpc_proto/GrpcConnection.proto"
assert False
finally:
import GrpcConnection_pb2
import GrpcConnection_pb2_grpc
config_dir = os.path.join(SCRIPT_DIR, './configs')
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance('node', config_dir=config_dir, env_variables={'UBSAN_OPTIONS': 'print_stacktrace=1'})
server_port = 9001
@pytest.fixture(scope="module")
def server_address():
cluster.start()
try:
yield cluster.get_instance_ip('node')
finally:
cluster.shutdown()
def Query(server_address_and_port, query, mode="output", insert_data=[]):
output = []
totals = []
data_stream = (len(insert_data) != 0)
with grpc.insecure_channel(server_address_and_port) as channel:
stub = GrpcConnection_pb2_grpc.GRPCStub(channel)
def write_query():
user_info = GrpcConnection_pb2.User(user="default", password='123', quota='default')
query_info = GrpcConnection_pb2.QuerySettings(query=query, query_id='123', data_stream=data_stream, format='TabSeparated')
yield GrpcConnection_pb2.QueryRequest(user_info=user_info, query_info=query_info)
if data_stream:
for data in insert_data:
yield GrpcConnection_pb2.QueryRequest(insert_data=data)
yield GrpcConnection_pb2.QueryRequest(insert_data="")
for response in stub.Query(write_query(), 10.0):
output += response.output.split()
totals += response.totals.split()
if mode == "output":
return output
elif mode == "totals":
return totals
def test_ordinary_query(server_address):
server_address_and_port = server_address + ':' + str(server_port)
assert Query(server_address_and_port, "SELECT 1") == [u'1']
assert Query(server_address_and_port, "SELECT count() FROM numbers(100)") == [u'100']
def test_query_insert(server_address):
server_address_and_port = server_address + ':' + str(server_port)
assert Query(server_address_and_port, "CREATE TABLE t (a UInt8) ENGINE = Memory") == []
assert Query(server_address_and_port, "INSERT INTO t VALUES (1),(2),(3)") == []
assert Query(server_address_and_port, "INSERT INTO t FORMAT TabSeparated 4\n5\n6\n") == []
assert Query(server_address_and_port, "INSERT INTO t FORMAT TabSeparated 10\n11\n12\n") == []
assert Query(server_address_and_port, "SELECT a FROM t ORDER BY a") == [u'1', u'2', u'3', u'4', u'5', u'6', u'10', u'11', u'12']
assert Query(server_address_and_port, "DROP TABLE t") == []
def test_handle_mistakes(server_address):
server_address_and_port = server_address + ':' + str(server_port)
assert Query(server_address_and_port, "") == []
assert Query(server_address_and_port, "CREATE TABLE t (a UInt8) ENGINE = Memory") == []
assert Query(server_address_and_port, "CREATE TABLE t (a UInt8) ENGINE = Memory") == []
def test_totals(server_address):
server_address_and_port = server_address + ':' + str(server_port)
assert Query(server_address_and_port, "") == []
assert Query(server_address_and_port, "CREATE TABLE tabl (x UInt8, y UInt8) ENGINE = Memory;") == []
assert Query(server_address_and_port, "INSERT INTO tabl VALUES (1, 2), (2, 4), (3, 2), (3, 3), (3, 4);") == []
assert Query(server_address_and_port, "SELECT sum(x), y FROM tabl GROUP BY y WITH TOTALS") == [u'4', u'2', u'3', u'3', u'5', u'4']
assert Query(server_address_and_port, "SELECT sum(x), y FROM tabl GROUP BY y WITH TOTALS", mode="totals") == [u'12', u'0']
def test_query_insert(server_address):
server_address_and_port = server_address + ':' + str(server_port)
assert Query(server_address_and_port, "CREATE TABLE t (a UInt8) ENGINE = Memory") == []
assert Query(server_address_and_port, "INSERT INTO t VALUES", insert_data=["(1),(2),(3)", "(5),(4),(6)", "(8),(7),(9)"]) == []
assert Query(server_address_and_port, "SELECT a FROM t ORDER BY a") == [u'1', u'2', u'3', u'4', u'5', u'6', u'7', u'8', u'9']
assert Query(server_address_and_port, "DROP TABLE t") == []