Merged clickhouse-server and clickhouse-local into common app.

This commit is contained in:
Vitaliy Lyudvichenko 2016-10-25 15:14:27 +03:00
parent 48a3d25f99
commit d952dd39b0
11 changed files with 331 additions and 240 deletions

View File

@ -34,6 +34,7 @@ ASTPtr parseQuery(
/// Split queries separated by ; on to list of single queries
bool splitMultipartQuery(const std::string & queries, std::vector<std::string> & queries_list);
/// Returns pointer to the end of last sucessfuly parsed query (first), and true if all queries are sucessfuly parsed (second)
std::pair<const char *, bool> splitMultipartQuery(const std::string & queries, std::vector<std::string> & queries_list);
}

View File

@ -20,17 +20,16 @@ endif()
add_executable(clickhouse-client Client.cpp)
add_executable(clickhouse-benchmark Benchmark.cpp)
add_executable(clickhouse-local LocalServer.cpp)
add_custom_target(clickhouse-local DEPENDS clickhouse-server SOURCES clickhouse-local)
target_link_libraries (clickhouse-client dbms ${LINE_EDITING_LIBS} ${BOOST_PROGRAM_OPTIONS_LIB})
target_link_libraries (clickhouse-benchmark dbms ${BOOST_PROGRAM_OPTIONS_LIB})
target_link_libraries (clickhouse-local dbms ${BOOST_PROGRAM_OPTIONS_LIB})
INSTALL(TARGETS clickhouse-client RUNTIME DESTINATION bin COMPONENT clickhouse-client)
INSTALL(FILES config.xml DESTINATION /etc/clickhouse-client COMPONENT clickhouse-client)
INSTALL(TARGETS clickhouse-benchmark RUNTIME DESTINATION bin COMPONENT clickhouse-benchmark)
INSTALL(TARGETS clickhouse-local RUNTIME DESTINATION bin COMPONENT clickhouse-local)
INSTALL(FILES clickhouse-local DESTINATION bin)
IF(TESTS)
add_subdirectory (tests)

View File

