Merge pull request #26231 from kssenii/interactive-mode-for-clickhouse-local

clickhouse-local interactive mode, merge clickhouse-client and clickhouse-local code
This commit is contained in:
Kseniia Sumarokova 2021-09-30 19:31:34 +03:00 committed by GitHub
commit 3457868f3d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
33 changed files with 4373 additions and 3349 deletions

View File

@ -1,9 +1,5 @@
set (CLICKHOUSE_CLIENT_SOURCES
Client.cpp
ConnectionParameters.cpp
QueryFuzzer.cpp
Suggest.cpp
TestHint.cpp
TestTags.cpp
)

File diff suppressed because it is too large Load Diff

36
programs/client/Client.h Normal file
View File

@ -0,0 +1,36 @@
#pragma once
#include <Client/ClientBase.h>
namespace DB
{
class Client : public ClientBase
{
public:
Client() = default;
void initialize(Poco::Util::Application & self) override;
int main(const std::vector<String> & /*args*/) override;
protected:
bool executeMultiQuery(const String & all_queries_text) override;
bool processWithFuzzing(const String & full_query) override;
void connect() override;
void processError(const String & query) const override;
String getName() const override { return "client"; }
void printHelpMessage(const OptionsDescription & options_description) override;
void addAndCheckOptions(OptionsDescription & options_description, po::variables_map & options, Arguments & arguments) override;
void processOptions(const OptionsDescription & options_description, const CommandLineOptions & options,
const std::vector<Arguments> & external_tables_arguments) override;
void processConfig() override;
private:
void printChangedSettings() const;
std::vector<String> loadWarningMessages();
};
}

View File

