This commit is contained in:
Nikita Mikhaylov 2024-08-26 16:16:03 +00:00
parent 020fbe10e0
commit 259310a8d1
12 changed files with 76 additions and 60 deletions

View File

@ -29,6 +29,7 @@
#include <filesystem>
#include <base/FnTraits.h>
namespace DB
{
namespace ErrorCodes

View File

@ -68,7 +68,6 @@
#include <boost/algorithm/string/case_conv.hpp>
#include <boost/algorithm/string/replace.hpp>
#include <iostream>
#include <filesystem>
#include <limits>
@ -87,6 +86,10 @@
# include <Common/GWPAsan.h>
#endif
namespace fs = std::filesystem;
using namespace std::literals;
namespace DB
{
@ -112,6 +115,22 @@ namespace ErrorCodes
extern const int USER_EXPIRED;
}
}
namespace ProfileEvents
{
extern const Event UserTimeMicroseconds;
extern const Event SystemTimeMicroseconds;
}
namespace
{
constexpr UInt64 THREAD_GROUP_ID = 0;
}
namespace DB
{
ProgressOption toProgressOption(std::string progress)
{
boost::to_upper(progress);
@ -136,22 +155,6 @@ std::istream& operator>> (std::istream & in, ProgressOption & progress)
return in;
}
}
namespace ProfileEvents
{
extern const Event UserTimeMicroseconds;
extern const Event SystemTimeMicroseconds;
}
namespace
{
constexpr UInt64 THREAD_GROUP_ID = 0;
}
namespace DB
{
static void incrementProfileEventsBlock(Block & dst, const Block & src)
{
if (!dst)
@ -252,7 +255,6 @@ static void incrementProfileEventsBlock(Block & dst, const Block & src)
dst.setColumns(std::move(mutable_columns));
}
/// To cancel the query on local format error.
class LocalFormatError : public DB::Exception
{
@ -260,6 +262,7 @@ public:
using Exception::Exception;
};
ClientBase::~ClientBase() = default;
ClientBase::ClientBase(
@ -410,7 +413,6 @@ void ClientBase::onData(Block & block, ASTPtr parsed_query)
return;
processed_rows += block.rows();
/// Even if all blocks are empty, we still need to initialize the output stream to write empty resultset.
initOutputFormat(block, parsed_query);
@ -693,7 +695,6 @@ void ClientBase::adjustSettings()
global_context->setSettings(settings);
}
void ClientBase::initClientContext()
{
client_context->setClientName(std::string(DEFAULT_CLIENT_NAME));
@ -703,14 +704,12 @@ void ClientBase::initClientContext()
client_context->setQueryParameters(query_parameters);
}
bool ClientBase::isRegularFile(int fd)
{
struct stat file_stat;
return fstat(fd, &file_stat) == 0 && S_ISREG(file_stat.st_mode);
}
void ClientBase::setDefaultFormatsAndCompressionFromConfiguration()
{
if (getClientConfiguration().has("output-format"))
@ -790,7 +789,6 @@ void ClientBase::setDefaultFormatsAndCompressionFromConfiguration()
}
}
void ClientBase::initTTYBuffer(ProgressOption progress)
{
if (tty_buf)
@ -951,7 +949,6 @@ void ClientBase::processTextAsSingleQuery(const String & full_query)
processError(full_query);
}
void ClientBase::processOrdinaryQuery(const String & query_to_execute, ASTPtr parsed_query)
{
auto query = query_to_execute;
@ -1471,6 +1468,7 @@ void ClientBase::setInsertionTable(const ASTInsertQuery & insert_query)
}
}
namespace
{
bool isStdinNotEmptyAndValid(ReadBufferFromFileDescriptor & std_in)

View File

@ -30,6 +30,7 @@
namespace po = boost::program_options;
namespace DB
{
@ -95,9 +96,8 @@ public:
virtual ~ClientBase();
bool tryStopQuery() { return query_interrupt_handler.tryStop(); }
void stopQuery() { return query_interrupt_handler.stop(); }
void stopQuery() { query_interrupt_handler.stop(); }
// std::vector<String> getAllRegisteredNames() const override { return cmd_options; }
ASTPtr parseQuery(const char *& pos, const char * end, const Settings & settings, bool allow_multi_statements);
protected:
@ -151,14 +151,13 @@ protected:
};
virtual void updateLoggerLevel(const String &) {}
virtual void printHelpMessage([[maybe_unused]] const OptionsDescription & options_description, [[maybe_unused]] bool verbose) {}
virtual void addOptions([[maybe_unused]] OptionsDescription & options_description) {}
virtual void processOptions([[maybe_unused]] const OptionsDescription & options_description,
[[maybe_unused]] const CommandLineOptions & options,
[[maybe_unused]] const std::vector<Arguments> & external_tables_arguments,
[[maybe_unused]] const std::vector<Arguments> & hosts_and_ports_arguments) {}
virtual void processConfig() {}
virtual void printHelpMessage(const OptionsDescription & options_description, bool verbose) = 0;
virtual void addOptions(OptionsDescription & options_description) = 0;
virtual void processOptions(const OptionsDescription & options_description,
const CommandLineOptions & options,
const std::vector<Arguments> & external_tables_arguments,
const std::vector<Arguments> & hosts_and_ports_arguments) = 0;
virtual void processConfig() = 0;
/// Returns true if query processing was successful.
bool processQueryText(const String & text);
@ -245,8 +244,6 @@ protected:
void initTTYBuffer(ProgressOption progress);
void parseAndCheckOptions(OptionsDescription & options_description, po::variables_map & options, Arguments & arguments);
/// Should be one of the first, to be destroyed the last,
/// since other members can use them.
SharedContextHolder shared_context; // maybe not initialized
@ -281,7 +278,6 @@ protected:
bool stderr_is_a_tty = false; /// stderr is a terminal.
uint64_t terminal_width = 0;
String format; /// Query results output format.
String pager;
String default_output_format; /// Query results output format.
@ -392,11 +388,6 @@ protected:
std::atomic_bool cancelled = false;
std::atomic_bool cancelled_printed = false;
/// Does log_comment has specified by user?
bool has_log_comment = false;
bool logging_initialized = false;
/// Unpacked descriptors and streams for the ease of use.
int in_fd = STDIN_FILENO;
int out_fd = STDOUT_FILENO;
@ -404,6 +395,7 @@ protected:
std::istream & input_stream;
std::ostream & output_stream;
std::ostream & error_stream;
};
}

