mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-19 16:20:50 +00:00
Merge pull request #65669 from ClickHouse/client-refactoring
Miscellaneous and insignificant changes around Client/ClientBase.
This commit is contained in:
commit
191aad6b53
@ -248,6 +248,10 @@ std::vector<String> Client::loadWarningMessages()
|
||||
}
|
||||
}
|
||||
|
||||
Poco::Util::LayeredConfiguration & Client::getClientConfiguration()
|
||||
{
|
||||
return config();
|
||||
}
|
||||
|
||||
void Client::initialize(Poco::Util::Application & self)
|
||||
{
|
||||
@ -697,9 +701,7 @@ bool Client::processWithFuzzing(const String & full_query)
|
||||
const char * begin = full_query.data();
|
||||
orig_ast = parseQuery(begin, begin + full_query.size(),
|
||||
global_context->getSettingsRef(),
|
||||
/*allow_multi_statements=*/ true,
|
||||
/*is_interactive=*/ is_interactive,
|
||||
/*ignore_error=*/ ignore_error);
|
||||
/*allow_multi_statements=*/ true);
|
||||
}
|
||||
catch (const Exception & e)
|
||||
{
|
||||
|
@ -16,6 +16,9 @@ public:
|
||||
int main(const std::vector<String> & /*args*/) override;
|
||||
|
||||
protected:
|
||||
|
||||
Poco::Util::LayeredConfiguration & getClientConfiguration() override;
|
||||
|
||||
bool processWithFuzzing(const String & full_query) override;
|
||||
std::optional<bool> processFuzzingStep(const String & query_to_execute, const ASTPtr & parsed_query);
|
||||
|
||||
|
@ -82,6 +82,11 @@ void applySettingsOverridesForLocal(ContextMutablePtr context)
|
||||
context->setSettings(settings);
|
||||
}
|
||||
|
||||
Poco::Util::LayeredConfiguration & LocalServer::getClientConfiguration()
|
||||
{
|
||||
return config();
|
||||
}
|
||||
|
||||
void LocalServer::processError(const String &) const
|
||||
{
|
||||
if (ignore_error)
|
||||
@ -117,19 +122,19 @@ void LocalServer::initialize(Poco::Util::Application & self)
|
||||
Poco::Util::Application::initialize(self);
|
||||
|
||||
/// Load config files if exists
|
||||
if (config().has("config-file") || fs::exists("config.xml"))
|
||||
if (getClientConfiguration().has("config-file") || fs::exists("config.xml"))
|
||||
{
|
||||
const auto config_path = config().getString("config-file", "config.xml");
|
||||
const auto config_path = getClientConfiguration().getString("config-file", "config.xml");
|
||||
ConfigProcessor config_processor(config_path, false, true);
|
||||
ConfigProcessor::setConfigPath(fs::path(config_path).parent_path());
|
||||
auto loaded_config = config_processor.loadConfig();
|
||||
config().add(loaded_config.configuration.duplicate(), PRIO_DEFAULT, false);
|
||||
getClientConfiguration().add(loaded_config.configuration.duplicate(), PRIO_DEFAULT, false);
|
||||
}
|
||||
|
||||
GlobalThreadPool::initialize(
|
||||
config().getUInt("max_thread_pool_size", 10000),
|
||||
config().getUInt("max_thread_pool_free_size", 1000),
|
||||
config().getUInt("thread_pool_queue_size", 10000)
|
||||
getClientConfiguration().getUInt("max_thread_pool_size", 10000),
|
||||
getClientConfiguration().getUInt("max_thread_pool_free_size", 1000),
|
||||
getClientConfiguration().getUInt("thread_pool_queue_size", 10000)
|
||||
);
|
||||
|
||||
#if USE_AZURE_BLOB_STORAGE
|
||||
@ -141,18 +146,18 @@ void LocalServer::initialize(Poco::Util::Application & self)
|
||||
#endif
|
||||
|
||||
getIOThreadPool().initialize(
|
||||
config().getUInt("max_io_thread_pool_size", 100),
|
||||
config().getUInt("max_io_thread_pool_free_size", 0),
|
||||
config().getUInt("io_thread_pool_queue_size", 10000));
|
||||
getClientConfiguration().getUInt("max_io_thread_pool_size", 100),
|
||||
getClientConfiguration().getUInt("max_io_thread_pool_free_size", 0),
|
||||
getClientConfiguration().getUInt("io_thread_pool_queue_size", 10000));
|
||||
|
||||
|
||||
const size_t active_parts_loading_threads = config().getUInt("max_active_parts_loading_thread_pool_size", 64);
|
||||
const size_t active_parts_loading_threads = getClientConfiguration().getUInt("max_active_parts_loading_thread_pool_size", 64);
|
||||
getActivePartsLoadingThreadPool().initialize(
|
||||
active_parts_loading_threads,
|
||||
0, // We don't need any threads one all the parts will be loaded
|
||||
active_parts_loading_threads);
|
||||
|
||||
const size_t outdated_parts_loading_threads = config().getUInt("max_outdated_parts_loading_thread_pool_size", 32);
|
||||
const size_t outdated_parts_loading_threads = getClientConfiguration().getUInt("max_outdated_parts_loading_thread_pool_size", 32);
|
||||
getOutdatedPartsLoadingThreadPool().initialize(
|
||||
outdated_parts_loading_threads,
|
||||
0, // We don't need any threads one all the parts will be loaded
|
||||
@ -160,7 +165,7 @@ void LocalServer::initialize(Poco::Util::Application & self)
|
||||
|
||||
getOutdatedPartsLoadingThreadPool().setMaxTurboThreads(active_parts_loading_threads);
|
||||
|
||||
const size_t unexpected_parts_loading_threads = config().getUInt("max_unexpected_parts_loading_thread_pool_size", 32);
|
||||
const size_t unexpected_parts_loading_threads = getClientConfiguration().getUInt("max_unexpected_parts_loading_thread_pool_size", 32);
|
||||
getUnexpectedPartsLoadingThreadPool().initialize(
|
||||
unexpected_parts_loading_threads,
|
||||
0, // We don't need any threads one all the parts will be loaded
|
||||
@ -168,7 +173,7 @@ void LocalServer::initialize(Poco::Util::Application & self)
|
||||
|
||||
getUnexpectedPartsLoadingThreadPool().setMaxTurboThreads(active_parts_loading_threads);
|
||||
|
||||
const size_t cleanup_threads = config().getUInt("max_parts_cleaning_thread_pool_size", 128);
|
||||
const size_t cleanup_threads = getClientConfiguration().getUInt("max_parts_cleaning_thread_pool_size", 128);
|
||||
getPartsCleaningThreadPool().initialize(
|
||||
cleanup_threads,
|
||||
0, // We don't need any threads one all the parts will be deleted
|
||||
@ -201,10 +206,10 @@ void LocalServer::tryInitPath()
|
||||
{
|
||||
std::string path;
|
||||
|
||||
if (config().has("path"))
|
||||
if (getClientConfiguration().has("path"))
|
||||
{
|
||||
// User-supplied path.
|
||||
path = config().getString("path");
|
||||
path = getClientConfiguration().getString("path");
|
||||
Poco::trimInPlace(path);
|
||||
|
||||
if (path.empty())
|
||||
@ -263,13 +268,13 @@ void LocalServer::tryInitPath()
|
||||
|
||||
global_context->setUserFilesPath(""); /// user's files are everywhere
|
||||
|
||||
std::string user_scripts_path = config().getString("user_scripts_path", fs::path(path) / "user_scripts/");
|
||||
std::string user_scripts_path = getClientConfiguration().getString("user_scripts_path", fs::path(path) / "user_scripts/");
|
||||
global_context->setUserScriptsPath(user_scripts_path);
|
||||
|
||||
/// top_level_domains_lists
|
||||
const std::string & top_level_domains_path = config().getString("top_level_domains_path", fs::path(path) / "top_level_domains/");
|
||||
const std::string & top_level_domains_path = getClientConfiguration().getString("top_level_domains_path", fs::path(path) / "top_level_domains/");
|
||||
if (!top_level_domains_path.empty())
|
||||
TLDListsHolder::getInstance().parseConfig(fs::path(top_level_domains_path) / "", config());
|
||||
TLDListsHolder::getInstance().parseConfig(fs::path(top_level_domains_path) / "", getClientConfiguration());
|
||||
}
|
||||
|
||||
|
||||
@ -311,14 +316,14 @@ void LocalServer::cleanup()
|
||||
|
||||
std::string LocalServer::getInitialCreateTableQuery()
|
||||
{
|
||||
if (!config().has("table-structure") && !config().has("table-file") && !config().has("table-data-format") && (!isRegularFile(STDIN_FILENO) || queries.empty()))
|
||||
if (!getClientConfiguration().has("table-structure") && !getClientConfiguration().has("table-file") && !getClientConfiguration().has("table-data-format") && (!isRegularFile(STDIN_FILENO) || queries.empty()))
|
||||
return {};
|
||||
|
||||
auto table_name = backQuoteIfNeed(config().getString("table-name", "table"));
|
||||
auto table_structure = config().getString("table-structure", "auto");
|
||||
auto table_name = backQuoteIfNeed(getClientConfiguration().getString("table-name", "table"));
|
||||
auto table_structure = getClientConfiguration().getString("table-structure", "auto");
|
||||
|
||||
String table_file;
|
||||
if (!config().has("table-file") || config().getString("table-file") == "-")
|
||||
if (!getClientConfiguration().has("table-file") || getClientConfiguration().getString("table-file") == "-")
|
||||
{
|
||||
/// Use Unix tools stdin naming convention
|
||||
table_file = "stdin";
|
||||
@ -326,7 +331,7 @@ std::string LocalServer::getInitialCreateTableQuery()
|
||||
else
|
||||
{
|
||||
/// Use regular file
|
||||
auto file_name = config().getString("table-file");
|
||||
auto file_name = getClientConfiguration().getString("table-file");
|
||||
table_file = quoteString(file_name);
|
||||
}
|
||||
|
||||
@ -374,18 +379,18 @@ void LocalServer::setupUsers()
|
||||
|
||||
ConfigurationPtr users_config;
|
||||
auto & access_control = global_context->getAccessControl();
|
||||
access_control.setNoPasswordAllowed(config().getBool("allow_no_password", true));
|
||||
access_control.setPlaintextPasswordAllowed(config().getBool("allow_plaintext_password", true));
|
||||
if (config().has("config-file") || fs::exists("config.xml"))
|
||||
access_control.setNoPasswordAllowed(getClientConfiguration().getBool("allow_no_password", true));
|
||||
access_control.setPlaintextPasswordAllowed(getClientConfiguration().getBool("allow_plaintext_password", true));
|
||||
if (getClientConfiguration().has("config-file") || fs::exists("config.xml"))
|
||||
{
|
||||
String config_path = config().getString("config-file", "");
|
||||
bool has_user_directories = config().has("user_directories");
|
||||
String config_path = getClientConfiguration().getString("config-file", "");
|
||||
bool has_user_directories = getClientConfiguration().has("user_directories");
|
||||
const auto config_dir = fs::path{config_path}.remove_filename().string();
|
||||
String users_config_path = config().getString("users_config", "");
|
||||
String users_config_path = getClientConfiguration().getString("users_config", "");
|
||||
|
||||
if (users_config_path.empty() && has_user_directories)
|
||||
{
|
||||
users_config_path = config().getString("user_directories.users_xml.path");
|
||||
users_config_path = getClientConfiguration().getString("user_directories.users_xml.path");
|
||||
if (fs::path(users_config_path).is_relative() && fs::exists(fs::path(config_dir) / users_config_path))
|
||||
users_config_path = fs::path(config_dir) / users_config_path;
|
||||
}
|
||||
@ -409,10 +414,10 @@ void LocalServer::setupUsers()
|
||||
|
||||
void LocalServer::connect()
|
||||
{
|
||||
connection_parameters = ConnectionParameters(config(), "localhost");
|
||||
connection_parameters = ConnectionParameters(getClientConfiguration(), "localhost");
|
||||
|
||||
ReadBuffer * in;
|
||||
auto table_file = config().getString("table-file", "-");
|
||||
auto table_file = getClientConfiguration().getString("table-file", "-");
|
||||
if (table_file == "-" || table_file == "stdin")
|
||||
{
|
||||
in = &std_in;
|
||||
@ -433,7 +438,7 @@ try
|
||||
UseSSL use_ssl;
|
||||
thread_status.emplace();
|
||||
|
||||
StackTrace::setShowAddresses(config().getBool("show_addresses_in_stack_traces", true));
|
||||
StackTrace::setShowAddresses(getClientConfiguration().getBool("show_addresses_in_stack_traces", true));
|
||||
|
||||
setupSignalHandler();
|
||||
|
||||
@ -448,7 +453,7 @@ try
|
||||
|
||||
if (rlim.rlim_cur < rlim.rlim_max)
|
||||
{
|
||||
rlim.rlim_cur = config().getUInt("max_open_files", static_cast<unsigned>(rlim.rlim_max));
|
||||
rlim.rlim_cur = getClientConfiguration().getUInt("max_open_files", static_cast<unsigned>(rlim.rlim_max));
|
||||
int rc = setrlimit(RLIMIT_NOFILE, &rlim);
|
||||
if (rc != 0)
|
||||
std::cerr << fmt::format("Cannot set max number of file descriptors to {}. Try to specify max_open_files according to your system limits. error: {}", rlim.rlim_cur, errnoToString()) << '\n';
|
||||
@ -456,8 +461,8 @@ try
|
||||
}
|
||||
|
||||
is_interactive = stdin_is_a_tty
|
||||
&& (config().hasOption("interactive")
|
||||
|| (queries.empty() && !config().has("table-structure") && queries_files.empty() && !config().has("table-file")));
|
||||
&& (getClientConfiguration().hasOption("interactive")
|
||||
|| (queries.empty() && !getClientConfiguration().has("table-structure") && queries_files.empty() && !getClientConfiguration().has("table-file")));
|
||||
|
||||
if (!is_interactive)
|
||||
{
|
||||
@ -481,7 +486,7 @@ try
|
||||
|
||||
SCOPE_EXIT({ cleanup(); });
|
||||
|
||||
initTTYBuffer(toProgressOption(config().getString("progress", "default")));
|
||||
initTTYBuffer(toProgressOption(getClientConfiguration().getString("progress", "default")));
|
||||
ASTAlterCommand::setFormatAlterCommandsWithParentheses(true);
|
||||
|
||||
applyCmdSettings(global_context);
|
||||
@ -489,7 +494,7 @@ try
|
||||
/// try to load user defined executable functions, throw on error and die
|
||||
try
|
||||
{
|
||||
global_context->loadOrReloadUserDefinedExecutableFunctions(config());
|
||||
global_context->loadOrReloadUserDefinedExecutableFunctions(getClientConfiguration());
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
@ -530,7 +535,7 @@ try
|
||||
}
|
||||
catch (const DB::Exception & e)
|
||||
{
|
||||
bool need_print_stack_trace = config().getBool("stacktrace", false);
|
||||
bool need_print_stack_trace = getClientConfiguration().getBool("stacktrace", false);
|
||||
std::cerr << getExceptionMessage(e, need_print_stack_trace, true) << std::endl;
|
||||
return e.code() ? e.code() : -1;
|
||||
}
|
||||
@ -542,42 +547,42 @@ catch (...)
|
||||
|
||||
void LocalServer::updateLoggerLevel(const String & logs_level)
|
||||
{
|
||||
config().setString("logger.level", logs_level);
|
||||
updateLevels(config(), logger());
|
||||
getClientConfiguration().setString("logger.level", logs_level);
|
||||
updateLevels(getClientConfiguration(), logger());
|
||||
}
|
||||
|
||||
void LocalServer::processConfig()
|
||||
{
|
||||
if (!queries.empty() && config().has("queries-file"))
|
||||
if (!queries.empty() && getClientConfiguration().has("queries-file"))
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Options '--query' and '--queries-file' cannot be specified at the same time");
|
||||
|
||||
if (config().has("multiquery"))
|
||||
if (getClientConfiguration().has("multiquery"))
|
||||
is_multiquery = true;
|
||||
|
||||
pager = config().getString("pager", "");
|
||||
pager = getClientConfiguration().getString("pager", "");
|
||||
|
||||
delayed_interactive = config().has("interactive") && (!queries.empty() || config().has("queries-file"));
|
||||
delayed_interactive = getClientConfiguration().has("interactive") && (!queries.empty() || getClientConfiguration().has("queries-file"));
|
||||
if (!is_interactive || delayed_interactive)
|
||||
{
|
||||
echo_queries = config().hasOption("echo") || config().hasOption("verbose");
|
||||
ignore_error = config().getBool("ignore-error", false);
|
||||
echo_queries = getClientConfiguration().hasOption("echo") || getClientConfiguration().hasOption("verbose");
|
||||
ignore_error = getClientConfiguration().getBool("ignore-error", false);
|
||||
}
|
||||
|
||||
print_stack_trace = config().getBool("stacktrace", false);
|
||||
print_stack_trace = getClientConfiguration().getBool("stacktrace", false);
|
||||
const std::string clickhouse_dialect{"clickhouse"};
|
||||
load_suggestions = (is_interactive || delayed_interactive) && !config().getBool("disable_suggestion", false)
|
||||
&& config().getString("dialect", clickhouse_dialect) == clickhouse_dialect;
|
||||
wait_for_suggestions_to_load = config().getBool("wait_for_suggestions_to_load", false);
|
||||
load_suggestions = (is_interactive || delayed_interactive) && !getClientConfiguration().getBool("disable_suggestion", false)
|
||||
&& getClientConfiguration().getString("dialect", clickhouse_dialect) == clickhouse_dialect;
|
||||
wait_for_suggestions_to_load = getClientConfiguration().getBool("wait_for_suggestions_to_load", false);
|
||||
|
||||
auto logging = (config().has("logger.console")
|
||||
|| config().has("logger.level")
|
||||
|| config().has("log-level")
|
||||
|| config().has("send_logs_level")
|
||||
|| config().has("logger.log"));
|
||||
auto logging = (getClientConfiguration().has("logger.console")
|
||||
|| getClientConfiguration().has("logger.level")
|
||||
|| getClientConfiguration().has("log-level")
|
||||
|| getClientConfiguration().has("send_logs_level")
|
||||
|| getClientConfiguration().has("logger.log"));
|
||||
|
||||
auto level = config().getString("log-level", "trace");
|
||||
auto level = getClientConfiguration().getString("log-level", "trace");
|
||||
|
||||
if (config().has("server_logs_file"))
|
||||
if (getClientConfiguration().has("server_logs_file"))
|
||||
{
|
||||
auto poco_logs_level = Poco::Logger::parseLevel(level);
|
||||
Poco::Logger::root().setLevel(poco_logs_level);
|
||||
@ -587,10 +592,10 @@ void LocalServer::processConfig()
|
||||
}
|
||||
else
|
||||
{
|
||||
config().setString("logger", "logger");
|
||||
getClientConfiguration().setString("logger", "logger");
|
||||
auto log_level_default = logging ? level : "fatal";
|
||||
config().setString("logger.level", config().getString("log-level", config().getString("send_logs_level", log_level_default)));
|
||||
buildLoggers(config(), logger(), "clickhouse-local");
|
||||
getClientConfiguration().setString("logger.level", getClientConfiguration().getString("log-level", getClientConfiguration().getString("send_logs_level", log_level_default)));
|
||||
buildLoggers(getClientConfiguration(), logger(), "clickhouse-local");
|
||||
}
|
||||
|
||||
shared_context = Context::createShared();
|
||||
@ -604,13 +609,13 @@ void LocalServer::processConfig()
|
||||
LoggerRawPtr log = &logger();
|
||||
|
||||
/// Maybe useless
|
||||
if (config().has("macros"))
|
||||
global_context->setMacros(std::make_unique<Macros>(config(), "macros", log));
|
||||
if (getClientConfiguration().has("macros"))
|
||||
global_context->setMacros(std::make_unique<Macros>(getClientConfiguration(), "macros", log));
|
||||
|
||||
setDefaultFormatsAndCompressionFromConfiguration();
|
||||
|
||||
/// Sets external authenticators config (LDAP, Kerberos).
|
||||
global_context->setExternalAuthenticatorsConfig(config());
|
||||
global_context->setExternalAuthenticatorsConfig(getClientConfiguration());
|
||||
|
||||
setupUsers();
|
||||
|
||||
@ -619,12 +624,12 @@ void LocalServer::processConfig()
|
||||
global_context->getProcessList().setMaxSize(0);
|
||||
|
||||
const size_t physical_server_memory = getMemoryAmount();
|
||||
const double cache_size_to_ram_max_ratio = config().getDouble("cache_size_to_ram_max_ratio", 0.5);
|
||||
const double cache_size_to_ram_max_ratio = getClientConfiguration().getDouble("cache_size_to_ram_max_ratio", 0.5);
|
||||
const size_t max_cache_size = static_cast<size_t>(physical_server_memory * cache_size_to_ram_max_ratio);
|
||||
|
||||
String uncompressed_cache_policy = config().getString("uncompressed_cache_policy", DEFAULT_UNCOMPRESSED_CACHE_POLICY);
|
||||
size_t uncompressed_cache_size = config().getUInt64("uncompressed_cache_size", DEFAULT_UNCOMPRESSED_CACHE_MAX_SIZE);
|
||||
double uncompressed_cache_size_ratio = config().getDouble("uncompressed_cache_size_ratio", DEFAULT_UNCOMPRESSED_CACHE_SIZE_RATIO);
|
||||
String uncompressed_cache_policy = getClientConfiguration().getString("uncompressed_cache_policy", DEFAULT_UNCOMPRESSED_CACHE_POLICY);
|
||||
size_t uncompressed_cache_size = getClientConfiguration().getUInt64("uncompressed_cache_size", DEFAULT_UNCOMPRESSED_CACHE_MAX_SIZE);
|
||||
double uncompressed_cache_size_ratio = getClientConfiguration().getDouble("uncompressed_cache_size_ratio", DEFAULT_UNCOMPRESSED_CACHE_SIZE_RATIO);
|
||||
if (uncompressed_cache_size > max_cache_size)
|
||||
{
|
||||
uncompressed_cache_size = max_cache_size;
|
||||
@ -632,9 +637,9 @@ void LocalServer::processConfig()
|
||||
}
|
||||
global_context->setUncompressedCache(uncompressed_cache_policy, uncompressed_cache_size, uncompressed_cache_size_ratio);
|
||||
|
||||
String mark_cache_policy = config().getString("mark_cache_policy", DEFAULT_MARK_CACHE_POLICY);
|
||||
size_t mark_cache_size = config().getUInt64("mark_cache_size", DEFAULT_MARK_CACHE_MAX_SIZE);
|
||||
double mark_cache_size_ratio = config().getDouble("mark_cache_size_ratio", DEFAULT_MARK_CACHE_SIZE_RATIO);
|
||||
String mark_cache_policy = getClientConfiguration().getString("mark_cache_policy", DEFAULT_MARK_CACHE_POLICY);
|
||||
size_t mark_cache_size = getClientConfiguration().getUInt64("mark_cache_size", DEFAULT_MARK_CACHE_MAX_SIZE);
|
||||
double mark_cache_size_ratio = getClientConfiguration().getDouble("mark_cache_size_ratio", DEFAULT_MARK_CACHE_SIZE_RATIO);
|
||||
if (!mark_cache_size)
|
||||
LOG_ERROR(log, "Too low mark cache size will lead to severe performance degradation.");
|
||||
if (mark_cache_size > max_cache_size)
|
||||
@ -644,9 +649,9 @@ void LocalServer::processConfig()
|
||||
}
|
||||
global_context->setMarkCache(mark_cache_policy, mark_cache_size, mark_cache_size_ratio);
|
||||
|
||||
String index_uncompressed_cache_policy = config().getString("index_uncompressed_cache_policy", DEFAULT_INDEX_UNCOMPRESSED_CACHE_POLICY);
|
||||
size_t index_uncompressed_cache_size = config().getUInt64("index_uncompressed_cache_size", DEFAULT_INDEX_UNCOMPRESSED_CACHE_MAX_SIZE);
|
||||
double index_uncompressed_cache_size_ratio = config().getDouble("index_uncompressed_cache_size_ratio", DEFAULT_INDEX_UNCOMPRESSED_CACHE_SIZE_RATIO);
|
||||
String index_uncompressed_cache_policy = getClientConfiguration().getString("index_uncompressed_cache_policy", DEFAULT_INDEX_UNCOMPRESSED_CACHE_POLICY);
|
||||
size_t index_uncompressed_cache_size = getClientConfiguration().getUInt64("index_uncompressed_cache_size", DEFAULT_INDEX_UNCOMPRESSED_CACHE_MAX_SIZE);
|
||||
double index_uncompressed_cache_size_ratio = getClientConfiguration().getDouble("index_uncompressed_cache_size_ratio", DEFAULT_INDEX_UNCOMPRESSED_CACHE_SIZE_RATIO);
|
||||
if (index_uncompressed_cache_size > max_cache_size)
|
||||
{
|
||||
index_uncompressed_cache_size = max_cache_size;
|
||||
@ -654,9 +659,9 @@ void LocalServer::processConfig()
|
||||
}
|
||||
global_context->setIndexUncompressedCache(index_uncompressed_cache_policy, index_uncompressed_cache_size, index_uncompressed_cache_size_ratio);
|
||||
|
||||
String index_mark_cache_policy = config().getString("index_mark_cache_policy", DEFAULT_INDEX_MARK_CACHE_POLICY);
|
||||
size_t index_mark_cache_size = config().getUInt64("index_mark_cache_size", DEFAULT_INDEX_MARK_CACHE_MAX_SIZE);
|
||||
double index_mark_cache_size_ratio = config().getDouble("index_mark_cache_size_ratio", DEFAULT_INDEX_MARK_CACHE_SIZE_RATIO);
|
||||
String index_mark_cache_policy = getClientConfiguration().getString("index_mark_cache_policy", DEFAULT_INDEX_MARK_CACHE_POLICY);
|
||||
size_t index_mark_cache_size = getClientConfiguration().getUInt64("index_mark_cache_size", DEFAULT_INDEX_MARK_CACHE_MAX_SIZE);
|
||||
double index_mark_cache_size_ratio = getClientConfiguration().getDouble("index_mark_cache_size_ratio", DEFAULT_INDEX_MARK_CACHE_SIZE_RATIO);
|
||||
if (index_mark_cache_size > max_cache_size)
|
||||
{
|
||||
index_mark_cache_size = max_cache_size;
|
||||
@ -664,7 +669,7 @@ void LocalServer::processConfig()
|
||||
}
|
||||
global_context->setIndexMarkCache(index_mark_cache_policy, index_mark_cache_size, index_mark_cache_size_ratio);
|
||||
|
||||
size_t mmap_cache_size = config().getUInt64("mmap_cache_size", DEFAULT_MMAP_CACHE_MAX_SIZE);
|
||||
size_t mmap_cache_size = getClientConfiguration().getUInt64("mmap_cache_size", DEFAULT_MMAP_CACHE_MAX_SIZE);
|
||||
if (mmap_cache_size > max_cache_size)
|
||||
{
|
||||
mmap_cache_size = max_cache_size;
|
||||
@ -676,8 +681,8 @@ void LocalServer::processConfig()
|
||||
global_context->setQueryCache(0, 0, 0, 0);
|
||||
|
||||
#if USE_EMBEDDED_COMPILER
|
||||
size_t compiled_expression_cache_max_size_in_bytes = config().getUInt64("compiled_expression_cache_size", DEFAULT_COMPILED_EXPRESSION_CACHE_MAX_SIZE);
|
||||
size_t compiled_expression_cache_max_elements = config().getUInt64("compiled_expression_cache_elements_size", DEFAULT_COMPILED_EXPRESSION_CACHE_MAX_ENTRIES);
|
||||
size_t compiled_expression_cache_max_size_in_bytes = getClientConfiguration().getUInt64("compiled_expression_cache_size", DEFAULT_COMPILED_EXPRESSION_CACHE_MAX_SIZE);
|
||||
size_t compiled_expression_cache_max_elements = getClientConfiguration().getUInt64("compiled_expression_cache_elements_size", DEFAULT_COMPILED_EXPRESSION_CACHE_MAX_ENTRIES);
|
||||
CompiledExpressionCacheFactory::instance().init(compiled_expression_cache_max_size_in_bytes, compiled_expression_cache_max_elements);
|
||||
#endif
|
||||
|
||||
@ -689,16 +694,16 @@ void LocalServer::processConfig()
|
||||
applyCmdOptions(global_context);
|
||||
|
||||
/// Load global settings from default_profile and system_profile.
|
||||
global_context->setDefaultProfiles(config());
|
||||
global_context->setDefaultProfiles(getClientConfiguration());
|
||||
|
||||
/// We load temporary database first, because projections need it.
|
||||
DatabaseCatalog::instance().initializeAndLoadTemporaryDatabase();
|
||||
|
||||
std::string default_database = config().getString("default_database", "default");
|
||||
std::string default_database = getClientConfiguration().getString("default_database", "default");
|
||||
DatabaseCatalog::instance().attachDatabase(default_database, createClickHouseLocalDatabaseOverlay(default_database, global_context));
|
||||
global_context->setCurrentDatabase(default_database);
|
||||
|
||||
if (config().has("path"))
|
||||
if (getClientConfiguration().has("path"))
|
||||
{
|
||||
String path = global_context->getPath();
|
||||
fs::create_directories(fs::path(path));
|
||||
@ -713,7 +718,7 @@ void LocalServer::processConfig()
|
||||
attachInformationSchema(global_context, *createMemoryDatabaseIfNotExists(global_context, DatabaseCatalog::INFORMATION_SCHEMA_UPPERCASE));
|
||||
waitLoad(TablesLoaderForegroundPoolId, startup_system_tasks);
|
||||
|
||||
if (!config().has("only-system-tables"))
|
||||
if (!getClientConfiguration().has("only-system-tables"))
|
||||
{
|
||||
DatabaseCatalog::instance().createBackgroundTasks();
|
||||
waitLoad(loadMetadata(global_context));
|
||||
@ -725,15 +730,15 @@ void LocalServer::processConfig()
|
||||
|
||||
LOG_DEBUG(log, "Loaded metadata.");
|
||||
}
|
||||
else if (!config().has("no-system-tables"))
|
||||
else if (!getClientConfiguration().has("no-system-tables"))
|
||||
{
|
||||
attachSystemTablesServer(global_context, *createMemoryDatabaseIfNotExists(global_context, DatabaseCatalog::SYSTEM_DATABASE), false);
|
||||
attachInformationSchema(global_context, *createMemoryDatabaseIfNotExists(global_context, DatabaseCatalog::INFORMATION_SCHEMA));
|
||||
attachInformationSchema(global_context, *createMemoryDatabaseIfNotExists(global_context, DatabaseCatalog::INFORMATION_SCHEMA_UPPERCASE));
|
||||
}
|
||||
|
||||
server_display_name = config().getString("display_name", "");
|
||||
prompt_by_server_display_name = config().getRawString("prompt_by_server_display_name.default", ":) ");
|
||||
server_display_name = getClientConfiguration().getString("display_name", "");
|
||||
prompt_by_server_display_name = getClientConfiguration().getRawString("prompt_by_server_display_name.default", ":) ");
|
||||
|
||||
global_context->setQueryKindInitial();
|
||||
global_context->setQueryKind(query_kind);
|
||||
@ -811,7 +816,7 @@ void LocalServer::applyCmdSettings(ContextMutablePtr context)
|
||||
|
||||
void LocalServer::applyCmdOptions(ContextMutablePtr context)
|
||||
{
|
||||
context->setDefaultFormat(config().getString("output-format", config().getString("format", is_interactive ? "PrettyCompact" : "TSV")));
|
||||
context->setDefaultFormat(getClientConfiguration().getString("output-format", getClientConfiguration().getString("format", is_interactive ? "PrettyCompact" : "TSV")));
|
||||
applyCmdSettings(context);
|
||||
}
|
||||
|
||||
@ -819,33 +824,33 @@ void LocalServer::applyCmdOptions(ContextMutablePtr context)
|
||||
void LocalServer::processOptions(const OptionsDescription &, const CommandLineOptions & options, const std::vector<Arguments> &, const std::vector<Arguments> &)
|
||||
{
|
||||
if (options.count("table"))
|
||||
config().setString("table-name", options["table"].as<std::string>());
|
||||
getClientConfiguration().setString("table-name", options["table"].as<std::string>());
|
||||
if (options.count("file"))
|
||||
config().setString("table-file", options["file"].as<std::string>());
|
||||
getClientConfiguration().setString("table-file", options["file"].as<std::string>());
|
||||
if (options.count("structure"))
|
||||
config().setString("table-structure", options["structure"].as<std::string>());
|
||||
getClientConfiguration().setString("table-structure", options["structure"].as<std::string>());
|
||||
if (options.count("no-system-tables"))
|
||||
config().setBool("no-system-tables", true);
|
||||
getClientConfiguration().setBool("no-system-tables", true);
|
||||
if (options.count("only-system-tables"))
|
||||
config().setBool("only-system-tables", true);
|
||||
getClientConfiguration().setBool("only-system-tables", true);
|
||||
if (options.count("database"))
|
||||
config().setString("default_database", options["database"].as<std::string>());
|
||||
getClientConfiguration().setString("default_database", options["database"].as<std::string>());
|
||||
|
||||
if (options.count("input-format"))
|
||||
config().setString("table-data-format", options["input-format"].as<std::string>());
|
||||
getClientConfiguration().setString("table-data-format", options["input-format"].as<std::string>());
|
||||
if (options.count("output-format"))
|
||||
config().setString("output-format", options["output-format"].as<std::string>());
|
||||
getClientConfiguration().setString("output-format", options["output-format"].as<std::string>());
|
||||
|
||||
if (options.count("logger.console"))
|
||||
config().setBool("logger.console", options["logger.console"].as<bool>());
|
||||
getClientConfiguration().setBool("logger.console", options["logger.console"].as<bool>());
|
||||
if (options.count("logger.log"))
|
||||
config().setString("logger.log", options["logger.log"].as<std::string>());
|
||||
getClientConfiguration().setString("logger.log", options["logger.log"].as<std::string>());
|
||||
if (options.count("logger.level"))
|
||||
config().setString("logger.level", options["logger.level"].as<std::string>());
|
||||
getClientConfiguration().setString("logger.level", options["logger.level"].as<std::string>());
|
||||
if (options.count("send_logs_level"))
|
||||
config().setString("send_logs_level", options["send_logs_level"].as<std::string>());
|
||||
getClientConfiguration().setString("send_logs_level", options["send_logs_level"].as<std::string>());
|
||||
if (options.count("wait_for_suggestions_to_load"))
|
||||
config().setBool("wait_for_suggestions_to_load", true);
|
||||
getClientConfiguration().setBool("wait_for_suggestions_to_load", true);
|
||||
}
|
||||
|
||||
void LocalServer::readArguments(int argc, char ** argv, Arguments & common_arguments, std::vector<Arguments> &, std::vector<Arguments> &)
|
||||
|
@ -30,6 +30,9 @@ public:
|
||||
int main(const std::vector<String> & /*args*/) override;
|
||||
|
||||
protected:
|
||||
|
||||
Poco::Util::LayeredConfiguration & getClientConfiguration() override;
|
||||
|
||||
void connect() override;
|
||||
|
||||
void processError(const String & query) const override;
|
||||
|
@ -302,8 +302,29 @@ public:
|
||||
|
||||
|
||||
ClientBase::~ClientBase() = default;
|
||||
ClientBase::ClientBase() = default;
|
||||
|
||||
ClientBase::ClientBase(
|
||||
int in_fd_,
|
||||
int out_fd_,
|
||||
int err_fd_,
|
||||
std::istream & input_stream_,
|
||||
std::ostream & output_stream_,
|
||||
std::ostream & error_stream_
|
||||
)
|
||||
: std_in(in_fd_)
|
||||
, std_out(out_fd_)
|
||||
, progress_indication(output_stream_, in_fd_, err_fd_)
|
||||
, in_fd(in_fd_)
|
||||
, out_fd(out_fd_)
|
||||
, err_fd(err_fd_)
|
||||
, input_stream(input_stream_)
|
||||
, output_stream(output_stream_)
|
||||
, error_stream(error_stream_)
|
||||
{
|
||||
stdin_is_a_tty = isatty(in_fd);
|
||||
stdout_is_a_tty = isatty(out_fd);
|
||||
stderr_is_a_tty = isatty(err_fd);
|
||||
terminal_width = getTerminalWidth(in_fd, err_fd);
|
||||
}
|
||||
|
||||
void ClientBase::setupSignalHandler()
|
||||
{
|
||||
@ -330,7 +351,7 @@ void ClientBase::setupSignalHandler()
|
||||
}
|
||||
|
||||
|
||||
ASTPtr ClientBase::parseQuery(const char *& pos, const char * end, const Settings & settings, bool allow_multi_statements, bool is_interactive, bool ignore_error)
|
||||
ASTPtr ClientBase::parseQuery(const char *& pos, const char * end, const Settings & settings, bool allow_multi_statements)
|
||||
{
|
||||
std::unique_ptr<IParserBase> parser;
|
||||
ASTPtr res;
|
||||
@ -359,7 +380,7 @@ ASTPtr ClientBase::parseQuery(const char *& pos, const char * end, const Setting
|
||||
|
||||
if (!res)
|
||||
{
|
||||
std::cerr << std::endl << message << std::endl << std::endl;
|
||||
error_stream << std::endl << message << std::endl << std::endl;
|
||||
return nullptr;
|
||||
}
|
||||
}
|
||||
@ -373,11 +394,11 @@ ASTPtr ClientBase::parseQuery(const char *& pos, const char * end, const Setting
|
||||
|
||||
if (is_interactive)
|
||||
{
|
||||
std::cout << std::endl;
|
||||
WriteBufferFromOStream res_buf(std::cout, 4096);
|
||||
output_stream << std::endl;
|
||||
WriteBufferFromOStream res_buf(output_stream, 4096);
|
||||
formatAST(*res, res_buf);
|
||||
res_buf.finalize();
|
||||
std::cout << std::endl << std::endl;
|
||||
output_stream << std::endl << std::endl;
|
||||
}
|
||||
|
||||
return res;
|
||||
@ -481,7 +502,7 @@ void ClientBase::onData(Block & block, ASTPtr parsed_query)
|
||||
if (need_render_progress && tty_buf)
|
||||
{
|
||||
if (select_into_file && !select_into_file_and_stdout)
|
||||
std::cerr << "\r";
|
||||
error_stream << "\r";
|
||||
progress_indication.writeProgress(*tty_buf);
|
||||
}
|
||||
}
|
||||
@ -741,17 +762,17 @@ bool ClientBase::isRegularFile(int fd)
|
||||
|
||||
void ClientBase::setDefaultFormatsAndCompressionFromConfiguration()
|
||||
{
|
||||
if (config().has("output-format"))
|
||||
if (getClientConfiguration().has("output-format"))
|
||||
{
|
||||
default_output_format = config().getString("output-format");
|
||||
default_output_format = getClientConfiguration().getString("output-format");
|
||||
is_default_format = false;
|
||||
}
|
||||
else if (config().has("format"))
|
||||
else if (getClientConfiguration().has("format"))
|
||||
{
|
||||
default_output_format = config().getString("format");
|
||||
default_output_format = getClientConfiguration().getString("format");
|
||||
is_default_format = false;
|
||||
}
|
||||
else if (config().has("vertical"))
|
||||
else if (getClientConfiguration().has("vertical"))
|
||||
{
|
||||
default_output_format = "Vertical";
|
||||
is_default_format = false;
|
||||
@ -777,17 +798,17 @@ void ClientBase::setDefaultFormatsAndCompressionFromConfiguration()
|
||||
default_output_format = "TSV";
|
||||
}
|
||||
|
||||
if (config().has("input-format"))
|
||||
if (getClientConfiguration().has("input-format"))
|
||||
{
|
||||
default_input_format = config().getString("input-format");
|
||||
default_input_format = getClientConfiguration().getString("input-format");
|
||||
}
|
||||
else if (config().has("format"))
|
||||
else if (getClientConfiguration().has("format"))
|
||||
{
|
||||
default_input_format = config().getString("format");
|
||||
default_input_format = getClientConfiguration().getString("format");
|
||||
}
|
||||
else if (config().getString("table-file", "-") != "-")
|
||||
else if (getClientConfiguration().getString("table-file", "-") != "-")
|
||||
{
|
||||
auto file_name = config().getString("table-file");
|
||||
auto file_name = getClientConfiguration().getString("table-file");
|
||||
std::optional<String> format_from_file_name = FormatFactory::instance().tryGetFormatFromFileName(file_name);
|
||||
if (format_from_file_name)
|
||||
default_input_format = *format_from_file_name;
|
||||
@ -803,7 +824,7 @@ void ClientBase::setDefaultFormatsAndCompressionFromConfiguration()
|
||||
default_input_format = "TSV";
|
||||
}
|
||||
|
||||
format_max_block_size = config().getUInt64("format_max_block_size",
|
||||
format_max_block_size = getClientConfiguration().getUInt64("format_max_block_size",
|
||||
global_context->getSettingsRef().max_block_size);
|
||||
|
||||
/// Setting value from cmd arg overrides one from config
|
||||
@ -813,7 +834,7 @@ void ClientBase::setDefaultFormatsAndCompressionFromConfiguration()
|
||||
}
|
||||
else
|
||||
{
|
||||
insert_format_max_block_size = config().getUInt64("insert_format_max_block_size",
|
||||
insert_format_max_block_size = getClientConfiguration().getUInt64("insert_format_max_block_size",
|
||||
global_context->getSettingsRef().max_insert_block_size);
|
||||
}
|
||||
}
|
||||
@ -924,9 +945,7 @@ void ClientBase::processTextAsSingleQuery(const String & full_query)
|
||||
const char * begin = full_query.data();
|
||||
auto parsed_query = parseQuery(begin, begin + full_query.size(),
|
||||
global_context->getSettingsRef(),
|
||||
/*allow_multi_statements=*/ false,
|
||||
is_interactive,
|
||||
ignore_error);
|
||||
/*allow_multi_statements=*/ false);
|
||||
|
||||
if (!parsed_query)
|
||||
return;
|
||||
@ -1100,7 +1119,7 @@ void ClientBase::processOrdinaryQuery(const String & query_to_execute, ASTPtr pa
|
||||
/// has been received yet.
|
||||
if (processed_rows == 0 && e.code() == ErrorCodes::DEADLOCK_AVOIDED && --retries_left)
|
||||
{
|
||||
std::cerr << "Got a transient error from the server, will"
|
||||
error_stream << "Got a transient error from the server, will"
|
||||
<< " retry (" << retries_left << " retries left)";
|
||||
}
|
||||
else
|
||||
@ -1154,7 +1173,7 @@ void ClientBase::receiveResult(ASTPtr parsed_query, Int32 signals_before_stop, b
|
||||
double elapsed = receive_watch.elapsedSeconds();
|
||||
if (break_on_timeout && elapsed > receive_timeout.totalSeconds())
|
||||
{
|
||||
std::cout << "Timeout exceeded while receiving data from server."
|
||||
output_stream << "Timeout exceeded while receiving data from server."
|
||||
<< " Waited for " << static_cast<size_t>(elapsed) << " seconds,"
|
||||
<< " timeout is " << receive_timeout.totalSeconds() << " seconds." << std::endl;
|
||||
|
||||
@ -1189,7 +1208,7 @@ void ClientBase::receiveResult(ASTPtr parsed_query, Int32 signals_before_stop, b
|
||||
|
||||
if (cancelled && is_interactive)
|
||||
{
|
||||
std::cout << "Query was cancelled." << std::endl;
|
||||
output_stream << "Query was cancelled." << std::endl;
|
||||
cancelled_printed = true;
|
||||
}
|
||||
}
|
||||
@ -1308,9 +1327,9 @@ void ClientBase::onEndOfStream()
|
||||
if (is_interactive)
|
||||
{
|
||||
if (cancelled && !cancelled_printed)
|
||||
std::cout << "Query was cancelled." << std::endl;
|
||||
output_stream << "Query was cancelled." << std::endl;
|
||||
else if (!written_first_block)
|
||||
std::cout << "Ok." << std::endl;
|
||||
output_stream << "Ok." << std::endl;
|
||||
}
|
||||
}
|
||||
|
||||
@ -1863,7 +1882,7 @@ void ClientBase::cancelQuery()
|
||||
progress_indication.clearProgressOutput(*tty_buf);
|
||||
|
||||
if (is_interactive)
|
||||
std::cout << "Cancelling query." << std::endl;
|
||||
output_stream << "Cancelling query." << std::endl;
|
||||
|
||||
cancelled = true;
|
||||
}
|
||||
@ -2026,7 +2045,7 @@ void ClientBase::processParsedSingleQuery(const String & full_query, const Strin
|
||||
{
|
||||
const String & new_database = use_query->getDatabase();
|
||||
/// If the client initiates the reconnection, it takes the settings from the config.
|
||||
config().setString("database", new_database);
|
||||
getClientConfiguration().setString("database", new_database);
|
||||
/// If the connection initiates the reconnection, it uses its variable.
|
||||
connection->setDefaultDatabase(new_database);
|
||||
}
|
||||
@ -2046,21 +2065,21 @@ void ClientBase::processParsedSingleQuery(const String & full_query, const Strin
|
||||
|
||||
if (is_interactive)
|
||||
{
|
||||
std::cout << std::endl;
|
||||
output_stream << std::endl;
|
||||
if (!server_exception || processed_rows != 0)
|
||||
std::cout << processed_rows << " row" << (processed_rows == 1 ? "" : "s") << " in set. ";
|
||||
std::cout << "Elapsed: " << progress_indication.elapsedSeconds() << " sec. ";
|
||||
output_stream << processed_rows << " row" << (processed_rows == 1 ? "" : "s") << " in set. ";
|
||||
output_stream << "Elapsed: " << progress_indication.elapsedSeconds() << " sec. ";
|
||||
progress_indication.writeFinalProgress();
|
||||
std::cout << std::endl << std::endl;
|
||||
output_stream << std::endl << std::endl;
|
||||
}
|
||||
else if (print_time_to_stderr)
|
||||
else if (getClientConfiguration().getBool("print-time-to-stderr", false))
|
||||
{
|
||||
std::cerr << progress_indication.elapsedSeconds() << "\n";
|
||||
error_stream << progress_indication.elapsedSeconds() << "\n";
|
||||
}
|
||||
|
||||
if (!is_interactive && print_num_processed_rows)
|
||||
if (!is_interactive && getClientConfiguration().getBool("print-num-processed-rows", false))
|
||||
{
|
||||
std::cout << "Processed rows: " << processed_rows << "\n";
|
||||
output_stream << "Processed rows: " << processed_rows << "\n";
|
||||
}
|
||||
|
||||
if (have_error && report_error)
|
||||
@ -2110,9 +2129,7 @@ MultiQueryProcessingStage ClientBase::analyzeMultiQueryText(
|
||||
{
|
||||
parsed_query = parseQuery(this_query_end, all_queries_end,
|
||||
global_context->getSettingsRef(),
|
||||
/*allow_multi_statements=*/ true,
|
||||
is_interactive,
|
||||
ignore_error);
|
||||
/*allow_multi_statements=*/ true);
|
||||
}
|
||||
catch (const Exception & e)
|
||||
{
|
||||
@ -2428,12 +2445,12 @@ void ClientBase::initQueryIdFormats()
|
||||
return;
|
||||
|
||||
/// Initialize query_id_formats if any
|
||||
if (config().has("query_id_formats"))
|
||||
if (getClientConfiguration().has("query_id_formats"))
|
||||
{
|
||||
Poco::Util::AbstractConfiguration::Keys keys;
|
||||
config().keys("query_id_formats", keys);
|
||||
getClientConfiguration().keys("query_id_formats", keys);
|
||||
for (const auto & name : keys)
|
||||
query_id_formats.emplace_back(name + ":", config().getString("query_id_formats." + name));
|
||||
query_id_formats.emplace_back(name + ":", getClientConfiguration().getString("query_id_formats." + name));
|
||||
}
|
||||
|
||||
if (query_id_formats.empty())
|
||||
@ -2478,9 +2495,9 @@ bool ClientBase::addMergeTreeSettings(ASTCreateQuery & ast_create)
|
||||
|
||||
void ClientBase::runInteractive()
|
||||
{
|
||||
if (config().has("query_id"))
|
||||
if (getClientConfiguration().has("query_id"))
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "query_id could be specified only in non-interactive mode");
|
||||
if (print_time_to_stderr)
|
||||
if (getClientConfiguration().getBool("print-time-to-stderr", false))
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "time option could be specified only in non-interactive mode");
|
||||
|
||||
initQueryIdFormats();
|
||||
@ -2493,9 +2510,9 @@ void ClientBase::runInteractive()
|
||||
{
|
||||
/// Load suggestion data from the server.
|
||||
if (global_context->getApplicationType() == Context::ApplicationType::CLIENT)
|
||||
suggest->load<Connection>(global_context, connection_parameters, config().getInt("suggestion_limit"), wait_for_suggestions_to_load);
|
||||
suggest->load<Connection>(global_context, connection_parameters, getClientConfiguration().getInt("suggestion_limit"), wait_for_suggestions_to_load);
|
||||
else if (global_context->getApplicationType() == Context::ApplicationType::LOCAL)
|
||||
suggest->load<LocalConnection>(global_context, connection_parameters, config().getInt("suggestion_limit"), wait_for_suggestions_to_load);
|
||||
suggest->load<LocalConnection>(global_context, connection_parameters, getClientConfiguration().getInt("suggestion_limit"), wait_for_suggestions_to_load);
|
||||
}
|
||||
|
||||
if (home_path.empty())
|
||||
@ -2506,8 +2523,8 @@ void ClientBase::runInteractive()
|
||||
}
|
||||
|
||||
/// Load command history if present.
|
||||
if (config().has("history_file"))
|
||||
history_file = config().getString("history_file");
|
||||
if (getClientConfiguration().has("history_file"))
|
||||
history_file = getClientConfiguration().getString("history_file");
|
||||
else
|
||||
{
|
||||
auto * history_file_from_env = getenv("CLICKHOUSE_HISTORY_FILE"); // NOLINT(concurrency-mt-unsafe)
|
||||
@ -2528,7 +2545,7 @@ void ClientBase::runInteractive()
|
||||
{
|
||||
if (e.getErrno() != EEXIST)
|
||||
{
|
||||
std::cerr << getCurrentExceptionMessage(false) << '\n';
|
||||
error_stream << getCurrentExceptionMessage(false) << '\n';
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -2539,13 +2556,13 @@ void ClientBase::runInteractive()
|
||||
|
||||
#if USE_REPLXX
|
||||
replxx::Replxx::highlighter_callback_t highlight_callback{};
|
||||
if (config().getBool("highlight", true))
|
||||
if (getClientConfiguration().getBool("highlight", true))
|
||||
highlight_callback = highlight;
|
||||
|
||||
ReplxxLineReader lr(
|
||||
*suggest,
|
||||
history_file,
|
||||
config().has("multiline"),
|
||||
getClientConfiguration().has("multiline"),
|
||||
query_extenders,
|
||||
query_delimiters,
|
||||
word_break_characters,
|
||||
@ -2553,7 +2570,7 @@ void ClientBase::runInteractive()
|
||||
#else
|
||||
LineReader lr(
|
||||
history_file,
|
||||
config().has("multiline"),
|
||||
getClientConfiguration().has("multiline"),
|
||||
query_extenders,
|
||||
query_delimiters,
|
||||
word_break_characters);
|
||||
@ -2633,7 +2650,7 @@ void ClientBase::runInteractive()
|
||||
{
|
||||
// If a separate connection loading suggestions failed to open a new session,
|
||||
// use the main session to receive them.
|
||||
suggest->load(*connection, connection_parameters.timeouts, config().getInt("suggestion_limit"), global_context->getClientInfo());
|
||||
suggest->load(*connection, connection_parameters.timeouts, getClientConfiguration().getInt("suggestion_limit"), global_context->getClientInfo());
|
||||
}
|
||||
|
||||
try
|
||||
@ -2648,7 +2665,7 @@ void ClientBase::runInteractive()
|
||||
break;
|
||||
|
||||
/// We don't need to handle the test hints in the interactive mode.
|
||||
std::cerr << "Exception on client:" << std::endl << getExceptionMessage(e, print_stack_trace, true) << std::endl << std::endl;
|
||||
error_stream << "Exception on client:" << std::endl << getExceptionMessage(e, print_stack_trace, true) << std::endl << std::endl;
|
||||
client_exception.reset(e.clone());
|
||||
}
|
||||
|
||||
@ -2665,11 +2682,11 @@ void ClientBase::runInteractive()
|
||||
while (true);
|
||||
|
||||
if (isNewYearMode())
|
||||
std::cout << "Happy new year." << std::endl;
|
||||
output_stream << "Happy new year." << std::endl;
|
||||
else if (isChineseNewYearMode(local_tz))
|
||||
std::cout << "Happy Chinese new year. 春节快乐!" << std::endl;
|
||||
output_stream << "Happy Chinese new year. 春节快乐!" << std::endl;
|
||||
else
|
||||
std::cout << "Bye." << std::endl;
|
||||
output_stream << "Bye." << std::endl;
|
||||
}
|
||||
|
||||
|
||||
@ -2680,7 +2697,7 @@ bool ClientBase::processMultiQueryFromFile(const String & file_name)
|
||||
ReadBufferFromFile in(file_name);
|
||||
readStringUntilEOF(queries_from_file, in);
|
||||
|
||||
if (!has_log_comment)
|
||||
if (!getClientConfiguration().has("log_comment"))
|
||||
{
|
||||
Settings settings = global_context->getSettings();
|
||||
/// NOTE: cannot use even weakly_canonical() since it fails for /dev/stdin due to resolving of "pipe:[X]"
|
||||
@ -2789,13 +2806,13 @@ void ClientBase::clearTerminal()
|
||||
/// It is needed if garbage is left in terminal.
|
||||
/// Show cursor. It can be left hidden by invocation of previous programs.
|
||||
/// A test for this feature: perl -e 'print "x"x100000'; echo -ne '\033[0;0H\033[?25l'; clickhouse-client
|
||||
std::cout << "\033[0J" "\033[?25h";
|
||||
output_stream << "\033[0J" "\033[?25h";
|
||||
}
|
||||
|
||||
|
||||
void ClientBase::showClientVersion()
|
||||
{
|
||||
std::cout << VERSION_NAME << " " + getName() + " version " << VERSION_STRING << VERSION_OFFICIAL << "." << std::endl;
|
||||
output_stream << VERSION_NAME << " " + getName() + " version " << VERSION_STRING << VERSION_OFFICIAL << "." << std::endl;
|
||||
}
|
||||
|
||||
namespace
|
||||
@ -3080,18 +3097,18 @@ void ClientBase::init(int argc, char ** argv)
|
||||
|
||||
if (options.count("version-clean"))
|
||||
{
|
||||
std::cout << VERSION_STRING;
|
||||
output_stream << VERSION_STRING;
|
||||
exit(0); // NOLINT(concurrency-mt-unsafe)
|
||||
}
|
||||
|
||||
if (options.count("verbose"))
|
||||
config().setBool("verbose", true);
|
||||
getClientConfiguration().setBool("verbose", true);
|
||||
|
||||
/// Output of help message.
|
||||
if (options.count("help")
|
||||
|| (options.count("host") && options["host"].as<std::string>() == "elp")) /// If user writes -help instead of --help.
|
||||
{
|
||||
if (config().getBool("verbose", false))
|
||||
if (getClientConfiguration().getBool("verbose", false))
|
||||
printHelpMessage(options_description, true);
|
||||
else
|
||||
printHelpMessage(options_description_non_verbose, false);
|
||||
@ -3099,72 +3116,75 @@ void ClientBase::init(int argc, char ** argv)
|
||||
}
|
||||
|
||||
/// Common options for clickhouse-client and clickhouse-local.
|
||||
|
||||
/// Output execution time to stderr in batch mode.
|
||||
if (options.count("time"))
|
||||
print_time_to_stderr = true;
|
||||
getClientConfiguration().setBool("print-time-to-stderr", true);
|
||||
if (options.count("query"))
|
||||
queries = options["query"].as<std::vector<std::string>>();
|
||||
if (options.count("query_id"))
|
||||
config().setString("query_id", options["query_id"].as<std::string>());
|
||||
getClientConfiguration().setString("query_id", options["query_id"].as<std::string>());
|
||||
if (options.count("database"))
|
||||
config().setString("database", options["database"].as<std::string>());
|
||||
getClientConfiguration().setString("database", options["database"].as<std::string>());
|
||||
if (options.count("config-file"))
|
||||
config().setString("config-file", options["config-file"].as<std::string>());
|
||||
getClientConfiguration().setString("config-file", options["config-file"].as<std::string>());
|
||||
if (options.count("queries-file"))
|
||||
queries_files = options["queries-file"].as<std::vector<std::string>>();
|
||||
if (options.count("multiline"))
|
||||
config().setBool("multiline", true);
|
||||
getClientConfiguration().setBool("multiline", true);
|
||||
if (options.count("multiquery"))
|
||||
config().setBool("multiquery", true);
|
||||
getClientConfiguration().setBool("multiquery", true);
|
||||
if (options.count("ignore-error"))
|
||||
config().setBool("ignore-error", true);
|
||||
getClientConfiguration().setBool("ignore-error", true);
|
||||
if (options.count("format"))
|
||||
config().setString("format", options["format"].as<std::string>());
|
||||
getClientConfiguration().setString("format", options["format"].as<std::string>());
|
||||
if (options.count("output-format"))
|
||||
config().setString("output-format", options["output-format"].as<std::string>());
|
||||
getClientConfiguration().setString("output-format", options["output-format"].as<std::string>());
|
||||
if (options.count("vertical"))
|
||||
config().setBool("vertical", true);
|
||||
getClientConfiguration().setBool("vertical", true);
|
||||
if (options.count("stacktrace"))
|
||||
config().setBool("stacktrace", true);
|
||||
getClientConfiguration().setBool("stacktrace", true);
|
||||
if (options.count("print-profile-events"))
|
||||
config().setBool("print-profile-events", true);
|
||||
getClientConfiguration().setBool("print-profile-events", true);
|
||||
if (options.count("profile-events-delay-ms"))
|
||||
config().setUInt64("profile-events-delay-ms", options["profile-events-delay-ms"].as<UInt64>());
|
||||
getClientConfiguration().setUInt64("profile-events-delay-ms", options["profile-events-delay-ms"].as<UInt64>());
|
||||
/// Whether to print the number of processed rows at
|
||||
if (options.count("processed-rows"))
|
||||
print_num_processed_rows = true;
|
||||
getClientConfiguration().setBool("print-num-processed-rows", true);
|
||||
if (options.count("progress"))
|
||||
{
|
||||
switch (options["progress"].as<ProgressOption>())
|
||||
{
|
||||
case DEFAULT:
|
||||
config().setString("progress", "default");
|
||||
getClientConfiguration().setString("progress", "default");
|
||||
break;
|
||||
case OFF:
|
||||
config().setString("progress", "off");
|
||||
getClientConfiguration().setString("progress", "off");
|
||||
break;
|
||||
case TTY:
|
||||
config().setString("progress", "tty");
|
||||
getClientConfiguration().setString("progress", "tty");
|
||||
break;
|
||||
case ERR:
|
||||
config().setString("progress", "err");
|
||||
getClientConfiguration().setString("progress", "err");
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (options.count("echo"))
|
||||
config().setBool("echo", true);
|
||||
getClientConfiguration().setBool("echo", true);
|
||||
if (options.count("disable_suggestion"))
|
||||
config().setBool("disable_suggestion", true);
|
||||
getClientConfiguration().setBool("disable_suggestion", true);
|
||||
if (options.count("wait_for_suggestions_to_load"))
|
||||
config().setBool("wait_for_suggestions_to_load", true);
|
||||
getClientConfiguration().setBool("wait_for_suggestions_to_load", true);
|
||||
if (options.count("suggestion_limit"))
|
||||
config().setInt("suggestion_limit", options["suggestion_limit"].as<int>());
|
||||
getClientConfiguration().setInt("suggestion_limit", options["suggestion_limit"].as<int>());
|
||||
if (options.count("highlight"))
|
||||
config().setBool("highlight", options["highlight"].as<bool>());
|
||||
getClientConfiguration().setBool("highlight", options["highlight"].as<bool>());
|
||||
if (options.count("history_file"))
|
||||
config().setString("history_file", options["history_file"].as<std::string>());
|
||||
getClientConfiguration().setString("history_file", options["history_file"].as<std::string>());
|
||||
if (options.count("interactive"))
|
||||
config().setBool("interactive", true);
|
||||
getClientConfiguration().setBool("interactive", true);
|
||||
if (options.count("pager"))
|
||||
config().setString("pager", options["pager"].as<std::string>());
|
||||
getClientConfiguration().setString("pager", options["pager"].as<std::string>());
|
||||
|
||||
if (options.count("log-level"))
|
||||
Poco::Logger::root().setLevel(options["log-level"].as<std::string>());
|
||||
@ -3182,13 +3202,13 @@ void ClientBase::init(int argc, char ** argv)
|
||||
alias_names.reserve(options_description.main_description->options().size());
|
||||
for (const auto& option : options_description.main_description->options())
|
||||
alias_names.insert(option->long_name());
|
||||
argsToConfig(common_arguments, config(), 100, &alias_names);
|
||||
argsToConfig(common_arguments, getClientConfiguration(), 100, &alias_names);
|
||||
}
|
||||
|
||||
clearPasswordFromCommandLine(argc, argv);
|
||||
|
||||
/// Limit on total memory usage
|
||||
std::string max_client_memory_usage = config().getString("max_memory_usage_in_client", "0" /*default value*/);
|
||||
std::string max_client_memory_usage = getClientConfiguration().getString("max_memory_usage_in_client", "0" /*default value*/);
|
||||
if (max_client_memory_usage != "0")
|
||||
{
|
||||
UInt64 max_client_memory_usage_int = parseWithSizeSuffix<UInt64>(max_client_memory_usage.c_str(), max_client_memory_usage.length());
|
||||
@ -3197,8 +3217,6 @@ void ClientBase::init(int argc, char ** argv)
|
||||
total_memory_tracker.setDescription("(total)");
|
||||
total_memory_tracker.setMetric(CurrentMetrics::MemoryTracking);
|
||||
}
|
||||
|
||||
has_log_comment = config().has("log_comment");
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -18,7 +18,6 @@
|
||||
#include <Storages/SelectQueryInfo.h>
|
||||
#include <Storages/MergeTree/MergeTreeSettings.h>
|
||||
|
||||
|
||||
namespace po = boost::program_options;
|
||||
|
||||
|
||||
@ -67,13 +66,22 @@ class ClientBase : public Poco::Util::Application, public IHints<2>
|
||||
public:
|
||||
using Arguments = std::vector<String>;
|
||||
|
||||
ClientBase();
|
||||
explicit ClientBase
|
||||
(
|
||||
int in_fd_ = STDIN_FILENO,
|
||||
int out_fd_ = STDOUT_FILENO,
|
||||
int err_fd_ = STDERR_FILENO,
|
||||
std::istream & input_stream_ = std::cin,
|
||||
std::ostream & output_stream_ = std::cout,
|
||||
std::ostream & error_stream_ = std::cerr
|
||||
);
|
||||
|
||||
~ClientBase() override;
|
||||
|
||||
void init(int argc, char ** argv);
|
||||
|
||||
std::vector<String> getAllRegisteredNames() const override { return cmd_options; }
|
||||
static ASTPtr parseQuery(const char *& pos, const char * end, const Settings & settings, bool allow_multi_statements, bool is_interactive, bool ignore_error);
|
||||
ASTPtr parseQuery(const char *& pos, const char * end, const Settings & settings, bool allow_multi_statements);
|
||||
|
||||
protected:
|
||||
void runInteractive();
|
||||
@ -82,6 +90,9 @@ protected:
|
||||
char * argv0 = nullptr;
|
||||
void runLibFuzzer();
|
||||
|
||||
/// This is the analogue of Poco::Application::config()
|
||||
virtual Poco::Util::LayeredConfiguration & getClientConfiguration() = 0;
|
||||
|
||||
virtual bool processWithFuzzing(const String &)
|
||||
{
|
||||
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Query processing with fuzzing is not implemented");
|
||||
@ -107,7 +118,7 @@ protected:
|
||||
String & query_to_execute, ASTPtr & parsed_query, const String & all_queries_text,
|
||||
std::unique_ptr<Exception> & current_exception);
|
||||
|
||||
static void clearTerminal();
|
||||
void clearTerminal();
|
||||
void showClientVersion();
|
||||
|
||||
using ProgramOptionsDescription = boost::program_options::options_description;
|
||||
@ -206,7 +217,6 @@ protected:
|
||||
|
||||
bool echo_queries = false; /// Print queries before execution in batch mode.
|
||||
bool ignore_error = false; /// In case of errors, don't print error message, continue to next query. Only applicable for non-interactive mode.
|
||||
bool print_time_to_stderr = false; /// Output execution time to stderr in batch mode.
|
||||
|
||||
std::optional<Suggest> suggest;
|
||||
bool load_suggestions = false;
|
||||
@ -251,9 +261,9 @@ protected:
|
||||
ConnectionParameters connection_parameters;
|
||||
|
||||
/// Buffer that reads from stdin in batch mode.
|
||||
ReadBufferFromFileDescriptor std_in{STDIN_FILENO};
|
||||
ReadBufferFromFileDescriptor std_in;
|
||||
/// Console output.
|
||||
WriteBufferFromFileDescriptor std_out{STDOUT_FILENO};
|
||||
WriteBufferFromFileDescriptor std_out;
|
||||
std::unique_ptr<ShellCommand> pager_cmd;
|
||||
|
||||
/// The user can specify to redirect query output to a file.
|
||||
@ -284,7 +294,6 @@ protected:
|
||||
bool need_render_profile_events = true;
|
||||
bool written_first_block = false;
|
||||
size_t processed_rows = 0; /// How many rows have been read or written.
|
||||
bool print_num_processed_rows = false; /// Whether to print the number of processed rows at
|
||||
|
||||
bool print_stack_trace = false;
|
||||
/// The last exception that was received from the server. Is used for the
|
||||
@ -332,8 +341,14 @@ protected:
|
||||
bool cancelled = false;
|
||||
bool cancelled_printed = false;
|
||||
|
||||
/// Does log_comment has specified by user?
|
||||
bool has_log_comment = false;
|
||||
/// Unpacked descriptors and streams for the ease of use.
|
||||
int in_fd = STDIN_FILENO;
|
||||
int out_fd = STDOUT_FILENO;
|
||||
int err_fd = STDERR_FILENO;
|
||||
std::istream & input_stream;
|
||||
std::ostream & output_stream;
|
||||
std::ostream & error_stream;
|
||||
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -23,14 +23,6 @@ void trim(String & s)
|
||||
s.erase(std::find_if(s.rbegin(), s.rend(), [](int ch) { return !std::isspace(ch); }).base(), s.end());
|
||||
}
|
||||
|
||||
/// Check if multi-line query is inserted from the paste buffer.
|
||||
/// Allows delaying the start of query execution until the entirety of query is inserted.
|
||||
bool hasInputData()
|
||||
{
|
||||
pollfd fd{STDIN_FILENO, POLLIN, 0};
|
||||
return poll(&fd, 1, 0) == 1;
|
||||
}
|
||||
|
||||
struct NoCaseCompare
|
||||
{
|
||||
bool operator()(const std::string & str1, const std::string & str2)
|
||||
@ -63,6 +55,14 @@ void addNewWords(Words & to, const Words & from, Compare comp)
|
||||
namespace DB
|
||||
{
|
||||
|
||||
/// Check if multi-line query is inserted from the paste buffer.
|
||||
/// Allows delaying the start of query execution until the entirety of query is inserted.
|
||||
bool LineReader::hasInputData() const
|
||||
{
|
||||
pollfd fd{in_fd, POLLIN, 0};
|
||||
return poll(&fd, 1, 0) == 1;
|
||||
}
|
||||
|
||||
replxx::Replxx::completions_t LineReader::Suggest::getCompletions(const String & prefix, size_t prefix_length, const char * word_break_characters)
|
||||
{
|
||||
std::string_view last_word;
|
||||
@ -131,11 +131,22 @@ void LineReader::Suggest::addWords(Words && new_words) // NOLINT(cppcoreguidelin
|
||||
}
|
||||
}
|
||||
|
||||
LineReader::LineReader(const String & history_file_path_, bool multiline_, Patterns extenders_, Patterns delimiters_)
|
||||
LineReader::LineReader(
|
||||
const String & history_file_path_,
|
||||
bool multiline_,
|
||||
Patterns extenders_,
|
||||
Patterns delimiters_,
|
||||
std::istream & input_stream_,
|
||||
std::ostream & output_stream_,
|
||||
int in_fd_
|
||||
)
|
||||
: history_file_path(history_file_path_)
|
||||
, multiline(multiline_)
|
||||
, extenders(std::move(extenders_))
|
||||
, delimiters(std::move(delimiters_))
|
||||
, input_stream(input_stream_)
|
||||
, output_stream(output_stream_)
|
||||
, in_fd(in_fd_)
|
||||
{
|
||||
/// FIXME: check extender != delimiter
|
||||
}
|
||||
@ -212,9 +223,9 @@ LineReader::InputStatus LineReader::readOneLine(const String & prompt)
|
||||
input.clear();
|
||||
|
||||
{
|
||||
std::cout << prompt;
|
||||
std::getline(std::cin, input);
|
||||
if (!std::cin.good())
|
||||
output_stream << prompt;
|
||||
std::getline(input_stream, input);
|
||||
if (!input_stream.good())
|
||||
return ABORT;
|
||||
}
|
||||
|
||||
|
@ -1,5 +1,7 @@
|
||||
#pragma once
|
||||
|
||||
#include <iostream>
|
||||
#include <unistd.h>
|
||||
#include <mutex>
|
||||
#include <atomic>
|
||||
#include <vector>
|
||||
@ -37,7 +39,16 @@ public:
|
||||
|
||||
using Patterns = std::vector<const char *>;
|
||||
|
||||
LineReader(const String & history_file_path, bool multiline, Patterns extenders, Patterns delimiters);
|
||||
LineReader(
|
||||
const String & history_file_path,
|
||||
bool multiline,
|
||||
Patterns extenders,
|
||||
Patterns delimiters,
|
||||
std::istream & input_stream_ = std::cin,
|
||||
std::ostream & output_stream_ = std::cout,
|
||||
int in_fd_ = STDIN_FILENO
|
||||
);
|
||||
|
||||
virtual ~LineReader() = default;
|
||||
|
||||
/// Reads the whole line until delimiter (in multiline mode) or until the last line without extender.
|
||||
@ -56,6 +67,8 @@ public:
|
||||
virtual void enableBracketedPaste() {}
|
||||
virtual void disableBracketedPaste() {}
|
||||
|
||||
bool hasInputData() const;
|
||||
|
||||
protected:
|
||||
enum InputStatus
|
||||
{
|
||||
@ -77,6 +90,10 @@ protected:
|
||||
|
||||
virtual InputStatus readOneLine(const String & prompt);
|
||||
virtual void addToHistory(const String &) {}
|
||||
|
||||
std::istream & input_stream;
|
||||
std::ostream & output_stream;
|
||||
int in_fd;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -16,7 +16,10 @@
|
||||
#include <Storages/IStorage.h>
|
||||
#include <Common/ConcurrentBoundedQueue.h>
|
||||
#include <Common/CurrentThread.h>
|
||||
|
||||
#include <Parsers/ParserQuery.h>
|
||||
#include <Parsers/PRQL/ParserPRQLQuery.h>
|
||||
#include <Parsers/Kusto/ParserKQLStatement.h>
|
||||
#include <Parsers/Kusto/parseKQLQuery.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -151,12 +154,26 @@ void LocalConnection::sendQuery(
|
||||
state->block = sample;
|
||||
|
||||
String current_format = "Values";
|
||||
|
||||
const auto & settings = context->getSettingsRef();
|
||||
const char * begin = state->query.data();
|
||||
auto parsed_query = ClientBase::parseQuery(begin, begin + state->query.size(),
|
||||
context->getSettingsRef(),
|
||||
/*allow_multi_statements=*/ false,
|
||||
/*is_interactive=*/ false,
|
||||
/*ignore_error=*/ false);
|
||||
const char * end = begin + state->query.size();
|
||||
const Dialect & dialect = settings.dialect;
|
||||
|
||||
std::unique_ptr<IParserBase> parser;
|
||||
if (dialect == Dialect::kusto)
|
||||
parser = std::make_unique<ParserKQLStatement>(end, settings.allow_settings_after_format_in_insert);
|
||||
else if (dialect == Dialect::prql)
|
||||
parser = std::make_unique<ParserPRQLQuery>(settings.max_query_size, settings.max_parser_depth, settings.max_parser_backtracks);
|
||||
else
|
||||
parser = std::make_unique<ParserQuery>(end, settings.allow_settings_after_format_in_insert);
|
||||
|
||||
ASTPtr parsed_query;
|
||||
if (dialect == Dialect::kusto)
|
||||
parsed_query = parseKQLQueryAndMovePosition(*parser, begin, end, "", /*allow_multi_statements*/false, settings.max_query_size, settings.max_parser_depth, settings.max_parser_backtracks);
|
||||
else
|
||||
parsed_query = parseQueryAndMovePosition(*parser, begin, end, "", /*allow_multi_statements*/false, settings.max_query_size, settings.max_parser_depth, settings.max_parser_backtracks);
|
||||
|
||||
if (const auto * insert = parsed_query->as<ASTInsertQuery>())
|
||||
{
|
||||
if (!insert->format.empty())
|
||||
|
@ -297,8 +297,15 @@ ReplxxLineReader::ReplxxLineReader(
|
||||
Patterns extenders_,
|
||||
Patterns delimiters_,
|
||||
const char word_break_characters_[],
|
||||
replxx::Replxx::highlighter_callback_t highlighter_)
|
||||
: LineReader(history_file_path_, multiline_, std::move(extenders_), std::move(delimiters_)), highlighter(std::move(highlighter_))
|
||||
replxx::Replxx::highlighter_callback_t highlighter_,
|
||||
[[ maybe_unused ]] std::istream & input_stream_,
|
||||
[[ maybe_unused ]] std::ostream & output_stream_,
|
||||
[[ maybe_unused ]] int in_fd_,
|
||||
[[ maybe_unused ]] int out_fd_,
|
||||
[[ maybe_unused ]] int err_fd_
|
||||
)
|
||||
: LineReader(history_file_path_, multiline_, std::move(extenders_), std::move(delimiters_), input_stream_, output_stream_, in_fd_)
|
||||
, highlighter(std::move(highlighter_))
|
||||
, word_break_characters(word_break_characters_)
|
||||
, editor(getEditor())
|
||||
{
|
||||
@ -471,7 +478,7 @@ ReplxxLineReader::ReplxxLineReader(
|
||||
|
||||
ReplxxLineReader::~ReplxxLineReader()
|
||||
{
|
||||
if (close(history_file_fd))
|
||||
if (history_file_fd >= 0 && close(history_file_fd))
|
||||
rx.print("Close of history file failed: %s\n", errnoToString().c_str());
|
||||
}
|
||||
|
||||
@ -496,7 +503,7 @@ void ReplxxLineReader::addToHistory(const String & line)
|
||||
// but replxx::Replxx::history_load() does not
|
||||
// and that is why flock() is added here.
|
||||
bool locked = false;
|
||||
if (flock(history_file_fd, LOCK_EX))
|
||||
if (history_file_fd >= 0 && flock(history_file_fd, LOCK_EX))
|
||||
rx.print("Lock of history file failed: %s\n", errnoToString().c_str());
|
||||
else
|
||||
locked = true;
|
||||
@ -507,7 +514,7 @@ void ReplxxLineReader::addToHistory(const String & line)
|
||||
if (!rx.history_save(history_file_path))
|
||||
rx.print("Saving history failed: %s\n", errnoToString().c_str());
|
||||
|
||||
if (locked && 0 != flock(history_file_fd, LOCK_UN))
|
||||
if (history_file_fd >= 0 && locked && 0 != flock(history_file_fd, LOCK_UN))
|
||||
rx.print("Unlock of history file failed: %s\n", errnoToString().c_str());
|
||||
}
|
||||
|
||||
|
@ -1,6 +1,7 @@
|
||||
#pragma once
|
||||
|
||||
#include "LineReader.h"
|
||||
#include <Client/LineReader.h>
|
||||
#include <base/strong_typedef.h>
|
||||
#include <replxx.hxx>
|
||||
|
||||
namespace DB
|
||||
@ -9,14 +10,22 @@ namespace DB
|
||||
class ReplxxLineReader : public LineReader
|
||||
{
|
||||
public:
|
||||
ReplxxLineReader(
|
||||
ReplxxLineReader
|
||||
(
|
||||
Suggest & suggest,
|
||||
const String & history_file_path,
|
||||
bool multiline,
|
||||
Patterns extenders_,
|
||||
Patterns delimiters_,
|
||||
const char word_break_characters_[],
|
||||
replxx::Replxx::highlighter_callback_t highlighter_);
|
||||
replxx::Replxx::highlighter_callback_t highlighter_,
|
||||
std::istream & input_stream_ = std::cin,
|
||||
std::ostream & output_stream_ = std::cout,
|
||||
int in_fd_ = STDIN_FILENO,
|
||||
int out_fd_ = STDOUT_FILENO,
|
||||
int err_fd_ = STDERR_FILENO
|
||||
);
|
||||
|
||||
~ReplxxLineReader() override;
|
||||
|
||||
void enableBracketedPaste() override;
|
||||
|
@ -92,19 +92,19 @@ void ProgressIndication::writeFinalProgress()
|
||||
if (progress.read_rows < 1000)
|
||||
return;
|
||||
|
||||
std::cout << "Processed " << formatReadableQuantity(progress.read_rows) << " rows, "
|
||||
output_stream << "Processed " << formatReadableQuantity(progress.read_rows) << " rows, "
|
||||
<< formatReadableSizeWithDecimalSuffix(progress.read_bytes);
|
||||
|
||||
UInt64 elapsed_ns = getElapsedNanoseconds();
|
||||
if (elapsed_ns)
|
||||
std::cout << " (" << formatReadableQuantity(progress.read_rows * 1000000000.0 / elapsed_ns) << " rows/s., "
|
||||
output_stream << " (" << formatReadableQuantity(progress.read_rows * 1000000000.0 / elapsed_ns) << " rows/s., "
|
||||
<< formatReadableSizeWithDecimalSuffix(progress.read_bytes * 1000000000.0 / elapsed_ns) << "/s.)";
|
||||
else
|
||||
std::cout << ". ";
|
||||
output_stream << ". ";
|
||||
|
||||
auto peak_memory_usage = getMemoryUsage().peak;
|
||||
if (peak_memory_usage >= 0)
|
||||
std::cout << "\nPeak memory usage: " << formatReadableSizeWithBinarySuffix(peak_memory_usage) << ".";
|
||||
output_stream << "\nPeak memory usage: " << formatReadableSizeWithBinarySuffix(peak_memory_usage) << ".";
|
||||
}
|
||||
|
||||
void ProgressIndication::writeProgress(WriteBufferFromFileDescriptor & message)
|
||||
@ -125,7 +125,7 @@ void ProgressIndication::writeProgress(WriteBufferFromFileDescriptor & message)
|
||||
|
||||
const char * indicator = indicators[increment % 8];
|
||||
|
||||
size_t terminal_width = getTerminalWidth();
|
||||
size_t terminal_width = getTerminalWidth(in_fd, err_fd);
|
||||
|
||||
if (!written_progress_chars)
|
||||
{
|
||||
|
@ -32,6 +32,19 @@ using HostToTimesMap = std::unordered_map<String, ThreadEventData>;
|
||||
class ProgressIndication
|
||||
{
|
||||
public:
|
||||
|
||||
explicit ProgressIndication
|
||||
(
|
||||
std::ostream & output_stream_ = std::cout,
|
||||
int in_fd_ = STDIN_FILENO,
|
||||
int err_fd_ = STDERR_FILENO
|
||||
)
|
||||
: output_stream(output_stream_),
|
||||
in_fd(in_fd_),
|
||||
err_fd(err_fd_)
|
||||
{
|
||||
}
|
||||
|
||||
/// Write progress bar.
|
||||
void writeProgress(WriteBufferFromFileDescriptor & message);
|
||||
void clearProgressOutput(WriteBufferFromFileDescriptor & message);
|
||||
@ -103,6 +116,10 @@ private:
|
||||
/// - hosts_data/cpu_usage_meter (guarded with profile_events_mutex)
|
||||
mutable std::mutex profile_events_mutex;
|
||||
mutable std::mutex progress_mutex;
|
||||
|
||||
std::ostream & output_stream;
|
||||
int in_fd;
|
||||
int err_fd;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -13,17 +13,17 @@ namespace DB::ErrorCodes
|
||||
extern const int SYSTEM_ERROR;
|
||||
}
|
||||
|
||||
uint16_t getTerminalWidth()
|
||||
uint16_t getTerminalWidth(int in_fd, int err_fd)
|
||||
{
|
||||
struct winsize terminal_size {};
|
||||
if (isatty(STDIN_FILENO))
|
||||
if (isatty(in_fd))
|
||||
{
|
||||
if (ioctl(STDIN_FILENO, TIOCGWINSZ, &terminal_size))
|
||||
if (ioctl(in_fd, TIOCGWINSZ, &terminal_size))
|
||||
throw DB::ErrnoException(DB::ErrorCodes::SYSTEM_ERROR, "Cannot obtain terminal window size (ioctl TIOCGWINSZ)");
|
||||
}
|
||||
else if (isatty(STDERR_FILENO))
|
||||
else if (isatty(err_fd))
|
||||
{
|
||||
if (ioctl(STDERR_FILENO, TIOCGWINSZ, &terminal_size))
|
||||
if (ioctl(err_fd, TIOCGWINSZ, &terminal_size))
|
||||
throw DB::ErrnoException(DB::ErrorCodes::SYSTEM_ERROR, "Cannot obtain terminal window size (ioctl TIOCGWINSZ)");
|
||||
}
|
||||
/// Default - 0.
|
||||
|
@ -1,16 +1,16 @@
|
||||
#pragma once
|
||||
|
||||
#include <string>
|
||||
#include <unistd.h>
|
||||
#include <boost/program_options.hpp>
|
||||
|
||||
|
||||
namespace po = boost::program_options;
|
||||
|
||||
|
||||
uint16_t getTerminalWidth();
|
||||
uint16_t getTerminalWidth(int in_fd = STDIN_FILENO, int err_fd = STDERR_FILENO);
|
||||
|
||||
/** Creates po::options_description with name and an appropriate size for option displaying
|
||||
* when program is called with option --help
|
||||
* */
|
||||
po::options_description createOptionsDescription(const std::string &caption, unsigned short terminal_width); /// NOLINT
|
||||
|
||||
|
@ -322,10 +322,14 @@ std_cerr_cout_excludes=(
|
||||
src/Client/LineReader.cpp
|
||||
src/Client/QueryFuzzer.cpp
|
||||
src/Client/Suggest.cpp
|
||||
src/Client/ClientBase.h
|
||||
src/Client/LineReader.h
|
||||
src/Client/ReplxxLineReader.h
|
||||
src/Bridge/IBridge.cpp
|
||||
src/Daemon/BaseDaemon.cpp
|
||||
src/Loggers/Loggers.cpp
|
||||
src/Common/GWPAsan.cpp
|
||||
src/Common/ProgressIndication.h
|
||||
)
|
||||
sources_with_std_cerr_cout=( $(
|
||||
find $ROOT_PATH/{src,base} -name '*.h' -or -name '*.cpp' | \
|
||||
|
Loading…
Reference in New Issue
Block a user