This commit is contained in:
Nikita Mikhaylov 2021-03-23 20:58:29 +03:00
parent 0be3fa178b
commit 2549468c14
20 changed files with 86 additions and 215 deletions

View File

@ -562,15 +562,6 @@ private:
connect();
if (config().has("next_task"))
{
std::cout << "has next task" << std::endl;
auto next_task = config().getString("next_task", "12345");
std::cout << "got next task " << next_task << std::endl;
sendNextTaskRequest(next_task);
std::cout << "sended " << std::endl;
}
/// Initialize DateLUT here to avoid counting time spent here as query execution time.
const auto local_tz = DateLUT::instance().getTimeZone();
if (!context->getSettingsRef().use_client_time_zone)
@ -1708,13 +1699,6 @@ private:
}
void sendNextTaskRequest(std::string id)
{
connection->sendNextTaskRequest(id);
}
/// Process the query that doesn't require transferring data blocks to the server.
void processOrdinaryQuery()
{
@ -2645,7 +2629,6 @@ public:
("opentelemetry-traceparent", po::value<std::string>(), "OpenTelemetry traceparent header as described by W3C Trace Context recommendation")
("opentelemetry-tracestate", po::value<std::string>(), "OpenTelemetry tracestate header as described by W3C Trace Context recommendation")
("history_file", po::value<std::string>(), "path to history file")
("next_task", po::value<std::string>(), "request new task from server")
;
Settings cmd_settings;
@ -2809,8 +2792,6 @@ public:
config().setBool("highlight", options["highlight"].as<bool>());
if (options.count("history_file"))
config().setString("history_file", options["history_file"].as<std::string>());
if (options.count("next_task"))
config().setString("next_task", options["next_task"].as<std::string>());
if ((query_fuzzer_runs = options["query-fuzzer-runs"].as<int>()))
{

View File

@ -3,7 +3,6 @@
#include <Core/Settings.h>
#include <Columns/ColumnString.h>
#include <Common/typeid_cast.h>
#include "Core/Protocol.h"
namespace DB
{
@ -34,8 +33,6 @@ void Suggest::load(const ConnectionParameters & connection_parameters, size_t su
connection_parameters.compression,
connection_parameters.security);
std::cerr << "Connection created" << std::endl;
loadImpl(connection, connection_parameters.timeouts, suggestion_limit);
}
catch (const Exception & e)
@ -159,7 +156,6 @@ void Suggest::fetch(Connection & connection, const ConnectionTimeouts & timeouts
Packet packet = connection.receivePacket();
switch (packet.type)
{
case Protocol::Server::NextTaskReply: [[fallthrough]];
case Protocol::Server::Data:
fillWordsFromBlock(packet.block);
continue;

View File

@ -423,7 +423,6 @@ endif ()
if (USE_GRPC)
dbms_target_link_libraries (PUBLIC clickhouse_grpc_protos)
dbms_target_link_libraries (PUBLIC clickhouse_s3_reader_protos)
endif()
if (USE_HDFS)

View File

@ -1,4 +1,3 @@
#include <string>
#include <Poco/Net/NetException.h>
#include <Poco/Net/SocketAddress.h>
#include <Core/Defines.h>
@ -22,7 +21,7 @@
#include <Common/StringUtils/StringUtils.h>
#include <Common/OpenSSLHelpers.h>
#include <Common/randomSeed.h>
#include "Core/Protocol.h"
#include <Core/Protocol.h>
#include <Interpreters/ClientInfo.h>
#include <Compression/CompressionFactory.h>
#include <Processors/Pipe.h>
@ -560,7 +559,6 @@ void Connection::sendIgnoredPartUUIDs(const std::vector<UUID> & uuids)
void Connection::sendNextTaskRequest(const std::string & id)
{
std::cout << "Connection::sendNextTaskRequest" << std::endl;
writeVarUInt(Protocol::Client::NextTaskRequest, *out);
writeStringBinary(id, *out);
out->next();

View File

@ -205,7 +205,7 @@ public:
in->setAsyncCallback(std::move(async_callback));
}
public:
private:
String host;
UInt16 port;

View File

@ -62,6 +62,7 @@ Block RemoteBlockInputStream::readImpl()
if (isCancelledOrThrowIfKilled())
return Block();
std::cout << "RemoteBlockInputStream " << block.rows() << ' ' << block.dumpStructure() << std::endl;
return block;
}

View File

@ -114,7 +114,7 @@ RemoteQueryExecutor::~RemoteQueryExecutor()
/** If we receive a block with slightly different column types, or with excessive columns,
* we will adapt it to expected structure.
*/
static Block adaptBlockStructure(const Block & block, const Block & header)
[[maybe_unused]] static Block adaptBlockStructure(const Block & block, const Block & header)
{
/// Special case when reader doesn't care about result structure. Deprecated and used only in Benchmark, PerformanceTest.
if (!header)
@ -123,6 +123,9 @@ static Block adaptBlockStructure(const Block & block, const Block & header)
Block res;
res.info = block.info;
std::cout << "block " << block.dumpStructure() << std::endl;
std::cout << "header " << header.dumpStructure() << std::endl;
for (const auto & elem : header)
{
ColumnPtr column;
@ -153,7 +156,17 @@ static Block adaptBlockStructure(const Block & block, const Block & header)
column = elem.column->cloneResized(block.rows());
}
else
{
// if (!block.has(elem.name))
// {
// column = elem.type->createColumn();
// }
// else
// {
// column = castColumn(block.getByName(elem.name), elem.type);
// }
column = castColumn(block.getByName(elem.name), elem.type);
}
res.insert({column, elem.type, elem.name});
}
@ -314,7 +327,12 @@ std::optional<Block> RemoteQueryExecutor::processPacket(Packet packet)
case Protocol::Server::Data:
/// If the block is not empty and is not a header block
if (packet.block && (packet.block.rows() > 0))
return adaptBlockStructure(packet.block, header);
{
// return packet.block;
Block anime = adaptBlockStructure(packet.block, header);
std::cout << "RemoteQueryExecutor " << anime.dumpStructure() << std::endl;
return anime;
}
break; /// If the block is empty - we will receive other packets before EndOfStream.
case Protocol::Server::Exception:

View File

@ -477,7 +477,7 @@ DataTypePtr FunctionOverloadResolverAdaptor::getReturnTypeDefaultImplementationF
}
if (null_presence.has_nullable)
{
Block nested_columns = createBlockWithNestedColumns(arguments);
Block nested_columns{createBlockWithNestedColumns(arguments)};
auto return_type = getter(ColumnsWithTypeAndName(nested_columns.begin(), nested_columns.end()));
return makeNullable(return_type);
}

View File

@ -43,9 +43,6 @@ bool ReadBufferFromS3::nextImpl()
initialized = true;
}
if (hasPendingData())
return true;
Stopwatch watch;
auto res = impl->next();
watch.stop();

View File

@ -326,7 +326,6 @@ namespace S3
URI::URI(const Poco::URI & uri_)
{
full = uri_.toString();
/// Case when bucket name represented in domain name of S3 URL.
/// E.g. (https://bucket-name.s3.Region.amazonaws.com/key)
/// https://docs.aws.amazon.com/AmazonS3/latest/dev/VirtualHosting.html#virtual-hosted-style-access
@ -401,11 +400,6 @@ namespace S3
throw Exception("Bucket or key name are invalid in S3 URI: " + uri.toString(), ErrorCodes::BAD_ARGUMENTS);
}
String URI::toString() const
{
return full;
}
}
}