@ -13,6 +13,8 @@
#include <Interpreters/executeQuery.h>
#include <Interpreters/loadMetadata.h>
#include <Interpreters/DatabaseCatalog.h>
#include <common/getFQDNOrHostName.h>
#include <common/scope_guard_safe.h>
#include <Interpreters/UserDefinedSQLObjectsLoader.h>
#include <Interpreters/Session.h>
#include <Common/Exception.h>
@ -24,15 +26,15 @@
#include <Common/UnicodeBar.h>
#include <Common/config_version.h>
#include <Common/quoteString.h>
#include <loggers/Loggers.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadBufferFromString.h>
#include <IO/WriteBufferFromFileDescriptor.h>
#include <IO/UseSSL.h>
#include <IO/ReadHelpers.h>
#include <IO/UseSSL.h>
#include <Parsers/parseQuery.h>
#include <Parsers/IAST.h>
#include <common/ErrorHandlers.h>
#include <Common/StatusFile.h>
#include <Functions/registerFunctions.h>
#include <AggregateFunctions/registerAggregateFunctions.h>
#include <TableFunctions/registerTableFunctions.h>
@ -49,24 +51,128 @@
namespace fs = std::filesystem;
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int SYNTAX_ERROR;
extern const int CANNOT_LOAD_CONFIG;
extern const int FILE_ALREADY_EXISTS;
}
LocalServer::LocalServer() = default;
LocalServer::~LocalServer()
void LocalServer::processError(const String & query) const
{
if (global_context)
global_context->shutdown(); /// required for properly exception handling
if (ignore_error)
return;
if (is_interactive)
{
if (server_exception)
{
bool print_stack_trace = config().getBool("stacktrace", false);
fmt::print(stderr, "Error on processing query '{}':\n{}\n", query, getExceptionMessage(*server_exception, print_stack_trace, true));
fmt::print(stderr, "\n");
}
if (client_exception)
{
fmt::print(stderr, "Error on processing query '{}':\n{}\n", query, client_exception->message());
fmt::print(stderr, "\n");
}
}
else
{
if (server_exception)
server_exception->rethrow();
if (client_exception)
client_exception->rethrow();
}
}
bool LocalServer::executeMultiQuery(const String & all_queries_text)
{
bool echo_query = echo_queries;
/// Several queries separated by ';'.
/// INSERT data is ended by the end of line, not ';'.
/// An exception is VALUES format where we also support semicolon in
/// addition to end of line.
const char * this_query_begin = all_queries_text.data();
const char * this_query_end;
const char * all_queries_end = all_queries_text.data() + all_queries_text.size();
String full_query; // full_query is the query + inline INSERT data + trailing comments (the latter is our best guess for now).
String query_to_execute;
ASTPtr parsed_query;
std::optional<Exception> current_exception;
while (true)
{
auto stage = analyzeMultiQueryText(this_query_begin, this_query_end, all_queries_end,
query_to_execute, parsed_query, all_queries_text, current_exception);
switch (stage)
{
case MultiQueryProcessingStage::QUERIES_END:
case MultiQueryProcessingStage::PARSING_FAILED:
{
return true;
}
case MultiQueryProcessingStage::CONTINUE_PARSING:
{
continue;
}
case MultiQueryProcessingStage::PARSING_EXCEPTION:
{
this_query_end = find_first_symbols<'\n'>(this_query_end, all_queries_end);
this_query_begin = this_query_end; /// It's expected syntax error, skip the line
current_exception.reset();
continue;
}
case MultiQueryProcessingStage::EXECUTE_QUERY:
{
full_query = all_queries_text.substr(this_query_begin - all_queries_text.data(), this_query_end - this_query_begin);
try
{
processParsedSingleQuery(full_query, query_to_execute, parsed_query, echo_query, false);
}
catch (...)
{
// Surprisingly, this is a client error. A server error would
// have been reported w/o throwing (see onReceiveSeverException()).
client_exception = std::make_unique<Exception>(getCurrentExceptionMessage(true), getCurrentExceptionCode());
have_error = true;
}
// For INSERTs with inline data: use the end of inline data as
// reported by the format parser (it is saved in sendData()).
// This allows us to handle queries like:
// insert into t values (1); select 1
// , where the inline data is delimited by semicolon and not by a
// newline.
auto * insert_ast = parsed_query->as<ASTInsertQuery>();
if (insert_ast && insert_ast->data)
{
this_query_end = insert_ast->end;
adjustQueryEnd(this_query_end, all_queries_end, global_context->getSettingsRef().max_parser_depth);
}
// Report error.
if (have_error)
processError(full_query);
// Stop processing queries if needed.
if (have_error && !ignore_error)
return is_interactive;
this_query_begin = this_query_end;
break;
}
}
}
}
@ -103,11 +209,20 @@ void LocalServer::initialize(Poco::Util::Application & self)
}
}
void LocalServer::applyCmdSettings(ContextMutablePtr context)
static DatabasePtr createMemoryDatabaseIfNotExists(ContextPtr context, const String & database_name)
{
context->applySettingsChanges(cmd_settings.changes());
DatabasePtr system_database = DatabaseCatalog::instance().tryGetDatabase(database_name);
if (!system_database)
{
/// TODO: add attachTableDelayed into DatabaseMemory to speedup loading
system_database = std::make_shared<DatabaseMemory>(database_name, context);
DatabaseCatalog::instance().attachDatabase(database_name, system_database);
}
return system_database;
}
/// If path is specified and not empty, will try to setup server environment and load existing metadata
void LocalServer::tryInitPath()
{
@ -141,7 +256,7 @@ void LocalServer::tryInitPath()
parent_folder = std::filesystem::temp_directory_path();
}
catch (const std::filesystem::filesystem_error& e)
catch (const fs::filesystem_error& e)
{
// tmp folder don't exists? misconfiguration? chroot?
LOG_DEBUG(log, "Can not get temporary folder: {}", e.what());
@ -180,54 +295,143 @@ void LocalServer::tryInitPath()
}
static DatabasePtr createMemoryDatabaseIfNotExists(ContextPtr context, const String & database_name)
void LocalServer::cleanup()
{
DatabasePtr system_database = DatabaseCatalog::instance().tryGetDatabase(database_name);
if (!system_database)
connection.reset();
if (global_context)
{
/// TODO: add attachTableDelayed into DatabaseMemory to speedup loading
system_database = std::make_shared<DatabaseMemory>(database_name, context);
DatabaseCatalog::instance().attachDatabase(database_name, system_database);
global_context->shutdown();
global_context.reset();
}
status.reset();
// Delete the temporary directory if needed.
if (temporary_directory_to_delete)
{
const auto dir = *temporary_directory_to_delete;
temporary_directory_to_delete.reset();
LOG_DEBUG(&logger(), "Removing temporary directory: {}", dir.string());
remove_all(dir);
}
return system_database;
}
std::string LocalServer::getInitialCreateTableQuery()
{
if (!config().has("table-structure"))
return {};
auto table_name = backQuoteIfNeed(config().getString("table-name", "table"));
auto table_structure = config().getString("table-structure");
auto data_format = backQuoteIfNeed(config().getString("table-data-format", "TSV"));
String table_file;
if (!config().has("table-file") || config().getString("table-file") == "-")
{
/// Use Unix tools stdin naming convention
table_file = "stdin";
}
else
{
/// Use regular file
table_file = quoteString(config().getString("table-file"));
}
return fmt::format("CREATE TABLE {} ({}) ENGINE = File({}, {});",
table_name, table_structure, data_format, table_file);
}
static ConfigurationPtr getConfigurationFromXMLString(const char * xml_data)
{
std::stringstream ss{std::string{xml_data}}; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
Poco::XML::InputSource input_source{ss};
return {new Poco::Util::XMLConfiguration{&input_source}};
}
void LocalServer::setupUsers()
{
static const char * minimal_default_user_xml =
"<yandex>"
" <profiles>"
" <default></default>"
" </profiles>"
" <users>"
" <default>"
" <password></password>"
" <networks>"
" <ip>::/0</ip>"
" </networks>"
" <profile>default</profile>"
" <quota>default</quota>"
" </default>"
" </users>"
" <quotas>"
" <default></default>"
" </quotas>"
"</yandex>";
ConfigurationPtr users_config;
if (config().has("users_config") || config().has("config-file") || fs::exists("config.xml"))
{
const auto users_config_path = config().getString("users_config", config().getString("config-file", "config.xml"));
ConfigProcessor config_processor(users_config_path);
const auto loaded_config = config_processor.loadConfig();
config_processor.savePreprocessedConfig(loaded_config, config().getString("path", DBMS_DEFAULT_PATH));
users_config = loaded_config.configuration;
}
else
{
users_config = getConfigurationFromXMLString(minimal_default_user_xml);
}
if (users_config)
global_context->setUsersConfig(users_config);
else
throw Exception("Can't load config for users", ErrorCodes::CANNOT_LOAD_CONFIG);
}
String LocalServer::getQueryTextPrefix()
{
return getInitialCreateTableQuery();
}
void LocalServer::connect()
{
connection_parameters = ConnectionParameters(config());
connection = LocalConnection::createConnection(connection_parameters, global_context, need_render_progress);
}
int LocalServer::main(const std::vector<std::string> & /*args*/)
try
{
Poco::Logger * log = &logger();
ThreadStatus thread_status;
UseSSL use_ssl;
ThreadStatus thread_status;
if (!config().has("query") && !config().has("table-structure") && !config().has("queries-file")) /// Nothing to process
std::cout << std::fixed << std::setprecision(3);
std::cerr << std::fixed << std::setprecision(3);
is_interactive = stdin_is_a_tty && !config().has("query") && !config().has("table-structure") && queries_files.empty();
std::optional<InterruptListener> interrupt_listener;
if (is_interactive)
{
if (config().hasOption("verbose"))
std::cerr << "There are no queries to process." << '\n';
return Application::EXIT_OK;
interrupt_listener.emplace();
}
if (config().has("query") && config().has("queries-file"))
else
{
throw Exception("Specify either `query` or `queries-file` option", ErrorCodes::BAD_ARGUMENTS);
/// We will terminate process on error
static KillingErrorHandler error_handler;
Poco::ErrorHandler::set(&error_handler);
}
shared_context = Context::createShared();
global_context = Context::createGlobal(shared_context.get());
global_context->makeGlobalContext();
global_context->setApplicationType(Context::ApplicationType::LOCAL);
tryInitPath();
std::optional<StatusFile> status;
/// Skip temp path installation
/// We will terminate process on error
static KillingErrorHandler error_handler;
Poco::ErrorHandler::set(&error_handler);
/// Don't initialize DateLUT
registerFunctions();
registerAggregateFunctions();
registerTableFunctions();
@ -236,10 +440,88 @@ try
registerDisks();
registerFormats();
processConfig();
applyCmdSettings(global_context);
connect();
if (is_interactive)
{
clearTerminal();
showClientVersion();
std::cerr << std::endl;
runInteractive();
}
else
{
runNonInteractive();
}
cleanup();
return Application::EXIT_OK;
}
catch (...)
{
try
{
cleanup();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
if (!ignore_error)
std::cerr << getCurrentExceptionMessage(config().hasOption("stacktrace")) << '\n';
auto code = getCurrentExceptionCode();
/// If exception code isn't zero, we should return non-zero return code anyway.
return code ? code : -1;
}
void LocalServer::processConfig()
{
if (is_interactive)
{
if (config().has("query") && config().has("queries-file"))
throw Exception("Specify either `query` or `queries-file` option", ErrorCodes::BAD_ARGUMENTS);
if (config().has("multiquery"))
is_multiquery = true;
load_suggestions = true;
}
else
{
need_render_progress = config().getBool("progress", false);
echo_queries = config().hasOption("echo") || config().hasOption("verbose");
ignore_error = config().getBool("ignore-error", false);
is_multiquery = true;
}
shared_context = Context::createShared();
global_context = Context::createGlobal(shared_context.get());
global_context->makeGlobalContext();
global_context->setApplicationType(Context::ApplicationType::LOCAL);
tryInitPath();
Poco::Logger * log = &logger();
/// Maybe useless
if (config().has("macros"))
global_context->setMacros(std::make_unique<Macros>(config(), "macros", log));
format = config().getString("output-format", config().getString("format", is_interactive ? "PrettyCompact" : "TSV"));
insert_format = "Values";
/// Setting value from cmd arg overrides one from config
if (global_context->getSettingsRef().max_insert_block_size.changed)
insert_format_max_block_size = global_context->getSettingsRef().max_insert_block_size;
else
insert_format_max_block_size = config().getInt("insert_format_max_block_size", global_context->getSettingsRef().max_insert_block_size);
/// Skip networking
/// Sets external authenticators config (LDAP, Kerberos).
@ -290,7 +572,7 @@ try
String path = global_context->getPath();
/// Lock path directory before read
status.emplace(path + "status", StatusFile::write_full_info);
status.emplace(fs::path(path) / "status", StatusFile::write_full_info);
fs::create_directories(fs::path(path) / "user_defined/");
LOG_DEBUG(log, "Loading user defined objects from {}", path);
@ -301,6 +583,7 @@ try
LOG_DEBUG(log, "Loading metadata from {}", path);
fs::create_directories(fs::path(path) / "data/");
fs::create_directories(fs::path(path) / "metadata/");
loadMetadataSystem(global_context);
attachSystemTablesLocal(*createMemoryDatabaseIfNotExists(global_context, DatabaseCatalog::SYSTEM_DATABASE));
attachInformationSchema(global_context, *createMemoryDatabaseIfNotExists(global_context, DatabaseCatalog::INFORMATION_SCHEMA));
@ -308,6 +591,7 @@ try
loadMetadata(global_context);
startupSystemTables();
DatabaseCatalog::instance().loadDatabases();
LOG_DEBUG(log, "Loaded metadata.");
}
else if (!config().has("no-system-tables"))
@ -317,226 +601,17 @@ try
attachInformationSchema(global_context, *createMemoryDatabaseIfNotExists(global_context, DatabaseCatalog::INFORMATION_SCHEMA_UPPERCASE));
}
processQueries();
server_display_name = config().getString("display_name", getFQDNOrHostName());
prompt_by_server_display_name = config().getRawString("prompt_by_server_display_name.default", "{display_name} :) ");
std::map<String, String> prompt_substitutions{{"display_name", server_display_name}};
for (const auto & [key, value] : prompt_substitutions)
boost::replace_all(prompt_by_server_display_name, "{" + key + "}", value);
global_context->shutdown();
global_context.reset();
status.reset();
cleanup();
return Application::EXIT_OK;
}
catch (const Exception & e)
{
try
{
cleanup();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
std::cerr << getCurrentExceptionMessage(config().hasOption("stacktrace")) << '\n';
/// If exception code isn't zero, we should return non-zero return code anyway.
return e.code() ? e.code() : -1;
ClientInfo & client_info = global_context->getClientInfo();
client_info.setInitialQuery();
}
std::string LocalServer::getInitialCreateTableQuery()
{
if (!config().has("table-structure"))
return {};
auto table_name = backQuoteIfNeed(config().getString("table-name", "table"));
auto table_structure = config().getString("table-structure");
auto data_format = backQuoteIfNeed(config().getString("table-data-format", "TSV"));
String table_file;
if (!config().has("table-file") || config().getString("table-file") == "-") /// Use Unix tools stdin naming convention
table_file = "stdin";
else /// Use regular file
table_file = quoteString(config().getString("table-file"));
return
"CREATE TABLE " + table_name +
" (" + table_structure + ") " +
"ENGINE = "
"File(" + data_format + ", " + table_file + ")"
"; ";
}
void LocalServer::processQueries()
{
String initial_create_query = getInitialCreateTableQuery();
String queries_str = initial_create_query;
if (config().has("query"))
queries_str += config().getRawString("query");
else
{
String queries_from_file;
ReadBufferFromFile in(config().getString("queries-file"));
readStringUntilEOF(queries_from_file, in);
queries_str += queries_from_file;
}
const auto & settings = global_context->getSettingsRef();
std::vector<String> queries;
auto parse_res = splitMultipartQuery(queries_str, queries, settings.max_query_size, settings.max_parser_depth);
if (!parse_res.second)
throw Exception("Cannot parse and execute the following part of query: " + String(parse_res.first), ErrorCodes::SYNTAX_ERROR);
/// Authenticate and create a context to execute queries.
Session session{global_context, ClientInfo::Interface::LOCAL};
session.authenticate("default", "", {});
/// Use the same context for all queries.
auto context = session.makeQueryContext();
context->makeSessionContext(); /// initial_create_query requires a session context to be set.
context->setCurrentQueryId("");
applyCmdSettings(context);
/// Use the same query_id (and thread group) for all queries
CurrentThread::QueryScope query_scope_holder(context);
/// Set progress show
need_render_progress = config().getBool("progress", false);
std::function<void()> finalize_progress;
if (need_render_progress)
{
/// Set progress callback, which can be run from multiple threads.
context->setProgressCallback([&](const Progress & value)
{
/// Write progress only if progress was updated
if (progress_indication.updateProgress(value))
progress_indication.writeProgress();
});
/// Set finalizing callback for progress, which is called right before finalizing query output.
finalize_progress = [&]()
{
progress_indication.clearProgressOutput();
};
/// Set callback for file processing progress.
progress_indication.setFileProgressCallback(context);
}
bool echo_queries = config().hasOption("echo") || config().hasOption("verbose");
std::exception_ptr exception;
for (const auto & query : queries)
{
written_first_block = false;
progress_indication.resetProgress();
ReadBufferFromString read_buf(query);
WriteBufferFromFileDescriptor write_buf(STDOUT_FILENO);
if (echo_queries)
{
writeString(query, write_buf);
writeChar('\n', write_buf);
write_buf.next();
}
try
{
executeQuery(read_buf, write_buf, /* allow_into_outfile = */ true, context, {}, {}, finalize_progress);
}
catch (...)
{
if (!config().hasOption("ignore-error"))
throw;
if (!exception)
exception = std::current_exception();
std::cerr << getCurrentExceptionMessage(config().hasOption("stacktrace")) << '\n';
}
}
if (exception)
std::rethrow_exception(exception);
}
static const char * minimal_default_user_xml =
"<yandex>"
" <profiles>"
" <default></default>"
" </profiles>"
" <users>"
" <default>"
" <password></password>"
" <networks>"
" <ip>::/0</ip>"
" </networks>"
" <profile>default</profile>"
" <quota>default</quota>"
" </default>"
" </users>"
" <quotas>"
" <default></default>"
" </quotas>"
"</yandex>";
static ConfigurationPtr getConfigurationFromXMLString(const char * xml_data)
{
std::stringstream ss{std::string{xml_data}}; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
Poco::XML::InputSource input_source{ss};
return {new Poco::Util::XMLConfiguration{&input_source}};
}
void LocalServer::setupUsers()
{
ConfigurationPtr users_config;
if (config().has("users_config") || config().has("config-file") || fs::exists("config.xml"))
{
const auto users_config_path = config().getString("users_config", config().getString("config-file", "config.xml"));
ConfigProcessor config_processor(users_config_path);
const auto loaded_config = config_processor.loadConfig();
config_processor.savePreprocessedConfig(loaded_config, config().getString("path", DBMS_DEFAULT_PATH));
users_config = loaded_config.configuration;
}
else
{
users_config = getConfigurationFromXMLString(minimal_default_user_xml);
}
if (users_config)
global_context->setUsersConfig(users_config);
else
throw Exception("Can't load config for users", ErrorCodes::CANNOT_LOAD_CONFIG);
}
void LocalServer::cleanup()
{
// Delete the temporary directory if needed.
if (temporary_directory_to_delete)
{
const auto dir = *temporary_directory_to_delete;
temporary_directory_to_delete.reset();
LOG_DEBUG(&logger(), "Removing temporary directory: {}", dir.string());
remove_all(dir);
}
}
static void showClientVersion()
{
std::cout << DBMS_NAME << " client version " << VERSION_STRING << VERSION_OFFICIAL << "." << '\n';
}
static std::string getHelpHeader()
{
return
@ -552,6 +627,7 @@ static std::string getHelpHeader()
"Either through corresponding command line parameters --table --structure --input-format and --file.";
}
static std::string getHelpFooter()
{
return
@ -562,119 +638,81 @@ static std::string getHelpFooter()
" BY mem_total DESC FORMAT PrettyCompact\"";
}
void LocalServer::init(int argc, char ** argv)
void LocalServer::printHelpMessage(const OptionsDescription & options_description)
{
namespace po = boost::program_options;
std::cout << getHelpHeader() << "\n";
std::cout << options_description.main_description.value() << "\n";
std::cout << getHelpFooter() << "\n";
}
/// Don't parse options with Poco library, we prefer neat boost::program_options
stopOptionsProcessing();
po::options_description description = createOptionsDescription("Main options", getTerminalWidth());
description.add_options()
("help", "produce help message")
("config-file,c", po::value<std::string>(), "config-file path")
("query,q", po::value<std::string>(), "query")
("queries-file, qf", po::value<std::string>(), "file path with queries to execute")
void LocalServer::addAndCheckOptions(OptionsDescription & options_description, po::variables_map & options, Arguments & arguments)
{
options_description.main_description->add_options()
("database,d", po::value<std::string>(), "database")
("table,N", po::value<std::string>(), "name of the initial table")
/// If structure argument is omitted then initial query is not generated
("structure,S", po::value<std::string>(), "structure of the initial table (list of column and type names)")
("file,f", po::value<std::string>(), "path to file with data of the initial table (stdin if not specified)")
("input-format", po::value<std::string>(), "input format of the initial table data")
("format,f", po::value<std::string>(), "default output format (clickhouse-client compatibility)")
("output-format", po::value<std::string>(), "default output format")
("stacktrace", "print stack traces of exceptions")
("echo", "print query before execution")
("verbose", "print query and other debugging info")
("logger.console", po::value<bool>()->implicit_value(true), "Log to console")
("logger.log", po::value<std::string>(), "Log file name")
("logger.level", po::value<std::string>(), "Log level")
("ignore-error", "do not stop processing if a query failed")
("no-system-tables", "do not attach system tables (better startup time)")
("version,V", "print version information and exit")
("progress", "print progress of queries execution")
;
cmd_settings.addProgramOptions(description);
/// Parse main commandline options.
po::parsed_options parsed = po::command_line_parser(argc, argv).options(description).run();
po::variables_map options;
cmd_settings.addProgramOptions(options_description.main_description.value());
po::parsed_options parsed = po::command_line_parser(arguments).options(options_description.main_description.value()).run();
po::store(parsed, options);
po::notify(options);
}
if (options.count("version") || options.count("V"))
{
showClientVersion();
exit(0);
}
if (options.empty() || options.count("help"))
{
std::cout << getHelpHeader() << "\n";
std::cout << description << "\n";
std::cout << getHelpFooter() << "\n";
exit(0);
}
void LocalServer::applyCmdSettings(ContextMutablePtr context)
{
context->applySettingsChanges(cmd_settings.changes());
}
/// Save received data into the internal config.
if (options.count("config-file"))
config().setString("config-file", options["config-file"].as<std::string>());
if (options.count("query"))
config().setString("query", options["query"].as<std::string>());
if (options.count("queries-file"))
config().setString("queries-file", options["queries-file"].as<std::string>());
if (options.count("database"))
config().setString("default_database", options["database"].as<std::string>());
void LocalServer::applyCmdOptions(ContextMutablePtr context)
{
context->setDefaultFormat(config().getString("output-format", config().getString("format", is_interactive ? "PrettyCompact" : "TSV")));
applyCmdSettings(context);
}
void LocalServer::processOptions(const OptionsDescription &, const CommandLineOptions & options, const std::vector<Arguments> &)
{
if (options.count("table"))
config().setString("table-name", options["table"].as<std::string>());
if (options.count("file"))
config().setString("table-file", options["file"].as<std::string>());
if (options.count("structure"))
config().setString("table-structure", options["structure"].as<std::string>());
if (options.count("no-system-tables"))
config().setBool("no-system-tables", true);
if (options.count("input-format"))
config().setString("table-data-format", options["input-format"].as<std::string>());
if (options.count("format"))
config().setString("format", options["format"].as<std::string>());
if (options.count("output-format"))
config().setString("output-format", options["output-format"].as<std::string>());
if (options.count("stacktrace"))
config().setBool("stacktrace", true);
if (options.count("progress"))
config().setBool("progress", true);
if (options.count("echo"))
config().setBool("echo", true);
if (options.count("verbose"))
config().setBool("verbose", true);
if (options.count("logger.console"))
config().setBool("logger.console", options["logger.console"].as<bool>());
if (options.count("logger.log"))
config().setString("logger.log", options["logger.log"].as<std::string>());
if (options.count("logger.level"))
config().setString("logger.level", options["logger.level"].as<std::string>());
if (options.count("ignore-error"))
config().setBool("ignore-error", true);
if (options.count("no-system-tables"))
config().setBool("no-system-tables", true);
std::vector<std::string> arguments;
for (int arg_num = 1; arg_num < argc; ++arg_num)
arguments.emplace_back(argv[arg_num]);
argsToConfig(arguments, config(), 100);
}
void LocalServer::applyCmdOptions(ContextMutablePtr context)
{
context->setDefaultFormat(config().getString("output-format", config().getString("format", "TSV")));
applyCmdSettings(context);
}
}
#pragma GCC diagnostic ignored "-Wunused-function"
#pragma GCC diagnostic ignored "-Wmissing-declarations"

