Some review fixes

This commit is contained in:
kssenii 2021-09-04 21:19:01 +03:00
parent 1c20b223cb
commit 8f77855981
9 changed files with 186 additions and 188 deletions

View File

@ -42,6 +42,7 @@
#include <IO/WriteHelpers.h>
#include <IO/Operators.h>
#include <IO/WriteBufferFromOStream.h>
#include <IO/UseSSL.h>
#include <DataStreams/NullBlockOutputStream.h>
@ -116,7 +117,7 @@ void Client::processError(const String & query) const
}
void Client::processSingleQuery(const String & query_to_execute, ASTPtr parsed_query)
void Client::executeSignleQuery(const String & query_to_execute, ASTPtr parsed_query)
{
client_exception.reset();
server_exception.reset();
@ -190,7 +191,7 @@ void Client::processSingleQuery(const String & query_to_execute, ASTPtr parsed_q
}
bool Client::processMultiQuery(const String & all_queries_text)
bool Client::executeMultiQuery(const String & all_queries_text)
{
// It makes sense not to base any control flow on this, so that it is
// the same in tests and in normal usage. The only difference is that in
@ -471,91 +472,100 @@ void Client::loadSuggestionData(Suggest & suggest)
}
int Client::mainImpl()
int Client::main(const std::vector<std::string> & /*args*/)
try
{
try
UseSSL use_ssl;
MainThreadStatus::getInstance();
std::cout << std::fixed << std::setprecision(3);
std::cerr << std::fixed << std::setprecision(3);
/// Limit on total memory usage
size_t max_client_memory_usage = config().getInt64("max_memory_usage_in_client", 0 /*default value*/);
if (max_client_memory_usage != 0)
{
MainThreadStatus::getInstance();
total_memory_tracker.setHardLimit(max_client_memory_usage);
total_memory_tracker.setDescription("(total)");
total_memory_tracker.setMetric(CurrentMetrics::MemoryTracking);
}
/// Limit on total memory usage
size_t max_client_memory_usage = config().getInt64("max_memory_usage_in_client", 0 /*default value*/);
registerFormats();
registerFunctions();
registerAggregateFunctions();
if (max_client_memory_usage != 0)
processConfig();
if (is_interactive)
{
clearTerminal();
showClientVersion();
}
connect();
if (is_interactive)
{
/// Load Warnings at the beginning of connection
if (!config().has("no-warnings"))
{
total_memory_tracker.setHardLimit(max_client_memory_usage);
total_memory_tracker.setDescription("(total)");
total_memory_tracker.setMetric(CurrentMetrics::MemoryTracking);
}
registerFormats();
registerFunctions();
registerAggregateFunctions();
processConfig();
connect();
if (is_interactive)
{
/// Load Warnings at the beginning of connection
if (!config().has("no-warnings"))
try
{
try
std::vector<String> messages = loadWarningMessages();
if (!messages.empty())
{
std::vector<String> messages = loadWarningMessages();
if (!messages.empty())
{
std::cout << "Warnings:" << std::endl;
for (const auto & message : messages)
std::cout << " * " << message << std::endl;
std::cout << std::endl;
}
}
catch (...)
{
/// Ignore exception
std::cout << "Warnings:" << std::endl;
for (const auto & message : messages)
std::cout << " * " << message << std::endl;
std::cout << std::endl;
}
}
runInteractive();
catch (...)
{
/// Ignore exception
}
}
else
runInteractive();
}
else
{
connection->setDefaultDatabase(connection_parameters.default_database);
runNonInteractive();
// If exception code isn't zero, we should return non-zero return
// code anyway.
const auto * exception = server_exception ? server_exception.get() : client_exception.get();
if (exception)
{
connection->setDefaultDatabase(connection_parameters.default_database);
runNonInteractive();
// If exception code isn't zero, we should return non-zero return
// code anyway.
const auto * exception = server_exception ? server_exception.get() : client_exception.get();
if (exception)
{
return exception->code() != 0 ? exception->code() : -1;
}
if (have_error)
{
// Shouldn't be set without an exception, but check it just in
// case so that at least we don't lose an error.
return -1;
}
return exception->code() != 0 ? exception->code() : -1;
}
if (have_error)
{
// Shouldn't be set without an exception, but check it just in
// case so that at least we don't lose an error.
return -1;
}
}
catch (const Exception & e)
{
bool print_stack_trace = config().getBool("stacktrace", false) && e.code() != ErrorCodes::NETWORK_ERROR;
std::cerr << getExceptionMessage(e, print_stack_trace, true) << std::endl << std::endl;
/// If exception code isn't zero, we should return non-zero return code anyway.
return e.code() ? e.code() : -1;
}
catch (...)
{
std::cerr << getCurrentExceptionMessage(false) << std::endl;
return getCurrentExceptionCode();
}
return 0;
}
catch (const Exception & e)
{
bool print_stack_trace = config().getBool("stacktrace", false) && e.code() != ErrorCodes::NETWORK_ERROR;
std::cerr << getExceptionMessage(e, print_stack_trace, true) << std::endl << std::endl;
/// If exception code isn't zero, we should return non-zero return code anyway.
return e.code() ? e.code() : -1;
}
catch (...)
{
std::cerr << getCurrentExceptionMessage(false) << std::endl;
return getCurrentExceptionCode();
}
void Client::connect()

View File

@ -11,18 +11,17 @@ class Client : public ClientBase
public:
Client() = default;
void initialize(Poco::Util::Application & self) override;
int main(const std::vector<String> & /*args*/) override;
protected:
void processSingleQuery(const String & query_to_execute, ASTPtr parsed_query) override;
bool processMultiQuery(const String & all_queries_text) override;
void executeSignleQuery(const String & query_to_execute, ASTPtr parsed_query) override;
bool executeMultiQuery(const String & all_queries_text) override;
bool processWithFuzzing(const String & full_query) override;
void connect() override;
void processError(const String & query) const override;
void loadSuggestionData(Suggest & suggest) override;
void connect() override;
int mainImpl() override;
void readArguments(int argc, char ** argv,
Arguments & common_arguments,
std::vector<Arguments> & external_tables_arguments) override;

View File

@ -30,6 +30,7 @@
#include <IO/ReadBufferFromString.h>
#include <IO/WriteBufferFromFileDescriptor.h>
#include <IO/ReadHelpers.h>
#include <IO/UseSSL.h>
#include <Parsers/parseQuery.h>
#include <Parsers/IAST.h>
#include <common/ErrorHandlers.h>
@ -83,7 +84,7 @@ void LocalServer::processError(const String & query) const
}
void LocalServer::processSingleQuery(const String & query_to_execute, ASTPtr parsed_query)
void LocalServer::executeSignleQuery(const String & query_to_execute, ASTPtr parsed_query)
{
/// To support previous behaviour of clickhouse-local do not reset first exception in case --ignore-error,
/// it needs to be thrown after multiquery is finished (test 00385). But I do not think it is ok to output only
@ -123,7 +124,7 @@ void LocalServer::processSingleQuery(const String & query_to_execute, ASTPtr par
}
bool LocalServer::processMultiQuery(const String & all_queries_text)
bool LocalServer::executeMultiQuery(const String & all_queries_text)
{
bool echo_query = echo_queries;
@ -329,6 +330,13 @@ static void attachSystemTables(ContextPtr context)
void LocalServer::cleanup()
{
connection.reset();
global_context->shutdown();
global_context.reset();
status.reset();
// Delete the temporary directory if needed.
if (temporary_directory_to_delete)
{
@ -438,11 +446,15 @@ void LocalServer::connect()
}
int LocalServer::mainImpl()
int LocalServer::main(const std::vector<std::string> & /*args*/)
try
{
UseSSL use_ssl;
ThreadStatus thread_status;
std::cout << std::fixed << std::setprecision(3);
std::cerr << std::fixed << std::setprecision(3);
/// We will terminate process on error
static KillingErrorHandler error_handler;
Poco::ErrorHandler::set(&error_handler);
@ -463,7 +475,10 @@ try
if (is_interactive)
{
std::cout << std::endl;
clearTerminal();
showClientVersion();
std::cerr << std::endl;
runInteractive();
}
else
@ -476,14 +491,7 @@ try
client_exception->rethrow();
}
connection.reset();
global_context->shutdown();
global_context.reset();
status.reset();
cleanup();
return Application::EXIT_OK;
}
catch (const Exception & e)

View File

@ -25,18 +25,16 @@ class LocalServer : public ClientBase, public Loggers
{
public:
LocalServer() = default;
void initialize(Poco::Util::Application & self) override;
int main(const std::vector<String> & /*args*/) override;
protected:
void processSingleQuery(const String & query_to_execute, ASTPtr parsed_query) override;
bool processMultiQuery(const String & all_queries_text) override;
void processError(const String & query) const override;
void loadSuggestionData(Suggest &) override;
void executeSignleQuery(const String & query_to_execute, ASTPtr parsed_query) override;
bool executeMultiQuery(const String & all_queries_text) override;
void connect() override;
int mainImpl() override;
void processError(const String & query) const override;
void loadSuggestionData(Suggest &) override;
String getQueryTextPrefix() override;
void printHelpMessage(const OptionsDescription & options_description) override;

View File

@ -46,7 +46,6 @@
#include <Processors/Transforms/AddingDefaultsTransform.h>
#include <Interpreters/ReplaceQueryParameterVisitor.h>
#include <IO/WriteBufferFromOStream.h>
#include <IO/UseSSL.h>
#include <IO/CompressionMethod.h>
#include <DataStreams/NullBlockOutputStream.h>
@ -261,7 +260,10 @@ void ClientBase::initBlockOutputStream(const Block & block, ASTPtr parsed_query)
if (!pager.empty())
{
signal(SIGPIPE, SIG_IGN);
pager_cmd = ShellCommand::execute(pager, true);
ShellCommand::Config config(pager);
config.pipe_stdin_only = true;
pager_cmd = ShellCommand::execute(config);
out_buf = &pager_cmd->in;
}
else
@ -390,10 +392,8 @@ void ClientBase::processOrdinaryQuery(const String & query_to_execute, ASTPtr pa
}
int retries_left = 10;
for (;;)
while (retries_left)
{
assert(retries_left > 0);
try
{
connection->sendQuery(
@ -425,6 +425,7 @@ void ClientBase::processOrdinaryQuery(const String & query_to_execute, ASTPtr pa
}
}
}
assert(retries_left > 0);
}
@ -892,7 +893,7 @@ void ClientBase::processParsedSingleQuery(const String & full_query, const Strin
written_first_block = false;
progress_indication.resetProgress();
processSingleQuery(query_to_execute, parsed_query);
executeSignleQuery(query_to_execute, parsed_query);
if (is_interactive)
{
@ -1021,7 +1022,7 @@ bool ClientBase::processQueryText(const String & text)
return true;
}
return processMultiQuery(text);
return executeMultiQuery(text);
}
@ -1167,7 +1168,7 @@ void ClientBase::runNonInteractive()
readStringUntilEOF(queries_from_file, in);
text += queries_from_file;
return processMultiQuery(text);
return executeMultiQuery(text);
};
/// Read all queries into `text`.
@ -1207,7 +1208,7 @@ void ClientBase::runNonInteractive()
}
static void clearTerminal()
void ClientBase::clearTerminal()
{
/// Clear from cursor until end of screen.
/// It is needed if garbage is left in terminal.
@ -1218,29 +1219,12 @@ static void clearTerminal()
}
static void showClientVersion()
void ClientBase::showClientVersion()
{
std::cout << DBMS_NAME << " client version " << VERSION_STRING << VERSION_OFFICIAL << "." << std::endl;
}
int ClientBase::main(const std::vector<std::string> & /*args*/)
{
UseSSL use_ssl;
std::cout << std::fixed << std::setprecision(3);
std::cerr << std::fixed << std::setprecision(3);
if (is_interactive)
{
clearTerminal();
showClientVersion();
}
return mainImpl();
}
void ClientBase::init(int argc, char ** argv)
{
namespace po = boost::program_options;

View File

@ -36,23 +36,20 @@ public:
using Arguments = std::vector<String>;
void init(int argc, char ** argv);
int main(const std::vector<String> & /*args*/) override;
protected:
void runInteractive();
void runNonInteractive();
virtual void connect() = 0;
virtual int mainImpl() = 0;
virtual bool processWithFuzzing(const String &)
{
throw Exception("Query processing with fuzzing is not implemented", ErrorCodes::NOT_IMPLEMENTED);
}
virtual void processSingleQuery(const String & query_to_execute, ASTPtr parsed_query) = 0;
virtual bool processMultiQuery(const String & all_queries_text) = 0;
virtual void executeSignleQuery(const String & query_to_execute, ASTPtr parsed_query) = 0;
virtual bool executeMultiQuery(const String & all_queries_text) = 0;
virtual void connect() = 0;
virtual void processError(const String & query) const = 0;
virtual void loadSuggestionData(Suggest &) = 0;
@ -74,6 +71,9 @@ protected:
/// For non-interactive multi-query mode get queries text prefix.
virtual String getQueryTextPrefix() { return ""; }
void clearTerminal();
void showClientVersion();
using ProgramOptionsDescription = boost::program_options::options_description;
using CommandLineOptions = boost::program_options::variables_map;
@ -134,37 +134,34 @@ protected:
bool ignore_error = false; /// In case of errors, don't print error message, continue to next query. Only applicable for non-interactive mode.
bool print_time_to_stderr = false; /// Output execution time to stderr in batch mode.
String home_path;
std::vector<String> queries_files; /// If not empty, queries will be read from these files
String history_file; /// Path to a file containing command history.
std::vector<String> interleave_queries_files; /// If not empty, run queries from these files before processing every file from 'queries_files'.
bool has_vertical_output_suffix = false; /// Is \G present at the end of the query string?
String prompt_by_server_display_name;
String server_display_name;
ProgressIndication progress_indication;
bool need_render_progress = true;
bool written_first_block = false;
size_t processed_rows = 0; /// How many rows have been read or written.
bool stdin_is_a_tty = false; /// stdin is a terminal.
bool stdout_is_a_tty = false; /// stdout is a terminal.
uint64_t terminal_width = 0;
std::unique_ptr<IServerConnection> connection;
ConnectionParameters connection_parameters;
String format; /// Query results output format.
bool is_default_format = true; /// false, if format is set in the config or command line.
size_t format_max_block_size = 0; /// Max block size for console output.
String insert_format; /// Format of INSERT data that is read from stdin in batch mode.
size_t insert_format_max_block_size = 0; /// Max block size when reading INSERT data.
size_t max_client_network_bandwidth = 0; /// The maximum speed of data exchange over the network for the client in bytes per second.
bool has_vertical_output_suffix = false; /// Is \G present at the end of the query string?
/// We will format query_id in interactive mode in various ways, the default is just to print Query id: ...
std::vector<std::pair<String, String>> query_id_formats;
/// Settings specified via command line args
Settings cmd_settings;
SharedContextHolder shared_context;
ContextMutablePtr global_context;
QueryFuzzer fuzzer;
int query_fuzzer_runs = 0;
/// If the last query resulted in exception. `server_exception` or
/// `client_exception` must be set.
bool have_error = false;
/// Buffer that reads from stdin in batch mode.
ReadBufferFromFileDescriptor std_in{STDIN_FILENO};
/// Console output.
@ -180,17 +177,20 @@ protected:
String server_logs_file;
BlockOutputStreamPtr logs_out_stream;
/// We will format query_id in interactive mode in various ways, the default is just to print Query id: ...
std::vector<std::pair<String, String>> query_id_formats;
String home_path;
String history_file; /// Path to a file containing command history.
/// Dictionary with query parameters for prepared statements.
NameToNameMap query_parameters;
String current_profile;
std::unique_ptr<IServerConnection> connection;
ConnectionParameters connection_parameters;
UInt64 server_revision = 0;
String server_version;
String prompt_by_server_display_name;
String server_display_name;
String format; /// Query results output format.
bool is_default_format = true; /// false, if format is set in the config or command line.
ProgressIndication progress_indication;
bool need_render_progress = true;
bool written_first_block = false;
size_t processed_rows = 0; /// How many rows have been read or written.
/// The last exception that was received from the server. Is used for the
/// return code in batch mode.
@ -198,20 +198,17 @@ protected:
/// Likewise, the last exception that occurred on the client.
std::unique_ptr<Exception> client_exception;
/// If the last query resulted in exception. `server_exception` or
/// `client_exception` must be set.
bool have_error = false;
std::list<ExternalTable> external_tables; /// External tables info.
NameToNameMap query_parameters; /// Dictionary with query parameters for prepared statements.
QueryFuzzer fuzzer;
int query_fuzzer_runs = 0;
QueryProcessingStage::Enum query_processing_stage;
/// External tables info.
std::list<ExternalTable> external_tables;
String current_profile;
size_t format_max_block_size = 0; /// Max block size for console output.
String insert_format; /// Format of INSERT data that is read from stdin in batch mode.
size_t insert_format_max_block_size = 0; /// Max block size when reading INSERT data.
size_t max_client_network_bandwidth = 0; /// The maximum speed of data exchange over the network for the client in bytes per second.
UInt64 server_revision = 0;
String server_version;
};
}

