#include "LocalServer.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace fs = std::filesystem; namespace DB { namespace ErrorCodes { extern const int BAD_ARGUMENTS; extern const int CANNOT_LOAD_CONFIG; extern const int FILE_ALREADY_EXISTS; extern const int QUERY_WAS_CANCELLED; extern const int INVALID_USAGE_OF_INPUT; } void LocalServer::processError(const String & query) const { if (!is_interactive) return; /// For non-interactive mode process exception only when all queries were executed. if (server_exception) { fmt::print(stderr, "Error on processing query '{}':\n{}\n", query, server_exception->message()); fmt::print(stderr, "\n"); } if (client_exception) { fmt::print(stderr, "Error on processing query '{}':\n{}\n", query, client_exception->message()); fmt::print(stderr, "\n"); } } void LocalServer::processSingleQuery(const String & query_to_execute, ASTPtr parsed_query) { cancelled = false; /// To support previous behaviour of clickhouse-local do not reset first exception in case --ignore-error, /// it needs to be thrown after multiquery is finished (test 00385). But I do not think it is ok to output only /// first exception or whether we need to even rethrow it because there is --ignore-error. if (!ignore_error) server_exception.reset(); auto process_error = [&]() { if (!ignore_error) throw; server_exception = std::make_unique(getCurrentExceptionMessage(true), getCurrentExceptionCode()); have_error = true; }; try { const auto * insert = parsed_query->as(); ASTPtr input_function; if (insert && insert->select) insert->tryFindInputFunction(input_function); /// INSERT query for which data transfer is needed (not an INSERT SELECT or input()) is processed separately. if (insert && (!insert->select || input_function) && !insert->watch) { if (input_function && insert->format.empty()) throw Exception("FORMAT must be specified for function input()", ErrorCodes::INVALID_USAGE_OF_INPUT); processInsertQuery(query_to_execute, parsed_query); } else processOrdinaryQuery(query_to_execute, parsed_query); } catch (const Exception & e) { if (is_interactive && e.code() == ErrorCodes::QUERY_WAS_CANCELLED) std::cout << "Query was cancelled." << std::endl; else process_error(); } catch (...) { process_error(); } } bool LocalServer::processMultiQuery(const String & all_queries_text) { bool echo_query = echo_queries; /// Several queries separated by ';'. /// INSERT data is ended by the end of line, not ';'. /// An exception is VALUES format where we also support semicolon in /// addition to end of line. const char * this_query_begin = all_queries_text.data(); const char * this_query_end; const char * all_queries_end = all_queries_text.data() + all_queries_text.size(); String full_query; // full_query is the query + inline INSERT data + trailing comments (the latter is our best guess for now). String query_to_execute; ASTPtr parsed_query; std::optional current_exception; while (true) { auto stage = analyzeMultiQueryText(this_query_begin, this_query_end, all_queries_end, query_to_execute, parsed_query, all_queries_text, current_exception); switch (stage) { case MultiQueryProcessingStage::QUERIES_END: case MultiQueryProcessingStage::PARSING_FAILED: { return true; } case MultiQueryProcessingStage::CONTINUE_PARSING: { continue; } case MultiQueryProcessingStage::PARSING_EXCEPTION: { this_query_end = find_first_symbols<'\n'>(this_query_end, all_queries_end); this_query_begin = this_query_end; /// It's expected syntax error, skip the line current_exception.reset(); continue; } case MultiQueryProcessingStage::EXECUTE_QUERY: { full_query = all_queries_text.substr(this_query_begin - all_queries_text.data(), this_query_end - this_query_begin); try { processParsedSingleQuery(full_query, query_to_execute, parsed_query, echo_query, false); } catch (...) { // Surprisingly, this is a client error. A server error would // have been reported w/o throwing (see onReceiveSeverException()). client_exception = std::make_unique(getCurrentExceptionMessage(true), getCurrentExceptionCode()); have_error = true; } // For INSERTs with inline data: use the end of inline data as // reported by the format parser (it is saved in sendData()). // This allows us to handle queries like: // insert into t values (1); select 1 // , where the inline data is delimited by semicolon and not by a // newline. auto * insert_ast = parsed_query->as(); if (insert_ast && insert_ast->data) { this_query_end = insert_ast->end; adjustQueryEnd(this_query_end, all_queries_end, global_context->getSettingsRef().max_parser_depth); } // Report error. if (have_error) processError(full_query); // Stop processing queries if needed. if (have_error && !ignore_error) return is_interactive; this_query_begin = this_query_end; break; } } } } void LocalServer::initialize(Poco::Util::Application & self) { Poco::Util::Application::initialize(self); /// Load config files if exists if (config().has("config-file") || fs::exists("config.xml")) { const auto config_path = config().getString("config-file", "config.xml"); ConfigProcessor config_processor(config_path, false, true); config_processor.setConfigPath(fs::path(config_path).parent_path()); auto loaded_config = config_processor.loadConfig(); config_processor.savePreprocessedConfig(loaded_config, loaded_config.configuration->getString("path", ".")); config().add(loaded_config.configuration.duplicate(), PRIO_DEFAULT, false); } if (config().has("logger.console") || config().has("logger.level") || config().has("logger.log")) { // force enable logging config().setString("logger", "logger"); // sensitive data rules are not used here buildLoggers(config(), logger(), "clickhouse-local"); } else { // Turn off server logging to stderr if (!config().has("verbose")) { Poco::Logger::root().setLevel("none"); Poco::Logger::root().setChannel(Poco::AutoPtr(new Poco::NullChannel())); } } } /// If path is specified and not empty, will try to setup server environment and load existing metadata void LocalServer::tryInitPath() { std::string path; if (config().has("path")) { // User-supplied path. path = config().getString("path"); Poco::trimInPlace(path); if (path.empty()) { throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot work with empty storage path that is explicitly specified" " by the --path option. Please check the program options and" " correct the --path."); } } else { // The path is not provided explicitly - use a unique path in the system temporary directory // (or in the current dir if temporary don't exist) Poco::Logger * log = &logger(); std::filesystem::path parent_folder; std::filesystem::path default_path; try { // try to guess a tmp folder name, and check if it's a directory (throw exception otherwise) parent_folder = std::filesystem::temp_directory_path(); } catch (const fs::filesystem_error& e) { // tmp folder don't exists? misconfiguration? chroot? LOG_DEBUG(log, "Can not get temporary folder: {}", e.what()); parent_folder = std::filesystem::current_path(); std::filesystem::is_directory(parent_folder); // that will throw an exception if it's not a directory LOG_DEBUG(log, "Will create working directory inside current directory: {}", parent_folder.string()); } /// we can have another clickhouse-local running simultaneously, even with the same PID (for ex. - several dockers mounting the same folder) /// or it can be some leftovers from other clickhouse-local runs /// as we can't accurately distinguish those situations we don't touch any existent folders /// we just try to pick some free name for our working folder default_path = parent_folder / fmt::format("clickhouse-local-{}-{}-{}", getpid(), time(nullptr), randomSeed()); if (exists(default_path)) throw Exception(ErrorCodes::FILE_ALREADY_EXISTS, "Unsuccessful attempt to create working directory: {} exist!", default_path.string()); create_directory(default_path); temporary_directory_to_delete = default_path; path = default_path.string(); LOG_DEBUG(log, "Working directory created: {}", path); } if (path.back() != '/') path += '/'; global_context->setPath(path); global_context->setTemporaryStorage(path + "tmp"); global_context->setFlagsPath(path + "flags"); global_context->setUserFilesPath(""); // user's files are everywhere } static void attachSystemTables(ContextPtr context) { DatabasePtr system_database = DatabaseCatalog::instance().tryGetDatabase(DatabaseCatalog::SYSTEM_DATABASE); if (!system_database) { /// TODO: add attachTableDelayed into DatabaseMemory to speedup loading system_database = std::make_shared(DatabaseCatalog::SYSTEM_DATABASE, context); DatabaseCatalog::instance().attachDatabase(DatabaseCatalog::SYSTEM_DATABASE, system_database); } attachSystemTablesLocal(*system_database); } void LocalServer::cleanup() { // Delete the temporary directory if needed. if (temporary_directory_to_delete) { const auto dir = *temporary_directory_to_delete; temporary_directory_to_delete.reset(); LOG_DEBUG(&logger(), "Removing temporary directory: {}", dir.string()); remove_all(dir); } } std::string LocalServer::getInitialCreateTableQuery() { if (!config().has("table-structure")) return {}; auto table_name = backQuoteIfNeed(config().getString("table-name", "table")); auto table_structure = config().getString("table-structure"); auto data_format = backQuoteIfNeed(config().getString("table-data-format", "TSV")); String table_file; if (!config().has("table-file") || config().getString("table-file") == "-") { /// Use Unix tools stdin naming convention table_file = "stdin"; } else { /// Use regular file table_file = quoteString(config().getString("table-file")); } return fmt::format("CREATE TABLE {} ({}) ENGINE = File({}, {});", table_name, table_structure, data_format, table_file); } void LocalServer::loadSuggestionData(Suggest & suggest) { if (is_interactive && !config().getBool("disable_suggestion", false)) suggest.load(global_context); } static ConfigurationPtr getConfigurationFromXMLString(const char * xml_data) { std::stringstream ss{std::string{xml_data}}; // STYLE_CHECK_ALLOW_STD_STRING_STREAM Poco::XML::InputSource input_source{ss}; return {new Poco::Util::XMLConfiguration{&input_source}}; } void LocalServer::setupUsers() { static const char * minimal_default_user_xml = "" " " " " " " " " " " " " " " " ::/0" " " " default" " default" " " " " " " " " " " ""; ConfigurationPtr users_config; if (config().has("users_config") || config().has("config-file") || fs::exists("config.xml")) { const auto users_config_path = config().getString("users_config", config().getString("config-file", "config.xml")); ConfigProcessor config_processor(users_config_path); const auto loaded_config = config_processor.loadConfig(); config_processor.savePreprocessedConfig(loaded_config, config().getString("path", DBMS_DEFAULT_PATH)); users_config = loaded_config.configuration; } else { users_config = getConfigurationFromXMLString(minimal_default_user_xml); } if (users_config) global_context->setUsersConfig(users_config); else throw Exception("Can't load config for users", ErrorCodes::CANNOT_LOAD_CONFIG); } String LocalServer::getQueryTextPrefix() { return getInitialCreateTableQuery(); } int LocalServer::mainImpl() try { ThreadStatus thread_status; /// We will terminate process on error static KillingErrorHandler error_handler; Poco::ErrorHandler::set(&error_handler); /// Don't initialize DateLUT registerFunctions(); registerAggregateFunctions(); registerTableFunctions(); registerStorages(); registerDictionaries(); registerDisks(); registerFormats(); processConfig(); applyCmdSettings(global_context); connection_parameters = ConnectionParameters(config()); connection = std::make_unique(global_context); /// Use the same query_id (and thread group) for all queries connect(); if (is_interactive) { std::cout << std::endl; runInteractive(); } else { runNonInteractive(); if (server_exception) server_exception->rethrow(); } connection.reset(); global_context->shutdown(); global_context.reset(); status.reset(); cleanup(); return Application::EXIT_OK; } catch (const Exception & e) { try { cleanup(); } catch (...) { tryLogCurrentException(__PRETTY_FUNCTION__); } if (!ignore_error) std::cerr << getCurrentExceptionMessage(config().hasOption("stacktrace")) << '\n'; /// If exception code isn't zero, we should return non-zero return code anyway. return e.code() ? e.code() : -1; } void LocalServer::processConfig() { if (stdin_is_a_tty && !config().has("query") && !config().has("table-structure") && queries_files.empty()) { if (config().has("query") && config().has("queries-file")) throw Exception("Specify either `query` or `queries-file` option", ErrorCodes::BAD_ARGUMENTS); is_interactive = true; if (config().has("multiquery")) is_multiquery = true; } else { need_render_progress = config().getBool("progress", false); echo_queries = config().hasOption("echo") || config().hasOption("verbose"); ignore_error = config().getBool("ignore-error", false); is_multiquery = true; } shared_context = Context::createShared(); global_context = Context::createGlobal(shared_context.get()); global_context->makeGlobalContext(); global_context->setApplicationType(Context::ApplicationType::LOCAL); tryInitPath(); Poco::Logger * log = &logger(); /// Maybe useless if (config().has("macros")) global_context->setMacros(std::make_unique(config(), "macros", log)); format = config().getString("output-format", config().getString("format", is_interactive ? "PrettyCompact" : "TSV")); insert_format = "Values"; /// Setting value from cmd arg overrides one from config if (global_context->getSettingsRef().max_insert_block_size.changed) insert_format_max_block_size = global_context->getSettingsRef().max_insert_block_size; else insert_format_max_block_size = config().getInt("insert_format_max_block_size", global_context->getSettingsRef().max_insert_block_size); /// Skip networking /// Sets external authenticators config (LDAP, Kerberos). global_context->setExternalAuthenticatorsConfig(config()); setupUsers(); /// Limit on total number of concurrently executing queries. /// There is no need for concurrent queries, override max_concurrent_queries. global_context->getProcessList().setMaxSize(0); /// Size of cache for uncompressed blocks. Zero means disabled. size_t uncompressed_cache_size = config().getUInt64("uncompressed_cache_size", 0); if (uncompressed_cache_size) global_context->setUncompressedCache(uncompressed_cache_size); /// Size of cache for marks (index of MergeTree family of tables). It is necessary. /// Specify default value for mark_cache_size explicitly! size_t mark_cache_size = config().getUInt64("mark_cache_size", 5368709120); if (mark_cache_size) global_context->setMarkCache(mark_cache_size); /// A cache for mmapped files. size_t mmap_cache_size = config().getUInt64("mmap_cache_size", 1000); /// The choice of default is arbitrary. if (mmap_cache_size) global_context->setMMappedFileCache(mmap_cache_size); /// Load global settings from default_profile and system_profile. global_context->setDefaultProfiles(config()); /** Init dummy default DB * NOTE: We force using isolated default database to avoid conflicts with default database from server environment * Otherwise, metadata of temporary File(format, EXPLICIT_PATH) tables will pollute metadata/ directory; * if such tables will not be dropped, clickhouse-server will not be able to load them due to security reasons. */ std::string default_database = config().getString("default_database", "_local"); DatabaseCatalog::instance().attachDatabase(default_database, std::make_shared(default_database, global_context)); global_context->setCurrentDatabase(default_database); applyCmdOptions(global_context); if (config().has("path")) { String path = global_context->getPath(); /// Lock path directory before read status.emplace(fs::path(path) / "status", StatusFile::write_full_info); LOG_DEBUG(log, "Loading metadata from {}", path); fs::create_directories(fs::path(path) / "data/"); fs::create_directories(fs::path(path) / "metadata/"); loadMetadataSystem(global_context); attachSystemTables(global_context); loadMetadata(global_context); DatabaseCatalog::instance().loadDatabases(); LOG_DEBUG(log, "Loaded metadata."); } else if (!config().has("no-system-tables")) { attachSystemTables(global_context); } server_display_name = config().getString("display_name", getFQDNOrHostName()); prompt_by_server_display_name = config().getRawString("prompt_by_server_display_name.default", "{display_name} :) "); std::map prompt_substitutions{{"display_name", server_display_name}}; for (const auto & [key, value] : prompt_substitutions) boost::replace_all(prompt_by_server_display_name, "{" + key + "}", value); ClientInfo & client_info = global_context->getClientInfo(); client_info.setInitialQuery(); } static std::string getHelpHeader() { return "usage: clickhouse-local [initial table definition] [--query ]\n" "clickhouse-local allows to execute SQL queries on your data files via single command line call." " To do so, initially you need to define your data source and its format." " After you can execute your SQL queries in usual manner.\n" "There are two ways to define initial table keeping your data." " Either just in first query like this:\n" " CREATE TABLE () ENGINE = File(, );\n" "Either through corresponding command line parameters --table --structure --input-format and --file."; } static std::string getHelpFooter() { return "Example printing memory used by each Unix user:\n" "ps aux | tail -n +2 | awk '{ printf(\"%s\\t%s\\n\", $1, $4) }' | " "clickhouse-local -S \"user String, mem Float64\" -q" " \"SELECT user, round(sum(mem), 2) as mem_total FROM table GROUP BY user ORDER" " BY mem_total DESC FORMAT PrettyCompact\""; } void LocalServer::printHelpMessage(const OptionsDescription & options_description) { std::cout << getHelpHeader() << "\n"; std::cout << options_description.main_description.value() << "\n"; std::cout << getHelpFooter() << "\n"; } void LocalServer::addAndCheckOptions(OptionsDescription & options_description, po::variables_map & options, Arguments & arguments) { options_description.main_description->add_options() ("database,d", po::value(), "database") ("table,N", po::value(), "name of the initial table") /// If structure argument is omitted then initial query is not generated ("structure,S", po::value(), "structure of the initial table (list of column and type names)") ("file,f", po::value(), "path to file with data of the initial table (stdin if not specified)") ("input-format", po::value(), "input format of the initial table data") ("output-format", po::value(), "default output format") ("logger.console", po::value()->implicit_value(true), "Log to console") ("logger.log", po::value(), "Log file name") ("logger.level", po::value(), "Log level") ("no-system-tables", "do not attach system tables (better startup time)") ; cmd_settings.addProgramOptions(options_description.main_description.value()); po::parsed_options parsed = po::command_line_parser(arguments).options(options_description.main_description.value()).run(); po::store(parsed, options); } void LocalServer::applyCmdSettings(ContextMutablePtr context) { context->applySettingsChanges(cmd_settings.changes()); } void LocalServer::applyCmdOptions(ContextMutablePtr context) { context->setDefaultFormat(config().getString("output-format", config().getString("format", is_interactive ? "PrettyCompact" : "TSV"))); applyCmdSettings(context); } void LocalServer::readArguments(int argc, char ** argv, Arguments & arguments, std::vector &) { for (int arg_num = 1; arg_num < argc; ++arg_num) arguments.emplace_back(argv[arg_num]); } void LocalServer::processOptions(const OptionsDescription &, const CommandLineOptions & options, const std::vector &) { if (options.count("table")) config().setString("table-name", options["table"].as()); if (options.count("file")) config().setString("table-file", options["file"].as()); if (options.count("structure")) config().setString("table-structure", options["structure"].as()); if (options.count("no-system-tables")) config().setBool("no-system-tables", true); if (options.count("input-format")) config().setString("table-data-format", options["input-format"].as()); if (options.count("output-format")) config().setString("output-format", options["output-format"].as()); if (options.count("logger.console")) config().setBool("logger.console", options["logger.console"].as()); if (options.count("logger.log")) config().setString("logger.log", options["logger.log"].as()); if (options.count("logger.level")) config().setString("logger.level", options["logger.level"].as()); } } #pragma GCC diagnostic ignored "-Wunused-function" #pragma GCC diagnostic ignored "-Wmissing-declarations" int mainEntryClickHouseLocal(int argc, char ** argv) { DB::LocalServer app; try { app.init(argc, argv); return app.run(); } catch (...) { std::cerr << DB::getCurrentExceptionMessage(true) << '\n'; auto code = DB::getCurrentExceptionCode(); return code ? code : 1; } }