Merge pull request #17435 from vitlibar/grpc-protocol-2

Implement GRPC protocol (corrections)
This commit is contained in:
Nikita Mikhaylov 2020-11-27 15:19:50 +03:00 committed by GitHub
commit 0b6f5c75b9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 19 additions and 212 deletions

View File

@ -180,8 +180,6 @@ add_subdirectory (obfuscator)
add_subdirectory (install)
add_subdirectory (git-import)
#add_subdirectory (grpc-client)
if (ENABLE_CLICKHOUSE_ODBC_BRIDGE)
add_subdirectory (odbc-bridge)
endif ()

View File

@ -1,7 +0,0 @@
include_directories(${CMAKE_CURRENT_BINARY_DIR})
get_filename_component(rpc_proto "${CMAKE_CURRENT_SOURCE_DIR}/../server/grpc_protos/GrpcConnection.proto" ABSOLUTE)
protobuf_generate_cpp(PROTO_SRCS PROTO_HDRS ${rpc_proto})
PROTOBUF_GENERATE_GRPC_CPP(GRPC_SRCS GRPC_HDRS ${rpc_proto})
add_executable(grpc-client grpc_client.cpp ${PROTO_SRCS} ${PROTO_HDRS} ${GRPC_SRCS} ${GRPC_HDRS})
target_link_libraries(grpc-client PUBLIC grpc++ PUBLIC libprotobuf PUBLIC daemon)

View File

