#pragma once #include "Common/ZooKeeper/ZooKeeperConstants.h" #include #include "Generator.h" #include #include #include #include #include #include #include #include #include #include #include #include "Stats.h" using Ports = std::vector; using Strings = std::vector; namespace CurrentMetrics { extern const Metric LocalThread; extern const Metric LocalThreadActive; } namespace DB::ErrorCodes { extern const int BAD_ARGUMENTS; } class Runner { public: Runner( std::optional concurrency_, const std::string & generator_name, const std::string & config_path, const Strings & hosts_strings_, std::optional max_time_, std::optional delay_, std::optional continue_on_error_, std::optional max_iterations_); void thread(std::vector> zookeepers); void printNumberOfRequestsExecuted(size_t num) { std::cerr << "Requests executed: " << num << ".\n"; } bool tryPushRequestInteractively(Coordination::ZooKeeperRequestPtr && request, DB::InterruptListener & interrupt_listener); void runBenchmark(); ~Runner(); private: void parseHostsFromConfig(const Poco::Util::AbstractConfiguration & config); size_t concurrency = 1; std::optional pool; std::optional generator; double max_time = 0; double delay = 1; bool continue_on_error = false; std::atomic max_iterations = 0; std::atomic requests_executed = 0; std::atomic shutdown = false; std::shared_ptr info; Stopwatch total_watch; Stopwatch delay_watch; std::mutex mutex; using Queue = ConcurrentBoundedQueue; std::optional queue; struct ConnectionInfo { std::string host; bool secure = false; int32_t session_timeout_ms = Coordination::DEFAULT_SESSION_TIMEOUT_MS; int32_t connection_timeout_ms = Coordination::DEFAULT_CONNECTION_TIMEOUT_MS; int32_t operation_timeout_ms = Coordination::DEFAULT_OPERATION_TIMEOUT_MS; size_t sessions = 1; }; std::mutex connection_mutex; std::vector connection_infos; std::vector> connections; std::unordered_map connections_to_info_map; void createConnections(); std::shared_ptr getConnection(const ConnectionInfo & connection_info); std::vector> refreshConnections(); };