This commit is contained in:
Nikita Mikhaylov 2021-03-13 02:47:20 +03:00
parent 97611faad0
commit 081ea84a41
6 changed files with 138 additions and 11 deletions

View File

@ -423,6 +423,7 @@ endif ()
if (USE_GRPC)
dbms_target_link_libraries (PUBLIC clickhouse_grpc_protos)
dbms_target_link_libraries (PUBLIC clickhouse_s3_task_server_protos)
endif()
if (USE_HDFS)

104
src/Server/S3TaskServer.h Normal file
View File

@ -0,0 +1,104 @@
#pragma once
#include <unordered_map>
#if !defined(ARCADIA_BUILD)
#include <Common/config.h>
#endif
#if USE_GRPC
#include <Poco/Net/SocketAddress.h>
#include "clickhouse_s3_task_server.grpc.pb.h"
#include <grpcpp/grpcpp.h>
#include <grpcpp/health_check_service_interface.h>
#include <grpcpp/ext/proto_server_reflection_plugin.h>
#include <vector>
#include <string>
#include <optional>
using grpc::Server;
using grpc::ServerBuilder;
using grpc::ServerContext;
using grpc::Status;
using clickhouse::s3_task_server::S3TaskServer;
using clickhouse::s3_task_server::S3TaskRequest;
using clickhouse::s3_task_server::S3TaskReply;
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
class S3Task
{
public:
S3Task() = delete;
explicit S3Task(std::vector<std::string> && paths_)
: paths(std::move(paths_))
{}
std::optional<std::string> getNext() {
static size_t next = 0;
if (next >= paths.size())
return std::nullopt;
const auto result = paths[next];
++next;
return result;
}
private:
std::vector<std::string> paths;
};
// Logic and data behind the server's behavior.
class S3TaskServer final : public S3TaskServer::Service {
Status GetNext(ServerContext* context, const S3TaskRequest* request, S3TaskReply* reply) override {
std::string prefix("Hello");
const auto query_id = request->query_id();
auto it = handlers.find(query_id);
if (it == handlers.end()) {
reply->set_message("");
reply->set_error(ErrorCodes::LOGICAL_ERROR);
return Status::CANCELLED;
}
reply->set_error(0);
reply->set_message(it->second.getNext());
return Status::OK;
}
private:
std::unordered_map<std::string, S3Task> handlers;
};
void RunServer() {
std::string server_address("0.0.0.0:50051");
static S3TaskServer service;
grpc::EnableDefaultHealthCheckService(true);
grpc::reflection::InitProtoReflectionServerBuilderPlugin();
ServerBuilder builder;
// Listen on the given address without any authentication mechanism.
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
// Register "service" as the instance through which we'll communicate with
// clients. In this case it corresponds to an *synchronous* service.
builder.RegisterService(&service);
// Finally assemble the server.
std::unique_ptr<Server> server(builder.BuildAndStart());
std::cout << "Server listening on " << server_address << std::endl;
server->Wait();
}
}
#endif

View File

@ -1,4 +1,5 @@
PROTOBUF_GENERATE_GRPC_CPP(clickhouse_grpc_proto_sources clickhouse_grpc_proto_headers clickhouse_grpc.proto)
PROTOBUF_GENERATE_GRPC_CPP(clickhouse_s3_task_server_sources clickhouse_s3_task_server_headers clickhouse_s3_task_server.proto)
# Ignore warnings while compiling protobuf-generated *.pb.h and *.pb.cpp files.
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -w")
@ -9,3 +10,8 @@ set (CMAKE_CXX_CLANG_TIDY "")
add_library(clickhouse_grpc_protos ${clickhouse_grpc_proto_headers} ${clickhouse_grpc_proto_sources})
target_include_directories(clickhouse_grpc_protos SYSTEM PUBLIC ${gRPC_INCLUDE_DIRS} ${Protobuf_INCLUDE_DIR} ${CMAKE_CURRENT_BINARY_DIR})
target_link_libraries (clickhouse_grpc_protos PUBLIC ${gRPC_LIBRARIES})
add_library(clickhouse_s3_task_server_protos ${clickhouse_s3_task_server_headers} ${clickhouse_s3_task_server_sources})
target_include_directories(clickhouse_s3_task_server_protos SYSTEM PUBLIC ${gRPC_INCLUDE_DIRS} ${Protobuf_INCLUDE_DIR} ${CMAKE_CURRENT_BINARY_DIR})
target_link_libraries (clickhouse_s3_task_server_protos PUBLIC ${gRPC_LIBRARIES})

View File

@ -0,0 +1,20 @@
syntax = "proto3";
package clickhouse.s3_task_server;
service S3TaskServer {
rpc GetNext (S3TaskRequest) returns (S3TaskReply) {}
}
message S3TaskRequest {
string query_id = 1;
}
message S3TaskReply {
string message = 1;
int32 error = 2;
}

View File

@ -452,7 +452,8 @@ StorageDistributed::StorageDistributed(
const DistributedSettings & distributed_settings_,
bool attach,
ClusterPtr owned_cluster_)
: StorageDistributed(id_, columns_, constraints_, String{}, String{}, cluster_name_, context_, sharding_key_, storage_policy_name_, relative_data_path_, distributed_settings_, attach, std::move(owned_cluster_))
: StorageDistributed(id_, columns_, constraints_, String{}, String{}, cluster_name_, context_, sharding_key_,
storage_policy_name_, relative_data_path_, distributed_settings_, attach, std::move(owned_cluster_))
{
remote_table_function_ptr = std::move(remote_table_function_ptr_);
}
@ -473,20 +474,15 @@ QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage(
ClusterPtr optimized_cluster = getOptimizedCluster(local_context, metadata_snapshot, query_info.query);
if (optimized_cluster)
{
LOG_DEBUG(
log,
"Skipping irrelevant shards - the query will be sent to the following shards of the cluster (shard numbers): {}",
makeFormattedListOfShards(optimized_cluster));
LOG_DEBUG(log, "Skipping irrelevant shards - the query will be sent to the following shards of the cluster (shard numbers): {}",
makeFormattedListOfShards(optimized_cluster));
cluster = optimized_cluster;
query_info.optimized_cluster = cluster;
}
else
{
LOG_DEBUG(
log,
"Unable to figure out irrelevant shards from WHERE/PREWHERE clauses - the query will be sent to all shards of the "
"cluster{}",
has_sharding_key ? "" : " (no sharding key)");
LOG_DEBUG(log, "Unable to figure out irrelevant shards from WHERE/PREWHERE clauses - the query will be sent to all shards of the cluster{}",
has_sharding_key ? "" : " (no sharding key)");
}
}

View File

@ -18,7 +18,7 @@ namespace DB
class TableFunctionRemote : public ITableFunction
{
public:
TableFunctionRemote(const std::string & name_, bool secure_ = false);
explicit TableFunctionRemote(const std::string & name_, bool secure_ = false);
std::string getName() const override { return name; }