diff --git a/programs/client/Client.cpp b/programs/client/Client.cpp index fde3daa6c43..308076f9033 100644 --- a/programs/client/Client.cpp +++ b/programs/client/Client.cpp @@ -562,15 +562,6 @@ private: connect(); - if (config().has("next_task")) - { - std::cout << "has next task" << std::endl; - auto next_task = config().getString("next_task", "12345"); - std::cout << "got next task " << next_task << std::endl; - sendNextTaskRequest(next_task); - std::cout << "sended " << std::endl; - } - /// Initialize DateLUT here to avoid counting time spent here as query execution time. const auto local_tz = DateLUT::instance().getTimeZone(); if (!context->getSettingsRef().use_client_time_zone) @@ -1708,13 +1699,6 @@ private: } - - void sendNextTaskRequest(std::string id) - { - connection->sendNextTaskRequest(id); - } - - /// Process the query that doesn't require transferring data blocks to the server. void processOrdinaryQuery() { @@ -2645,7 +2629,6 @@ public: ("opentelemetry-traceparent", po::value(), "OpenTelemetry traceparent header as described by W3C Trace Context recommendation") ("opentelemetry-tracestate", po::value(), "OpenTelemetry tracestate header as described by W3C Trace Context recommendation") ("history_file", po::value(), "path to history file") - ("next_task", po::value(), "request new task from server") ; Settings cmd_settings; @@ -2809,8 +2792,6 @@ public: config().setBool("highlight", options["highlight"].as()); if (options.count("history_file")) config().setString("history_file", options["history_file"].as()); - if (options.count("next_task")) - config().setString("next_task", options["next_task"].as()); if ((query_fuzzer_runs = options["query-fuzzer-runs"].as())) { diff --git a/programs/client/Suggest.cpp b/programs/client/Suggest.cpp index 04f90897dd9..dfa7048349e 100644 --- a/programs/client/Suggest.cpp +++ b/programs/client/Suggest.cpp @@ -3,7 +3,6 @@ #include #include #include -#include "Core/Protocol.h" namespace DB { @@ -34,8 +33,6 @@ void Suggest::load(const ConnectionParameters & connection_parameters, size_t su connection_parameters.compression, connection_parameters.security); - std::cerr << "Connection created" << std::endl; - loadImpl(connection, connection_parameters.timeouts, suggestion_limit); } catch (const Exception & e) @@ -159,7 +156,6 @@ void Suggest::fetch(Connection & connection, const ConnectionTimeouts & timeouts Packet packet = connection.receivePacket(); switch (packet.type) { - case Protocol::Server::NextTaskReply: [[fallthrough]]; case Protocol::Server::Data: fillWordsFromBlock(packet.block); continue; diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index b93af56ae4a..43f6ae8fea5 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -423,7 +423,6 @@ endif () if (USE_GRPC) dbms_target_link_libraries (PUBLIC clickhouse_grpc_protos) - dbms_target_link_libraries (PUBLIC clickhouse_s3_reader_protos) endif() if (USE_HDFS) diff --git a/src/Client/Connection.cpp b/src/Client/Connection.cpp index 7fcd8332249..018544f969f 100644 --- a/src/Client/Connection.cpp +++ b/src/Client/Connection.cpp @@ -1,4 +1,3 @@ -#include #include #include #include @@ -22,7 +21,7 @@ #include #include #include -#include "Core/Protocol.h" +#include #include #include #include @@ -560,7 +559,6 @@ void Connection::sendIgnoredPartUUIDs(const std::vector & uuids) void Connection::sendNextTaskRequest(const std::string & id) { - std::cout << "Connection::sendNextTaskRequest" << std::endl; writeVarUInt(Protocol::Client::NextTaskRequest, *out); writeStringBinary(id, *out); out->next(); diff --git a/src/Client/Connection.h b/src/Client/Connection.h index e38b0501964..123b10942f1 100644 --- a/src/Client/Connection.h +++ b/src/Client/Connection.h @@ -205,7 +205,7 @@ public: in->setAsyncCallback(std::move(async_callback)); } -public: +private: String host; UInt16 port; diff --git a/src/DataStreams/RemoteBlockInputStream.cpp b/src/DataStreams/RemoteBlockInputStream.cpp index c633600d37f..5ab226acd13 100644 --- a/src/DataStreams/RemoteBlockInputStream.cpp +++ b/src/DataStreams/RemoteBlockInputStream.cpp @@ -62,6 +62,7 @@ Block RemoteBlockInputStream::readImpl() if (isCancelledOrThrowIfKilled()) return Block(); + std::cout << "RemoteBlockInputStream " << block.rows() << ' ' << block.dumpStructure() << std::endl; return block; } diff --git a/src/DataStreams/RemoteQueryExecutor.cpp b/src/DataStreams/RemoteQueryExecutor.cpp index 4aa659854b9..847baf555ee 100644 --- a/src/DataStreams/RemoteQueryExecutor.cpp +++ b/src/DataStreams/RemoteQueryExecutor.cpp @@ -114,7 +114,7 @@ RemoteQueryExecutor::~RemoteQueryExecutor() /** If we receive a block with slightly different column types, or with excessive columns, * we will adapt it to expected structure. */ -static Block adaptBlockStructure(const Block & block, const Block & header) +[[maybe_unused]] static Block adaptBlockStructure(const Block & block, const Block & header) { /// Special case when reader doesn't care about result structure. Deprecated and used only in Benchmark, PerformanceTest. if (!header) @@ -123,6 +123,9 @@ static Block adaptBlockStructure(const Block & block, const Block & header) Block res; res.info = block.info; + std::cout << "block " << block.dumpStructure() << std::endl; + std::cout << "header " << header.dumpStructure() << std::endl; + for (const auto & elem : header) { ColumnPtr column; @@ -153,7 +156,17 @@ static Block adaptBlockStructure(const Block & block, const Block & header) column = elem.column->cloneResized(block.rows()); } else + { + // if (!block.has(elem.name)) + // { + // column = elem.type->createColumn(); + // } + // else + // { + // column = castColumn(block.getByName(elem.name), elem.type); + // } column = castColumn(block.getByName(elem.name), elem.type); + } res.insert({column, elem.type, elem.name}); } @@ -314,7 +327,12 @@ std::optional RemoteQueryExecutor::processPacket(Packet packet) case Protocol::Server::Data: /// If the block is not empty and is not a header block if (packet.block && (packet.block.rows() > 0)) - return adaptBlockStructure(packet.block, header); + { + // return packet.block; + Block anime = adaptBlockStructure(packet.block, header); + std::cout << "RemoteQueryExecutor " << anime.dumpStructure() << std::endl; + return anime; + } break; /// If the block is empty - we will receive other packets before EndOfStream. case Protocol::Server::Exception: diff --git a/src/Functions/IFunction.cpp b/src/Functions/IFunction.cpp index e4a1adb8525..9636573c5f4 100644 --- a/src/Functions/IFunction.cpp +++ b/src/Functions/IFunction.cpp @@ -477,7 +477,7 @@ DataTypePtr FunctionOverloadResolverAdaptor::getReturnTypeDefaultImplementationF } if (null_presence.has_nullable) { - Block nested_columns = createBlockWithNestedColumns(arguments); + Block nested_columns{createBlockWithNestedColumns(arguments)}; auto return_type = getter(ColumnsWithTypeAndName(nested_columns.begin(), nested_columns.end())); return makeNullable(return_type); } diff --git a/src/IO/ReadBufferFromS3.cpp b/src/IO/ReadBufferFromS3.cpp index be5497a709b..fd07a7f309a 100644 --- a/src/IO/ReadBufferFromS3.cpp +++ b/src/IO/ReadBufferFromS3.cpp @@ -43,9 +43,6 @@ bool ReadBufferFromS3::nextImpl() initialized = true; } - if (hasPendingData()) - return true; - Stopwatch watch; auto res = impl->next(); watch.stop(); diff --git a/src/IO/S3Common.cpp b/src/IO/S3Common.cpp index 1e498c03a45..e0d0709bbab 100644 --- a/src/IO/S3Common.cpp +++ b/src/IO/S3Common.cpp @@ -326,7 +326,6 @@ namespace S3 URI::URI(const Poco::URI & uri_) { - full = uri_.toString(); /// Case when bucket name represented in domain name of S3 URL. /// E.g. (https://bucket-name.s3.Region.amazonaws.com/key) /// https://docs.aws.amazon.com/AmazonS3/latest/dev/VirtualHosting.html#virtual-hosted-style-access @@ -401,11 +400,6 @@ namespace S3 throw Exception("Bucket or key name are invalid in S3 URI: " + uri.toString(), ErrorCodes::BAD_ARGUMENTS); } - - String URI::toString() const - { - return full; - } } } diff --git a/src/IO/S3Common.h b/src/IO/S3Common.h index 54493bb4d44..b071daefee1 100644 --- a/src/IO/S3Common.h +++ b/src/IO/S3Common.h @@ -67,13 +67,9 @@ struct URI String key; String storage_name; - /// Full representation of URI - String full; - bool is_virtual_hosted_style; explicit URI(const Poco::URI & uri_); - String toString() const; }; } diff --git a/src/Server/S3TaskServer.h b/src/Server/S3TaskServer.h deleted file mode 100644 index d1345ad8532..00000000000 --- a/src/Server/S3TaskServer.h +++ /dev/null @@ -1,104 +0,0 @@ -#pragma once - -#include -#if !defined(ARCADIA_BUILD) -#include -#endif - -#if USE_GRPC -#include -#include "clickhouse_s3_task_server.grpc.pb.h" - - -#include -#include -#include - -#include -#include -#include - -using grpc::Server; -using grpc::ServerBuilder; -using grpc::ServerContext; -using grpc::Status; -using clickhouse::s3_task_server::S3TaskServer; -using clickhouse::s3_task_server::S3TaskRequest; -using clickhouse::s3_task_server::S3TaskReply; - - -namespace DB -{ - -namespace ErrorCodes -{ - extern const int LOGICAL_ERROR; -} - - -class S3Task -{ -public: - S3Task() = delete; - - explicit S3Task(std::vector && paths_) - : paths(std::move(paths_)) - {} - - std::optional getNext() { - static size_t next = 0; - if (next >= paths.size()) - return std::nullopt; - const auto result = paths[next]; - ++next; - return result; - } -private: - std::vector paths; -}; - - -// Logic and data behind the server's behavior. -class S3TaskServer final : public S3TaskServer::Service { - Status GetNext(ServerContext* context, const S3TaskRequest* request, S3TaskReply* reply) override { - std::string prefix("Hello"); - const auto query_id = request->query_id(); - auto it = handlers.find(query_id); - if (it == handlers.end()) { - reply->set_message(""); - reply->set_error(ErrorCodes::LOGICAL_ERROR); - return Status::CANCELLED; - } - - reply->set_error(0); - reply->set_message(it->second.getNext()); - return Status::OK; - } - - private: - std::unordered_map handlers; -}; - - -void RunServer() { - std::string server_address("0.0.0.0:50051"); - static S3TaskServer service; - - grpc::EnableDefaultHealthCheckService(true); - grpc::reflection::InitProtoReflectionServerBuilderPlugin(); - ServerBuilder builder; - // Listen on the given address without any authentication mechanism. - builder.AddListeningPort(server_address, grpc::InsecureServerCredentials()); - // Register "service" as the instance through which we'll communicate with - // clients. In this case it corresponds to an *synchronous* service. - builder.RegisterService(&service); - // Finally assemble the server. - std::unique_ptr server(builder.BuildAndStart()); - std::cout << "Server listening on " << server_address << std::endl; - server->Wait(); -} - -} - - -#endif diff --git a/src/Server/TCPHandler.cpp b/src/Server/TCPHandler.cpp index 0270c015e6c..d6c5aed4fc3 100644 --- a/src/Server/TCPHandler.cpp +++ b/src/Server/TCPHandler.cpp @@ -760,7 +760,7 @@ void TCPHandler::sendPartUUIDs() void TCPHandler::sendNextTaskReply(String reply) { - LOG_DEBUG(log, "Nexttask for id is {} ", reply); + LOG_TRACE(log, "Nexttask for id is {} ", reply); writeVarUInt(Protocol::Server::NextTaskReply, *out); writeStringBinary(reply, *out); out->next(); @@ -975,13 +975,10 @@ bool TCPHandler::receivePacket() readVarUInt(packet_type, *in); - std::cout << "TCPHander receivePacket" << packet_type << ' ' << Protocol::Client::NextTaskRequest << std::endl; - switch (packet_type) { case Protocol::Client::NextTaskRequest: { - std::cout << "Protocol::Client::NextTaskRequest" << std::endl; auto id = receiveNextTaskRequest(); auto next = TaskSupervisor::instance().getNextTaskForId(id); sendNextTaskReply(next); diff --git a/src/Server/grpc_protos/CMakeLists.txt b/src/Server/grpc_protos/CMakeLists.txt index 22a834f96a1..584cf015a65 100644 --- a/src/Server/grpc_protos/CMakeLists.txt +++ b/src/Server/grpc_protos/CMakeLists.txt @@ -9,4 +9,3 @@ set (CMAKE_CXX_CLANG_TIDY "") add_library(clickhouse_grpc_protos ${clickhouse_grpc_proto_headers} ${clickhouse_grpc_proto_sources}) target_include_directories(clickhouse_grpc_protos SYSTEM PUBLIC ${gRPC_INCLUDE_DIRS} ${Protobuf_INCLUDE_DIR} ${CMAKE_CURRENT_BINARY_DIR}) target_link_libraries (clickhouse_grpc_protos PUBLIC ${gRPC_LIBRARIES}) - diff --git a/src/Storages/CMakeLists.txt b/src/Storages/CMakeLists.txt index 2dbc2013648..deb1c9f6716 100644 --- a/src/Storages/CMakeLists.txt +++ b/src/Storages/CMakeLists.txt @@ -4,7 +4,3 @@ add_subdirectory(System) if(ENABLE_TESTS) add_subdirectory(tests) endif() - -if (USE_GRPC) - add_subdirectory(protos) -endif() diff --git a/src/Storages/StorageS3Distributed.cpp b/src/Storages/StorageS3Distributed.cpp index d520310ad66..f64e6fb3622 100644 --- a/src/Storages/StorageS3Distributed.cpp +++ b/src/Storages/StorageS3Distributed.cpp @@ -32,9 +32,14 @@ #include #include +#include +#include +#include + #include #include +#include #include #include #include @@ -115,10 +120,19 @@ public: Chunk generate() override { - auto chunk = inner->generate(); - if (!chunk && !createOrUpdateInnerSource()) + if (!inner) return {}; - return inner->generate(); + + auto chunk = inner->generate(); + if (!chunk) + { + if (!createOrUpdateInnerSource()) + return {}; + else + chunk = inner->generate(); + } + std::cout << "generate() " << chunk.dumpStructure() << std::endl; + return chunk; } private: @@ -131,7 +145,7 @@ private: initiator_connection->sendNextTaskRequest(initial_query_id); auto packet = initiator_connection->receivePacket(); assert(packet.type = Protocol::Server::NextTaskReply); - LOG_DEBUG(&Poco::Logger::get("StorageS3SequentialSource"), "Got new task {}", packet.next_task); + LOG_TRACE(&Poco::Logger::get("StorageS3SequentialSource"), "Got new task {}", packet.next_task); return packet.next_task; } catch (...) @@ -144,11 +158,13 @@ private: bool createOrUpdateInnerSource() { - auto next_uri = S3::URI(Poco::URI(askAboutNextKey())); - - if (next_uri.uri.empty()) + auto next_string = askAboutNextKey(); + std::cout << "createOrUpdateInnerSource " << next_string << std::endl; + if (next_string.empty()) return false; + auto next_uri = S3::URI(Poco::URI(next_string)); + assert(next_uri.bucket == client_auth.uri.bucket); inner = std::make_unique( @@ -223,7 +239,7 @@ Pipe StorageS3Distributed::read( const StorageMetadataPtr & metadata_snapshot, SelectQueryInfo & query_info, const Context & context, - QueryProcessingStage::Enum /*processed_stage*/, + QueryProcessingStage::Enum processed_stage, size_t max_block_size, unsigned /*num_streams*/) { @@ -232,7 +248,6 @@ Pipe StorageS3Distributed::read( { StorageS3::updateClientAndAuthSettings(context, client_auth); - Pipes pipes; bool need_path_column = false; bool need_file_column = false; for (const auto & column : column_names) @@ -243,7 +258,10 @@ Pipe StorageS3Distributed::read( need_file_column = true; } - std::cout << metadata_snapshot->getSampleBlock().dumpStructure() << std::endl; + std::cout << need_file_column << std::boolalpha << need_file_column << std::endl; + std::cout << need_path_column << std::boolalpha << need_path_column << std::endl; + + std::cout << "metadata_snapshot->getSampleBlock().dumpStructure() " << metadata_snapshot->getSampleBlock().dumpStructure() << std::endl; return Pipe(std::make_shared( context.getInitialQueryId(), @@ -265,6 +283,13 @@ Pipe StorageS3Distributed::read( connections.reserve(cluster->getShardCount()); std::cout << "StorageS3Distributed::read" << std::endl; + std::cout << "QueryProcessingStage " << processed_stage << std::endl; + + + Block header = + InterpreterSelectQuery(query_info.query, context, SelectQueryOptions(processed_stage).analyze()).getSampleBlock(); + + const Scalars & scalars = context.hasQueryContext() ? context.getQueryContext().getScalars() : Scalars{}; for (const auto & replicas : cluster->getShardsAddresses()) { /// There will be only one replica, because we consider each replica as a shard @@ -282,9 +307,12 @@ Pipe StorageS3Distributed::read( auto stream = std::make_shared( /*connection=*/*connections.back(), /*query=*/queryToString(query_info.query), - /*header=*/metadata_snapshot->getSampleBlock(), + /*header=*/header, /*context=*/context, - nullptr, Scalars(), Tables(), QueryProcessingStage::WithMergeableState + nullptr, + scalars, + Tables(), + QueryProcessingStage::FetchColumns ); pipes.emplace_back(std::make_shared(std::move(stream))); } diff --git a/src/Storages/StorageTaskManager.h b/src/Storages/StorageTaskManager.h index 87fd32c0359..bb8b7952a4f 100644 --- a/src/Storages/StorageTaskManager.h +++ b/src/Storages/StorageTaskManager.h @@ -38,7 +38,6 @@ public: using NextTaskResolverBasePtr = std::unique_ptr; - class S3NextTaskResolver : public NextTaskResolverBase { public: @@ -73,19 +72,13 @@ private: TasksIterator current; }; - - class TaskSupervisor { public: using QueryId = std::string; - TaskSupervisor() - { - auto nexttask = std::make_unique("12345", std::vector{"anime1", "anime2", "anime3"}); - registerNextTaskResolver(std::move(nexttask)); - } - + TaskSupervisor() = default; + static TaskSupervisor & instance() { static TaskSupervisor task_manager; @@ -105,12 +98,16 @@ public: Task getNextTaskForId(const QueryId & id) { - std::shared_lock lock(rwlock); + std::lock_guard lock(rwlock); auto it = dict.find(id); if (it == dict.end()) - throw Exception(fmt::format("NextTaskResolver is not registered for query {}", id), ErrorCodes::LOGICAL_ERROR); - return it->second->next(); + return ""; + auto answer = it->second->next(); + if (answer.empty()) + dict.erase(it); + return answer; } + private: using ResolverDict = std::unordered_map; ResolverDict dict; diff --git a/src/Storages/protos/CMakeLists.txt b/src/Storages/protos/CMakeLists.txt deleted file mode 100644 index 42df78dd87b..00000000000 --- a/src/Storages/protos/CMakeLists.txt +++ /dev/null @@ -1,11 +0,0 @@ -PROTOBUF_GENERATE_GRPC_CPP(clickhouse_s3_reader_proto_sources clickhouse_s3_reader_proto_headers clickhouse_s3_reader.proto) - -# Ignore warnings while compiling protobuf-generated *.pb.h and *.pb.cpp files. -set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -w") - -# Disable clang-tidy for protobuf-generated *.pb.h and *.pb.cpp files. -set (CMAKE_CXX_CLANG_TIDY "") - -add_library(clickhouse_s3_reader_protos ${clickhouse_s3_reader_proto_headers} ${clickhouse_s3_reader_proto_sources}) -target_include_directories(clickhouse_s3_reader_protos SYSTEM PUBLIC ${gRPC_INCLUDE_DIRS} ${Protobuf_INCLUDE_DIR} ${CMAKE_CURRENT_BINARY_DIR}) -target_link_libraries (clickhouse_s3_reader_protos PUBLIC ${gRPC_LIBRARIES}) diff --git a/src/Storages/protos/clickhouse_s3_reader.proto b/src/Storages/protos/clickhouse_s3_reader.proto deleted file mode 100644 index 18d1102d40b..00000000000 --- a/src/Storages/protos/clickhouse_s3_reader.proto +++ /dev/null @@ -1,18 +0,0 @@ -syntax = "proto3"; - -package clickhouse.s3_reader; - - -service S3TaskServer { - rpc GetNext (S3TaskRequest) returns (S3TaskReply) {} -} - - -message S3TaskRequest { - string query_id = 1; -} - -message S3TaskReply { - string message = 1; - int32 error = 2; -} diff --git a/src/TableFunctions/TableFunctionS3Distributed.cpp b/src/TableFunctions/TableFunctionS3Distributed.cpp index debd89604a8..8717a5aa5bc 100644 --- a/src/TableFunctions/TableFunctionS3Distributed.cpp +++ b/src/TableFunctions/TableFunctionS3Distributed.cpp @@ -1,6 +1,8 @@ #include #include #include "DataStreams/RemoteBlockInputStream.h" +#include "Parsers/ASTFunction.h" +#include "Parsers/IAST_fwd.h" #include "Processors/Sources/SourceFromInputStream.h" #include "Storages/StorageS3Distributed.h" #include "Storages/System/StorageSystemOne.h" @@ -73,7 +75,7 @@ ColumnsDescription TableFunctionS3Distributed::getActualTableStructure(const Con } StoragePtr TableFunctionS3Distributed::executeImpl( - const ASTPtr & /*ast_function*/, const Context & context, + const ASTPtr & ast_function, const Context & context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const { Poco::URI uri (filename); @@ -89,12 +91,19 @@ StoragePtr TableFunctionS3Distributed::executeImpl( Strings tasks; tasks.reserve(lists.size()); - for (auto & value : lists) { + for (auto & value : lists) + { tasks.emplace_back(client_auth.uri.endpoint + '/' + client_auth.uri.bucket + '/' + value); + std::cout << tasks.back() << std::endl; } std::cout << "query_id " << context.getCurrentQueryId() << std::endl; + std::cout << ast_function->dumpTree() << std::endl; + auto * func = ast_function->as(); + + std::cout << func->arguments->dumpTree() << std::endl; + /// Register resolver, which will give other nodes a task to execute TaskSupervisor::instance().registerNextTaskResolver( std::make_unique(context.getCurrentQueryId(), std::move(tasks))); @@ -109,13 +118,11 @@ StoragePtr TableFunctionS3Distributed::executeImpl( max_connections, getActualTableStructure(context), ConstraintsDescription{}, - const_cast(context), + context, compression_method); storage->startup(); - // std::this_thread::sleep_for(std::chrono::seconds(60)); - return storage; }