Merge pull request #10308 from ClickHouse/shared-context-lifetime

Shared context lifetime
This commit is contained in:
alexey-milovidov 2020-04-21 00:12:45 +03:00 committed by GitHub
commit 8c2839d3c9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
32 changed files with 145 additions and 75 deletions

View File

@ -64,7 +64,8 @@ public:
concurrency(concurrency_), delay(delay_), queue(concurrency), randomize(randomize_),
cumulative(cumulative_), max_iterations(max_iterations_), max_time(max_time_),
json_path(json_path_), confidence(confidence_), query_id(query_id_), settings(settings_),
global_context(Context::createGlobal()), pool(concurrency)
shared_context(Context::createShared()), global_context(Context::createGlobal(shared_context.get())),
pool(concurrency)
{
const auto secure = secure_ ? Protocol::Secure::Enable : Protocol::Secure::Disable;
size_t connections_cnt = std::max(ports_.size(), hosts_.size());
@ -149,6 +150,7 @@ private:
size_t confidence;
std::string query_id;
Settings settings;
SharedContextHolder shared_context;
Context global_context;
QueryProcessingStage::Enum query_processing_stage;

View File

@ -138,7 +138,8 @@ private:
bool has_vertical_output_suffix = false; /// Is \G present at the end of the query string?
Context context = Context::createGlobal();
SharedContextHolder shared_context = Context::createShared();
Context context = Context::createGlobal(shared_context.get());
/// Buffer that reads from stdin in batch mode.
ReadBufferFromFileDescriptor std_in {STDIN_FILENO};

View File

@ -94,7 +94,8 @@ void ClusterCopierApp::mainImpl()
<< "path " << process_path << ", "
<< "revision " << ClickHouseRevision::get() << ")");
auto context = std::make_unique<Context>(Context::createGlobal());
SharedContextHolder shared_context = Context::createShared();
auto context = std::make_unique<Context>(Context::createGlobal(shared_context.get()));
context->makeGlobalContext();
SCOPE_EXIT(context->shutdown());

View File

@ -148,7 +148,8 @@ try
return Application::EXIT_OK;
}
context = std::make_unique<Context>(Context::createGlobal());
shared_context = Context::createShared();
context = std::make_unique<Context>(Context::createGlobal(shared_context.get()));
context->makeGlobalContext();
context->setApplicationType(Context::ApplicationType::LOCAL);
tryInitPath();

View File

@ -41,6 +41,7 @@ private:
void setupUsers();
protected:
SharedContextHolder shared_context;
std::unique_ptr<Context> context;
/// Settings specified via command line args

View File

@ -1080,7 +1080,8 @@ try
header.insert(std::move(column));
}
Context context = Context::createGlobal();
SharedContextHolder shared_context = Context::createShared();
Context context = Context::createGlobal(shared_context.get());
context.makeGlobalContext();
ReadBufferFromFileDescriptor file_in(STDIN_FILENO);

View File

@ -120,7 +120,7 @@ void ODBCColumnsInfoHandler::handleRequest(Poco::Net::HTTPServerRequest & reques
SCOPE_EXIT(SQLFreeStmt(hstmt, SQL_DROP));
const auto & context_settings = context->getSettingsRef();
const auto & context_settings = context.getSettingsRef();
/// TODO Why not do SQLColumns instead?
std::string name = schema_name.empty() ? table_name : schema_name + "." + table_name;

View File

@ -14,7 +14,7 @@ namespace DB
class ODBCColumnsInfoHandler : public Poco::Net::HTTPRequestHandler
{
public:
ODBCColumnsInfoHandler(size_t keep_alive_timeout_, std::shared_ptr<Context> context_)
ODBCColumnsInfoHandler(size_t keep_alive_timeout_, Context & context_)
: log(&Poco::Logger::get("ODBCColumnsInfoHandler")), keep_alive_timeout(keep_alive_timeout_), context(context_)
{
}
@ -24,7 +24,7 @@ public:
private:
Poco::Logger * log;
size_t keep_alive_timeout;
std::shared_ptr<Context> context;
Context & context;
};
}
#endif

View File

@ -21,7 +21,7 @@ namespace DB
class HandlerFactory : public Poco::Net::HTTPRequestHandlerFactory
{
public:
HandlerFactory(const std::string & name_, size_t keep_alive_timeout_, std::shared_ptr<Context> context_)
HandlerFactory(const std::string & name_, size_t keep_alive_timeout_, Context & context_)
: log(&Poco::Logger::get(name_)), name(name_), keep_alive_timeout(keep_alive_timeout_), context(context_)
{
pool_map = std::make_shared<ODBCHandler::PoolMap>();
@ -33,7 +33,7 @@ private:
Poco::Logger * log;
std::string name;
size_t keep_alive_timeout;
std::shared_ptr<Context> context;
Context & context;
std::shared_ptr<ODBCHandler::PoolMap> pool_map;
};
}