View File

@ -13,7 +13,7 @@
#include <common/scope_guard.h>
#if !defined(ARCADIA_BUILD)
#include <readpassphrase/readpassphrase.h>
#include <readpassphrase/readpassphrase.h> // Y_IGNORE
#endif

View File

@ -9,11 +9,12 @@ namespace ErrorCodes
{
extern const int UNKNOWN_PACKET_FROM_SERVER;
extern const int UNKNOWN_EXCEPTION;
extern const int NOT_IMPLEMENTED;
}
LocalConnection::LocalConnection(ContextPtr context_)
: WithContext(context_)
, session(getContext(), ClientInfo::Interface::TCP)
, session(getContext(), ClientInfo::Interface::LOCAL)
{
/// Authenticate and create a context to execute queries.
session.authenticate("default", "", Poco::Net::SocketAddress{});
@ -76,6 +77,9 @@ void LocalConnection::sendQuery(
if (state->io.out)
{
/** Made above the rest of the lines, so that in case of `writePrefix` function throws an exception,
* client receive exception before sending data.
*/
state->io.out->writePrefix();
state->block = state->io.out->getHeader();
}
@ -288,10 +292,7 @@ bool LocalConnection::pollImpl()
auto next_read = pullBlock(block);
if (block)
{
if (state->io.null_format)
state->block.emplace();
else
state->block.emplace(block);
state->block.emplace(block);
}
else if (!next_read)
{
@ -358,31 +359,34 @@ Packet LocalConnection::receivePacket()
void LocalConnection::getServerVersion(
const ConnectionTimeouts & /* timeouts */, String & /* name */,
UInt64 & /* version_major */, UInt64 & /* version_minor */,
UInt64 & /* version_patch */, UInt64 & /* revision */) { }
void LocalConnection::setDefaultDatabase(const String & name)
UInt64 & /* version_patch */, UInt64 & /* revision */)
{
default_database = name;
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Not implemented");
}
void LocalConnection::setDefaultDatabase(const String &)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Not implemented");
}
UInt64 LocalConnection::getServerRevision(const ConnectionTimeouts &)
{
return server_revision;
}
const String & LocalConnection::getDescription() const
{
return description;
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Not implemented");
}
const String & LocalConnection::getServerTimezone(const ConnectionTimeouts &)
{
return server_timezone;
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Not implemented");
}
const String & LocalConnection::getServerDisplayName(const ConnectionTimeouts &)
{
return server_display_name;
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Not implemented");
}
void LocalConnection::sendExternalTablesData(ExternalTablesData &)
{
/// Do nothing.
}
}

View File

@ -35,7 +35,6 @@ struct LocalQueryState
/// Is request cancelled
bool is_cancelled = false;
/// Is query finished == !has_pending_data
bool is_finished = false;
bool sent_totals = false;
@ -66,11 +65,10 @@ public:
UInt64 & revision) override;
UInt64 getServerRevision(const ConnectionTimeouts & timeouts) override;
const String & getServerTimezone(const ConnectionTimeouts & timeouts) override;
const String & getServerDisplayName(const ConnectionTimeouts & timeouts) override;
const String & getDescription() const override;
const String & getDescription() const override { return description; }
void sendQuery(
const ConnectionTimeouts & timeouts,
@ -85,7 +83,7 @@ public:
void sendData(const Block &, const String &, bool) override;
void sendExternalTablesData(ExternalTablesData &) override {}
void sendExternalTablesData(ExternalTablesData &) override;
bool poll(size_t timeout_microseconds = 0) override;