View File

@ -1,13 +1,19 @@
#pragma once
#include <Client/ClientBase.h>
#include <Client/LocalConnection.h>
#include <Common/ProgressIndication.h>
#include <Common/StatusFile.h>
#include <Common/InterruptListener.h>
#include <loggers/Loggers.h>
#include <Core/Settings.h>
#include <Interpreters/Context.h>
#include <filesystem>
#include <memory>
#include <optional>
#include <Core/Settings.h>
#include <Interpreters/Context.h>
#include <loggers/Loggers.h>
#include <Poco/Util/Application.h>
#include <Common/ProgressIndication.h>
namespace DB
{
@ -15,18 +21,29 @@ namespace DB
/// Lightweight Application for clickhouse-local
/// No networking, no extra configs and working directories, no pid and status files, no dictionaries, no logging.
/// Quiet mode by default
class LocalServer : public Poco::Util::Application, public Loggers
class LocalServer : public ClientBase, public Loggers
{
public:
LocalServer();
LocalServer() = default;
void initialize(Poco::Util::Application & self) override;
int main(const std::vector<std::string> & args) override;
int main(const std::vector<String> & /*args*/) override;
void init(int argc, char ** argv);
protected:
bool executeMultiQuery(const String & all_queries_text) override;
~LocalServer() override;
void connect() override;
void processError(const String & query) const override;
String getName() const override { return "local"; }
String getQueryTextPrefix() override;
void printHelpMessage(const OptionsDescription & options_description) override;
void addAndCheckOptions(OptionsDescription & options_description, po::variables_map & options, Arguments & arguments) override;
void processOptions(const OptionsDescription & options_description, const CommandLineOptions & options,
const std::vector<Arguments> &) override;
void processConfig() override;
private:
/** Composes CREATE subquery based on passed arguments (--structure --file --table and --input-format)
@ -36,26 +53,13 @@ private:
std::string getInitialCreateTableQuery();
void tryInitPath();
void applyCmdOptions(ContextMutablePtr context);
void applyCmdSettings(ContextMutablePtr context);
void processQueries();
void setupUsers();
void cleanup();
void applyCmdOptions(ContextMutablePtr context);
void applyCmdSettings(ContextMutablePtr context);
protected:
SharedContextHolder shared_context;
ContextMutablePtr global_context;
/// Settings specified via command line args
Settings cmd_settings;
bool need_render_progress = false;
bool written_first_block = false;
ProgressIndication progress_indication;
std::optional<StatusFile> status;
std::optional<std::filesystem::path> temporary_directory_to_delete;
};

1543
src/Client/ClientBase.cpp Normal file

File diff suppressed because it is too large Load Diff

214
src/Client/ClientBase.h Normal file
View File

@ -0,0 +1,214 @@
#pragma once
#include <Common/ProgressIndication.h>
#include <Common/InterruptListener.h>
#include <Common/ShellCommand.h>
#include <Core/ExternalTable.h>
#include <Poco/Util/Application.h>
#include <Interpreters/Context.h>
#include <Client/Suggest.h>
#include <Client/QueryFuzzer.h>
#include <boost/program_options.hpp>
namespace po = boost::program_options;
namespace DB
{
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
}
enum MultiQueryProcessingStage
{
QUERIES_END,
PARSING_EXCEPTION,
CONTINUE_PARSING,
EXECUTE_QUERY,
PARSING_FAILED,
};
class ClientBase : public Poco::Util::Application
{
public:
using Arguments = std::vector<String>;
void init(int argc, char ** argv);
protected:
void runInteractive();
void runNonInteractive();
virtual bool processWithFuzzing(const String &)
{
throw Exception("Query processing with fuzzing is not implemented", ErrorCodes::NOT_IMPLEMENTED);
}
virtual bool executeMultiQuery(const String & all_queries_text) = 0;
virtual void connect() = 0;
virtual void processError(const String & query) const = 0;
virtual String getName() const = 0;
void processOrdinaryQuery(const String & query_to_execute, ASTPtr parsed_query);
void processInsertQuery(const String & query_to_execute, ASTPtr parsed_query);
void processTextAsSingleQuery(const String & full_query);
void processParsedSingleQuery(const String & full_query, const String & query_to_execute,
ASTPtr parsed_query, std::optional<bool> echo_query_ = {}, bool report_error = false);
static void adjustQueryEnd(const char *& this_query_end, const char * all_queries_end, int max_parser_depth);
ASTPtr parseQuery(const char *& pos, const char * end, bool allow_multi_statements) const;
MultiQueryProcessingStage analyzeMultiQueryText(
const char *& this_query_begin, const char *& this_query_end, const char * all_queries_end,
String & query_to_execute, ASTPtr & parsed_query, const String & all_queries_text,
std::optional<Exception> & current_exception);
/// For non-interactive multi-query mode get queries text prefix.
virtual String getQueryTextPrefix() { return ""; }
static void clearTerminal();
void showClientVersion();
using ProgramOptionsDescription = boost::program_options::options_description;
using CommandLineOptions = boost::program_options::variables_map;
struct OptionsDescription
{
std::optional<ProgramOptionsDescription> main_description;
std::optional<ProgramOptionsDescription> external_description;
};
virtual void printHelpMessage(const OptionsDescription & options_description) = 0;
virtual void addAndCheckOptions(OptionsDescription & options_description, po::variables_map & options, Arguments & arguments) = 0;
virtual void processOptions(const OptionsDescription & options_description,
const CommandLineOptions & options,
const std::vector<Arguments> & external_tables_arguments) = 0;
virtual void processConfig() = 0;
private:
bool processQueryText(const String & text);
void receiveResult(ASTPtr parsed_query);
bool receiveAndProcessPacket(ASTPtr parsed_query, bool cancelled);
void receiveLogs(ASTPtr parsed_query);
bool receiveSampleBlock(Block & out, ColumnsDescription & columns_description, ASTPtr parsed_query);
bool receiveEndOfQuery();
void onProgress(const Progress & value);
void onData(Block & block, ASTPtr parsed_query);
void onLogData(Block & block);
void onTotals(Block & block, ASTPtr parsed_query);
void onExtremes(Block & block, ASTPtr parsed_query);
void onReceiveExceptionFromServer(std::unique_ptr<Exception> && e);
void onProfileInfo(const BlockStreamProfileInfo & profile_info);
void onEndOfStream();
void sendData(Block & sample, const ColumnsDescription & columns_description, ASTPtr parsed_query);
void sendDataFrom(ReadBuffer & buf, Block & sample,
const ColumnsDescription & columns_description, ASTPtr parsed_query);
void sendExternalTables(ASTPtr parsed_query);
void initBlockOutputStream(const Block & block, ASTPtr parsed_query);
void initLogsOutputStream();
inline String prompt() const
{
return boost::replace_all_copy(prompt_by_server_display_name, "{database}", config().getString("database", "default"));
}
void resetOutput();
void outputQueryInfo(bool echo_query_);
void readArguments(int argc, char ** argv, Arguments & common_arguments, std::vector<Arguments> & external_tables_arguments);
protected:
bool is_interactive = false; /// Use either interactive line editing interface or batch mode.
bool is_multiquery = false;
bool echo_queries = false; /// Print queries before execution in batch mode.
bool ignore_error = false; /// In case of errors, don't print error message, continue to next query. Only applicable for non-interactive mode.
bool print_time_to_stderr = false; /// Output execution time to stderr in batch mode.
bool load_suggestions = false;
std::vector<String> queries_files; /// If not empty, queries will be read from these files
std::vector<String> interleave_queries_files; /// If not empty, run queries from these files before processing every file from 'queries_files'.
bool stdin_is_a_tty = false; /// stdin is a terminal.
bool stdout_is_a_tty = false; /// stdout is a terminal.
uint64_t terminal_width = 0;
ServerConnectionPtr connection;
ConnectionParameters connection_parameters;
String format; /// Query results output format.
bool is_default_format = true; /// false, if format is set in the config or command line.
size_t format_max_block_size = 0; /// Max block size for console output.
String insert_format; /// Format of INSERT data that is read from stdin in batch mode.
size_t insert_format_max_block_size = 0; /// Max block size when reading INSERT data.
size_t max_client_network_bandwidth = 0; /// The maximum speed of data exchange over the network for the client in bytes per second.
bool has_vertical_output_suffix = false; /// Is \G present at the end of the query string?
/// We will format query_id in interactive mode in various ways, the default is just to print Query id: ...
std::vector<std::pair<String, String>> query_id_formats;
/// Settings specified via command line args
Settings cmd_settings;
SharedContextHolder shared_context;
ContextMutablePtr global_context;
/// Buffer that reads from stdin in batch mode.
ReadBufferFromFileDescriptor std_in{STDIN_FILENO};
/// Console output.
WriteBufferFromFileDescriptor std_out{STDOUT_FILENO};
std::unique_ptr<ShellCommand> pager_cmd;
/// The user can specify to redirect query output to a file.
std::unique_ptr<WriteBuffer> out_file_buf;
BlockOutputStreamPtr block_out_stream;
/// The user could specify special file for server logs (stderr by default)
std::unique_ptr<WriteBuffer> out_logs_buf;
String server_logs_file;
BlockOutputStreamPtr logs_out_stream;
String home_path;
String history_file; /// Path to a file containing command history.
String current_profile;
UInt64 server_revision = 0;
String server_version;
String prompt_by_server_display_name;
String server_display_name;
ProgressIndication progress_indication;
bool need_render_progress = true;
bool written_first_block = false;
size_t processed_rows = 0; /// How many rows have been read or written.
/// The last exception that was received from the server. Is used for the
/// return code in batch mode.
std::unique_ptr<Exception> server_exception;
/// Likewise, the last exception that occurred on the client.
std::unique_ptr<Exception> client_exception;
/// If the last query resulted in exception. `server_exception` or
/// `client_exception` must be set.
bool have_error = false;
std::list<ExternalTable> external_tables; /// External tables info.
bool send_external_tables = false;
NameToNameMap query_parameters; /// Dictionary with query parameters for prepared statements.
QueryFuzzer fuzzer;
int query_fuzzer_runs = 0;
QueryProcessingStage::Enum query_processing_stage;
};
}

View File

@ -0,0 +1,168 @@
#include "ClientBaseHelpers.h"
#include <common/DateLUT.h>
#include <common/LocalDate.h>
#include <Parsers/Lexer.h>
#include <Common/UTF8Helpers.h>
namespace DB
{
/// Should we celebrate a bit?
bool isNewYearMode()
{
time_t current_time = time(nullptr);
/// It's bad to be intrusive.
if (current_time % 3 != 0)
return false;
LocalDate now(current_time);
return (now.month() == 12 && now.day() >= 20) || (now.month() == 1 && now.day() <= 5);
}
bool isChineseNewYearMode(const String & local_tz)
{
/// Days of Dec. 20 in Chinese calendar starting from year 2019 to year 2105
static constexpr UInt16 chineseNewYearIndicators[]
= {18275, 18659, 19014, 19368, 19752, 20107, 20491, 20845, 21199, 21583, 21937, 22292, 22676, 23030, 23414, 23768, 24122, 24506,
24860, 25215, 25599, 25954, 26308, 26692, 27046, 27430, 27784, 28138, 28522, 28877, 29232, 29616, 29970, 30354, 30708, 31062,
31446, 31800, 32155, 32539, 32894, 33248, 33632, 33986, 34369, 34724, 35078, 35462, 35817, 36171, 36555, 36909, 37293, 37647,
38002, 38386, 38740, 39095, 39479, 39833, 40187, 40571, 40925, 41309, 41664, 42018, 42402, 42757, 43111, 43495, 43849, 44233,
44587, 44942, 45326, 45680, 46035, 46418, 46772, 47126, 47510, 47865, 48249, 48604, 48958, 49342};
/// All time zone names are acquired from https://www.iana.org/time-zones
static constexpr const char * chineseNewYearTimeZoneIndicators[] = {
/// Time zones celebrating Chinese new year.
"Asia/Shanghai",
"Asia/Chongqing",
"Asia/Harbin",
"Asia/Urumqi",
"Asia/Hong_Kong",
"Asia/Chungking",
"Asia/Macao",
"Asia/Macau",
"Asia/Taipei",
"Asia/Singapore",
/// Time zones celebrating Chinese new year but with different festival names. Let's not print the message for now.
// "Asia/Brunei",
// "Asia/Ho_Chi_Minh",
// "Asia/Hovd",
// "Asia/Jakarta",
// "Asia/Jayapura",
// "Asia/Kashgar",
// "Asia/Kuala_Lumpur",
// "Asia/Kuching",
// "Asia/Makassar",
// "Asia/Pontianak",
// "Asia/Pyongyang",
// "Asia/Saigon",
// "Asia/Seoul",
// "Asia/Ujung_Pandang",
// "Asia/Ulaanbaatar",
// "Asia/Ulan_Bator",
};
static constexpr size_t M = sizeof(chineseNewYearTimeZoneIndicators) / sizeof(chineseNewYearTimeZoneIndicators[0]);
time_t current_time = time(nullptr);
if (chineseNewYearTimeZoneIndicators + M
== std::find_if(chineseNewYearTimeZoneIndicators, chineseNewYearTimeZoneIndicators + M, [&local_tz](const char * tz)
{
return tz == local_tz;
}))
return false;
/// It's bad to be intrusive.
if (current_time % 3 != 0)
return false;
auto days = DateLUT::instance().toDayNum(current_time).toUnderType();
for (auto d : chineseNewYearIndicators)
{
/// Let's celebrate until Lantern Festival
if (d <= days && d + 25 >= days)
return true;
else if (d > days)
return false;
}
return false;
}
#if USE_REPLXX
void highlight(const String & query, std::vector<replxx::Replxx::Color> & colors)
{
using namespace replxx;
static const std::unordered_map<TokenType, Replxx::Color> token_to_color
= {{TokenType::Whitespace, Replxx::Color::DEFAULT},
{TokenType::Comment, Replxx::Color::GRAY},
{TokenType::BareWord, Replxx::Color::DEFAULT},
{TokenType::Number, Replxx::Color::GREEN},
{TokenType::StringLiteral, Replxx::Color::CYAN},
{TokenType::QuotedIdentifier, Replxx::Color::MAGENTA},
{TokenType::OpeningRoundBracket, Replxx::Color::BROWN},
{TokenType::ClosingRoundBracket, Replxx::Color::BROWN},
{TokenType::OpeningSquareBracket, Replxx::Color::BROWN},
{TokenType::ClosingSquareBracket, Replxx::Color::BROWN},
{TokenType::DoubleColon, Replxx::Color::BROWN},
{TokenType::OpeningCurlyBrace, Replxx::Color::INTENSE},
{TokenType::ClosingCurlyBrace, Replxx::Color::INTENSE},
{TokenType::Comma, Replxx::Color::INTENSE},
{TokenType::Semicolon, Replxx::Color::INTENSE},
{TokenType::Dot, Replxx::Color::INTENSE},
{TokenType::Asterisk, Replxx::Color::INTENSE},
{TokenType::HereDoc, Replxx::Color::CYAN},
{TokenType::Plus, Replxx::Color::INTENSE},
{TokenType::Minus, Replxx::Color::INTENSE},
{TokenType::Slash, Replxx::Color::INTENSE},
{TokenType::Percent, Replxx::Color::INTENSE},
{TokenType::Arrow, Replxx::Color::INTENSE},
{TokenType::QuestionMark, Replxx::Color::INTENSE},
{TokenType::Colon, Replxx::Color::INTENSE},
{TokenType::Equals, Replxx::Color::INTENSE},
{TokenType::NotEquals, Replxx::Color::INTENSE},
{TokenType::Less, Replxx::Color::INTENSE},
{TokenType::Greater, Replxx::Color::INTENSE},
{TokenType::LessOrEquals, Replxx::Color::INTENSE},
{TokenType::GreaterOrEquals, Replxx::Color::INTENSE},
{TokenType::Concatenation, Replxx::Color::INTENSE},
{TokenType::At, Replxx::Color::INTENSE},
{TokenType::DoubleAt, Replxx::Color::MAGENTA},
{TokenType::EndOfStream, Replxx::Color::DEFAULT},
{TokenType::Error, Replxx::Color::RED},
{TokenType::ErrorMultilineCommentIsNotClosed, Replxx::Color::RED},
{TokenType::ErrorSingleQuoteIsNotClosed, Replxx::Color::RED},
{TokenType::ErrorDoubleQuoteIsNotClosed, Replxx::Color::RED},
{TokenType::ErrorSinglePipeMark, Replxx::Color::RED},
{TokenType::ErrorWrongNumber, Replxx::Color::RED},
{ TokenType::ErrorMaxQuerySizeExceeded, Replxx::Color::RED }};
const Replxx::Color unknown_token_color = Replxx::Color::RED;
Lexer lexer(query.data(), query.data() + query.size());
size_t pos = 0;
for (Token token = lexer.nextToken(); !token.isEnd(); token = lexer.nextToken())
{
size_t utf8_len = UTF8::countCodePoints(reinterpret_cast<const UInt8 *>(token.begin), token.size());
for (size_t code_point_index = 0; code_point_index < utf8_len; ++code_point_index)
{
if (token_to_color.find(token.type) != token_to_color.end())
colors[pos + code_point_index] = token_to_color.at(token.type);
else
colors[pos + code_point_index] = unknown_token_color;
}
pos += utf8_len;
}
}
#endif
}

View File

@ -0,0 +1,22 @@
#pragma once
#include <Core/Types.h>
#if USE_REPLXX
# include <common/ReplxxLineReader.h>
#endif
namespace DB
{
/// Should we celebrate a bit?
bool isNewYearMode();
bool isChineseNewYearMode(const String & local_tz);
#if USE_REPLXX
void highlight(const String & query, std::vector<replxx::Replxx::Color> & colors);
#endif
}

View File

@ -12,6 +12,7 @@
#include <DataStreams/NativeBlockInputStream.h>
#include <DataStreams/NativeBlockOutputStream.h>
#include <Client/Connection.h>
#include <Client/ConnectionParameters.h>
#include <Common/ClickHouseRevision.h>
#include <Common/Exception.h>
#include <Common/NetException.h>
@ -510,7 +511,7 @@ void Connection::sendQuery(
/// Send empty block which means end of data.
if (!with_pending_data)
{
sendData(Block());
sendData(Block(), "", false);
out->next();
}
}
@ -665,7 +666,7 @@ protected:
num_rows += chunk.getNumRows();
auto block = getPort().getHeader().cloneWithColumns(chunk.detachColumns());
connection.sendData(block, table_data.table_name);
connection.sendData(block, table_data.table_name, false);
}
private:
@ -681,7 +682,7 @@ void Connection::sendExternalTablesData(ExternalTablesData & data)
if (data.empty())
{
/// Send empty block, which means end of data transfer.
sendData(Block());
sendData(Block(), "", false);
return;
}
@ -719,11 +720,11 @@ void Connection::sendExternalTablesData(ExternalTablesData & data)
/// If table is empty, send empty block with name.
if (read_rows == 0)
sendData(sink->getPort().getHeader(), elem->table_name);
sendData(sink->getPort().getHeader(), elem->table_name, false);
}
/// Send empty block, which means end of data transfer.
sendData(Block());
sendData(Block(), "", false);
out_bytes = out->count() - out_bytes;
maybe_compressed_out_bytes = maybe_compressed_out->count() - maybe_compressed_out_bytes;
@ -980,4 +981,19 @@ void Connection::throwUnexpectedPacket(UInt64 packet_type, const char * expected
ErrorCodes::UNEXPECTED_PACKET_FROM_SERVER);
}
ServerConnectionPtr Connection::createConnection(const ConnectionParameters & parameters, ContextPtr)
{
return std::make_unique<Connection>(
parameters.host,
parameters.port,
parameters.default_database,
parameters.user,
parameters.password,
"", /* cluster */
"", /* cluster_secret */
"client",
parameters.compression,
parameters.security);
}
}