View File

@ -67,13 +67,9 @@ struct URI
String key;
String storage_name;
/// Full representation of URI
String full;
bool is_virtual_hosted_style;
explicit URI(const Poco::URI & uri_);
String toString() const;
};
}

View File

@ -1,104 +0,0 @@
#pragma once
#include <unordered_map>
#if !defined(ARCADIA_BUILD)
#include <Common/config.h>
#endif
#if USE_GRPC
#include <Poco/Net/SocketAddress.h>
#include "clickhouse_s3_task_server.grpc.pb.h"
#include <grpcpp/grpcpp.h>
#include <grpcpp/health_check_service_interface.h>
#include <grpcpp/ext/proto_server_reflection_plugin.h>
#include <vector>
#include <string>
#include <optional>
using grpc::Server;
using grpc::ServerBuilder;
using grpc::ServerContext;
using grpc::Status;
using clickhouse::s3_task_server::S3TaskServer;
using clickhouse::s3_task_server::S3TaskRequest;
using clickhouse::s3_task_server::S3TaskReply;
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
class S3Task
{
public:
S3Task() = delete;
explicit S3Task(std::vector<std::string> && paths_)
: paths(std::move(paths_))
{}
std::optional<std::string> getNext() {
static size_t next = 0;
if (next >= paths.size())
return std::nullopt;
const auto result = paths[next];
++next;
return result;
}
private:
std::vector<std::string> paths;
};
// Logic and data behind the server's behavior.
class S3TaskServer final : public S3TaskServer::Service {
Status GetNext(ServerContext* context, const S3TaskRequest* request, S3TaskReply* reply) override {
std::string prefix("Hello");
const auto query_id = request->query_id();
auto it = handlers.find(query_id);
if (it == handlers.end()) {
reply->set_message("");
reply->set_error(ErrorCodes::LOGICAL_ERROR);
return Status::CANCELLED;
}
reply->set_error(0);
reply->set_message(it->second.getNext());
return Status::OK;
}
private:
std::unordered_map<std::string, S3Task> handlers;
};
void RunServer() {
std::string server_address("0.0.0.0:50051");
static S3TaskServer service;
grpc::EnableDefaultHealthCheckService(true);
grpc::reflection::InitProtoReflectionServerBuilderPlugin();
ServerBuilder builder;
// Listen on the given address without any authentication mechanism.
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
// Register "service" as the instance through which we'll communicate with
// clients. In this case it corresponds to an *synchronous* service.
builder.RegisterService(&service);
// Finally assemble the server.
std::unique_ptr<Server> server(builder.BuildAndStart());
std::cout << "Server listening on " << server_address << std::endl;
server->Wait();
}
}
#endif