@ -1,227 +0,0 @@
#include <Poco/Util/Application.h>
#include <Poco/Util/ServerApplication.h>
#include <Poco/Util/XMLConfiguration.h>
#include <DB/Databases/DatabaseOrdinary.h>
#include <DB/Interpreters/Context.h>
#include <DB/Interpreters/ProcessList.h>
#include <DB/Interpreters/executeQuery.h>
#include <DB/Common/Macros.h>
#include <DB/Common/ConfigProcessor.h>
#include <DB/IO/ReadBufferFromString.h>
#include <DB/IO/WriteBufferFromFileDescriptor.h>
#include <DB/Parsers/parseQuery.h>
#include <common/ErrorHandlers.h>
namespace DB
{
/// Lightweight Application for clickhouse-local
/// No networking, no extra configs and working directories, no pid-file, no dictionaries, no logging
/// Quiet mode by default
class LocalServer : public Poco::Util::ServerApplication
{
public:
LocalServer() {}
void initialize(Poco::Util::Application & self) override
{
Poco::Util::ServerApplication::initialize(self);
}
void defineOptions(Poco::Util::OptionSet& _options) override
{
Poco::Util::ServerApplication::defineOptions (_options);
_options.addOption(
Poco::Util::Option ("config-file", "C", "load configuration from a given file")
.required (false)
.repeatable (false)
.argument ("<file>")
.binding("config-file")
);
_options.addOption(
Poco::Util::Option ("log-file", "L", "use given log file")
.required (false)
.repeatable (false)
.argument ("<file>")
.binding("logger.log")
);
_options.addOption(
Poco::Util::Option ("errorlog-file", "E", "use given log file for errors only")
.required (false)
.repeatable (false)
.argument ("<file>")
.binding("logger.errorlog")
);
_options.addOption(
Poco::Util::Option ("query", "Q", "[Local mode] queries to execute")
.required (false)
.repeatable (false)
.argument ("<string>", true)
.binding("query")
);
}
int main(const std::vector<std::string> & args) override
{
if (!config().has("query")) /// Nothing to process
return Application::EXIT_OK;
context = std::make_unique<Context>();
context->setGlobalContext(*context);
/// Skip path, temp path, flag's path installation
/// We will terminate process on error
static KillingErrorHandler error_handler;
Poco::ErrorHandler::set(&error_handler);
/// Don't initilaize DateLUT
/// Maybe useless
if (config().has("macros"))
context->setMacros(Macros(config(), "macros"));
/// Skip networking
setupUsers();
/// Limit on total number of coucurrently executed queries.
/// Threre are no need for concurrent threads, override max_concurrent_queries.
context->getProcessList().setMaxSize(0);
/// Size of cache for uncompressed blocks. Zero means disabled.
size_t uncompressed_cache_size = parse<size_t>(config().getString("uncompressed_cache_size", "0"));
if (uncompressed_cache_size)
context->setUncompressedCache(uncompressed_cache_size);
/// Size of cache for marks (index of MergeTree family of tables). It is necessary.
/// Specify default value if mark_cache_size explicitly!
size_t mark_cache_size = parse<size_t>(config().getString("mark_cache_size", "5368709120"));
if (mark_cache_size)
context->setMarkCache(mark_cache_size);
/// Load global settings from default profile.
context->setSetting("profile", config().getString("default_profile", "default"));
/// Init dummy default DB
const std::string default_database = "default";
context->addDatabase(default_database, std::make_shared<DatabaseMemory>(default_database));
context->setCurrentDatabase(default_database);
processQueries();
return Application::EXIT_OK;
}
void processQueries()
{
Logger * log = &logger();
String queries_str = config().getString("query");
LOG_DEBUG(log, "Executing queries: '" << queries_str << "'");
std::vector<String> queries;
splitMultipartQuery(queries_str, queries);
context->setUser("default", "", Poco::Net::IPAddress{}, 0, "");
for (const auto & query : queries)
{
LOG_DEBUG(log, "Executing query: '" << query << "'");
try
{
ReadBufferFromString read_buf(query);
WriteBufferFromFileDescriptor write_buf(STDOUT_FILENO);
BlockInputStreamPtr plan;
LOG_DEBUG(log, "executing query: " << query);
executeQuery(read_buf, write_buf, *context, plan, nullptr);
}
catch (...)
{
tryLogCurrentException(log, "An error ocurred while executing query");
throw;
}
}
}
void setupUsers()
{
ConfigurationPtr users_config;
if (config().has("users_config") || config().has("config-file") || Poco::File("config.xml").exists())
{
auto users_config_path = config().getString("users_config", config().getString("config-file", "config.xml"));
users_config = ConfigProcessor().loadConfig(users_config_path);
}
else
{
std::stringstream default_user_stream;
default_user_stream <<
"<yandex>\n"
" <profiles>\n"
" <default>\n"
" <max_memory_usage>10000000000</max_memory_usage>\n"
" <use_uncompressed_cache>0</use_uncompressed_cache>\n"
" <load_balancing>random</load_balancing>\n"
" </default>\n"
" <readonly>\n"
" <readonly>1</readonly>\n"
" </readonly>\n"
" </profiles>\n"
"\n"
" <users>\n"
" <default>\n"
" <password></password>\n"
" <networks>\n"
" <ip>::/0</ip>\n"
" </networks>\n"
" <profile>default</profile>\n"
" <quota>default</quota>\n"
" </default>\n"
" </users>\n"
"\n"
" <quotas>\n"
" <default>\n"
" <interval>\n"
" <duration>3600</duration>\n"
" <queries>0</queries>\n"
" <errors>0</errors>\n"
" <result_rows>0</result_rows>\n"
" <read_rows>0</read_rows>\n"
" <execution_time>0</execution_time>\n"
" </interval>\n"
" </default>\n"
" </quotas>\n"
"</yandex>\n";
Poco::XML::InputSource default_user_source(default_user_stream);
users_config = ConfigurationPtr(new Poco::Util::XMLConfiguration(&default_user_source));
}
if (users_config)
context->setUsersConfig(users_config);
else
throw Exception("Can't load config for users");
}
protected:
std::unique_ptr<Context> context;
};
}
#include <common/ApplicationServerExt.h>
YANDEX_APP_SERVER_MAIN(DB::LocalServer)