View File

@ -12,8 +12,8 @@ namespace DB
class IdentifierQuoteHandler : public Poco::Net::HTTPRequestHandler
{
public:
IdentifierQuoteHandler(size_t keep_alive_timeout_, std::shared_ptr<Context> context_)
: log(&Poco::Logger::get("IdentifierQuoteHandler")), keep_alive_timeout(keep_alive_timeout_), context(context_)
IdentifierQuoteHandler(size_t keep_alive_timeout_, Context &)
: log(&Poco::Logger::get("IdentifierQuoteHandler")), keep_alive_timeout(keep_alive_timeout_)
{
}
@ -22,7 +22,6 @@ public:
private:
Poco::Logger * log;
size_t keep_alive_timeout;
std::shared_ptr<Context> context;
};
}
#endif

View File

@ -129,7 +129,7 @@ void ODBCHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Ne
WriteBufferFromHTTPServerResponse out(request, response, keep_alive_timeout);
try
{
BlockOutputStreamPtr writer = FormatFactory::instance().getOutput(format, out, *sample_block, *context);
BlockOutputStreamPtr writer = FormatFactory::instance().getOutput(format, out, *sample_block, context);
auto pool = getPool(connection_string);
ODBCBlockInputStream inp(pool->get(), query, *sample_block, max_block_size);
copyData(inp, *writer);

View File

@ -24,7 +24,7 @@ public:
ODBCHandler(std::shared_ptr<PoolMap> pool_map_,
size_t keep_alive_timeout_,
std::shared_ptr<Context> context_)
Context & context_)
: log(&Poco::Logger::get("ODBCHandler"))
, pool_map(pool_map_)
, keep_alive_timeout(keep_alive_timeout_)
@ -39,7 +39,7 @@ private:
std::shared_ptr<PoolMap> pool_map;
size_t keep_alive_timeout;
std::shared_ptr<Context> context;
Context & context;
static inline std::mutex mutex;

View File

@ -176,8 +176,9 @@ int ODBCBridge::main(const std::vector<std::string> & /*args*/)
http_params->setTimeout(http_timeout);
http_params->setKeepAliveTimeout(keep_alive_timeout);
context = std::make_shared<Context>(Context::createGlobal());
context->makeGlobalContext();
auto shared_context = Context::createShared();
Context context(Context::createGlobal(shared_context.get()));
context.makeGlobalContext();
if (config().has("query_masking_rules"))
{

View File

@ -35,7 +35,5 @@ private:
size_t keep_alive_timeout;
Poco::Logger * log;
std::shared_ptr<Context> context; /// need for settings only
};
}

View File

@ -231,7 +231,10 @@ int Server::main(const std::vector<std::string> & /*args*/)
/** Context contains all that query execution is dependent:
* settings, available functions, data types, aggregate functions, databases...
*/
global_context = std::make_unique<Context>(Context::createGlobal());
auto shared_context = Context::createShared();
auto global_context = std::make_unique<Context>(Context::createGlobal(shared_context.get()));
global_context_ptr = global_context.get();
global_context->makeGlobalContext();
global_context->setApplicationType(Context::ApplicationType::SERVER);
@ -328,7 +331,9 @@ int Server::main(const std::vector<std::string> & /*args*/)
/** Explicitly destroy Context. It is more convenient than in destructor of Server, because logger is still available.
* At this moment, no one could own shared part of Context.
*/
global_context_ptr = nullptr;
global_context.reset();
shared_context.reset();
LOG_DEBUG(log, "Destroyed global context.");
});

View File

@ -35,7 +35,7 @@ public:
Context & context() const override
{
return *global_context;
return *global_context_ptr;
}
bool isCancelled() const override
@ -56,7 +56,7 @@ protected:
std::string getDefaultCorePath() const override;
private:
std::unique_ptr<Context> global_context;
Context * global_context_ptr = nullptr;
};
}

View File