View File

@ -33,12 +33,6 @@ struct ConnectionParameters
using Database = StrongTypedef<String, struct DatabaseTag>;
using Host = StrongTypedef<String, struct HostTag>;
// // We don't take database from config, as it can be changed after query execution
// ConnectionParameters(const Poco::Util::AbstractConfiguration & config, const std::string & database, std::string host);
// ConnectionParameters(
// const Poco::Util::AbstractConfiguration & config, const std::string & database, std::string host, std::optional<UInt16> port
// );
ConnectionParameters() = default;
ConnectionParameters(const Poco::Util::AbstractConfiguration & config_, const Host & host_, const Database & database_);
ConnectionParameters(const Poco::Util::AbstractConfiguration & config_, const Host & host_, const Database & database_, std::optional<UInt16> port_);

View File

@ -1225,6 +1225,7 @@ public:
{
SERVER, /// The program is run as clickhouse-server daemon (default behavior)
CLIENT, /// clickhouse-client
EMBEDDED_CLIENT,/// clickhouse-client being run over SSH tunnel
LOCAL, /// clickhouse-local
KEEPER, /// clickhouse-keeper (also daemon)
DISKS, /// clickhouse-disks

View File

@ -1,4 +1,4 @@
#include "ClientEmbedded.h"
#include <Server/ClientEmbedded/ClientEmbedded.h>
#include <base/getFQDNOrHostName.h>
#include <Interpreters/Session.h>
@ -96,6 +96,9 @@ int ClientEmbedded::run(const NameToNameMap & envVars, const String & first_quer
{
try
{
client_context = session->sessionContext();
initClientContext();
setThreadName("LocalServerPty");
query_processing_stage = QueryProcessingStage::Enum::Complete;
@ -125,7 +128,7 @@ try
default_database = getEnvOption<String>(envVars, "database", "");
format = getEnvOption<String>(envVars, "output-format", getEnvOption<String>(envVars, "format", is_interactive ? "PrettyCompact" : "TSV"));
default_output_format = getEnvOption<String>(envVars, "output-format", getEnvOption<String>(envVars, "format", is_interactive ? "PrettyCompact" : "TSV"));
// TODO: Fix
// insert_format = "Values";
insert_format_max_block_size = getEnvOption<size_t>(envVars, "insert_format_max_block_size",

View File

@ -57,6 +57,14 @@ protected:
String getName() const override { return "embedded"; }
void printHelpMessage(const OptionsDescription &, bool) override {}
void addOptions(OptionsDescription &) override {}
void processOptions(const OptionsDescription &,
const CommandLineOptions &,
const std::vector<Arguments> &,
const std::vector<Arguments> &) override {}
void processConfig() override {}
private:
void cleanup();

View File

@ -1,6 +1,6 @@
#include <Server/ClientEmbedded/ClientEmbeddedRunner.h>
#include <Common/Exception.h>
#include "Common/logger_useful.h"
#include <Common/logger_useful.h>
namespace DB
{

View File

@ -48,6 +48,5 @@ private:
ThreadFromGlobalPool client_thread;
std::unique_ptr<Session> db_session;
Poco::Logger * log;
// LocalServerPty client;
};
}

View File

@ -54,11 +54,17 @@ void SSHEvent::removeSession(SSHSession & session)
int SSHEvent::poll(int timeout)
{
int rc = ssh_event_dopoll(event.get(), timeout);
if (rc != SSH_OK)
throw DB::Exception(DB::ErrorCodes::SSH_EXCEPTION, "Error on polling on ssh event");
while (true)
{
int rc = ssh_event_dopoll(event.get(), timeout);
return rc;
if (rc == SSH_AGAIN)
continue;
if (rc != SSH_OK)
throw DB::Exception(DB::ErrorCodes::SSH_EXCEPTION, "Error on polling on ssh event: {}", rc);
return rc;
}
}
int SSHEvent::poll()

View File

@ -27,7 +27,20 @@ SSHSession::SSHSession() : session(ssh_new())
SSHSession::~SSHSession()
{
ssh_free(session);
if (session)
ssh_free(session);
}
SSHSession::SSHSession(SSHSession && rhs) noexcept
{
*this = std::move(rhs);
}
SSHSession & SSHSession::operator=(SSHSession && rhs) noexcept
{
this->session = rhs.session;
rhs.session = nullptr;
return *this;
}
SSHSession::SessionPtr SSHSession::getInternalPtr() const

View File

@ -18,10 +18,11 @@ class SSHSession
public:
SSHSession();
~SSHSession();
SSHSession(SSHSession &&) noexcept;
SSHSession & operator=(SSHSession &&) noexcept;
SSHSession(const SSHSession &) = delete;
SSHSession & operator=(const SSHSession &) = delete;
SSHSession(SSHSession &&) noexcept = default;
SSHSession & operator=(SSHSession &&) noexcept = default;
using SessionPtr = ssh_session_struct *;
/// Get raw pointer from libssh to be able to pass it to other objects
@ -51,7 +52,7 @@ public:
bool hasFinished();
private:
SessionPtr session;
SessionPtr session = nullptr;
};
}