View File

@ -4,20 +4,13 @@
#include <Poco/Net/StreamSocket.h>
#include <Common/Throttler.h>
#if !defined(ARCADIA_BUILD)
# include <Common/config.h>
#endif
#include <Core/Block.h>
#include <Client/IServerConnection.h>
#include <Core/Defines.h>
#include <IO/Progress.h>
#include <Core/Protocol.h>
#include <Core/QueryProcessingStage.h>
#include <DataStreams/IBlockStream_fwd.h>
#include <DataStreams/BlockStreamProfileInfo.h>
#include <IO/ConnectionTimeouts.h>
#include <IO/ReadBufferFromPocoSocket.h>
#include <Interpreters/TablesStatus.h>
@ -31,46 +24,15 @@
namespace DB
{
class ClientInfo;
class Pipe;
struct Settings;
/// Struct which represents data we are going to send for external table.
struct ExternalTableData
{
/// Pipe of data form table;
std::unique_ptr<Pipe> pipe;
std::string table_name;
std::function<std::unique_ptr<Pipe>()> creating_pipe_callback;
/// Flag if need to stop reading.
std::atomic_bool is_cancelled = false;
};
using ExternalTableDataPtr = std::unique_ptr<ExternalTableData>;
using ExternalTablesData = std::vector<ExternalTableDataPtr>;
class Connection;
struct ConnectionParameters;
using ConnectionPtr = std::shared_ptr<Connection>;
using Connections = std::vector<ConnectionPtr>;
/// Packet that could be received from server.
struct Packet
{
UInt64 type;
Block block;
std::unique_ptr<Exception> exception;
std::vector<String> multistring_message;
Progress progress;
BlockStreamProfileInfo profile_info;
std::vector<UUID> part_uuids;
Packet() : type(Protocol::Server::Hello) {}
};
/** Connection with database server, to use by client.
* How to use - see Core/Protocol.h
* (Implementation of server end - see Server/TCPHandler.h)
@ -78,7 +40,7 @@ struct Packet
* As 'default_database' empty string could be passed
* - in that case, server will use it's own default database.
*/
class Connection : private boost::noncopyable
class Connection : public IServerConnection
{
friend class MultiplexedConnections;
@ -111,92 +73,82 @@ public:
setDescription();
}
virtual ~Connection() = default;
static ServerConnectionPtr createConnection(const ConnectionParameters & parameters, ContextPtr context);
/// Set throttler of network traffic. One throttler could be used for multiple connections to limit total traffic.
void setThrottler(const ThrottlerPtr & throttler_)
void setThrottler(const ThrottlerPtr & throttler_) override
{
throttler = throttler_;
}
/// Change default database. Changes will take effect on next reconnect.
void setDefaultDatabase(const String & database);
void setDefaultDatabase(const String & database) override;
void getServerVersion(const ConnectionTimeouts & timeouts,
String & name,
UInt64 & version_major,
UInt64 & version_minor,
UInt64 & version_patch,
UInt64 & revision);
UInt64 getServerRevision(const ConnectionTimeouts & timeouts);
UInt64 & revision) override;
const String & getServerTimezone(const ConnectionTimeouts & timeouts);
const String & getServerDisplayName(const ConnectionTimeouts & timeouts);
UInt64 getServerRevision(const ConnectionTimeouts & timeouts) override;
const String & getServerTimezone(const ConnectionTimeouts & timeouts) override;
const String & getServerDisplayName(const ConnectionTimeouts & timeouts) override;
/// For log and exception messages.
const String & getDescription() const;
const String & getDescription() const override;
const String & getHost() const;
UInt16 getPort() const;
const String & getDefaultDatabase() const;
Protocol::Compression getCompression() const { return compression; }
/// If last flag is true, you need to call sendExternalTablesData after.
void sendQuery(
const ConnectionTimeouts & timeouts,
const String & query,
const String & query_id_ = "",
UInt64 stage = QueryProcessingStage::Complete,
const Settings * settings = nullptr,
const ClientInfo * client_info = nullptr,
bool with_pending_data = false);
const String & query_id_/* = "" */,
UInt64 stage/* = QueryProcessingStage::Complete */,
const Settings * settings/* = nullptr */,
const ClientInfo * client_info/* = nullptr */,
bool with_pending_data/* = false */) override;
void sendCancel();
/// Send block of data; if name is specified, server will write it to external (temporary) table of that name.
void sendData(const Block & block, const String & name = "", bool scalar = false);
/// Send all scalars.
void sendScalarsData(Scalars & data);
/// Send all contents of external (temporary) tables.
void sendExternalTablesData(ExternalTablesData & data);
/// Send parts' uuids to excluded them from query processing
void sendIgnoredPartUUIDs(const std::vector<UUID> & uuids);
void sendCancel() override;
void sendData(const Block & block, const String & name/* = "" */, bool scalar/* = false */) override;
void sendExternalTablesData(ExternalTablesData & data) override;
bool poll(size_t timeout_microseconds/* = 0 */) override;
bool hasReadPendingData() const override;
std::optional<UInt64> checkPacket(size_t timeout_microseconds/* = 0*/) override;
Packet receivePacket() override;
void forceConnected(const ConnectionTimeouts & timeouts) override;
bool isConnected() const override { return connected; }
bool checkConnected() override { return connected && ping(); }
void disconnect() override;
void sendReadTaskResponse(const String &);
/// Send prepared block of data (serialized and, if need, compressed), that will be read from 'input'.
/// You could pass size of serialized/compressed block.
void sendPreparedData(ReadBuffer & input, size_t size, const String & name = "");
/// Check, if has data to read.
bool poll(size_t timeout_microseconds = 0);
/// Check, if has data in read buffer.
bool hasReadPendingData() const;
/// Checks if there is input data in connection and reads packet ID.
std::optional<UInt64> checkPacket(size_t timeout_microseconds = 0);
/// Receive packet from server.
Packet receivePacket();
/// If not connected yet, or if connection is broken - then connect. If cannot connect - throw an exception.
void forceConnected(const ConnectionTimeouts & timeouts);
bool isConnected() const { return connected; }
/// Check if connection is still active with ping request.
bool checkConnected() { return connected && ping(); }
void sendReadTaskResponse(const String &);
/// Send all scalars.
void sendScalarsData(Scalars & data);
/// Send parts' uuids to excluded them from query processing
void sendIgnoredPartUUIDs(const std::vector<UUID> & uuids);
TablesStatusResponse getTablesStatus(const ConnectionTimeouts & timeouts,
const TablesStatusRequest & request);
/** Disconnect.
* This may be used, if connection is left in unsynchronised state
* (when someone continues to wait for something) after an exception.
*/
void disconnect();
size_t outBytesCount() const { return out ? out->count() : 0; }
size_t inBytesCount() const { return in ? in->count() : 0; }
@ -209,7 +161,6 @@ public:
if (in)
in->setAsyncCallback(std::move(async_callback));
}
private:
String host;
UInt16 port;

