#pragma once #include "Common/ZooKeeper/ZooKeeperArgs.h" #include #include "Generator.h" #include #include #include #include #include #include #include #include #include #include "Interpreters/Context.h" #include "Stats.h" #include using Ports = std::vector; using Strings = std::vector; struct BenchmarkContext { public: void initializeFromConfig(const Poco::Util::AbstractConfiguration & config); void startup(Coordination::ZooKeeper & zookeeper); void cleanup(Coordination::ZooKeeper & zookeeper); private: struct Node { StringGetter name; std::optional data; std::vector> children; size_t repeat_count = 0; std::shared_ptr clone() const; void createNode(Coordination::ZooKeeper & zookeeper, const std::string & parent_path, const Coordination::ACLs & acls) const; void dumpTree(int level = 0) const; }; static std::shared_ptr parseNode(const std::string & key, const Poco::Util::AbstractConfiguration & config); std::vector> root_nodes; Coordination::ACLs default_acls; }; class Runner { public: Runner( std::optional concurrency_, const std::string & config_path, const std::string & input_request_log_, const std::string & setup_nodes_snapshot_path_, const Strings & hosts_strings_, std::optional max_time_, std::optional delay_, std::optional continue_on_error_, std::optional max_iterations_); void thread(std::vector> zookeepers); void printNumberOfRequestsExecuted(size_t num) { std::cerr << "Requests executed: " << num << ".\n"; } bool tryPushRequestInteractively(Coordination::ZooKeeperRequestPtr && request, DB::InterruptListener & interrupt_listener); void runBenchmark(); ~Runner(); private: struct ConnectionInfo { std::string host; bool secure = false; int32_t session_timeout_ms = Coordination::DEFAULT_SESSION_TIMEOUT_MS; int32_t connection_timeout_ms = Coordination::DEFAULT_CONNECTION_TIMEOUT_MS; int32_t operation_timeout_ms = Coordination::DEFAULT_OPERATION_TIMEOUT_MS; bool use_compression = false; size_t sessions = 1; }; void parseHostsFromConfig(const Poco::Util::AbstractConfiguration & config); void runBenchmarkWithGenerator(); void runBenchmarkFromLog(); void createConnections(); std::vector> refreshConnections(); std::shared_ptr getConnection(const ConnectionInfo & connection_info, size_t connection_info_idx); std::string input_request_log; std::string setup_nodes_snapshot_path; size_t concurrency = 1; std::optional pool; std::optional generator; double max_time = 0; double delay = 1; bool continue_on_error = false; size_t max_iterations = 0; std::atomic requests_executed = 0; std::atomic shutdown = false; std::shared_ptr info; bool print_to_stdout; std::optional file_output; bool output_file_with_timestamp; Stopwatch total_watch; Stopwatch delay_watch; std::mutex mutex; using Queue = ConcurrentBoundedQueue; std::optional queue; std::mutex connection_mutex; ConnectionInfo default_connection_info; std::vector connection_infos; std::vector> connections; std::unordered_map connections_to_info_map; DB::SharedContextHolder shared_context; DB::ContextMutablePtr global_context; BenchmarkContext benchmark_context; };