View File

@ -760,7 +760,7 @@ void TCPHandler::sendPartUUIDs()
void TCPHandler::sendNextTaskReply(String reply)
{
LOG_DEBUG(log, "Nexttask for id is {} ", reply);
LOG_TRACE(log, "Nexttask for id is {} ", reply);
writeVarUInt(Protocol::Server::NextTaskReply, *out);
writeStringBinary(reply, *out);
out->next();
@ -975,13 +975,10 @@ bool TCPHandler::receivePacket()
readVarUInt(packet_type, *in);
std::cout << "TCPHander receivePacket" << packet_type << ' ' << Protocol::Client::NextTaskRequest << std::endl;
switch (packet_type)
{
case Protocol::Client::NextTaskRequest:
{
std::cout << "Protocol::Client::NextTaskRequest" << std::endl;
auto id = receiveNextTaskRequest();
auto next = TaskSupervisor::instance().getNextTaskForId(id);
sendNextTaskReply(next);

View File

@ -9,4 +9,3 @@ set (CMAKE_CXX_CLANG_TIDY "")
add_library(clickhouse_grpc_protos ${clickhouse_grpc_proto_headers} ${clickhouse_grpc_proto_sources})
target_include_directories(clickhouse_grpc_protos SYSTEM PUBLIC ${gRPC_INCLUDE_DIRS} ${Protobuf_INCLUDE_DIR} ${CMAKE_CURRENT_BINARY_DIR})
target_link_libraries (clickhouse_grpc_protos PUBLIC ${gRPC_LIBRARIES})

View File

@ -4,7 +4,3 @@ add_subdirectory(System)
if(ENABLE_TESTS)
add_subdirectory(tests)
endif()
if (USE_GRPC)
add_subdirectory(protos)
endif()

View File

@ -32,9 +32,14 @@
#include <Storages/StorageS3.h>
#include <Parsers/queryToString.h>
#include <Interpreters/getHeaderForProcessingStage.h>
#include <Interpreters/SelectQueryOptions.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Poco/Logger.h>
#include <Poco/Net/TCPServerConnection.h>
#include <ios>
#include <memory>
#include <string>
#include <thread>
@ -115,10 +120,19 @@ public:
Chunk generate() override
{
auto chunk = inner->generate();
if (!chunk && !createOrUpdateInnerSource())
if (!inner)
return {};
return inner->generate();
auto chunk = inner->generate();
if (!chunk)
{
if (!createOrUpdateInnerSource())
return {};
else
chunk = inner->generate();
}
std::cout << "generate() " << chunk.dumpStructure() << std::endl;
return chunk;
}
private:
@ -131,7 +145,7 @@ private:
initiator_connection->sendNextTaskRequest(initial_query_id);
auto packet = initiator_connection->receivePacket();
assert(packet.type = Protocol::Server::NextTaskReply);
LOG_DEBUG(&Poco::Logger::get("StorageS3SequentialSource"), "Got new task {}", packet.next_task);
LOG_TRACE(&Poco::Logger::get("StorageS3SequentialSource"), "Got new task {}", packet.next_task);
return packet.next_task;
}
catch (...)
@ -144,11 +158,13 @@ private:
bool createOrUpdateInnerSource()
{
auto next_uri = S3::URI(Poco::URI(askAboutNextKey()));
if (next_uri.uri.empty())
auto next_string = askAboutNextKey();
std::cout << "createOrUpdateInnerSource " << next_string << std::endl;
if (next_string.empty())
return false;
auto next_uri = S3::URI(Poco::URI(next_string));
assert(next_uri.bucket == client_auth.uri.bucket);
inner = std::make_unique<StorageS3Source>(
@ -223,7 +239,7 @@ Pipe StorageS3Distributed::read(
const StorageMetadataPtr & metadata_snapshot,
SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum /*processed_stage*/,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned /*num_streams*/)
{
@ -232,7 +248,6 @@ Pipe StorageS3Distributed::read(
{
StorageS3::updateClientAndAuthSettings(context, client_auth);
Pipes pipes;
bool need_path_column = false;
bool need_file_column = false;
for (const auto & column : column_names)
@ -243,7 +258,10 @@ Pipe StorageS3Distributed::read(
need_file_column = true;
}
std::cout << metadata_snapshot->getSampleBlock().dumpStructure() << std::endl;
std::cout << need_file_column << std::boolalpha << need_file_column << std::endl;
std::cout << need_path_column << std::boolalpha << need_path_column << std::endl;
std::cout << "metadata_snapshot->getSampleBlock().dumpStructure() " << metadata_snapshot->getSampleBlock().dumpStructure() << std::endl;
return Pipe(std::make_shared<StorageS3SequentialSource>(
context.getInitialQueryId(),
@ -265,6 +283,13 @@ Pipe StorageS3Distributed::read(
connections.reserve(cluster->getShardCount());
std::cout << "StorageS3Distributed::read" << std::endl;
std::cout << "QueryProcessingStage " << processed_stage << std::endl;
Block header =
InterpreterSelectQuery(query_info.query, context, SelectQueryOptions(processed_stage).analyze()).getSampleBlock();
const Scalars & scalars = context.hasQueryContext() ? context.getQueryContext().getScalars() : Scalars{};
for (const auto & replicas : cluster->getShardsAddresses()) {
/// There will be only one replica, because we consider each replica as a shard
@ -282,9 +307,12 @@ Pipe StorageS3Distributed::read(
auto stream = std::make_shared<RemoteBlockInputStream>(
/*connection=*/*connections.back(),
/*query=*/queryToString(query_info.query),
/*header=*/metadata_snapshot->getSampleBlock(),
/*header=*/header,
/*context=*/context,
nullptr, Scalars(), Tables(), QueryProcessingStage::WithMergeableState
nullptr,
scalars,
Tables(),
QueryProcessingStage::FetchColumns
);
pipes.emplace_back(std::make_shared<SourceFromInputStream>(std::move(stream)));
}

View File

@ -38,7 +38,6 @@ public:
using NextTaskResolverBasePtr = std::unique_ptr<NextTaskResolverBase>;
class S3NextTaskResolver : public NextTaskResolverBase
{
public:
@ -73,19 +72,13 @@ private:
TasksIterator current;
};
class TaskSupervisor
{
public:
using QueryId = std::string;
TaskSupervisor()
{
auto nexttask = std::make_unique<S3NextTaskResolver>("12345", std::vector<Task>{"anime1", "anime2", "anime3"});
registerNextTaskResolver(std::move(nexttask));
}
TaskSupervisor() = default;
static TaskSupervisor & instance()
{
static TaskSupervisor task_manager;
@ -105,12 +98,16 @@ public:
Task getNextTaskForId(const QueryId & id)
{
std::shared_lock lock(rwlock);
std::lock_guard lock(rwlock);
auto it = dict.find(id);
if (it == dict.end())
throw Exception(fmt::format("NextTaskResolver is not registered for query {}", id), ErrorCodes::LOGICAL_ERROR);
return it->second->next();
return "";
auto answer = it->second->next();
if (answer.empty())
dict.erase(it);
return answer;
}
private:
using ResolverDict = std::unordered_map<QueryId, NextTaskResolverBasePtr>;
ResolverDict dict;

View File

@ -1,11 +0,0 @@
PROTOBUF_GENERATE_GRPC_CPP(clickhouse_s3_reader_proto_sources clickhouse_s3_reader_proto_headers clickhouse_s3_reader.proto)
# Ignore warnings while compiling protobuf-generated *.pb.h and *.pb.cpp files.
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -w")
# Disable clang-tidy for protobuf-generated *.pb.h and *.pb.cpp files.
set (CMAKE_CXX_CLANG_TIDY "")
add_library(clickhouse_s3_reader_protos ${clickhouse_s3_reader_proto_headers} ${clickhouse_s3_reader_proto_sources})
target_include_directories(clickhouse_s3_reader_protos SYSTEM PUBLIC ${gRPC_INCLUDE_DIRS} ${Protobuf_INCLUDE_DIR} ${CMAKE_CURRENT_BINARY_DIR})
target_link_libraries (clickhouse_s3_reader_protos PUBLIC ${gRPC_LIBRARIES})

View File

@ -1,18 +0,0 @@
syntax = "proto3";
package clickhouse.s3_reader;
service S3TaskServer {
rpc GetNext (S3TaskRequest) returns (S3TaskReply) {}
}
message S3TaskRequest {
string query_id = 1;
}
message S3TaskReply {
string message = 1;
int32 error = 2;
}

View File

@ -1,6 +1,8 @@
#include <thread>
#include <Common/config.h>
#include "DataStreams/RemoteBlockInputStream.h"
#include "Parsers/ASTFunction.h"
#include "Parsers/IAST_fwd.h"
#include "Processors/Sources/SourceFromInputStream.h"
#include "Storages/StorageS3Distributed.h"
#include "Storages/System/StorageSystemOne.h"
@ -73,7 +75,7 @@ ColumnsDescription TableFunctionS3Distributed::getActualTableStructure(const Con
}
StoragePtr TableFunctionS3Distributed::executeImpl(
const ASTPtr & /*ast_function*/, const Context & context,
const ASTPtr & ast_function, const Context & context,
const std::string & table_name, ColumnsDescription /*cached_columns*/) const
{
Poco::URI uri (filename);
@ -89,12 +91,19 @@ StoragePtr TableFunctionS3Distributed::executeImpl(
Strings tasks;
tasks.reserve(lists.size());
for (auto & value : lists) {
for (auto & value : lists)
{
tasks.emplace_back(client_auth.uri.endpoint + '/' + client_auth.uri.bucket + '/' + value);
std::cout << tasks.back() << std::endl;
}
std::cout << "query_id " << context.getCurrentQueryId() << std::endl;
std::cout << ast_function->dumpTree() << std::endl;
auto * func = ast_function->as<ASTFunction>();
std::cout << func->arguments->dumpTree() << std::endl;
/// Register resolver, which will give other nodes a task to execute
TaskSupervisor::instance().registerNextTaskResolver(
std::make_unique<S3NextTaskResolver>(context.getCurrentQueryId(), std::move(tasks)));
@ -109,13 +118,11 @@ StoragePtr TableFunctionS3Distributed::executeImpl(
max_connections,
getActualTableStructure(context),
ConstraintsDescription{},
const_cast<Context &>(context),
context,
compression_method);
storage->startup();
// std::this_thread::sleep_for(std::chrono::seconds(60));
return storage;
}