View File

@ -13,9 +13,10 @@
#include <common/scope_guard.h>
#if !defined(ARCADIA_BUILD)
#include <readpassphrase.h> // Y_IGNORE
#include <readpassphrase/readpassphrase.h> // Y_IGNORE
#endif
namespace DB
{

View File

@ -0,0 +1,124 @@
#pragma once
#include <Common/Throttler.h>
#include <Core/Types.h>
#include <Core/QueryProcessingStage.h>
#include <Core/Block.h>
#include <Core/Protocol.h>
#include <DataStreams/IBlockStream_fwd.h>
#include <DataStreams/BlockStreamProfileInfo.h>
#include <Processors/Pipe.h>
#include <IO/ConnectionTimeouts.h>
#include <IO/Progress.h>
#include <boost/noncopyable.hpp>
namespace DB
{
class ClientInfo;
/// Packet that could be received from server.
struct Packet
{
UInt64 type;
Block block;
std::unique_ptr<Exception> exception;
std::vector<String> multistring_message;
Progress progress;
BlockStreamProfileInfo profile_info;
std::vector<UUID> part_uuids;
Packet() : type(Protocol::Server::Hello) {}
};
/// Struct which represents data we are going to send for external table.
struct ExternalTableData
{
/// Pipe of data form table;
std::unique_ptr<Pipe> pipe;
std::string table_name;
std::function<std::unique_ptr<Pipe>()> creating_pipe_callback;
/// Flag if need to stop reading.
std::atomic_bool is_cancelled = false;
};
using ExternalTableDataPtr = std::unique_ptr<ExternalTableData>;
using ExternalTablesData = std::vector<ExternalTableDataPtr>;
class IServerConnection : boost::noncopyable
{
public:
virtual ~IServerConnection() = default;
virtual void setDefaultDatabase(const String & database) = 0;
virtual void getServerVersion(
const ConnectionTimeouts & timeouts, String & name,
UInt64 & version_major, UInt64 & version_minor,
UInt64 & version_patch, UInt64 & revision) = 0;
virtual UInt64 getServerRevision(const ConnectionTimeouts & timeouts) = 0;
virtual const String & getServerTimezone(const ConnectionTimeouts & timeouts) = 0;
virtual const String & getServerDisplayName(const ConnectionTimeouts & timeouts) = 0;
virtual const String & getDescription() const = 0;
/// If last flag is true, you need to call sendExternalTablesData after.
virtual void sendQuery(
const ConnectionTimeouts & timeouts,
const String & query,
const String & query_id_,
UInt64 stage,
const Settings * settings,
const ClientInfo * client_info,
bool with_pending_data) = 0;
virtual void sendCancel() = 0;
/// Send block of data; if name is specified, server will write it to external (temporary) table of that name.
virtual void sendData(const Block & block, const String & name, bool scalar) = 0;
/// Send all contents of external (temporary) tables.
virtual void sendExternalTablesData(ExternalTablesData & data) = 0;
/// Check, if has data to read.
virtual bool poll(size_t timeout_microseconds) = 0;
/// Check, if has data in read buffer.
virtual bool hasReadPendingData() const = 0;
/// Checks if there is input data in connection and reads packet ID.
virtual std::optional<UInt64> checkPacket(size_t timeout_microseconds) = 0;
/// Receive packet from server.
virtual Packet receivePacket() = 0;
/// If not connected yet, or if connection is broken - then connect. If cannot connect - throw an exception.
virtual void forceConnected(const ConnectionTimeouts & timeouts) = 0;
virtual bool isConnected() const = 0;
/// Check if connection is still active with ping request.
virtual bool checkConnected() = 0;
/** Disconnect.
* This may be used, if connection is left in unsynchronised state
* (when someone continues to wait for something) after an exception.
*/
virtual void disconnect() = 0;
/// Set throttler of network traffic. One throttler could be used for multiple connections to limit total traffic.
virtual void setThrottler(const ThrottlerPtr & throttler_) = 0;
};
using ServerConnectionPtr = std::unique_ptr<IServerConnection>;
}

View File

@ -0,0 +1,402 @@
#include "LocalConnection.h"
#include <Interpreters/executeQuery.h>
#include <Processors/Executors/CompletedPipelineExecutor.h>
#include <Processors/Executors/PullingAsyncPipelineExecutor.h>
#include <Processors/Executors/PushingPipelineExecutor.h>
#include <Processors/Executors/PushingAsyncPipelineExecutor.h>
#include <Storages/IStorage.h>
namespace DB
{
namespace ErrorCodes
{
extern const int UNKNOWN_PACKET_FROM_SERVER;
extern const int UNKNOWN_EXCEPTION;
extern const int NOT_IMPLEMENTED;
}
LocalConnection::LocalConnection(ContextPtr context_, bool send_progress_)
: WithContext(context_)
, session(getContext(), ClientInfo::Interface::LOCAL)
, send_progress(send_progress_)
{
/// Authenticate and create a context to execute queries.
session.authenticate("default", "", Poco::Net::SocketAddress{});
session.makeSessionContext();
if (!CurrentThread::isInitialized())
thread_status.emplace();
}
LocalConnection::~LocalConnection()
{
try
{
state.reset();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
bool LocalConnection::hasReadPendingData() const
{
return !state->is_finished;
}
std::optional<UInt64> LocalConnection::checkPacket(size_t)
{
return next_packet_type;
}
void LocalConnection::updateProgress(const Progress & value)
{
state->progress.incrementPiecewiseAtomically(value);
}
void LocalConnection::sendQuery(
const ConnectionTimeouts &,
const String & query_,
const String & query_id_,
UInt64,
const Settings *,
const ClientInfo *,
bool)
{
query_context = session.makeQueryContext();
query_context->setCurrentQueryId(query_id_);
if (send_progress)
query_context->setProgressCallback([this] (const Progress & value) { return this->updateProgress(value); });
CurrentThread::QueryScope query_scope_holder(query_context);
state.reset();
state.emplace();
state->query_id = query_id_;
state->query = query_;
if (send_progress)
state->after_send_progress.restart();
next_packet_type.reset();
try
{
state->io = executeQuery(state->query, query_context, false, state->stage);
if (state->io.pipeline.pushing())
{
size_t num_threads = state->io.pipeline.getNumThreads();
if (num_threads > 1)
{
state->pushing_async_executor = std::make_unique<PushingAsyncPipelineExecutor>(state->io.pipeline);
state->pushing_async_executor->start();
state->block = state->pushing_async_executor->getHeader();
}
else
{
state->pushing_executor = std::make_unique<PushingPipelineExecutor>(state->io.pipeline);
state->pushing_executor->start();
state->block = state->pushing_executor->getHeader();
}
}
else if (state->io.pipeline.pulling())
{
state->block = state->io.pipeline.getHeader();
state->executor = std::make_unique<PullingAsyncPipelineExecutor>(state->io.pipeline);
}
else if (state->io.pipeline.completed())
{
CompletedPipelineExecutor executor(state->io.pipeline);
executor.execute();
}
if (state->block)
next_packet_type = Protocol::Server::Data;
}
catch (const Exception & e)
{
state->io.onException();
state->exception.emplace(e);
}
catch (const std::exception & e)
{
state->io.onException();
state->exception.emplace(Exception::CreateFromSTDTag{}, e);
}
catch (...)
{
state->io.onException();
state->exception.emplace("Unknown exception", ErrorCodes::UNKNOWN_EXCEPTION);
}
}
void LocalConnection::sendData(const Block & block, const String &, bool)
{
if (!block)
return;
if (state->pushing_async_executor)
{
state->pushing_async_executor->push(std::move(block));
}
else if (state->pushing_executor)
{
state->pushing_executor->push(std::move(block));
}
}
void LocalConnection::sendCancel()
{
if (state->executor)
state->executor->cancel();
}
bool LocalConnection::pullBlock(Block & block)
{
if (state->executor)
return state->executor->pull(block, query_context->getSettingsRef().interactive_delay / 1000);
return false;
}
void LocalConnection::finishQuery()
{
next_packet_type = Protocol::Server::EndOfStream;
if (!state)
return;
if (state->executor)
{
state->executor.reset();
}
else if (state->pushing_async_executor)
{
state->pushing_async_executor->finish();
}
else if (state->pushing_executor)
{
state->pushing_executor->finish();
}
state->io.onFinish();
state.reset();
}
bool LocalConnection::poll(size_t)
{
if (!state)
return false;
/// Wait for next poll to collect current packet.
if (next_packet_type)
return true;
if (send_progress && (state->after_send_progress.elapsedMicroseconds() >= query_context->getSettingsRef().interactive_delay))
{
state->after_send_progress.restart();
next_packet_type = Protocol::Server::Progress;
return true;
}
if (!state->is_finished)
{
try
{
pollImpl();
}
catch (const Exception & e)
{
state->io.onException();
state->exception.emplace(e);
}
catch (const std::exception & e)
{
state->io.onException();
state->exception.emplace(Exception::CreateFromSTDTag{}, e);
}
catch (...)
{
state->io.onException();
state->exception.emplace("Unknown exception", ErrorCodes::UNKNOWN_EXCEPTION);
}
}
if (state->exception)
{
next_packet_type = Protocol::Server::Exception;
return true;
}
if (state->is_finished && !state->sent_totals)
{
state->sent_totals = true;
Block totals;
if (state->executor)
totals = state->executor->getTotalsBlock();
if (totals)
{
next_packet_type = Protocol::Server::Totals;
state->block.emplace(totals);
return true;
}
}
if (state->is_finished && !state->sent_extremes)
{
state->sent_extremes = true;
Block extremes;
if (state->executor)
extremes = state->executor->getExtremesBlock();
if (extremes)
{
next_packet_type = Protocol::Server::Extremes;
state->block.emplace(extremes);
return true;
}
}
if (state->is_finished && send_progress && !state->sent_progress)
{
state->sent_progress = true;
next_packet_type = Protocol::Server::Progress;
return true;
}
if (state->is_finished)
{
finishQuery();
return true;
}
if (state->block && state->block.value())
{
next_packet_type = Protocol::Server::Data;
return true;
}
return false;
}
bool LocalConnection::pollImpl()
{
Block block;
auto next_read = pullBlock(block);
if (block)
{
state->block.emplace(block);
}
else if (!next_read)
{
state->is_finished = true;
}
return true;
}
Packet LocalConnection::receivePacket()
{
Packet packet;
if (!state)
{
packet.type = Protocol::Server::EndOfStream;
return packet;
}
if (!next_packet_type)
poll(0);
if (!next_packet_type)
{
packet.type = Protocol::Server::EndOfStream;
return packet;
}
packet.type = next_packet_type.value();
switch (next_packet_type.value())
{
case Protocol::Server::Totals: [[fallthrough]];
case Protocol::Server::Extremes: [[fallthrough]];
case Protocol::Server::Log: [[fallthrough]];
case Protocol::Server::Data:
{
if (state->block && state->block.value())
{
packet.block = std::move(state->block.value());
state->block.reset();
}
break;
}
case Protocol::Server::Exception:
{
packet.exception = std::make_unique<Exception>(*state->exception);
break;
}
case Protocol::Server::Progress:
{
packet.progress = std::move(state->progress);
state->progress.reset();
break;
}
case Protocol::Server::EndOfStream:
{
break;
}
default:
throw Exception(ErrorCodes::UNKNOWN_PACKET_FROM_SERVER,
"Unknown packet {} for {}", toString(packet.type), getDescription());
}
next_packet_type.reset();
return packet;
}
void LocalConnection::getServerVersion(
const ConnectionTimeouts & /* timeouts */, String & /* name */,
UInt64 & /* version_major */, UInt64 & /* version_minor */,
UInt64 & /* version_patch */, UInt64 & /* revision */)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Not implemented");
}
void LocalConnection::setDefaultDatabase(const String &)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Not implemented");
}
UInt64 LocalConnection::getServerRevision(const ConnectionTimeouts &)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Not implemented");
}
const String & LocalConnection::getServerTimezone(const ConnectionTimeouts &)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Not implemented");
}
const String & LocalConnection::getServerDisplayName(const ConnectionTimeouts &)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Not implemented");
}
void LocalConnection::sendExternalTablesData(ExternalTablesData &)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Not implemented");
}
ServerConnectionPtr LocalConnection::createConnection(const ConnectionParameters &, ContextPtr current_context, bool send_progress)
{
return std::make_unique<LocalConnection>(current_context, send_progress);
}
}