View File

@ -0,0 +1,2 @@
#!/bin/bash
clickhouse-server --local-mode "$@"

View File

@ -176,7 +176,7 @@ ASTPtr parseQuery(
return parseQueryAndMovePosition(parser, pos, end, description, false);
}
bool splitMultipartQuery(const std::string & queries, std::vector<std::string> & queries_list)
std::pair<const char *, bool> splitMultipartQuery(const std::string & queries, std::vector<std::string> & queries_list)
{
ASTPtr ast;
ParserQuery parser;
@ -192,7 +192,7 @@ bool splitMultipartQuery(const std::string & queries, std::vector<std::string> &
ast = parseQueryAndMovePosition(parser, pos, end, "", true);
if (!ast)
break;
return std::make_pair(begin, false);
ASTInsertQuery * insert = typeid_cast<ASTInsertQuery *>(&*ast);
@ -211,7 +211,7 @@ bool splitMultipartQuery(const std::string & queries, std::vector<std::string> &
++begin;
}
return true;
return std::make_pair(begin, false);
}
}

View File

@ -1,4 +1,5 @@
add_executable(clickhouse-server
main.cpp
Server.cpp
HTTPHandler.cpp
TCPHandler.cpp
@ -6,7 +7,8 @@ add_executable(clickhouse-server
MetricsTransmitter.cpp
ConfigReloader.cpp
StatusFile.cpp
ReplicasStatusHandler.cpp)
ReplicasStatusHandler.cpp
LocalServer.cpp)
# Adding init.d support
INCLUDE(${PROJECT_SOURCE_DIR}/tools/init.d/CMakeLists.init)

View File