@ -1,173 +0,0 @@
#include <iostream>
#include <memory>
#include <string>
#include <vector>
#include <thread>
#include <stdlib.h>
#include <grpc++/channel.h>
#include <grpc++/client_context.h>
#include <grpc++/create_channel.h>
#include <grpc++/security/credentials.h>
#include "GrpcConnection.grpc.pb.h"
class GRPCClient
{
public:
explicit GRPCClient(std::shared_ptr<grpc::Channel> channel)
: stub_(GRPCConnection::GRPC::NewStub(channel))
{}
std::string Query(const GRPCConnection::User& userInfo,
const std::string& query,
std::vector<std::string> insert_data = {})
{
GRPCConnection::QueryRequest request;
grpc::Status status;
GRPCConnection::QueryResponse reply;
grpc::ClientContext context;
auto deadline = std::chrono::system_clock::now() + std::chrono::milliseconds(10000);
context.set_deadline(deadline);
auto user = std::make_unique<GRPCConnection::User>(userInfo);
auto querySettigs = std::make_unique<GRPCConnection::QuerySettings>();
int id = rand();
request.set_allocated_user_info(user.release());
// interactive_delay in miliseconds
request.set_interactive_delay(1000);
querySettigs->set_query(query);
querySettigs->set_format("Values");
querySettigs->set_query_id(std::to_string(id));
querySettigs->set_data_stream((insert_data.size() != 0));
(*querySettigs->mutable_settings())["max_query_size"] ="100";
request.set_allocated_query_info(querySettigs.release());
void* got_tag = (void*)1;
bool ok = false;
std::unique_ptr<grpc::ClientReaderWriter<GRPCConnection::QueryRequest, GRPCConnection::QueryResponse> > reader(stub_->Query(&context));
reader->Write(request);
auto write = [&reply, &reader, &insert_data]()
{
GRPCConnection::QueryRequest request_insert;
for (const auto& data : insert_data)
{
request_insert.set_insert_data(data);
if (reply.exception_occured().empty())
{
reader->Write(request_insert);
}
else
{
break;
}
}
request_insert.set_insert_data("");
if (reply.exception_occured().empty())
{
reader->Write(request_insert);
}
// reader->WritesDone();
};
std::thread write_thread(write);
write_thread.detach();
while (reader->Read(&reply))
{
if (!reply.output().empty())
{
std::cout << "Query Part:\n " << id<< reply.output()<<'\n';
}
else if (reply.progress().read_rows()
|| reply.progress().read_bytes()
|| reply.progress().total_rows_to_read()
|| reply.progress().written_rows()
|| reply.progress().written_bytes())
{
std::cout << "Progress " << id<< ":{\n" << "read_rows: " << reply.progress().read_rows() << '\n'
<< "read_bytes: " << reply.progress().read_bytes() << '\n'
<< "total_rows_to_read: " << reply.progress().total_rows_to_read() << '\n'
<< "written_rows: " << reply.progress().written_rows() << '\n'
<< "written_bytes: " << reply.progress().written_bytes() << '\n';
}
else if (!reply.totals().empty())
{
std::cout << "Totals:\n " << id << " " << reply.totals() <<'\n';
}
else if (!reply.extremes().empty())
{
std::cout << "Extremes:\n " << id << " " << reply.extremes() <<'\n';
}
}
if (status.ok() && reply.exception_occured().empty())
{
return "";
}
else if (status.ok() && !reply.exception_occured().empty())
{
return reply.exception_occured();
}
else
{
return "RPC failed";
}
}
private:
std::unique_ptr<GRPCConnection::GRPC::Stub> stub_;
};
int main(int argc, char** argv)
{
GRPCConnection::User userInfo1;
userInfo1.set_user("default");
userInfo1.set_password("");
userInfo1.set_quota("default");
std::cout << "Try: " << argv[1] << std::endl;
grpc::ChannelArguments ch_args;
ch_args.SetMaxReceiveMessageSize(-1);
GRPCClient client(
grpc::CreateCustomChannel(argv[1], grpc::InsecureChannelCredentials(), ch_args));
{
std::cout << client.Query(userInfo1, "CREATE TABLE t (a UInt8) ENGINE = Memory") << std::endl;
std::cout << client.Query(userInfo1, "CREATE TABLE t (a UInt8) ENGINE = Memory") << std::endl;
std::cout << client.Query(userInfo1, "INSERT INTO t VALUES", {"(1),(2),(3)", "(4),(6),(5)"}) << std::endl;
std::cout << client.Query(userInfo1, "INSERT INTO t_not_defined VALUES", {"(1),(2),(3)", "(4),(6),(5)"}) << std::endl;
std::cout << client.Query(userInfo1, "SELECT a FROM t ORDER BY a") << std::endl;
std::cout << client.Query(userInfo1, "DROP TABLE t") << std::endl;
}
{
std::cout << client.Query(userInfo1, "SELECT count() FROM numbers(1)") << std::endl;
std::cout << client.Query(userInfo1, "SELECT 100") << std::endl;
std::cout << client.Query(userInfo1, "SELECT count() FROM numbers(10000000000)") << std::endl;
std::cout << client.Query(userInfo1, "SELECT count() FROM numbers(100)") << std::endl;
}
{
std::cout << client.Query(userInfo1, "CREATE TABLE arrays_test (s String, arr Array(UInt8)) ENGINE = Memory;") << std::endl;
std::cout << client.Query(userInfo1, "INSERT INTO arrays_test VALUES ('Hello', [1,2]), ('World', [3,4,5]), ('Goodbye', []);") << std::endl;
std::cout << client.Query(userInfo1, "SELECT s FROM arrays_test") << std::endl;
std::cout << client.Query(userInfo1, "DROP TABLE arrays_test") << std::endl;
std::cout << client.Query(userInfo1, "") << std::endl;
}
{//Check null return from pipe
std::cout << client.Query(userInfo1, "CREATE TABLE table2 (x UInt8, y UInt8) ENGINE = Memory;") << std::endl;
std::cout << client.Query(userInfo1, "SELECT x FROM table2") << std::endl;
std::cout << client.Query(userInfo1, "DROP TABLE table2") << std::endl;
}
{//Check Totals
std::cout << client.Query(userInfo1, "CREATE TABLE tabl (x UInt8, y UInt8) ENGINE = Memory;") << std::endl;
std::cout << client.Query(userInfo1, "INSERT INTO tabl VALUES (1, 2), (2, 4), (3, 2), (3, 3), (3, 4);") << std::endl;
std::cout << client.Query(userInfo1, "SELECT sum(x), y FROM tabl GROUP BY y WITH TOTALS") << std::endl;
std::cout << client.Query(userInfo1, "DROP TABLE tabl") << std::endl;
}
return 0;
}

View File