View File

@ -0,0 +1,138 @@
#pragma once
#include "Connection.h"
#include <Interpreters/Context.h>
#include <DataStreams/BlockIO.h>
#include <IO/TimeoutSetter.h>
#include <Interpreters/Session.h>
namespace DB
{
class PullingAsyncPipelineExecutor;
class PushingAsyncPipelineExecutor;
class PushingPipelineExecutor;
/// State of query processing.
struct LocalQueryState
{
/// Identifier of the query.
String query_id;
QueryProcessingStage::Enum stage = QueryProcessingStage::Complete;
/// Query text.
String query;
/// Streams of blocks, that are processing the query.
BlockIO io;
/// Current stream to pull blocks from.
std::unique_ptr<PullingAsyncPipelineExecutor> executor;
std::unique_ptr<PushingPipelineExecutor> pushing_executor;
std::unique_ptr<PushingAsyncPipelineExecutor> pushing_async_executor;
std::optional<Exception> exception;
/// Current block to be sent next.
std::optional<Block> block;
/// Is request cancelled
bool is_cancelled = false;
bool is_finished = false;
bool sent_totals = false;
bool sent_extremes = false;
bool sent_progress = false;
/// To output progress, the difference after the previous sending of progress.
Progress progress;
/// Time after the last check to stop the request and send the progress.
Stopwatch after_send_progress;
};
class LocalConnection : public IServerConnection, WithContext
{
public:
explicit LocalConnection(ContextPtr context_, bool send_progress_ = false);
~LocalConnection() override;
static ServerConnectionPtr createConnection(const ConnectionParameters & connection_parameters, ContextPtr current_context, bool send_progress = false);
void setDefaultDatabase(const String & database) override;
void getServerVersion(const ConnectionTimeouts & timeouts,
String & name,
UInt64 & version_major,
UInt64 & version_minor,
UInt64 & version_patch,
UInt64 & revision) override;
UInt64 getServerRevision(const ConnectionTimeouts & timeouts) override;
const String & getServerTimezone(const ConnectionTimeouts & timeouts) override;
const String & getServerDisplayName(const ConnectionTimeouts & timeouts) override;
const String & getDescription() const override { return description; }
void sendQuery(
const ConnectionTimeouts & timeouts,
const String & query,
const String & query_id_/* = "" */,
UInt64 stage/* = QueryProcessingStage::Complete */,
const Settings * settings/* = nullptr */,
const ClientInfo * client_info/* = nullptr */,
bool with_pending_data/* = false */) override;
void sendCancel() override;
void sendData(const Block & block, const String & name/* = "" */, bool scalar/* = false */) override;
void sendExternalTablesData(ExternalTablesData &) override;
bool poll(size_t timeout_microseconds/* = 0 */) override;
bool hasReadPendingData() const override;
std::optional<UInt64> checkPacket(size_t timeout_microseconds/* = 0*/) override;
Packet receivePacket() override;
void forceConnected(const ConnectionTimeouts &) override {}
bool isConnected() const override { return true; }
bool checkConnected() override { return true; }
void disconnect() override {}
void setThrottler(const ThrottlerPtr &) override {}
private:
void initBlockInput();
void processOrdinaryQuery();
void processOrdinaryQueryWithProcessors();
void updateState();
bool pullBlock(Block & block);
void finishQuery();
void updateProgress(const Progress & value);
bool pollImpl();
ContextMutablePtr query_context;
Session session;
bool send_progress;
String description = "clickhouse-local";
std::optional<LocalQueryState> state;
std::optional<ThreadStatus> thread_status;
/// Last "server" packet.
std::optional<UInt64> next_packet_type;
};
}

