#include "LocalServer.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace DB { namespace ErrorCodes { extern const int SYNTAX_ERROR; extern const int CANNOT_LOAD_CONFIG; } LocalServer::LocalServer() = default; LocalServer::~LocalServer() { if (context) context->shutdown(); /// required for properly exception handling } void LocalServer::initialize(Poco::Util::Application & self) { Poco::Util::Application::initialize(self); /// Load config files if exists if (config().has("config-file") || Poco::File("config.xml").exists()) { const auto config_path = config().getString("config-file", "config.xml"); ConfigProcessor config_processor(config_path, false, true); config_processor.setConfigPath(Poco::Path(config_path).makeParent().toString()); auto loaded_config = config_processor.loadConfig(); config_processor.savePreprocessedConfig(loaded_config, loaded_config.configuration->getString("path", ".")); config().add(loaded_config.configuration.duplicate(), PRIO_DEFAULT, false); } if (config().has("logger") || config().has("logger.level") || config().has("logger.log")) { // sensitive data rules are not used here buildLoggers(config(), logger(), self.commandName()); } else { // Turn off server logging to stderr if (!config().has("verbose")) { Poco::Logger::root().setLevel("none"); Poco::Logger::root().setChannel(Poco::AutoPtr(new Poco::NullChannel())); } } } void LocalServer::applyCmdSettings() { context->applySettingsChanges(cmd_settings.changes()); } /// If path is specified and not empty, will try to setup server environment and load existing metadata void LocalServer::tryInitPath() { std::string path; if (config().has("path")) { // User-supplied path. path = config().getString("path"); Poco::trimInPlace(path); if (path.empty()) { throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot work with emtpy storage path that is explicitly specified" " by the --path option. Please check the program options and " correct the --path."); } } else { // Default unique path in the system temporary directory. const auto tmp = std::filesystem::temp_directory_path(); const auto default_path = tmp / fmt::format("clickhouse-local-{}", getpid()); if (exists(default_path)) { // This is a directory that is left by a previous run of // clickhouse-local that had the same pid and did not complete // correctly. Remove it, with an additional sanity check. if (default_path.parent_path() != tmp) { throw Exception(ErrorCodes::LOGICAL_ERROR, "The temporary directory of clickhouse-local '{}' is not" " inside the system temporary directory '{}'. Will not delete" " it", default_path.string(), tmp.string()); } remove_all(default_path); } create_directory(default_path); temporary_directory_to_delete = default_path; path = default_path.string(); } if (path.back() != '/') path += '/'; context->setPath(path); context->setUserFilesPath(""); // user's files are everywhere } static void attachSystemTables(const Context & context) { DatabasePtr system_database = DatabaseCatalog::instance().tryGetDatabase(DatabaseCatalog::SYSTEM_DATABASE); if (!system_database) { /// TODO: add attachTableDelayed into DatabaseMemory to speedup loading system_database = std::make_shared(DatabaseCatalog::SYSTEM_DATABASE, context); DatabaseCatalog::instance().attachDatabase(DatabaseCatalog::SYSTEM_DATABASE, system_database); } attachSystemTablesLocal(*system_database); } int LocalServer::main(const std::vector & /*args*/) try { Poco::Logger * log = &logger(); ThreadStatus thread_status; UseSSL use_ssl; if (!config().has("query") && !config().has("table-structure")) /// Nothing to process { if (config().hasOption("verbose")) std::cerr << "There are no queries to process." << '\n'; return Application::EXIT_OK; } shared_context = Context::createShared(); context = std::make_unique(Context::createGlobal(shared_context.get())); context->makeGlobalContext(); context->setApplicationType(Context::ApplicationType::LOCAL); tryInitPath(); std::optional status; /// Skip temp path installation /// We will terminate process on error static KillingErrorHandler error_handler; Poco::ErrorHandler::set(&error_handler); /// Don't initialize DateLUT registerFunctions(); registerAggregateFunctions(); registerTableFunctions(); registerStorages(); registerDictionaries(); registerDisks(); /// Maybe useless if (config().has("macros")) context->setMacros(std::make_unique(config(), "macros")); /// Skip networking setupUsers(); /// Limit on total number of concurrently executing queries. /// There is no need for concurrent queries, override max_concurrent_queries. context->getProcessList().setMaxSize(0); /// Size of cache for uncompressed blocks. Zero means disabled. size_t uncompressed_cache_size = config().getUInt64("uncompressed_cache_size", 0); if (uncompressed_cache_size) context->setUncompressedCache(uncompressed_cache_size); /// Size of cache for marks (index of MergeTree family of tables). It is necessary. /// Specify default value for mark_cache_size explicitly! size_t mark_cache_size = config().getUInt64("mark_cache_size", 5368709120); if (mark_cache_size) context->setMarkCache(mark_cache_size); /// Load global settings from default_profile and system_profile. context->setDefaultProfiles(config()); /** Init dummy default DB * NOTE: We force using isolated default database to avoid conflicts with default database from server environment * Otherwise, metadata of temporary File(format, EXPLICIT_PATH) tables will pollute metadata/ directory; * if such tables will not be dropped, clickhouse-server will not be able to load them due to security reasons. */ std::string default_database = config().getString("default_database", "_local"); DatabaseCatalog::instance().attachDatabase(default_database, std::make_shared(default_database, *context)); context->setCurrentDatabase(default_database); applyCmdOptions(); if (!context->getPath().empty()) { /// Lock path directory before read status.emplace(context->getPath() + "status"); LOG_DEBUG(log, "Loading metadata from {}", context->getPath()); loadMetadataSystem(*context); attachSystemTables(*context); loadMetadata(*context); DatabaseCatalog::instance().loadDatabases(); LOG_DEBUG(log, "Loaded metadata."); } else { attachSystemTables(*context); } processQueries(); context->shutdown(); context.reset(); cleanup(); return Application::EXIT_OK; } catch (const Exception & e) { try { cleanup(); } catch (...) { tryLogCurrentException(__PRETTY_FUNCTION__); } std::cerr << getCurrentExceptionMessage(config().hasOption("stacktrace")) << '\n'; /// If exception code isn't zero, we should return non-zero return code anyway. return e.code() ? e.code() : -1; } std::string LocalServer::getInitialCreateTableQuery() { if (!config().has("table-structure")) return {}; auto table_name = backQuoteIfNeed(config().getString("table-name", "table")); auto table_structure = config().getString("table-structure"); auto data_format = backQuoteIfNeed(config().getString("table-data-format", "TSV")); String table_file; if (!config().has("table-file") || config().getString("table-file") == "-") /// Use Unix tools stdin naming convention table_file = "stdin"; else /// Use regular file table_file = quoteString(config().getString("table-file")); return "CREATE TABLE " + table_name + " (" + table_structure + ") " + "ENGINE = " "File(" + data_format + ", " + table_file + ")" "; "; } void LocalServer::processQueries() { String initial_create_query = getInitialCreateTableQuery(); String queries_str = initial_create_query + config().getRawString("query"); const auto & settings = context->getSettingsRef(); std::vector queries; auto parse_res = splitMultipartQuery(queries_str, queries, settings.max_query_size, settings.max_parser_depth); if (!parse_res.second) throw Exception("Cannot parse and execute the following part of query: " + String(parse_res.first), ErrorCodes::SYNTAX_ERROR); context->makeSessionContext(); context->makeQueryContext(); context->setUser("default", "", Poco::Net::SocketAddress{}); context->setCurrentQueryId(""); applyCmdSettings(); /// Use the same query_id (and thread group) for all queries CurrentThread::QueryScope query_scope_holder(*context); bool echo_queries = config().hasOption("echo") || config().hasOption("verbose"); std::exception_ptr exception; for (const auto & query : queries) { ReadBufferFromString read_buf(query); WriteBufferFromFileDescriptor write_buf(STDOUT_FILENO); if (echo_queries) { writeString(query, write_buf); writeChar('\n', write_buf); write_buf.next(); } try { executeQuery(read_buf, write_buf, /* allow_into_outfile = */ true, *context, {}); } catch (...) { if (!config().hasOption("ignore-error")) throw; if (!exception) exception = std::current_exception(); std::cerr << getCurrentExceptionMessage(config().hasOption("stacktrace")) << '\n'; } } if (exception) std::rethrow_exception(exception); } static const char * minimal_default_user_xml = "" " " " " " " " " " " " " " " " ::/0" " " " default" " default" " " " " " " " " " " ""; static ConfigurationPtr getConfigurationFromXMLString(const char * xml_data) { std::stringstream ss{std::string{xml_data}}; Poco::XML::InputSource input_source{ss}; return {new Poco::Util::XMLConfiguration{&input_source}}; } void LocalServer::setupUsers() { ConfigurationPtr users_config; if (config().has("users_config") || config().has("config-file") || Poco::File("config.xml").exists()) { const auto users_config_path = config().getString("users_config", config().getString("config-file", "config.xml")); ConfigProcessor config_processor(users_config_path); const auto loaded_config = config_processor.loadConfig(); config_processor.savePreprocessedConfig(loaded_config, config().getString("path", DBMS_DEFAULT_PATH)); users_config = loaded_config.configuration; } else { users_config = getConfigurationFromXMLString(minimal_default_user_xml); } if (users_config) context->setUsersConfig(users_config); else throw Exception("Can't load config for users", ErrorCodes::CANNOT_LOAD_CONFIG); } void LocalServer::cleanup() { // Delete the temporary directory if needed. Just in case, check that it is // in the system temporary directory, not to delete user data if there is a // bug. if (temporary_directory_to_delete) { const auto tmp = std::filesystem::temp_directory_path(); const auto dir = *temporary_directory_to_delete; temporary_directory_to_delete.reset(); if (dir.parent_path() != tmp) { throw Exception(ErrorCodes::LOGICAL_ERROR, "The temporary directory of clickhouse-local '{}' is not inside" " the system temporary directory '{}'. Will not delete it", dir.string(), tmp.string()); } remove_all(dir); } } static void showClientVersion() { std::cout << DBMS_NAME << " client version " << VERSION_STRING << VERSION_OFFICIAL << "." << '\n'; } static std::string getHelpHeader() { return "usage: clickhouse-local [initial table definition] [--query ]\n" "clickhouse-local allows to execute SQL queries on your data files via single command line call." " To do so, initially you need to define your data source and its format." " After you can execute your SQL queries in usual manner.\n" "There are two ways to define initial table keeping your data." " Either just in first query like this:\n" " CREATE TABLE () ENGINE = File(, );\n" "Either through corresponding command line parameters --table --structure --input-format and --file."; } static std::string getHelpFooter() { return "Example printing memory used by each Unix user:\n" "ps aux | tail -n +2 | awk '{ printf(\"%s\\t%s\\n\", $1, $4) }' | " "clickhouse-local -S \"user String, mem Float64\" -q" " \"SELECT user, round(sum(mem), 2) as mem_total FROM table GROUP BY user ORDER" " BY mem_total DESC FORMAT PrettyCompact\""; } void LocalServer::init(int argc, char ** argv) { namespace po = boost::program_options; /// Don't parse options with Poco library, we prefer neat boost::program_options stopOptionsProcessing(); po::options_description description = createOptionsDescription("Main options", getTerminalWidth()); description.add_options() ("help", "produce help message") ("config-file,c", po::value(), "config-file path") ("query,q", po::value(), "query") ("database,d", po::value(), "database") ("table,N", po::value(), "name of the initial table") /// If structure argument is omitted then initial query is not generated ("structure,S", po::value(), "structure of the initial table (list of column and type names)") ("file,f", po::value(), "path to file with data of the initial table (stdin if not specified)") ("input-format", po::value(), "input format of the initial table data") ("format,f", po::value(), "default output format (clickhouse-client compatibility)") ("output-format", po::value(), "default output format") ("stacktrace", "print stack traces of exceptions") ("echo", "print query before execution") ("verbose", "print query and other debugging info") ("logger.log", po::value(), "Log file name") ("logger.level", po::value(), "Log level") ("ignore-error", "do not stop processing if a query failed") ("version,V", "print version information and exit") ; cmd_settings.addProgramOptions(description); /// Parse main commandline options. po::parsed_options parsed = po::command_line_parser(argc, argv).options(description).run(); po::variables_map options; po::store(parsed, options); po::notify(options); if (options.count("version") || options.count("V")) { showClientVersion(); exit(0); } if (options.empty() || options.count("help")) { std::cout << getHelpHeader() << "\n"; std::cout << description << "\n"; std::cout << getHelpFooter() << "\n"; exit(0); } /// Save received data into the internal config. if (options.count("config-file")) config().setString("config-file", options["config-file"].as()); if (options.count("query")) config().setString("query", options["query"].as()); if (options.count("database")) config().setString("default_database", options["database"].as()); if (options.count("table")) config().setString("table-name", options["table"].as()); if (options.count("file")) config().setString("table-file", options["file"].as()); if (options.count("structure")) config().setString("table-structure", options["structure"].as()); if (options.count("input-format")) config().setString("table-data-format", options["input-format"].as()); if (options.count("format")) config().setString("format", options["format"].as()); if (options.count("output-format")) config().setString("output-format", options["output-format"].as()); if (options.count("stacktrace")) config().setBool("stacktrace", true); if (options.count("echo")) config().setBool("echo", true); if (options.count("verbose")) config().setBool("verbose", true); if (options.count("logger.log")) config().setString("logger.log", options["logger.log"].as()); if (options.count("logger.level")) config().setString("logger.level", options["logger.level"].as()); if (options.count("ignore-error")) config().setBool("ignore-error", true); std::vector arguments; for (int arg_num = 1; arg_num < argc; ++arg_num) arguments.emplace_back(argv[arg_num]); argsToConfig(arguments, config(), 100); } void LocalServer::applyCmdOptions() { context->setDefaultFormat(config().getString("output-format", config().getString("format", "TSV"))); applyCmdSettings(); } } #pragma GCC diagnostic ignored "-Wunused-function" #pragma GCC diagnostic ignored "-Wmissing-declarations" int mainEntryClickHouseLocal(int argc, char ** argv) { DB::LocalServer app; try { app.init(argc, argv); return app.run(); } catch (...) { std::cerr << DB::getCurrentExceptionMessage(true) << '\n'; auto code = DB::getCurrentExceptionCode(); return code ? code : 1; } }