@ -2,16 +2,24 @@
#include <Interpreters/Context.h>
inline DB::Context createContext()
struct ContextHolder
{
auto context = DB::Context::createGlobal();
context.makeGlobalContext();
context.setPath("./");
return context;
}
DB::SharedContextHolder shared_context;
DB::Context context;
inline const DB::Context & getContext()
ContextHolder()
: shared_context(DB::Context::createShared())
, context(DB::Context::createGlobal(shared_context.get()))
{
}
ContextHolder(ContextHolder &&) = default;
};
inline ContextHolder getContext()
{
static DB::Context global_context = createContext();
return global_context;
ContextHolder holder;
holder.context.makeGlobalContext();
holder.context.setPath("./");
return holder;
}

View File

@ -66,7 +66,8 @@ try
//CollapsingSortedBlockInputStream collapsed(inputs, descr, "Sign", 1048576);
CollapsingFinalBlockInputStream collapsed(inputs, descr, "Sign");
Context context = Context::createGlobal();
SharedContextHolder shared_context = Context::createShared();
Context context = Context::createGlobal(shared_context.get());
context.makeGlobalContext();
WriteBufferFromFileDescriptor out_buf(STDERR_FILENO);
BlockOutputStreamPtr output = context.getOutputFormat("TabSeparated", out_buf, block1);

View File

@ -35,7 +35,8 @@ try
ParserSelectQuery parser;
ASTPtr ast = parseQuery(parser, input.data(), input.data() + input.size(), "", 0, 0);
Context context = Context::createGlobal();
SharedContextHolder shared_context = Context::createShared();
Context context = Context::createGlobal(shared_context.get());
context.makeGlobalContext();
NamesAndTypesList source_columns = {{"number", std::make_shared<DataTypeUInt64>()}};

View File

@ -40,7 +40,8 @@ try
formatAST(*ast, std::cerr);
std::cerr << std::endl;
Context context = Context::createGlobal();
SharedContextHolder shared_context = Context::createShared();
Context context = Context::createGlobal(shared_context.get());
context.makeGlobalContext();
NamesAndTypesList source_columns = {{"number", std::make_shared<DataTypeUInt64>()}};

View File

