Sessions in HTTP interface: modifications after merge [#CLICKHOUSE-2949].

This commit is contained in:
Alexey Milovidov 2017-06-03 00:01:17 +03:00
parent 8fb0afe74e
commit efc12f2334
3 changed files with 51 additions and 50 deletions

View File

@ -105,7 +105,7 @@ struct ContextShared
String path; /// Path to the data directory, with a slash at the end.
String tmp_path; /// The path to the temporary files that occur when processing the request.
String flags_path; ///
String flags_path; /// Path to the directory with some control flags for server maintenance.
Databases databases; /// List of databases and tables in them.
TableFunctionFactory table_function_factory; /// Table functions.
FormatFactory format_factory; /// Formats.
@ -123,16 +123,19 @@ struct ContextShared
InterserverIOHandler interserver_io_handler; /// Handler for interserver communication.
BackgroundProcessingPoolPtr background_pool; /// The thread pool for the background work performed by the tables.
ReshardingWorkerPtr resharding_worker;
Macros macros; /// Substitutions extracted from config.
std::unique_ptr<Compiler> compiler; /// Used for dynamic compilation of queries' parts if it necessary.
Macros macros; /// Substitutions from config. Can be used for parameters of ReplicatedMergeTree.
std::unique_ptr<Compiler> compiler; /// Used for dynamic compilation of queries' parts if it necessary.
std::unique_ptr<QueryLog> query_log; /// Used to log queries.
std::shared_ptr<PartLog> part_log; /// Used to log operations with parts
std::shared_ptr<PartLog> part_log; /// Used to log operations with parts
/// Rules for selecting the compression method, depending on the size of the part.
mutable std::unique_ptr<CompressionMethodSelector> compression_method_selector;
std::unique_ptr<MergeTreeSettings> merge_tree_settings; /// Settings of MergeTree* engines.
size_t max_table_size_to_drop = 50000000000lu; /// Protects MergeTree tables from accidental DROP (50GB by default)
std::unique_ptr<MergeTreeSettings> merge_tree_settings; /// Settings of MergeTree* engines.
size_t max_table_size_to_drop = 50000000000lu; /// Protects MergeTree tables from accidental DROP (50GB by default)
class SessionKeyHash {
/// Named sessions. The user could specify session identifier to reuse settings and temporary tables in subsequent requests.
class SessionKeyHash
{
public:
size_t operator()(const Context::SessionKey & key) const
{
@ -272,14 +275,14 @@ Context::SessionKey Context::getSessionKey(const String & session_id) const
}
void Context::scheduleClose(const Context::SessionKey & key, std::chrono::steady_clock::duration timeout)
void Context::scheduleCloseSession(const Context::SessionKey & key, std::chrono::steady_clock::duration timeout)
{
const UInt64 close_index = timeout / shared->close_interval + 1;
const auto new_close_cycle = shared->close_cycle + close_index;
if (close_cycle != new_close_cycle)
if (session_close_cycle != new_close_cycle)
{
close_cycle = new_close_cycle;
session_close_cycle = new_close_cycle;
if (shared->close_times.size() < close_index + 1)
shared->close_times.resize(close_index + 1);
shared->close_times[close_index].emplace_back(key);
@ -301,7 +304,7 @@ std::shared_ptr<Context> Context::acquireSession(const String & session_id, std:
auto new_session = std::make_shared<Context>(*global_context);
new_session->scheduleClose(key, timeout);
new_session->scheduleCloseSession(key, timeout);
it = shared->sessions.insert(std::make_pair(key, std::move(new_session))).first;
}
@ -312,9 +315,9 @@ std::shared_ptr<Context> Context::acquireSession(const String & session_id, std:
const auto & session = it->second;
if (session->used)
if (session->session_is_used)
throw Exception("Session is locked by a concurrent client.", ErrorCodes::SESSION_IS_LOCKED);
session->used = true;
session->session_is_used = true;
session->client_info = client_info;
@ -326,9 +329,8 @@ void Context::releaseSession(const String & session_id, std::chrono::steady_cloc
{
auto lock = getLock();
used = false;
scheduleClose(getSessionKey(session_id), timeout);
session_is_used = false;
scheduleCloseSession(getSessionKey(session_id), timeout);
}
@ -343,7 +345,7 @@ std::chrono::steady_clock::duration Context::closeSessions() const
const auto current_cycle = shared->close_cycle;
++(shared->close_cycle);
++shared->close_cycle;
shared->close_cycle_time = now + shared->close_interval;
if (shared->close_times.empty())
@ -355,10 +357,10 @@ std::chrono::steady_clock::duration Context::closeSessions() const
{
const auto session = shared->sessions.find(key);
if (session != shared->sessions.end() && session->second->close_cycle <= current_cycle)
if (session != shared->sessions.end() && session->second->session_close_cycle <= current_cycle)
{
if (session->second->used)
session->second->scheduleClose(key, std::chrono::seconds(0));
if (session->second->session_is_used)
session->second->scheduleCloseSession(key, std::chrono::seconds(0));
else
shared->sessions.erase(session);
}

View File

@ -89,21 +89,21 @@ private:
ClientInfo client_info;
std::shared_ptr<QuotaForIntervals> quota; /// Current quota. By default - empty quota, that have no limits.
std::shared_ptr<QuotaForIntervals> quota; /// Current quota. By default - empty quota, that have no limits.
String current_database;
Settings settings; /// Setting for query execution.
using ProgressCallback = std::function<void(const Progress & progress)>;
ProgressCallback progress_callback; /// Callback for tracking progress of query execution.
ProgressCallback progress_callback; /// Callback for tracking progress of query execution.
ProcessListElement * process_list_elem = nullptr; /// For tracking total resource usage for query.
String default_format; /// Format, used when server formats data by itself and if query does not have FORMAT specification.
String default_format; /// Format, used when server formats data by itself and if query does not have FORMAT specification.
/// Thus, used in HTTP interface. If not specified - then some globally default format is used.
Tables external_tables; /// Temporary tables.
Context * session_context = nullptr; /// Session context or nullptr. Could be equal to this.
Context * global_context = nullptr; /// Global context or nullptr. Could be equal to this.
Tables external_tables; /// Temporary tables.
Context * session_context = nullptr; /// Session context or nullptr. Could be equal to this.
Context * global_context = nullptr; /// Global context or nullptr. Could be equal to this.
UInt64 close_cycle = 0;
bool used = false;
UInt64 session_close_cycle = 0;
bool session_is_used = false;
using DatabasePtr = std::shared_ptr<IDatabase>;
using Databases = std::map<String, std::shared_ptr<IDatabase>>;
@ -222,12 +222,11 @@ public:
const Databases getDatabases() const;
Databases getDatabases();
using SessionKey = std::pair<String, String>;
std::shared_ptr<Context> acquireSession(const String & session_id, std::chrono::steady_clock::duration timeout, bool session_check) const;
void releaseSession(const String & session_id, std::chrono::steady_clock::duration timeout);
std::chrono::steady_clock::duration closeSessions() const;
/// Close sessions, that has been expired. Returns how long to wait for next session to be expired, if no new sessions will be added.
std::chrono::steady_clock::duration closeSessions() const;
/// For methods below you may need to acquire a lock by yourself.
std::unique_lock<Poco::Mutex> getLock() const;
@ -324,6 +323,9 @@ public:
String getDefaultProfileName() const;
void setDefaultProfileName(const String & name);
/// User name and session identifier. Named sessions are local to users.
using SessionKey = std::pair<String, String>;
private:
/** Проверить, имеет ли текущий клиент доступ к заданной базе данных.
* Если доступ запрещён, кинуть исключение.
@ -337,7 +339,9 @@ private:
StoragePtr getTableImpl(const String & database_name, const String & table_name, Exception * exception) const;
SessionKey getSessionKey(const String & session_id) const;
void scheduleClose(const SessionKey & key, std::chrono::steady_clock::duration timeout);
/// Session will be closed after specified timeout.
void scheduleCloseSession(const SessionKey & key, std::chrono::steady_clock::duration timeout);
};

View File

@ -80,6 +80,7 @@ namespace ErrorCodes
extern const int INVALID_SESSION_TIMEOUT;
}
static Poco::Net::HTTPResponse::HTTPStatus exceptionCodeToHTTPStatus(int exception_code)
{
using namespace Poco::Net;
@ -127,30 +128,21 @@ static Poco::Net::HTTPResponse::HTTPStatus exceptionCodeToHTTPStatus(int excepti
static std::chrono::steady_clock::duration parseSessionTimeout(const HTMLForm & params)
{
int session_timeout;
const auto & config = Poco::Util::Application::instance().config();
unsigned session_timeout = config.getInt("default_session_timeout", 60);
if (params.has("session_timeout"))
{
auto max_session_timeout = Poco::Util::Application::instance().config().getInt("max_session_timeout", 3600);
auto session_timeout_str = params.get("session_timeout");
unsigned max_session_timeout = config.getUInt("max_session_timeout", 3600);
std::string session_timeout_str = params.get("session_timeout");
try
{
session_timeout = std::stoi(session_timeout_str);
}
catch (...)
{
session_timeout = -1;
}
session_timeout = parse<unsigned>(session_timeout_str);
if (session_timeout < 0 || max_session_timeout < session_timeout)
throw Exception("Invalid session timeout '" + session_timeout_str + "', valid values are integers from 0 to " + std::to_string(max_session_timeout),
if (session_timeout > max_session_timeout)
throw Exception("Session timeout '" + session_timeout_str + "' is larger than max_session_timeout: " + toString(max_session_timeout)
+ ". Maximum session timeout could be modified in configuration file.",
ErrorCodes::INVALID_SESSION_TIMEOUT);
}
else
{
session_timeout = Poco::Util::Application::instance().config().getInt("default_session_timeout", 60);
}
return std::chrono::seconds(session_timeout);
}
@ -196,6 +188,7 @@ HTTPHandler::HTTPHandler(Server & server_)
{
}
void HTTPHandler::processQuery(
Poco::Net::HTTPServerRequest & request,
HTMLForm & params,
@ -213,7 +206,6 @@ void HTTPHandler::processQuery(
if (!query_param.empty())
query_param += '\n';
/// User name and password can be passed using query parameters or using HTTP Basic auth (both methods are insecure).
/// The user and password can be passed by headers (similar to X-Auth-*), which is used by load balancers to pass authentication information
std::string user = request.get("X-ClickHouse-User", params.get("user", "default"));
@ -236,6 +228,9 @@ void HTTPHandler::processQuery(
context.setUser(user, password, request.clientAddress(), quota_key);
context.setCurrentQueryId(query_id);
/// The user could specify session identifier and session timeout.
/// It allows to modify settings, create temporary tables and reuse them in subsequent requests.
std::shared_ptr<Context> session;
String session_id;
std::chrono::steady_clock::duration session_timeout;