View File

@ -1,10 +1,20 @@
#include "Suggest.h"
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <AggregateFunctions/AggregateFunctionCombinatorFactory.h>
#include <Core/Settings.h>
#include <Columns/ColumnString.h>
#include <Common/typeid_cast.h>
#include <IO/WriteBufferFromString.h>
#include <Common/Macros.h>
#include <IO/Operators.h>
#include <Functions/FunctionFactory.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <Formats/FormatFactory.h>
#include <Storages/StorageFactory.h>
#include <DataTypes/DataTypeFactory.h>
#include <Interpreters/Context.h>
#include <Client/Connection.h>
#include <Client/LocalConnection.h>
namespace DB
@ -16,31 +26,90 @@ namespace ErrorCodes
extern const int DEADLOCK_AVOIDED;
}
void Suggest::load(const ConnectionParameters & connection_parameters, size_t suggestion_limit)
Suggest::Suggest()
{
loading_thread = std::thread([connection_parameters, suggestion_limit, this]
/// Keywords may be not up to date with ClickHouse parser.
words = {"CREATE", "DATABASE", "IF", "NOT", "EXISTS", "TEMPORARY", "TABLE", "ON", "CLUSTER", "DEFAULT",
"MATERIALIZED", "ALIAS", "ENGINE", "AS", "VIEW", "POPULATE", "SETTINGS", "ATTACH", "DETACH", "DROP",
"RENAME", "TO", "ALTER", "ADD", "MODIFY", "CLEAR", "COLUMN", "AFTER", "COPY", "PROJECT",
"PRIMARY", "KEY", "CHECK", "PARTITION", "PART", "FREEZE", "FETCH", "FROM", "SHOW", "INTO",
"OUTFILE", "FORMAT", "TABLES", "DATABASES", "LIKE", "PROCESSLIST", "CASE", "WHEN", "THEN", "ELSE",
"END", "DESCRIBE", "DESC", "USE", "SET", "OPTIMIZE", "FINAL", "DEDUPLICATE", "INSERT", "VALUES",
"SELECT", "DISTINCT", "SAMPLE", "ARRAY", "JOIN", "GLOBAL", "LOCAL", "ANY", "ALL", "INNER",
"LEFT", "RIGHT", "FULL", "OUTER", "CROSS", "USING", "PREWHERE", "WHERE", "GROUP", "BY",
"WITH", "TOTALS", "HAVING", "ORDER", "COLLATE", "LIMIT", "UNION", "AND", "OR", "ASC",
"IN", "KILL", "QUERY", "SYNC", "ASYNC", "TEST", "BETWEEN", "TRUNCATE", "USER", "ROLE",
"PROFILE", "QUOTA", "POLICY", "ROW", "GRANT", "REVOKE", "OPTION", "ADMIN", "EXCEPT", "REPLACE",
"IDENTIFIED", "HOST", "NAME", "READONLY", "WRITABLE", "PERMISSIVE", "FOR", "RESTRICTIVE", "RANDOMIZED",
"INTERVAL", "LIMITS", "ONLY", "TRACKING", "IP", "REGEXP", "ILIKE"};
}
static String getLoadSuggestionQuery(Int32 suggestion_limit, bool basic_suggestion)
{
/// NOTE: Once you will update the completion list,
/// do not forget to update 01676_clickhouse_client_autocomplete.sh
WriteBufferFromOwnString query;
query << "SELECT DISTINCT arrayJoin(extractAll(name, '[\\\\w_]{2,}')) AS res FROM ("
"SELECT name FROM system.functions"
" UNION ALL "
"SELECT name FROM system.table_engines"
" UNION ALL "
"SELECT name FROM system.formats"
" UNION ALL "
"SELECT name FROM system.table_functions"
" UNION ALL "
"SELECT name FROM system.data_type_families"
" UNION ALL "
"SELECT name FROM system.merge_tree_settings"
" UNION ALL "
"SELECT name FROM system.settings"
" UNION ALL ";
if (!basic_suggestion)
{
query << "SELECT cluster FROM system.clusters"
" UNION ALL "
"SELECT macro FROM system.macros"
" UNION ALL "
"SELECT policy_name FROM system.storage_policies"
" UNION ALL ";
}
query << "SELECT concat(func.name, comb.name) FROM system.functions AS func CROSS JOIN system.aggregate_function_combinators AS comb WHERE is_aggregate";
/// The user may disable loading of databases, tables, columns by setting suggestion_limit to zero.
if (suggestion_limit > 0)
{
String limit_str = toString(suggestion_limit);
query << " UNION ALL "
"SELECT name FROM system.databases LIMIT " << limit_str
<< " UNION ALL "
"SELECT DISTINCT name FROM system.tables LIMIT " << limit_str
<< " UNION ALL ";
if (!basic_suggestion)
{
query << "SELECT DISTINCT name FROM system.dictionaries LIMIT " << limit_str
<< " UNION ALL ";
}
query << "SELECT DISTINCT name FROM system.columns LIMIT " << limit_str;
}
query << ") WHERE notEmpty(res)";
return query.str();
}
template <typename ConnectionType>
void Suggest::load(ContextPtr context, const ConnectionParameters & connection_parameters, Int32 suggestion_limit)
{
loading_thread = std::thread([context=Context::createCopy(context), connection_parameters, suggestion_limit, this]
{
for (size_t retry = 0; retry < 10; ++retry)
{
try
{
Connection connection(
connection_parameters.host,
connection_parameters.port,
connection_parameters.default_database,
connection_parameters.user,
connection_parameters.password,
"" /* cluster */,
"" /* cluster_secret */,
"client",
connection_parameters.compression,
connection_parameters.security);
loadImpl(connection, connection_parameters.timeouts, suggestion_limit);
auto connection = ConnectionType::createConnection(connection_parameters, context);
fetch(*connection, connection_parameters.timeouts, getLoadSuggestionQuery(suggestion_limit, std::is_same_v<ConnectionType, LocalConnection>));
}
catch (const Exception & e)
{
/// Retry when the server said "Client should retry".
if (e.code() == ErrorCodes::DEADLOCK_AVOIDED)
continue;
@ -70,76 +139,9 @@ void Suggest::load(const ConnectionParameters & connection_parameters, size_t su
});
}
Suggest::Suggest()
void Suggest::fetch(IServerConnection & connection, const ConnectionTimeouts & timeouts, const std::string & query)
{
/// Keywords may be not up to date with ClickHouse parser.
words = {"CREATE", "DATABASE", "IF", "NOT", "EXISTS", "TEMPORARY", "TABLE", "ON", "CLUSTER", "DEFAULT",
"MATERIALIZED", "ALIAS", "ENGINE", "AS", "VIEW", "POPULATE", "SETTINGS", "ATTACH", "DETACH", "DROP",
"RENAME", "TO", "ALTER", "ADD", "MODIFY", "CLEAR", "COLUMN", "AFTER", "COPY", "PROJECT",
"PRIMARY", "KEY", "CHECK", "PARTITION", "PART", "FREEZE", "FETCH", "FROM", "SHOW", "INTO",
"OUTFILE", "FORMAT", "TABLES", "DATABASES", "LIKE", "PROCESSLIST", "CASE", "WHEN", "THEN", "ELSE",
"END", "DESCRIBE", "DESC", "USE", "SET", "OPTIMIZE", "FINAL", "DEDUPLICATE", "INSERT", "VALUES",
"SELECT", "DISTINCT", "SAMPLE", "ARRAY", "JOIN", "GLOBAL", "LOCAL", "ANY", "ALL", "INNER",
"LEFT", "RIGHT", "FULL", "OUTER", "CROSS", "USING", "PREWHERE", "WHERE", "GROUP", "BY",
"WITH", "TOTALS", "HAVING", "ORDER", "COLLATE", "LIMIT", "UNION", "AND", "OR", "ASC",
"IN", "KILL", "QUERY", "SYNC", "ASYNC", "TEST", "BETWEEN", "TRUNCATE", "USER", "ROLE",
"PROFILE", "QUOTA", "POLICY", "ROW", "GRANT", "REVOKE", "OPTION", "ADMIN", "EXCEPT", "REPLACE",
"IDENTIFIED", "HOST", "NAME", "READONLY", "WRITABLE", "PERMISSIVE", "FOR", "RESTRICTIVE", "RANDOMIZED",
"INTERVAL", "LIMITS", "ONLY", "TRACKING", "IP", "REGEXP", "ILIKE"};
}
void Suggest::loadImpl(Connection & connection, const ConnectionTimeouts & timeouts, size_t suggestion_limit)
{
/// NOTE: Once you will update the completion list,
/// do not forget to update 01676_clickhouse_client_autocomplete.sh
WriteBufferFromOwnString query;
query << "SELECT DISTINCT arrayJoin(extractAll(name, '[\\\\w_]{2,}')) AS res FROM ("
"SELECT name FROM system.functions"
" UNION ALL "
"SELECT name FROM system.table_engines"
" UNION ALL "
"SELECT name FROM system.formats"
" UNION ALL "
"SELECT name FROM system.table_functions"
" UNION ALL "
"SELECT name FROM system.data_type_families"
" UNION ALL "
"SELECT name FROM system.merge_tree_settings"
" UNION ALL "
"SELECT name FROM system.settings"
" UNION ALL "
"SELECT cluster FROM system.clusters"
" UNION ALL "
"SELECT macro FROM system.macros"
" UNION ALL "
"SELECT policy_name FROM system.storage_policies"
" UNION ALL "
"SELECT concat(func.name, comb.name) FROM system.functions AS func CROSS JOIN system.aggregate_function_combinators AS comb WHERE is_aggregate";
/// The user may disable loading of databases, tables, columns by setting suggestion_limit to zero.
if (suggestion_limit > 0)
{
String limit_str = toString(suggestion_limit);
query <<
" UNION ALL "
"SELECT name FROM system.databases LIMIT " << limit_str
<< " UNION ALL "
"SELECT DISTINCT name FROM system.tables LIMIT " << limit_str
<< " UNION ALL "
"SELECT DISTINCT name FROM system.dictionaries LIMIT " << limit_str
<< " UNION ALL "
"SELECT DISTINCT name FROM system.columns LIMIT " << limit_str;
}
query << ") WHERE notEmpty(res)";
fetch(connection, timeouts, query.str());
}
void Suggest::fetch(Connection & connection, const ConnectionTimeouts & timeouts, const std::string & query)
{
connection.sendQuery(timeouts, query, "" /* query_id */, QueryProcessingStage::Complete);
connection.sendQuery(timeouts, query, "" /* query_id */, QueryProcessingStage::Complete, nullptr, nullptr, false);
while (true)
{
@ -190,4 +192,9 @@ void Suggest::fillWordsFromBlock(const Block & block)
words.emplace_back(column.getDataAt(i).toString());
}
template
void Suggest::load<Connection>(ContextPtr context, const ConnectionParameters & connection_parameters, Int32 suggestion_limit);
template
void Suggest::load<LocalConnection>(ContextPtr context, const ConnectionParameters & connection_parameters, Int32 suggestion_limit);
}

View File

@ -3,6 +3,8 @@
#include "ConnectionParameters.h"
#include <Client/Connection.h>
#include <Client/IServerConnection.h>
#include <Client/LocalConnection.h>
#include <IO/ConnectionTimeouts.h>
#include <common/LineReader.h>
#include <thread>
@ -11,29 +13,27 @@
namespace DB
{
namespace ErrorCodes
{
}
class Suggest : public LineReader::Suggest, boost::noncopyable
{
public:
Suggest();
~Suggest()
{
if (loading_thread.joinable())
loading_thread.join();
}
void load(const ConnectionParameters & connection_parameters, size_t suggestion_limit);
/// Load suggestions for clickhouse-client.
template <typename ConnectionType>
void load(ContextPtr context, const ConnectionParameters & connection_parameters, Int32 suggestion_limit);
/// Older server versions cannot execute the query above.
static constexpr int MIN_SERVER_REVISION = 54406;
private:
void fetch(IServerConnection & connection, const ConnectionTimeouts & timeouts, const std::string & query);
void loadImpl(Connection & connection, const ConnectionTimeouts & timeouts, size_t suggestion_limit);
void fetch(Connection & connection, const ConnectionTimeouts & timeouts, const std::string & query);
void fillWordsFromBlock(const Block & block);
/// Words are fetched asynchronously.

View File

@ -37,7 +37,7 @@ RemoteInserter::RemoteInserter(
/** Send query and receive "header", that describes table structure.
* Header is needed to know, what structure is required for blocks to be passed to 'write' method.
*/
connection.sendQuery(timeouts, query, "", QueryProcessingStage::Complete, &settings_, &modified_client_info);
connection.sendQuery(timeouts, query, "", QueryProcessingStage::Complete, &settings_, &modified_client_info, false);
while (true)
{
@ -75,12 +75,12 @@ void RemoteInserter::write(Block block)
{
try
{
connection.sendData(block);
connection.sendData(block, /* name */"", /* scalar */false);
}
catch (const NetException &)
{
/// Try to get more detailed exception from server
auto packet_type = connection.checkPacket();
auto packet_type = connection.checkPacket(/* timeout_microseconds */0);
if (packet_type && *packet_type == Protocol::Server::Exception)
{
Packet packet = connection.receivePacket();
@ -102,7 +102,7 @@ void RemoteInserter::writePrepared(ReadBuffer & buf, size_t size)
void RemoteInserter::onFinish()
{
/// Empty block means end of data.
connection.sendData(Block());
connection.sendData(Block(), /* name */"", /* scalar */false);
/// Wait for EndOfStream or Exception packet, skip Log packets.
while (true)

View File

@ -981,9 +981,8 @@ void executeQuery(
WriteBuffer & ostr,
bool allow_into_outfile,
ContextMutablePtr context,
std::function<void(const String &, const String &, const String &, const String &)> set_result_details,
const std::optional<FormatSettings> & output_format_settings,
std::function<void()> before_finalize_callback)
SetResultDetailsFunc set_result_details,
const std::optional<FormatSettings> & output_format_settings)
{
PODArray<char> parse_buf;
const char * begin;
@ -1079,8 +1078,6 @@ void executeQuery(
out->onProgress(progress);
});
out->setBeforeFinalizeCallback(before_finalize_callback);
if (set_result_details)
set_result_details(
context->getClientInfo().current_query_id, out->getContentType(), format_name, DateLUT::instance().getTimeZone());

View File

@ -11,15 +11,16 @@ namespace DB
class ReadBuffer;
class WriteBuffer;
using SetResultDetailsFunc = std::function<void(const String &, const String &, const String &, const String &)>;
/// Parse and execute a query.
void executeQuery(
ReadBuffer & istr, /// Where to read query from (and data for INSERT, if present).
WriteBuffer & ostr, /// Where to write query output to.
bool allow_into_outfile, /// If true and the query contains INTO OUTFILE section, redirect output to that file.
ContextMutablePtr context, /// DB, tables, data types, storage engines, functions, aggregate functions...
std::function<void(const String &, const String &, const String &, const String &)> set_result_details, /// If a non-empty callback is passed, it will be called with the query id, the content-type, the format, and the timezone.
const std::optional<FormatSettings> & output_format_settings = std::nullopt, /// Format settings for output format, will be calculated from the context if not set.
std::function<void()> before_finalize_callback = {} /// Will be set in output format to be called before finalize.
SetResultDetailsFunc set_result_details, /// If a non-empty callback is passed, it will be called with the query id, the content-type, the format, and the timezone.
const std::optional<FormatSettings> & output_format_settings = std::nullopt /// Format settings for output format, will be calculated from the context if not set.
);

View File

@ -76,9 +76,6 @@ void IOutputFormat::work()
if (rows_before_limit_counter && rows_before_limit_counter->hasAppliedLimit())
setRowsBeforeLimit(rows_before_limit_counter->get());
if (before_finalize_callback)
before_finalize_callback();
finalize();
finalized = true;
return;

View File

@ -67,9 +67,6 @@ public:
/// Passed value are delta, that must be summarized.
virtual void onProgress(const Progress & /*progress*/) {}
/// Set callback, which will be called before call to finalize().
void setBeforeFinalizeCallback(std::function<void()> callback) { before_finalize_callback = callback; }
/// Content-Type to set when sending HTTP response.
virtual std::string getContentType() const { return "text/plain; charset=UTF-8"; }
@ -96,7 +93,5 @@ private:
size_t result_bytes = 0;
bool prefix_written = false;
std::function<void()> before_finalize_callback;
};
}

View File

@ -51,8 +51,8 @@ ${CLICKHOUSE_LOCAL} --max_rows_in_distinct=33 -q "SELECT name, value FROM system
${CLICKHOUSE_LOCAL} -q "SET max_rows_in_distinct=33; SELECT name, value FROM system.settings WHERE name = 'max_rows_in_distinct'"
${CLICKHOUSE_LOCAL} --max_bytes_before_external_group_by=1 --max_block_size=10 -q "SELECT sum(ignore(*)) FROM (SELECT number, count() FROM numbers(1000) GROUP BY number)"
echo
# Check exta options
(${CLICKHOUSE_LOCAL} --ignore-error --echo -q "SELECT nothing_to_do();SELECT 42;" 2>/dev/null && echo "Wrong RC") || true
# Check exta options, we expect zero exit code and no stderr output
(${CLICKHOUSE_LOCAL} --ignore-error --echo -q "SELECT nothing_to_do();SELECT 42;" 2>/dev/null || echo "Wrong RC")
echo
${CLICKHOUSE_LOCAL} -q "CREATE TABLE sophisticated_default
(

View File

@ -89,7 +89,7 @@ idx10 ['This','is','a','test']
23
24
=== Try load data from datapage_v2.snappy.parquet
Code: 33. DB::ParsingEx---tion: Error while reading Parquet data: IOError: Not yet implemented: Unsupported encoding.: While executing ParquetBlockInputFormat: (in query: INSERT INTO parquet_load FORMAT Parquet): data for INSERT was parsed from stdin. (CANNOT_READ_ALL_DATA)
Code: 33. DB::ParsingEx---tion: Error while reading Parquet data: IOError: Not yet implemented: Unsupported encoding.: While executing ParquetBlockInputFormat: data for INSERT was parsed from stdin: (in query: INSERT INTO parquet_load FORMAT Parquet). (CANNOT_READ_ALL_DATA)
=== Try load data from datatype-date32.parquet
1925-01-01

View File

@ -7,4 +7,5 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# The following command will execute:
# CREATE TABLE table (key UInt32) ENGINE = File(TSV, stdin);
# INSERT INTO `table` SELECT key FROM input('key UInt32') FORMAT TSV
${CLICKHOUSE_LOCAL} -S 'key UInt32' -q "INSERT INTO \`table\` SELECT key FROM input('key UInt32') FORMAT TSV" < /dev/null
${CLICKHOUSE_LOCAL} -S 'key UInt32' -q "INSERT INTO \`table\` SELECT key FROM input('key UInt32') FORMAT TSV" < /dev/null 2>&1 \
| grep -q "No data to insert" || echo "Fail"

View File

@ -21,5 +21,5 @@ ccccccccc aaaaaaaaa bbbbbbbbb
ccccccccc aaaaaaaaa bbbbbbbbb
695071 0
:0
:70
:233
:79

View File

@ -7,7 +7,7 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# NOTE: database = $CLICKHOUSE_DATABASE is superfluous
function test_completion_word()
function test_completion_word_client()
{
local w=$1 && shift
@ -55,7 +55,7 @@ EOF
# last 3 bytes will be completed,
# so take this in mind when you will update the list.
compwords_positive=(
client_compwords_positive=(
# system.functions
concatAssumeInjective
# system.table_engines
@ -89,8 +89,73 @@ compwords_positive=(
# system.dictionaries
# FIXME: none
)
for w in "${compwords_positive[@]}"; do
test_completion_word "$w" || echo "[FAIL] $w (positive)"
function test_completion_word_local()
{
local w=$1 && shift
local w_len=${#w}
local compword_begin=${w:0:$((w_len-3))}
local compword_end=${w:$((w_len-3))}
# NOTE: here and below you should escape variables of the expect.
timeout 60s expect << EOF
log_user 0
set timeout 3
match_max 100000
# A default timeout action is to do nothing, change it to fail
expect_after {
timeout {
exit 1
}
}
spawn bash -c "$CLICKHOUSE_LOCAL"
expect ":) "
# Make a query
send -- "SET $compword_begin"
expect "SET $compword_begin"
# Wait for suggestions to load, they are loaded in background
set is_done 0
while {\$is_done == 0} {
send -- "\\t"
expect {
"$compword_begin$compword_end" {
set is_done 1
}
default {
sleep 1
}
}
}
send -- "\\3\\4"
expect eof
EOF
}
local_compwords_positive=(
# system.functions
concatAssumeInjective
# system.table_engines
ReplacingMergeTree
# system.formats
JSONEachRow
# system.table_functions
clusterAllReplicas
# system.data_type_families
SimpleAggregateFunction
)
for w in "${client_compwords_positive[@]}"; do
test_completion_word_client "$w" || echo "[FAIL] $w (positive)"
done
for w in "${local_compwords_positive[@]}"; do
test_completion_word_local "$w" || echo "[FAIL] $w (positive)"
done
exit 0

View File

@ -212,6 +212,9 @@ std_cerr_cout_excludes=(
src/Interpreters/Context.cpp
# IProcessor::dump()
src/Processors/IProcessor.cpp
src/Client/ClientBase.cpp
src/Client/QueryFuzzer.cpp
src/Client/Suggest.cpp
)
sources_with_std_cerr_cout=( $(
find $ROOT_PATH/src -name '*.h' -or -name '*.cpp' | \