@ -0,0 +1,222 @@
#include "LocalServer.h"
#include <Poco/Util/XMLConfiguration.h>
#include <DB/Databases/DatabaseOrdinary.h>
#include <DB/Interpreters/Context.h>
#include <DB/Interpreters/ProcessList.h>
#include <DB/Interpreters/executeQuery.h>
#include <DB/Common/Macros.h>
#include <DB/Common/ConfigProcessor.h>
#include <DB/IO/ReadBufferFromString.h>
#include <DB/IO/WriteBufferFromFileDescriptor.h>
#include <DB/Parsers/parseQuery.h>
#include <common/ErrorHandlers.h>
namespace DB
{
void LocalServer::initialize(Poco::Util::Application & self)
{
Poco::Util::Application::initialize(self);
/// Load config files if exists
if (config().has("config-file") || Poco::File("config.xml").exists())
{
ConfigurationPtr processed_config = ConfigProcessor(false, true).loadConfig(config().getString("config-file", "config.xml"));
config().add(processed_config.duplicate(), PRIO_DEFAULT, false);
}
}
void LocalServer::defineOptions(Poco::Util::OptionSet& _options)
{
Poco::Util::Application::defineOptions (_options);
_options.addOption(
Poco::Util::Option ("config-file", "C", "load configuration from a given file")
.required (false)
.repeatable (false)
.argument ("<file>")
.binding("config-file")
);
// _options.addOption(
// Poco::Util::Option ("log-file", "L", "use given log file")
// .required (false)
// .repeatable (false)
// .argument ("<file>")
// .binding("logger.log")
// );
//
// _options.addOption(
// Poco::Util::Option ("errorlog-file", "E", "use given log file for errors only")
// .required (false)
// .repeatable (false)
// .argument ("<file>")
// .binding("logger.errorlog")
// );
_options.addOption(
Poco::Util::Option ("query", "Q", "[Local mode] queries to execute")
.required (false)
.repeatable (false)
.argument ("<string>", true)
.binding("query")
);
}
int LocalServer::main(const std::vector<std::string> & args)
{
if (!config().has("query")) /// Nothing to process
return Application::EXIT_OK;
context = std::make_unique<Context>();
context->setGlobalContext(*context);
/// Skip path, temp path, flag's path installation
/// We will terminate process on error
static KillingErrorHandler error_handler;
Poco::ErrorHandler::set(&error_handler);
/// Don't initilaize DateLUT
/// Maybe useless
if (config().has("macros"))
context->setMacros(Macros(config(), "macros"));
/// Skip networking
setupUsers();
/// Limit on total number of coucurrently executed queries.
/// Threre are no need for concurrent threads, override max_concurrent_queries.
context->getProcessList().setMaxSize(0);
/// Size of cache for uncompressed blocks. Zero means disabled.
size_t uncompressed_cache_size = parse<size_t>(config().getString("uncompressed_cache_size", "0"));
if (uncompressed_cache_size)
context->setUncompressedCache(uncompressed_cache_size);
/// Size of cache for marks (index of MergeTree family of tables). It is necessary.
/// Specify default value if mark_cache_size explicitly!
size_t mark_cache_size = parse<size_t>(config().getString("mark_cache_size", "5368709120"));
if (mark_cache_size)
context->setMarkCache(mark_cache_size);
/// Load global settings from default profile.
context->setSetting("profile", config().getString("default_profile", "default"));
/// Init dummy default DB
const std::string default_database = "default";
context->addDatabase(default_database, std::make_shared<DatabaseMemory>(default_database));
context->setCurrentDatabase(default_database);
processQueries();
return Application::EXIT_OK;
}
void LocalServer::processQueries()
{
Logger * log = &logger();
String queries_str = config().getString("query");
//LOG_DEBUG(log, "Executing queries: '" << queries_str << "'");
std::vector<String> queries;
auto parse_res = splitMultipartQuery(queries_str, queries);
context->setUser("default", "", Poco::Net::IPAddress{}, 0, "");
for (const auto & query : queries)
{
try
{
ReadBufferFromString read_buf(query);
WriteBufferFromFileDescriptor write_buf(STDOUT_FILENO);
BlockInputStreamPtr plan;
LOG_INFO(log, "executing query: " << query);
executeQuery(read_buf, write_buf, *context, plan, nullptr);
}
catch (...)
{
tryLogCurrentException(log, "An error ocurred while executing query");
throw;
}
}
if (!parse_res.second)
{
LOG_ERROR(log, "Cannot parse and execute the following part of query: '" << parse_res.first << "'");
}
}
void LocalServer::setupUsers()
{
ConfigurationPtr users_config;
if (config().has("users_config") || config().has("config-file") || Poco::File("config.xml").exists())
{
auto users_config_path = config().getString("users_config", config().getString("config-file", "config.xml"));
users_config = ConfigProcessor().loadConfig(users_config_path);
}
else
{
std::stringstream default_user_stream;
default_user_stream <<
"<yandex>\n"
" <profiles>\n"
" <default>\n"
" <max_memory_usage>10000000000</max_memory_usage>\n"
" <use_uncompressed_cache>0</use_uncompressed_cache>\n"
" <load_balancing>random</load_balancing>\n"
" </default>\n"
" <readonly>\n"
" <readonly>1</readonly>\n"
" </readonly>\n"
" </profiles>\n"
"\n"
" <users>\n"
" <default>\n"
" <password></password>\n"
" <networks>\n"
" <ip>::/0</ip>\n"
" </networks>\n"
" <profile>default</profile>\n"
" <quota>default</quota>\n"
" </default>\n"
" </users>\n"
"\n"
" <quotas>\n"
" <default>\n"
" <interval>\n"
" <duration>3600</duration>\n"
" <queries>0</queries>\n"
" <errors>0</errors>\n"
" <result_rows>0</result_rows>\n"
" <read_rows>0</read_rows>\n"
" <execution_time>0</execution_time>\n"
" </interval>\n"
" </default>\n"
" </quotas>\n"
"</yandex>\n";
Poco::XML::InputSource default_user_source(default_user_stream);
users_config = ConfigurationPtr(new Poco::Util::XMLConfiguration(&default_user_source));
}
if (users_config)
context->setUsersConfig(users_config);
else
throw Exception("Can't load config for users");
}
}
//#include <common/ApplicationServerExt.h>
//YANDEX_APP_MAIN(DB::LocalServer)