@ -135,20 +135,20 @@
<keep_alive_timeout>3</keep_alive_timeout>
<!-- gRPC protocol (see src/Server/grpc_protos/clickhouse_grpc.proto for the API)
<grpc_port>9001</grpc_port>
<grpc_port>9100</grpc_port>
<grpc>
<enable_ssl>true</enable_ssl> -->
<!-- The following two files are used only if enable_ssl=1
<ssl_cert_file>/path/to/ssl_cert_file</ssl_cert_file>
<ssl_key_file>/path/to/ssl_key_file</ssl_key_file> -->
<!-- Whether server will request client for a certificate
<ssl_require_client_auth>true</ssl_require_client_auth> -->
<!-- The following file is used only if ssl_require_client_auth=1
<ssl_ca_cert_file>/path/to/ssl_ca_cert_file</ssl_ca_cert_file> -->
<!-- Default compression algorithm (applied if client doesn't specify another algorithm).
Supported algorithms: none, deflate, gzip, stream_gzip
<compression>gzip</compression> -->

View File

@ -1,13 +0,0 @@
#pragma once
// .h autogenerated by cmake!
#define USE_ICU 1
#define USE_MYSQL 1
#define USE_RDKAFKA 1
#define USE_AMQPCPP 1
#define USE_EMBEDDED_COMPILER 0
#define USE_INTERNAL_LLVM_LIBRARY 0
#define USE_SSL 1
#define USE_OPENCL 0
#define USE_LDAP 1

View File

@ -1,3 +1,3 @@
<yandex>
<grpc_port>9001</grpc_port>
<grpc_port>9100</grpc_port>
</yandex>

View File

@ -29,7 +29,7 @@ import clickhouse_grpc_pb2_grpc
config_dir = os.path.join(SCRIPT_DIR, './configs')
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance('node', main_configs=['configs/grpc_port.xml'])
grpc_port = 9001
grpc_port = 9100
main_channel = None
def create_channel():

View File

@ -1,18 +1,18 @@
<yandex>
<grpc_port>9001</grpc_port>
<grpc_port>9100</grpc_port>
<grpc>
<enable_ssl>true</enable_ssl>
<!-- The following two files are used only if enable_ssl=1 -->
<ssl_cert_file>/etc/clickhouse-server/config.d/server-cert.pem</ssl_cert_file>
<ssl_key_file>/etc/clickhouse-server/config.d/server-key.pem</ssl_key_file>
<!-- Whether server will request client for a certificate -->
<ssl_require_client_auth>true</ssl_require_client_auth>
<!-- The following file is used only if ssl_require_client_auth=1 -->
<ssl_ca_cert_file>/etc/clickhouse-server/config.d/ca-cert.pem</ssl_ca_cert_file>
<!-- Default compression algorithm (applied if client doesn't specify another algorithm).
Supported algorithms: none, deflate, gzip, stream_gzip -->
<compression>gzip</compression>

View File

@ -25,7 +25,7 @@ import clickhouse_grpc_pb2_grpc
# Utilities
node_ip = '10.5.172.77' # It's important for the node to work at this IP because 'server-cert.pem' requires that (see server-ext.cnf).
grpc_port = 9001
grpc_port = 9100
node_ip_with_grpc_port = node_ip + ':' + str(grpc_port)
config_dir = os.path.join(SCRIPT_DIR, './configs')
cluster = ClickHouseCluster(__file__)

View File

@ -10,10 +10,12 @@
# Most of the command line options are the same, for more information type
# ./clickhouse_grpc_client.py --help
import argparse, cmd, os, signal, subprocess, sys, threading, time, uuid, grpc
import grpc # pip3 install grpcio
import grpc_tools # pip3 install grpcio-tools
import argparse, cmd, os, signal, subprocess, sys, threading, time, uuid
default_host = 'localhost'
default_port = 9001
default_port = 9100
default_user_name = 'default'
default_output_format_for_interactive_mode = 'PrettyCompact'
history_filename = '~/.clickhouse_grpc_history'
@ -196,7 +198,7 @@ class ClickHouseGRPCClient(cmd.Cmd):
errors = p.stderr.read().decode().strip('\n').split('\n')
only_warnings = all(('Warning' in error) for error in errors)
if not only_warnings:
error_print(errors.join('\n'))
error_print('\n'.join(errors))
# Import the generated *pb2.py files.
@staticmethod
@ -261,7 +263,7 @@ def main(args):
parser = argparse.ArgumentParser(description='ClickHouse client accessing server through gRPC protocol.', add_help=False)
parser.add_argument('--help', help='Show this help message and exit', action='store_true')
parser.add_argument('--host', '-h', help='The server name, localhost by default. You can use either the name or the IPv4 or IPv6 address.', default='localhost')
parser.add_argument('--port', help='The port to connect to. This port should be enabled on the ClickHouse server (see grpc_port in the config).', default=9001)
parser.add_argument('--port', help='The port to connect to. This port should be enabled on the ClickHouse server (see grpc_port in the config).', default=9100)
parser.add_argument('--user', '-u', dest='user_name', help='The username. Default value: default.', default='default')
parser.add_argument('--password', help='The password. Default value: empty string.', default='')
parser.add_argument('--query', '-q', help='The query to process when using non-interactive mode.', default='')