@ -23,7 +23,8 @@ using namespace DB;
int main(int, char **)
try
{
Context context = Context::createGlobal();
SharedContextHolder shared_context = Context::createShared();
Context context = Context::createGlobal(shared_context.get());
context.makeGlobalContext();
Settings settings = context.getSettings();

View File

@ -445,14 +445,28 @@ Context::Context() = default;
Context::Context(const Context &) = default;
Context & Context::operator=(const Context &) = default;
SharedContextHolder::SharedContextHolder(SharedContextHolder &&) noexcept = default;
SharedContextHolder & SharedContextHolder::operator=(SharedContextHolder &&) = default;
SharedContextHolder::SharedContextHolder() = default;
SharedContextHolder::~SharedContextHolder() = default;
SharedContextHolder::SharedContextHolder(std::unique_ptr<ContextShared> shared_context)
: shared(std::move(shared_context)) {}
Context Context::createGlobal()
void SharedContextHolder::reset() { shared.reset(); }
Context Context::createGlobal(ContextShared * shared)
{
Context res;
res.shared = std::make_shared<ContextShared>();
res.shared = shared;
return res;
}
SharedContextHolder Context::createShared()
{
return SharedContextHolder(std::make_unique<ContextShared>());
}
Context::~Context() = default;

View File

@ -131,6 +131,23 @@ struct IHostContext
using IHostContextPtr = std::shared_ptr<IHostContext>;
/// A small class which owns ContextShared.
/// We don't use something like unique_ptr directly to allow ContextShared type to be incomplete.
struct SharedContextHolder
{
~SharedContextHolder();
SharedContextHolder();
SharedContextHolder(std::unique_ptr<ContextShared> shared_context);
SharedContextHolder(SharedContextHolder &&) noexcept;
SharedContextHolder & operator=(SharedContextHolder &&);
ContextShared * get() const { return shared.get(); }
void reset();
private:
std::unique_ptr<ContextShared> shared;
};
/** A set of known objects that can be used in the query.
* Consists of a shared part (always common to all sessions and queries)
* and copied part (which can be its own for each session or query).
@ -140,8 +157,7 @@ using IHostContextPtr = std::shared_ptr<IHostContext>;
class Context
{
private:
using Shared = std::shared_ptr<ContextShared>;
Shared shared;
ContextShared * shared;
ClientInfo client_info;
ExternalTablesInitializer external_tables_initializer_callback;
@ -193,7 +209,8 @@ private:
public:
/// Create initial Context with ContextShared and etc.
static Context createGlobal();
static Context createGlobal(ContextShared * shared);
static SharedContextHolder createShared();
Context(const Context &);
Context & operator=(const Context &);

View File

@ -78,7 +78,8 @@ try
ParserCreateQuery parser;
ASTPtr ast = parseQuery(parser, input.data(), input.data() + input.size(), "", 0, 0);
Context context = Context::createGlobal();
SharedContextHolder shared_context = Context::createShared();
Context context = Context::createGlobal(shared_context.get());
context.makeGlobalContext();
context.setPath("./");

View File

@ -46,7 +46,8 @@ int main(int argc, char ** argv)
formatAST(*ast, std::cerr);
std::cerr << std::endl;
Context context = Context::createGlobal();
SharedContextHolder shared_context = Context::createShared();
Context context = Context::createGlobal(shared_context.get());
context.makeGlobalContext();
NamesAndTypesList columns
{

View File

@ -95,7 +95,8 @@ int main()
}
};
Context context = Context::createGlobal();
SharedContextHolder shared_context = Context::createShared();
Context context = Context::createGlobal(shared_context.get());
context.makeGlobalContext();
auto system_database = std::make_shared<DatabaseMemory>("system");

View File

@ -1156,7 +1156,8 @@ static bool run()
TestResult check(const TestEntry & entry)
{
static DB::Context context = DB::Context::createGlobal();
static DB::SharedContextHolder shared_context = DB::Context::createShared();
static DB::Context context = DB::Context::createGlobal(shared_context.get());
context.makeGlobalContext();
try

View File

@ -30,7 +30,8 @@ try
/// Pre-initialize the `DateLUT` so that the first initialization does not affect the measured execution speed.
DateLUT::instance();
Context context = Context::createGlobal();
SharedContextHolder shared_context = Context::createShared();
Context context = Context::createGlobal(shared_context.get());
context.makeGlobalContext();
context.setPath("./");

View File

@ -68,7 +68,7 @@ using DiskImplementations = testing::Types<DB::DiskMemory, DB::DiskLocal>;
TYPED_TEST_SUITE(StorageLogTest, DiskImplementations);
// Returns data written to table in Values format.
std::string writeData(int rows, DB::StoragePtr & table)
std::string writeData(int rows, DB::StoragePtr & table, DB::Context & context)
{
using namespace DB;
@ -96,23 +96,23 @@ std::string writeData(int rows, DB::StoragePtr & table)
block.insert(column);
}
BlockOutputStreamPtr out = table->write({}, getContext());
BlockOutputStreamPtr out = table->write({}, context);
out->write(block);
return data;
}
// Returns all table data in Values format.
std::string readData(DB::StoragePtr & table)
std::string readData(DB::StoragePtr & table, DB::Context & context)
{
using namespace DB;
Names column_names;
column_names.push_back("a");
QueryProcessingStage::Enum stage = table->getQueryProcessingStage(getContext());
QueryProcessingStage::Enum stage = table->getQueryProcessingStage(context);
BlockInputStreamPtr in = std::make_shared<TreeExecutorBlockInputStream>(std::move(table->read(column_names, {}, getContext(), stage, 8192, 1)[0]));
BlockInputStreamPtr in = std::make_shared<TreeExecutorBlockInputStream>(std::move(table->read(column_names, {}, context, stage, 8192, 1)[0]));
Block sample;
{
@ -123,7 +123,7 @@ std::string readData(DB::StoragePtr & table)
std::ostringstream ss;
WriteBufferFromOStream out_buf(ss);
BlockOutputStreamPtr output = FormatFactory::instance().getOutput("Values", out_buf, sample, getContext());
BlockOutputStreamPtr output = FormatFactory::instance().getOutput("Values", out_buf, sample, context);
copyData(*in, *output);
@ -135,15 +135,16 @@ std::string readData(DB::StoragePtr & table)
TYPED_TEST(StorageLogTest, testReadWrite)
{
using namespace DB;
auto context_holder = getContext();
std::string data;
// Write several chunks of data.
data += writeData(10, this->getTable());
data += writeData(10, this->getTable(), context_holder.context);
data += ",";
data += writeData(20, this->getTable());
data += writeData(20, this->getTable(), context_holder.context);
data += ",";
data += writeData(10, this->getTable());
data += writeData(10, this->getTable(), context_holder.context);
ASSERT_EQ(data, readData(this->getTable()));
ASSERT_EQ(data, readData(this->getTable(), context_holder.context));
}

View File

@ -18,7 +18,7 @@ using namespace DB;
/// NOTE How to do better?
struct State
{
Context context = getContext();
Context & context;
NamesAndTypesList columns{
{"column", std::make_shared<DataTypeUInt8>()},
{"apply_id", std::make_shared<DataTypeUInt64>()},
@ -27,7 +27,7 @@ struct State
{"create_time", std::make_shared<DataTypeDateTime>()},
};
State()
State(Context & context_) : context(context_)
{
registerFunctions();
DatabasePtr database = std::make_shared<DatabaseMemory>("test");
@ -38,12 +38,6 @@ struct State
}
};
static State & state()
{
static State res;
return res;
}
static void check(const std::string & query, const std::string & expected, const Context & context, const NamesAndTypesList & columns)
{
@ -60,47 +54,62 @@ static void check(const std::string & query, const std::string & expected, const
TEST(TransformQueryForExternalDatabase, InWithSingleElement)
{
auto context_holder = getContext();
State state(context_holder.context);
check("SELECT column FROM test.table WHERE 1 IN (1)",
R"(SELECT "column" FROM "test"."table" WHERE 1)",
state().context, state().columns);
state.context, state.columns);
check("SELECT column FROM test.table WHERE column IN (1, 2)",
R"(SELECT "column" FROM "test"."table" WHERE "column" IN (1, 2))",
state().context, state().columns);
state.context, state.columns);
check("SELECT column FROM test.table WHERE column NOT IN ('hello', 'world')",
R"(SELECT "column" FROM "test"."table" WHERE "column" NOT IN ('hello', 'world'))",
state().context, state().columns);
state.context, state.columns);
}
TEST(TransformQueryForExternalDatabase, Like)
{
auto context_holder = getContext();
State state(context_holder.context);
check("SELECT column FROM test.table WHERE column LIKE '%hello%'",
R"(SELECT "column" FROM "test"."table" WHERE "column" LIKE '%hello%')",
state().context, state().columns);
state.context, state.columns);
check("SELECT column FROM test.table WHERE column NOT LIKE 'w%rld'",
R"(SELECT "column" FROM "test"."table" WHERE "column" NOT LIKE 'w%rld')",
state().context, state().columns);
state.context, state.columns);
}
TEST(TransformQueryForExternalDatabase, Substring)
{
auto context_holder = getContext();
State state(context_holder.context);
check("SELECT column FROM test.table WHERE left(column, 10) = RIGHT(column, 10) AND SUBSTRING(column FROM 1 FOR 2) = 'Hello'",
R"(SELECT "column" FROM "test"."table")",
state().context, state().columns);
state.context, state.columns);
}
TEST(TransformQueryForExternalDatabase, MultipleAndSubqueries)
{
auto context_holder = getContext();
State state(context_holder.context);
check("SELECT column FROM test.table WHERE 1 = 1 AND toString(column) = '42' AND column = 42 AND left(column, 10) = RIGHT(column, 10) AND column IN (1, 42) AND SUBSTRING(column FROM 1 FOR 2) = 'Hello' AND column != 4",
R"(SELECT "column" FROM "test"."table" WHERE 1 AND ("column" = 42) AND ("column" IN (1, 42)) AND ("column" != 4))",
state().context, state().columns);
state.context, state.columns);
check("SELECT column FROM test.table WHERE toString(column) = '42' AND left(column, 10) = RIGHT(column, 10) AND column = 42",
R"(SELECT "column" FROM "test"."table" WHERE ("column" = 42))",
state().context, state().columns);
state.context, state.columns);
}
TEST(TransformQueryForExternalDatabase, Issue7245)
{
auto context_holder = getContext();
State state(context_holder.context);
check("select apply_id from test.table where apply_type = 2 and create_time > addDays(toDateTime('2019-01-01 01:02:03'),-7) and apply_status in (3,4)",
R"(SELECT "apply_id", "apply_type", "apply_status", "create_time" FROM "test"."table" WHERE ("apply_type" = 2) AND ("create_time" > '2018-12-25 01:02:03') AND ("apply_status" IN (3, 4)))",
state().context, state().columns);
state.context, state.columns);
}

View File

@ -26,7 +26,8 @@ try
names_and_types.emplace_back("a", std::make_shared<DataTypeUInt64>());
names_and_types.emplace_back("b", std::make_shared<DataTypeUInt8>());
auto context = Context::createGlobal();
SharedContextHolder shared_context = Context::createShared();
auto context = Context::createGlobal(shared_context.get());
context.makeGlobalContext();
context.setPath("./");

View File

@ -27,7 +27,8 @@ try
WriteBufferFromOStream out_buf(std::cout);
auto context = Context::createGlobal();
SharedContextHolder shared_context = Context::createShared();
auto context = Context::createGlobal(shared_context.get());
context.makeGlobalContext();
QueryProcessingStage::Enum stage = table->getQueryProcessingStage(context);