mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-24 16:42:05 +00:00
improve temporary tables
This commit is contained in:
parent
c7468d2502
commit
8b3a245a1d
@ -992,14 +992,15 @@ bool TCPHandler::receiveData(bool scalar)
|
||||
auto resolved = query_context->tryResolveStorageID(temporary_id, Context::ResolveExternal);
|
||||
StoragePtr storage;
|
||||
/// If such a table does not exist, create it.
|
||||
if (resolved.empty())
|
||||
if (resolved)
|
||||
storage = DatabaseCatalog::instance().getTable(resolved);
|
||||
else
|
||||
{
|
||||
NamesAndTypesList columns = block.getNamesAndTypesList();
|
||||
storage = StorageMemory::create(temporary_id, ColumnsDescription{columns}, ConstraintsDescription{});
|
||||
storage->startup();
|
||||
query_context->addExternalTable(temporary_id.table_name, storage);
|
||||
} else
|
||||
storage = DatabaseCatalog::instance().getTable(resolved);
|
||||
auto temporary_table = TemporaryTableHolder(*query_context, ColumnsDescription{columns});
|
||||
storage = temporary_table.getTable();
|
||||
query_context->addExternalTable(temporary_id.table_name, std::move(temporary_table));
|
||||
}
|
||||
/// The data will be written directly to the table.
|
||||
state.io.out = storage->write(ASTPtr(), *query_context);
|
||||
}
|
||||
|
@ -170,9 +170,9 @@ void ExternalTablesHandler::handlePart(const Poco::Net::MessageHeader & header,
|
||||
|
||||
/// Create table
|
||||
NamesAndTypesList columns = sample_block.getNamesAndTypesList();
|
||||
StoragePtr storage = StorageMemory::create(StorageID("_external", data->table_name), ColumnsDescription{columns}, ConstraintsDescription{});
|
||||
storage->startup();
|
||||
context.addExternalTable(data->table_name, storage);
|
||||
auto temporary_table = TemporaryTableHolder(context, ColumnsDescription{columns});
|
||||
auto storage = temporary_table.getTable();
|
||||
context.addExternalTable(data->table_name, std::move(temporary_table));
|
||||
BlockOutputStreamPtr output = storage->write(ASTPtr(), context);
|
||||
|
||||
/// Write data
|
||||
|
@ -47,7 +47,7 @@ try
|
||||
chain.finalize();
|
||||
ExpressionActionsPtr expression = chain.getLastActions();
|
||||
|
||||
StoragePtr table = StorageSystemNumbers::create("numbers", false);
|
||||
StoragePtr table = StorageSystemNumbers::create(StorageID("test", "numbers"), false);
|
||||
|
||||
Names column_names;
|
||||
column_names.push_back("number");
|
||||
|
@ -52,7 +52,7 @@ try
|
||||
chain.finalize();
|
||||
ExpressionActionsPtr expression = chain.getLastActions();
|
||||
|
||||
StoragePtr table = StorageSystemNumbers::create("numbers", false);
|
||||
StoragePtr table = StorageSystemNumbers::create(StorageID("test", "numbers"), false);
|
||||
|
||||
Names column_names;
|
||||
column_names.push_back("number");
|
||||
|
@ -53,6 +53,7 @@ std::pair<String, StoragePtr> createTableFromAST(
|
||||
{
|
||||
const auto & table_function = ast_create_query.as_table_function->as<ASTFunction &>();
|
||||
const auto & factory = TableFunctionFactory::instance();
|
||||
//FIXME storage will have wrong database name
|
||||
StoragePtr storage = factory.get(table_function.name, context)->execute(ast_create_query.as_table_function, context, ast_create_query.table);
|
||||
return {ast_create_query.table, storage};
|
||||
}
|
||||
|
@ -274,62 +274,6 @@ void NamedSession::release()
|
||||
parent.releaseSession(*this);
|
||||
}
|
||||
|
||||
struct TemporaryTableHolder : boost::noncopyable
|
||||
{
|
||||
TemporaryTableHolder(const Context & context_,
|
||||
IDatabase & external_tables_,
|
||||
const StoragePtr & table,
|
||||
const ASTPtr & query = {})
|
||||
: context(context_), external_tables(external_tables_)
|
||||
{
|
||||
ASTCreateQuery * create = dynamic_cast<ASTCreateQuery *>(query.get());
|
||||
if (create)
|
||||
{
|
||||
if (create->uuid == UUIDHelpers::Nil)
|
||||
create->uuid = UUIDHelpers::generateV4();
|
||||
id = create->uuid;
|
||||
}
|
||||
else
|
||||
id = UUIDHelpers::generateV4();
|
||||
String global_name = "_data_" + toString(id);
|
||||
external_tables.createTable(context, global_name, table, query ? query->clone() : query);
|
||||
if (create)
|
||||
{
|
||||
create->database = DatabaseCatalog::TEMPORARY_DATABASE;
|
||||
create->table = global_name;
|
||||
create->uuid = id;
|
||||
}
|
||||
}
|
||||
|
||||
TemporaryTableHolder(TemporaryTableHolder && other)
|
||||
: context(other.context), external_tables(other.external_tables), id(other.id)
|
||||
{
|
||||
other.id = UUIDHelpers::Nil;
|
||||
}
|
||||
|
||||
~TemporaryTableHolder()
|
||||
{
|
||||
external_tables.removeTable(context, "_data_" + toString(id));
|
||||
}
|
||||
|
||||
StorageID getGlobalTableID() const
|
||||
{
|
||||
return StorageID{DatabaseCatalog::TEMPORARY_DATABASE, "_data_" + toString(id), id};
|
||||
}
|
||||
|
||||
StoragePtr getTable() const
|
||||
{
|
||||
auto table = external_tables.tryGetTable(context, "_data_" + toString(id));
|
||||
if (!table)
|
||||
throw Exception("Temporary table " + getGlobalTableID().getNameForLogs() + " not found", ErrorCodes::LOGICAL_ERROR);
|
||||
return table;
|
||||
}
|
||||
|
||||
const Context & context;
|
||||
IDatabase & external_tables;
|
||||
UUID id;
|
||||
};
|
||||
|
||||
|
||||
/** Set of known objects (environment), that could be used in query.
|
||||
* Shared (global) part. Order of members (especially, order of destruction) is very important.
|
||||
@ -915,27 +859,12 @@ Tables Context::getExternalTables() const
|
||||
}
|
||||
|
||||
|
||||
void Context::addExternalTable(const String & table_name, const StoragePtr & storage, const ASTPtr & ast)
|
||||
void Context::addExternalTable(const String & table_name, TemporaryTableHolder && temporary_table)
|
||||
{
|
||||
auto external_db = DatabaseCatalog::instance().getDatabaseForTemporaryTables();
|
||||
auto holder = std::make_shared<TemporaryTableHolder>(*this, *external_db, storage, ast);
|
||||
auto lock = getLock();
|
||||
if (external_tables_mapping.end() != external_tables_mapping.find(table_name))
|
||||
throw Exception("Temporary table " + backQuoteIfNeed(table_name) + " already exists.", ErrorCodes::TABLE_ALREADY_EXISTS);
|
||||
|
||||
external_tables_mapping.emplace(table_name, std::move(holder));
|
||||
}
|
||||
|
||||
|
||||
void Context::addScalar(const String & name, const Block & block)
|
||||
{
|
||||
scalars[name] = block;
|
||||
}
|
||||
|
||||
|
||||
bool Context::hasScalar(const String & name) const
|
||||
{
|
||||
return scalars.count(name);
|
||||
external_tables_mapping.emplace(table_name, std::make_shared<TemporaryTableHolder>(std::move(temporary_table)));
|
||||
}
|
||||
|
||||
|
||||
@ -954,6 +883,18 @@ bool Context::removeExternalTable(const String & table_name)
|
||||
}
|
||||
|
||||
|
||||
void Context::addScalar(const String & name, const Block & block)
|
||||
{
|
||||
scalars[name] = block;
|
||||
}
|
||||
|
||||
|
||||
bool Context::hasScalar(const String & name) const
|
||||
{
|
||||
return scalars.count(name);
|
||||
}
|
||||
|
||||
|
||||
StoragePtr Context::executeTableFunction(const ASTPtr & table_expression)
|
||||
{
|
||||
/// Slightly suboptimal.
|
||||
|
@ -130,8 +130,6 @@ struct IHostContext
|
||||
|
||||
using IHostContextPtr = std::shared_ptr<IHostContext>;
|
||||
|
||||
struct TemporaryTableHolder;
|
||||
|
||||
/** A set of known objects that can be used in the query.
|
||||
* Consists of a shared part (always common to all sessions and queries)
|
||||
* and copied part (which can be its own for each session or query).
|
||||
@ -165,7 +163,6 @@ private:
|
||||
|
||||
String default_format; /// Format, used when server formats data by itself and if query does not have FORMAT specification.
|
||||
/// Thus, used in HTTP interface. If not specified - then some globally default format is used.
|
||||
using TemporaryTablesMapping = std::map<String, std::shared_ptr<TemporaryTableHolder>>;
|
||||
TemporaryTablesMapping external_tables_mapping;
|
||||
Scalars scalars;
|
||||
|
||||
@ -309,13 +306,14 @@ public:
|
||||
StorageID tryResolveStorageID(StorageID storage_id, StorageNamespace where = StorageNamespace::ResolveAll) const;
|
||||
StorageID resolveStorageIDImpl(StorageID storage_id, StorageNamespace where, std::optional<Exception> * exception) const;
|
||||
|
||||
Tables getExternalTables() const;
|
||||
void addExternalTable(const String & table_name, TemporaryTableHolder && temporary_table);
|
||||
bool removeExternalTable(const String & table_name);
|
||||
|
||||
const Scalars & getScalars() const;
|
||||
const Block & getScalar(const String & name) const;
|
||||
Tables getExternalTables() const;
|
||||
void addExternalTable(const String & table_name, const StoragePtr & storage, const ASTPtr & ast = {});
|
||||
void addScalar(const String & name, const Block & block);
|
||||
bool hasScalar(const String & name) const;
|
||||
bool removeExternalTable(const String & table_name);
|
||||
|
||||
StoragePtr executeTableFunction(const ASTPtr & table_expression);
|
||||
|
||||
|
@ -1,12 +1,12 @@
|
||||
#include <Interpreters/DatabaseCatalog.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Interpreters/loadMetadata.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <Storages/StorageID.h>
|
||||
#include <Storages/IStorage.h>
|
||||
#include <Databases/IDatabase.h>
|
||||
#include <Databases/DatabaseMemory.h>
|
||||
#include <Poco/File.h>
|
||||
#include <Common/quoteString.h>
|
||||
#include <Storages/StorageMemory.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -22,6 +22,81 @@ namespace ErrorCodes
|
||||
extern const int DATABASE_ACCESS_DENIED;
|
||||
}
|
||||
|
||||
TemporaryTableHolder::TemporaryTableHolder(const Context & context_,
|
||||
const TemporaryTableHolder::Creator & creator, const ASTPtr & query)
|
||||
: context(context_.getGlobalContext())
|
||||
, temporary_tables(*DatabaseCatalog::instance().getDatabaseForTemporaryTables())
|
||||
{
|
||||
ASTPtr original_create;
|
||||
ASTCreateQuery * create = dynamic_cast<ASTCreateQuery *>(query.get());
|
||||
String global_name;
|
||||
if (query)
|
||||
{
|
||||
original_create = create->clone();
|
||||
if (create->uuid == UUIDHelpers::Nil)
|
||||
create->uuid = UUIDHelpers::generateV4();
|
||||
id = create->uuid;
|
||||
create->table = "_tmp_" + toString(id);
|
||||
global_name = create->table;
|
||||
create->database = DatabaseCatalog::TEMPORARY_DATABASE;
|
||||
}
|
||||
else
|
||||
{
|
||||
id = UUIDHelpers::generateV4();
|
||||
global_name = "_tmp_" + toString(id);
|
||||
}
|
||||
auto table_id = StorageID(DatabaseCatalog::TEMPORARY_DATABASE, global_name, id);
|
||||
auto table = creator(table_id);
|
||||
temporary_tables.createTable(context, global_name, table, original_create);
|
||||
table->startup();
|
||||
}
|
||||
|
||||
|
||||
TemporaryTableHolder::TemporaryTableHolder(const Context & context_, const ColumnsDescription & columns, const ASTPtr & query)
|
||||
: TemporaryTableHolder
|
||||
(
|
||||
context_,
|
||||
[&](const StorageID & table_id)
|
||||
{
|
||||
return StorageMemory::create(table_id, ColumnsDescription{columns}, ConstraintsDescription{});
|
||||
},
|
||||
query
|
||||
)
|
||||
{
|
||||
}
|
||||
|
||||
TemporaryTableHolder::TemporaryTableHolder(TemporaryTableHolder && rhs)
|
||||
: context(rhs.context), temporary_tables(rhs.temporary_tables), id(rhs.id)
|
||||
{
|
||||
rhs.id = UUIDHelpers::Nil;
|
||||
}
|
||||
|
||||
TemporaryTableHolder & TemporaryTableHolder::operator = (TemporaryTableHolder && rhs)
|
||||
{
|
||||
id = rhs.id;
|
||||
rhs.id = UUIDHelpers::Nil;
|
||||
return *this;
|
||||
}
|
||||
|
||||
TemporaryTableHolder::~TemporaryTableHolder()
|
||||
{
|
||||
if (id != UUIDHelpers::Nil)
|
||||
temporary_tables.removeTable(context, "_tmp_" + toString(id));
|
||||
}
|
||||
|
||||
StorageID TemporaryTableHolder::getGlobalTableID() const
|
||||
{
|
||||
return StorageID{DatabaseCatalog::TEMPORARY_DATABASE, "_tmp_" + toString(id), id};
|
||||
}
|
||||
|
||||
StoragePtr TemporaryTableHolder::getTable() const
|
||||
{
|
||||
auto table = temporary_tables.tryGetTable(context, "_tmp_" + toString(id));
|
||||
if (!table)
|
||||
throw Exception("Temporary table " + getGlobalTableID().getNameForLogs() + " not found", ErrorCodes::LOGICAL_ERROR);
|
||||
return table;
|
||||
}
|
||||
|
||||
|
||||
void DatabaseCatalog::loadDatabases()
|
||||
{
|
||||
|
@ -1,6 +1,7 @@
|
||||
#pragma once
|
||||
#include <Storages/IStorage_fwd.h>
|
||||
#include <Storages/StorageID.h>
|
||||
#include <Parsers/IAST_fwd.h>
|
||||
#include <Core/UUID.h>
|
||||
#include <Poco/Logger.h>
|
||||
#include <boost/noncopyable.hpp>
|
||||
@ -54,6 +55,32 @@ private:
|
||||
std::unique_lock<std::mutex> table_lock;
|
||||
};
|
||||
|
||||
class ColumnsDescription;
|
||||
|
||||
struct TemporaryTableHolder : boost::noncopyable
|
||||
{
|
||||
typedef std::function<StoragePtr(const StorageID &)> Creator;
|
||||
|
||||
TemporaryTableHolder(const Context & context, const Creator & creator, const ASTPtr & query = {});
|
||||
|
||||
/// Creates temporary table with Engine=Memory
|
||||
TemporaryTableHolder(const Context & context, const ColumnsDescription & columns, const ASTPtr & query = {});
|
||||
|
||||
TemporaryTableHolder(TemporaryTableHolder && rhs);
|
||||
TemporaryTableHolder & operator = (TemporaryTableHolder && rhs);
|
||||
|
||||
~TemporaryTableHolder();
|
||||
|
||||
StorageID getGlobalTableID() const;
|
||||
|
||||
StoragePtr getTable() const;
|
||||
|
||||
const Context & context;
|
||||
IDatabase & temporary_tables;
|
||||
UUID id;
|
||||
};
|
||||
|
||||
using TemporaryTablesMapping = std::map<String, std::shared_ptr<TemporaryTableHolder>>;
|
||||
|
||||
class DatabaseCatalog : boost::noncopyable
|
||||
{
|
||||
|
@ -59,7 +59,6 @@
|
||||
|
||||
#include <Interpreters/ActionsVisitor.h>
|
||||
|
||||
#include <Interpreters/ExternalTablesVisitor.h>
|
||||
#include <Interpreters/GlobalSubqueriesVisitor.h>
|
||||
#include <Interpreters/GetAggregatesVisitor.h>
|
||||
|
||||
@ -261,10 +260,6 @@ void ExpressionAnalyzer::analyzeAggregation()
|
||||
|
||||
void ExpressionAnalyzer::initGlobalSubqueriesAndExternalTables(bool do_global)
|
||||
{
|
||||
/// Adds existing external tables (not subqueries) to the external_tables dictionary.
|
||||
ExternalTablesVisitor::Data tables_data{context, external_tables};
|
||||
ExternalTablesVisitor(tables_data).visit(query);
|
||||
|
||||
if (do_global)
|
||||
{
|
||||
GlobalSubqueriesVisitor::Data subqueries_data(context, subquery_depth, isRemoteStorage(),
|
||||
|
@ -9,6 +9,7 @@
|
||||
#include <Parsers/IAST_fwd.h>
|
||||
#include <Storages/IStorage_fwd.h>
|
||||
#include <Storages/SelectQueryInfo.h>
|
||||
#include <Interpreters/DatabaseCatalog.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -51,7 +52,7 @@ struct ExpressionAnalyzerData
|
||||
bool has_global_subqueries = false;
|
||||
|
||||
/// All new temporary tables obtained by performing the GLOBAL IN/JOIN subqueries.
|
||||
Tables external_tables;
|
||||
TemporaryTablesMapping external_tables;
|
||||
};
|
||||
|
||||
|
||||
@ -247,7 +248,7 @@ public:
|
||||
const PreparedSets & getPreparedSets() const { return prepared_sets; }
|
||||
|
||||
/// Tables that will need to be sent to remote servers for distributed query processing.
|
||||
const Tables & getExternalTables() const { return external_tables; }
|
||||
const TemporaryTablesMapping & getExternalTables() const { return external_tables; }
|
||||
|
||||
ExpressionActionsPtr simpleSelectActions();
|
||||
|
||||
|
@ -1,43 +0,0 @@
|
||||
#pragma once
|
||||
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Parsers/IAST.h>
|
||||
#include <Parsers/ASTIdentifier.h>
|
||||
#include <Common/typeid_cast.h>
|
||||
#include <Interpreters/InDepthNodeVisitor.h>
|
||||
#include <Interpreters/IdentifierSemantic.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
/// If node is ASTIdentifier try to extract external_storage.
|
||||
class ExternalTablesMatcher
|
||||
{
|
||||
public:
|
||||
struct Data
|
||||
{
|
||||
const Context & context;
|
||||
Tables & external_tables;
|
||||
};
|
||||
|
||||
static void visit(ASTPtr & ast, Data & data)
|
||||
{
|
||||
if (const auto * t = ast->as<ASTIdentifier>())
|
||||
visit(*t, ast, data);
|
||||
}
|
||||
|
||||
static bool needChildVisit(ASTPtr &, const ASTPtr &) { return true; }
|
||||
|
||||
private:
|
||||
static void visit(const ASTIdentifier & node, ASTPtr &, Data & data)
|
||||
{
|
||||
if (auto opt_name = IdentifierSemantic::getTableName(node))
|
||||
if (auto resolved_id = data.context.tryResolveStorageID(StorageID("", *opt_name), Context::ResolveExternal))
|
||||
data.external_tables[*opt_name] = DatabaseCatalog::instance().getTable(resolved_id);
|
||||
}
|
||||
};
|
||||
|
||||
/// Finds in the query the usage of external tables. Fills in external_tables.
|
||||
using ExternalTablesVisitor = InDepthNodeVisitor<ExternalTablesMatcher, false>;
|
||||
|
||||
}
|
@ -34,12 +34,12 @@ public:
|
||||
size_t subquery_depth;
|
||||
bool is_remote;
|
||||
size_t external_table_id;
|
||||
Tables & external_tables;
|
||||
TemporaryTablesMapping & external_tables;
|
||||
SubqueriesForSets & subqueries_for_sets;
|
||||
bool & has_global_subqueries;
|
||||
|
||||
Data(const Context & context_, size_t subquery_depth_, bool is_remote_,
|
||||
Tables & tables, SubqueriesForSets & subqueries_for_sets_, bool & has_global_subqueries_)
|
||||
TemporaryTablesMapping & tables, SubqueriesForSets & subqueries_for_sets_, bool & has_global_subqueries_)
|
||||
: context(context_),
|
||||
subquery_depth(subquery_depth_),
|
||||
is_remote(is_remote_),
|
||||
@ -99,8 +99,8 @@ public:
|
||||
Block sample = interpreter->getSampleBlock();
|
||||
NamesAndTypesList columns = sample.getNamesAndTypesList();
|
||||
|
||||
StoragePtr external_storage = StorageMemory::create(StorageID("_external", external_table_name), ColumnsDescription{columns}, ConstraintsDescription{});
|
||||
external_storage->startup();
|
||||
auto external_storage_holder = std::make_shared<TemporaryTableHolder>(context, ColumnsDescription{columns});
|
||||
StoragePtr external_storage = external_storage_holder->getTable();
|
||||
|
||||
/** We replace the subquery with the name of the temporary table.
|
||||
* It is in this form, the request will go to the remote server.
|
||||
@ -129,7 +129,7 @@ public:
|
||||
else
|
||||
ast = database_and_table_name;
|
||||
|
||||
external_tables[external_table_name] = external_storage;
|
||||
external_tables[external_table_name] = external_storage_holder;
|
||||
subqueries_for_sets[external_table_name].source = interpreter->execute().in;
|
||||
subqueries_for_sets[external_table_name].table = external_storage;
|
||||
|
||||
|
@ -606,15 +606,23 @@ bool InterpreterCreateQuery::doCreateTable(const ASTCreateQuery & create,
|
||||
throw Exception("Table " + create.database + "." + table_name + " already exists.", ErrorCodes::TABLE_ALREADY_EXISTS);
|
||||
}
|
||||
}
|
||||
else if (context.isExternalTableExist(table_name) && create.if_not_exists)
|
||||
else
|
||||
{
|
||||
if (context.isExternalTableExist(table_name) && create.if_not_exists)
|
||||
return false;
|
||||
|
||||
auto temporary_table = TemporaryTableHolder(context, properties.columns, query_ptr);
|
||||
context.getSessionContext().addExternalTable(table_name, std::move(temporary_table));
|
||||
return true;
|
||||
}
|
||||
|
||||
StoragePtr res;
|
||||
/// NOTE: CREATE query may be rewritten by Storage creator or table function
|
||||
if (create.as_table_function)
|
||||
{
|
||||
const auto & table_function = create.as_table_function->as<ASTFunction &>();
|
||||
const auto & factory = TableFunctionFactory::instance();
|
||||
//FIXME storage will have wrong database name
|
||||
res = factory.get(table_function.name, context)->execute(create.as_table_function, context, create.table);
|
||||
}
|
||||
else
|
||||
@ -628,10 +636,7 @@ bool InterpreterCreateQuery::doCreateTable(const ASTCreateQuery & create,
|
||||
false);
|
||||
}
|
||||
|
||||
if (need_add_to_database)
|
||||
database->createTable(context, table_name, res, query_ptr);
|
||||
else
|
||||
context.getSessionContext().addExternalTable(table_name, res, query_ptr);
|
||||
|
||||
/// We must call "startup" and "shutdown" while holding DDLGuard.
|
||||
/// Because otherwise method "shutdown" (from InterpreterDropQuery) can be called before startup
|
||||
|
@ -322,7 +322,7 @@ InterpreterSelectQuery::InterpreterSelectQuery(
|
||||
/// Save the new temporary tables in the query context
|
||||
for (const auto & it : query_analyzer->getExternalTables())
|
||||
if (!context->isExternalTableExist(it.first))
|
||||
context->addExternalTable(it.first, it.second);
|
||||
context->addExternalTable(it.first, std::move(*it.second));
|
||||
}
|
||||
|
||||
if (!options.only_analyze || options.modify_inplace)
|
||||
|
@ -102,7 +102,7 @@ int main()
|
||||
DatabaseCatalog::instance().attachDatabase("system", system_database);
|
||||
//context.setCurrentDatabase("system");
|
||||
system_database->attachTable("one", StorageSystemOne::create("one"));
|
||||
system_database->attachTable("numbers", StorageSystemNumbers::create("numbers", false));
|
||||
system_database->attachTable("numbers", StorageSystemNumbers::create(StorageID("system", "numbers"), false));
|
||||
|
||||
size_t success = 0;
|
||||
for (auto & entry : queries)
|
||||
|
@ -140,11 +140,12 @@ BlockInputStreamPtr StorageLiveView::completeQuery(Pipes pipes)
|
||||
auto block_context = std::make_unique<Context>(global_context);
|
||||
block_context->makeQueryContext();
|
||||
|
||||
auto blocks_storage_id = getBlocksStorageID();
|
||||
auto blocks_storage = StorageBlocks::createStorage(blocks_storage_id, getParentStorage()->getColumns(),
|
||||
auto creator = [&](const StorageID & blocks_id_global)
|
||||
{
|
||||
return StorageBlocks::createStorage(blocks_id_global, getParentStorage()->getColumns(),
|
||||
std::move(pipes), QueryProcessingStage::WithMergeableState);
|
||||
|
||||
block_context->addExternalTable(blocks_storage_id.table_name, blocks_storage);
|
||||
};
|
||||
block_context->addExternalTable(getBlocksTableName(), TemporaryTableHolder(global_context, creator));
|
||||
|
||||
InterpreterSelectQuery select(getInnerBlocksQuery(), *block_context, StoragePtr(), SelectQueryOptions(QueryProcessingStage::Complete));
|
||||
BlockInputStreamPtr data = std::make_shared<MaterializingBlockInputStream>(select.execute().in);
|
||||
@ -196,8 +197,6 @@ void StorageLiveView::writeIntoLiveView(
|
||||
}
|
||||
}
|
||||
|
||||
auto blocks_storage_id = live_view.getBlocksStorageID();
|
||||
|
||||
if (!is_block_processed)
|
||||
{
|
||||
ASTPtr mergeable_query = live_view.getInnerQuery();
|
||||
@ -208,10 +207,14 @@ void StorageLiveView::writeIntoLiveView(
|
||||
Pipes pipes;
|
||||
pipes.emplace_back(std::make_shared<SourceFromSingleChunk>(block.cloneEmpty(), Chunk(block.getColumns(), block.rows())));
|
||||
|
||||
auto blocks_storage = StorageBlocks::createStorage(blocks_storage_id,
|
||||
live_view.getParentStorage()->getColumns(), std::move(pipes), QueryProcessingStage::FetchColumns);
|
||||
auto creator = [&](const StorageID & blocks_id_global)
|
||||
{
|
||||
return StorageBlocks::createStorage(blocks_id_global, live_view.getParentStorage()->getColumns(),
|
||||
std::move(pipes), QueryProcessingStage::FetchColumns);
|
||||
};
|
||||
TemporaryTableHolder blocks_storage(context, creator);
|
||||
|
||||
InterpreterSelectQuery select_block(mergeable_query, context, blocks_storage,
|
||||
InterpreterSelectQuery select_block(mergeable_query, context, blocks_storage.getTable(),
|
||||
QueryProcessingStage::WithMergeableState);
|
||||
|
||||
auto data_mergeable_stream = std::make_shared<MaterializingBlockInputStream>(
|
||||
|
@ -49,9 +49,9 @@ public:
|
||||
~StorageLiveView() override;
|
||||
String getName() const override { return "LiveView"; }
|
||||
bool isView() const override { return true; }
|
||||
StorageID getBlocksStorageID() const
|
||||
String getBlocksTableName() const
|
||||
{
|
||||
return StorageID("", getStorageID().table_name + "_blocks");
|
||||
return getStorageID().table_name + "_blocks";
|
||||
}
|
||||
StoragePtr getParentStorage() const { return DatabaseCatalog::instance().getTable(select_table_id); }
|
||||
|
||||
|
@ -18,8 +18,8 @@ namespace ErrorCodes
|
||||
extern const int INVALID_USAGE_OF_INPUT;
|
||||
}
|
||||
|
||||
StorageInput::StorageInput(const String & table_name_, const ColumnsDescription & columns_)
|
||||
: IStorage({"", table_name_})
|
||||
StorageInput::StorageInput(const StorageID & table_id, const ColumnsDescription & columns_)
|
||||
: IStorage(table_id)
|
||||
{
|
||||
setColumns(columns_);
|
||||
}
|
||||
|
@ -29,6 +29,6 @@ private:
|
||||
BlockInputStreamPtr input_stream;
|
||||
|
||||
protected:
|
||||
StorageInput(const String & table_name_, const ColumnsDescription & columns_);
|
||||
StorageInput(const StorageID & table_id, const ColumnsDescription & columns_);
|
||||
};
|
||||
}
|
||||
|
@ -115,8 +115,8 @@ private:
|
||||
}
|
||||
|
||||
|
||||
StorageSystemNumbers::StorageSystemNumbers(const std::string & name_, bool multithreaded_, std::optional<UInt64> limit_, UInt64 offset_, bool even_distribution_)
|
||||
: IStorage({"system", name_}), multithreaded(multithreaded_), even_distribution(even_distribution_), limit(limit_), offset(offset_)
|
||||
StorageSystemNumbers::StorageSystemNumbers(const StorageID & table_id, bool multithreaded_, std::optional<UInt64> limit_, UInt64 offset_, bool even_distribution_)
|
||||
: IStorage(table_id), multithreaded(multithreaded_), even_distribution(even_distribution_), limit(limit_), offset(offset_)
|
||||
{
|
||||
setColumns(ColumnsDescription({{"number", std::make_shared<DataTypeUInt64>()}}));
|
||||
}
|
||||
|
@ -48,7 +48,7 @@ private:
|
||||
protected:
|
||||
/// If even_distribution is true, numbers are distributed evenly between streams.
|
||||
/// Otherwise, streams concurrently increment atomic.
|
||||
StorageSystemNumbers(const std::string & name_, bool multithreaded_, std::optional<UInt64> limit_ = std::nullopt, UInt64 offset_ = 0, bool even_distribution_ = true);
|
||||
StorageSystemNumbers(const StorageID & table_id, bool multithreaded_, std::optional<UInt64> limit_ = std::nullopt, UInt64 offset_ = 0, bool even_distribution_ = true);
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -51,8 +51,8 @@ namespace DB
|
||||
void attachSystemTablesLocal(IDatabase & system_database)
|
||||
{
|
||||
system_database.attachTable("one", StorageSystemOne::create("one"));
|
||||
system_database.attachTable("numbers", StorageSystemNumbers::create("numbers", false));
|
||||
system_database.attachTable("numbers_mt", StorageSystemNumbers::create("numbers_mt", true));
|
||||
system_database.attachTable("numbers", StorageSystemNumbers::create(StorageID("system", "numbers"), false));
|
||||
system_database.attachTable("numbers_mt", StorageSystemNumbers::create(StorageID("system", "numbers_mt"), true));
|
||||
system_database.attachTable("databases", StorageSystemDatabases::create("databases"));
|
||||
system_database.attachTable("tables", StorageSystemTables::create("tables"));
|
||||
system_database.attachTable("columns", StorageSystemColumns::create("columns"));
|
||||
|
@ -15,7 +15,7 @@ try
|
||||
{
|
||||
using namespace DB;
|
||||
|
||||
StoragePtr table = StorageSystemNumbers::create("numbers", false);
|
||||
StoragePtr table = StorageSystemNumbers::create(StorageID("test", "numbers"), false);
|
||||
|
||||
Names column_names;
|
||||
column_names.push_back("number");
|
||||
|
@ -41,7 +41,7 @@ StoragePtr TableFunctionInput::executeImpl(const ASTPtr & ast_function, const Co
|
||||
|
||||
String structure = evaluateConstantExpressionOrIdentifierAsLiteral(args[0], context)->as<ASTLiteral &>().value.safeGet<String>();
|
||||
auto columns = parseColumnsListFromString(structure, context);
|
||||
StoragePtr storage = StorageInput::create(table_name, columns);
|
||||
StoragePtr storage = StorageInput::create(StorageID(getDatabaseName(), table_name), columns);
|
||||
|
||||
storage->startup();
|
||||
|
||||
|
@ -35,7 +35,7 @@ StoragePtr TableFunctionNumbers<multithreaded>::executeImpl(const ASTPtr & ast_f
|
||||
|
||||
context.checkAccess(AccessType::numbers);
|
||||
|
||||
auto res = StorageSystemNumbers::create(table_name, multithreaded, length, offset, false);
|
||||
auto res = StorageSystemNumbers::create(StorageID(getDatabaseName(), table_name), multithreaded, length, offset, false);
|
||||
res->startup();
|
||||
return res;
|
||||
}
|
||||
|
@ -188,17 +188,20 @@ StoragePtr TableFunctionRemote::executeImpl(const ASTPtr & ast_function, const C
|
||||
secure);
|
||||
}
|
||||
|
||||
auto structure_remote_table = getStructureOfRemoteTable(*cluster, {remote_database, remote_table}, context, remote_table_function_ptr);
|
||||
auto remote_table_id = StorageID::createEmpty();
|
||||
remote_table_id.database_name = remote_database;
|
||||
remote_table_id.table_name = remote_table;
|
||||
auto structure_remote_table = getStructureOfRemoteTable(*cluster, remote_table_id, context, remote_table_function_ptr);
|
||||
|
||||
StoragePtr res = remote_table_function_ptr
|
||||
? StorageDistributed::createWithOwnCluster(
|
||||
StorageID("", table_name),
|
||||
StorageID(getDatabaseName(), table_name),
|
||||
structure_remote_table,
|
||||
remote_table_function_ptr,
|
||||
cluster,
|
||||
context)
|
||||
: StorageDistributed::createWithOwnCluster(
|
||||
StorageID("", table_name),
|
||||
StorageID(getDatabaseName(), table_name),
|
||||
structure_remote_table,
|
||||
remote_database,
|
||||
remote_table,
|
||||
|
@ -23,7 +23,7 @@ with client(name='client1>', log=log) as client1, client(name='client2>', log=lo
|
||||
|
||||
client1.send('DROP TABLE IF EXISTS test.lv')
|
||||
client1.expect(prompt)
|
||||
client1.send(' DROP TABLE IF EXISTS test.mt')
|
||||
client1.send('DROP TABLE IF EXISTS test.mt')
|
||||
client1.expect(prompt)
|
||||
client1.send('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()')
|
||||
client1.expect(prompt)
|
||||
|
Loading…
Reference in New Issue
Block a user