View File

@ -0,0 +1,35 @@
#include <Poco/Util/Application.h>
#include <memory>
namespace DB
{
class Context;
/// Lightweight Application for clickhouse-local
/// No networking, no extra configs and working directories, no pid and status files, no dictionaries, no logging.
/// Quiet mode by default
class LocalServer : public Poco::Util::Application
{
public:
LocalServer() = default;
void initialize(Poco::Util::Application & self) override;
void defineOptions(Poco::Util::OptionSet& _options) override;
int main(const std::vector<std::string> & args) override;
~LocalServer() = default;
protected:
void processQueries();
void setupUsers();
std::unique_ptr<Context> context;
};
}

View File

@ -6,7 +6,6 @@
#include <Poco/Util/XMLConfiguration.h>
#include <Poco/DirectoryIterator.h>
#include <common/ApplicationServerExt.h>
#include <common/ErrorHandlers.h>
#include <ext/scope_guard.hpp>
@ -654,5 +653,3 @@ void Server::attachSystemTables(const std::string & path, bool has_zookeeper) co
}
}
YANDEX_APP_SERVER_MAIN(DB::Server);

28
dbms/src/Server/main.cpp Normal file
View File

@ -0,0 +1,28 @@
#include "Server.h"
#include "LocalServer.h"
#include <common/ApplicationServerExt.h>
/// Universal executable for various clickhouse application
YANDEX_APP_SERVER_MAIN_FUNC(DB::Server, main_clickhouse_server);
YANDEX_APP_MAIN_FUNC(DB::LocalServer, main_clickhouse_local);
int main (int argc_, char * argv_[])
{
if (argc_ > 1 && !strcmp(argv_[1], "--local-mode"))
{
/// Cut first argument
int argc = argc_ - 1;
std::vector<char *> argv(argc);
argv[0] = argv_[0];
for (int i_arg = 2; i_arg < argc_; ++i_arg)
argv[i_arg - 1] = argv_[i_arg];
main_clickhouse_local(argc, argv.data());
}
else
{
main_clickhouse_server(argc_, argv_);
}
}

View File

@ -4,9 +4,9 @@
#include <Poco/TextEncoding.h>
#include <Poco/Util/ServerApplication.h>
#define YANDEX_APP_SERVER_MAIN(AppServerClassName) \
#define YANDEX_APP_SERVER_MAIN_FUNC(AppServerClassName, main_func) \
int \
main (int _argc, char* _argv[]) \
main_func (int _argc, char* _argv[]) \
{ \
AppServerClassName app; \
try \
@ -30,3 +30,35 @@
} \
return Poco::Util::Application::EXIT_CONFIG; \
}
#define YANDEX_APP_MAIN_FUNC(AppClassName, main_func) \
int \
main_func (int _argc, char* _argv[]) \
{ \
AppClassName app; \
try \
{ \
app.init(_argc, _argv); \
return app.run(); \
} \
catch (const Poco::Exception& _ex) \
{ \
std::cerr << "POCO ERROR: " << _ex.displayText() << std::endl; \
app.logger().log (_ex); \
} \
catch (const std::exception& _ex) \
{ \
std::cerr << "STD ERROR: " << _ex.what() << std::endl; \
app.logger().error (Poco::Logger::format ("Got exception: $0", _ex.what ())); \
} \
catch (...) \
{ \
std::cerr << "UNKNOWN ERROR" << std::endl; \
app.logger().error ("Unknown exception"); \
} \
return Poco::Util::Application::EXIT_CONFIG; \
}
#define YANDEX_APP_SERVER_MAIN(AppServerClassName) YANDEX_APP_SERVER_MAIN_FUNC(AppServerClassName, main)
#define YANDEX_APP_MAIN(AppClassName) YANDEX_APP_MAIN_FUNC(AppClassName, main)