mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-29 02:52:13 +00:00
add feature
This commit is contained in:
parent
d0e1f02208
commit
237de4e775
36
docs/en/operations/system-tables/detached_tables.md
Normal file
36
docs/en/operations/system-tables/detached_tables.md
Normal file
@ -0,0 +1,36 @@
|
||||
---
|
||||
slug: /en/operations/system-tables/detached_tables
|
||||
---
|
||||
# detached_tables
|
||||
|
||||
Contains information of each detached table.
|
||||
|
||||
Columns:
|
||||
|
||||
- `database` ([String](../../sql-reference/data-types/string.md)) — The name of the database the table is in.
|
||||
|
||||
- `name` ([String](../../sql-reference/data-types/string.md)) — Table name.
|
||||
|
||||
- `uuid` ([UUID](../../sql-reference/data-types/uuid.md)) — Table uuid (Atomic database).
|
||||
|
||||
- `is_permanently` ([UInt8](../../sql-reference/data-types/int-uint.md)) - Flag indicates that the table was detached PERMANENTLY.
|
||||
|
||||
- `metadata_path` ([String](../../sql-reference/data-types/string.md)) - Path to the table metadata in the file system.
|
||||
|
||||
|
||||
**Example**
|
||||
|
||||
```sql
|
||||
SELECT * FROM system.detached_tables FORMAT Vertical;
|
||||
```
|
||||
|
||||
```text
|
||||
Row 1:
|
||||
──────
|
||||
database: base
|
||||
name: t1
|
||||
uuid: 81b1c20a-b7c6-4116-a2ce-7583fb6b6736
|
||||
data_paths: ['/var/lib/clickhouse/store/81b/81b1c20a-b7c6-4116-a2ce-7583fb6b6736/']
|
||||
metadata_path: /var/lib/clickhouse/store/461/461cf698-fd0b-406d-8c01-5d8fd5748a91/t1.sql
|
||||
is_permanently: 1
|
||||
```
|
@ -37,8 +37,10 @@ namespace ErrorCodes
|
||||
class AtomicDatabaseTablesSnapshotIterator final : public DatabaseTablesSnapshotIterator
|
||||
{
|
||||
public:
|
||||
explicit AtomicDatabaseTablesSnapshotIterator(DatabaseTablesSnapshotIterator && base)
|
||||
: DatabaseTablesSnapshotIterator(std::move(base)) {}
|
||||
explicit AtomicDatabaseTablesSnapshotIterator(DatabaseTablesSnapshotIterator && base) noexcept
|
||||
: DatabaseTablesSnapshotIterator(std::move(base))
|
||||
{
|
||||
}
|
||||
UUID uuid() const override { return table()->getStorageID().uuid; }
|
||||
};
|
||||
|
||||
@ -101,6 +103,8 @@ void DatabaseAtomic::attachTable(ContextPtr /* context_ */, const String & name,
|
||||
auto table_id = table->getStorageID();
|
||||
assertDetachedTableNotInUse(table_id.uuid);
|
||||
DatabaseOrdinary::attachTableUnlocked(name, table);
|
||||
detached_tables.erase(table_id.uuid);
|
||||
|
||||
table_name_to_path.emplace(std::make_pair(name, relative_table_path));
|
||||
}
|
||||
|
||||
@ -108,11 +112,11 @@ StoragePtr DatabaseAtomic::detachTable(ContextPtr /* context */, const String &
|
||||
{
|
||||
DetachedTables not_in_use;
|
||||
std::lock_guard lock(mutex);
|
||||
auto table = DatabaseOrdinary::detachTableUnlocked(name);
|
||||
auto detached_table = DatabaseOrdinary::detachTableUnlocked(name);
|
||||
table_name_to_path.erase(name);
|
||||
detached_tables.emplace(table->getStorageID().uuid, table);
|
||||
detached_tables.emplace(detached_table->getStorageID().uuid, detached_table);
|
||||
not_in_use = cleanupDetachedTables();
|
||||
return table;
|
||||
return detached_table;
|
||||
}
|
||||
|
||||
void DatabaseAtomic::dropTable(ContextPtr local_context, const String & table_name, bool sync)
|
||||
@ -433,6 +437,12 @@ DatabaseAtomic::getTablesIterator(ContextPtr local_context, const IDatabase::Fil
|
||||
return std::make_unique<AtomicDatabaseTablesSnapshotIterator>(std::move(typeid_cast<DatabaseTablesSnapshotIterator &>(*base_iter)));
|
||||
}
|
||||
|
||||
DatabaseDetachedTablesSnapshotIteratorPtr DatabaseAtomic::getDetachedTablesIterator(
|
||||
ContextPtr local_context, const IDatabase::FilterByNameFunction & filter_by_table_name, const bool skip_not_loaded) const
|
||||
{
|
||||
return DatabaseOrdinary::getDetachedTablesIterator(local_context, filter_by_table_name, skip_not_loaded);
|
||||
}
|
||||
|
||||
UUID DatabaseAtomic::tryGetTableUUID(const String & table_name) const
|
||||
{
|
||||
if (auto table = tryGetTable(table_name, getContext()))
|
||||
|
@ -1,7 +1,8 @@
|
||||
#pragma once
|
||||
|
||||
#include <Databases/DatabasesCommon.h>
|
||||
#include <Databases/DatabaseOrdinary.h>
|
||||
#include <Databases/DatabasesCommon.h>
|
||||
#include <Storages/IStorage_fwd.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -48,6 +49,9 @@ public:
|
||||
|
||||
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_name, bool skip_not_loaded) const override;
|
||||
|
||||
DatabaseDetachedTablesSnapshotIteratorPtr
|
||||
getDetachedTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_name, bool skip_not_loaded) const override;
|
||||
|
||||
void beforeLoadingMetadata(ContextMutablePtr context, LoadingStrictnessLevel mode) override;
|
||||
|
||||
LoadTaskPtr startupDatabaseAsync(AsyncLoader & async_loader, LoadJobSet startup_after, LoadingStrictnessLevel mode) override;
|
||||
@ -81,6 +85,7 @@ protected:
|
||||
|
||||
//TODO store path in DatabaseWithOwnTables::tables
|
||||
using NameToPathMap = std::unordered_map<String, String>;
|
||||
|
||||
NameToPathMap table_name_to_path TSA_GUARDED_BY(mutex);
|
||||
|
||||
DetachedTables detached_tables TSA_GUARDED_BY(mutex);
|
||||
|
@ -307,6 +307,9 @@ void DatabaseOnDisk::detachTablePermanently(ContextPtr query_context, const Stri
|
||||
try
|
||||
{
|
||||
FS::createFile(detached_permanently_flag);
|
||||
|
||||
std::lock_guard lock(mutex);
|
||||
snapshot_detached_tables.at(table_name).is_permanently = true;
|
||||
}
|
||||
catch (Exception & e)
|
||||
{
|
||||
|
@ -187,7 +187,7 @@ void DatabaseOrdinary::loadTablesMetadata(ContextPtr local_context, ParsedTables
|
||||
size_t prev_tables_count = metadata.parsed_tables.size();
|
||||
size_t prev_total_dictionaries = metadata.total_dictionaries;
|
||||
|
||||
auto process_metadata = [&metadata, is_startup, this](const String & file_name)
|
||||
auto process_metadata = [&metadata, is_startup, local_context, this](const String & file_name) mutable
|
||||
{
|
||||
fs::path path(getMetadataPath());
|
||||
fs::path file_path(file_name);
|
||||
@ -195,7 +195,7 @@ void DatabaseOrdinary::loadTablesMetadata(ContextPtr local_context, ParsedTables
|
||||
|
||||
try
|
||||
{
|
||||
auto ast = parseQueryFromMetadata(log, getContext(), full_path.string(), /*throw_on_error*/ true, /*remove_empty*/ false);
|
||||
auto ast = parseQueryFromMetadata(log, local_context, full_path.string(), /*throw_on_error*/ true, /*remove_empty*/ false);
|
||||
if (ast)
|
||||
{
|
||||
FunctionNameNormalizer::visit(ast.get());
|
||||
@ -226,6 +226,33 @@ void DatabaseOrdinary::loadTablesMetadata(ContextPtr local_context, ParsedTables
|
||||
const std::string table_name = unescapeForFileName(file_name.substr(0, file_name.size() - 4));
|
||||
permanently_detached_tables.push_back(table_name);
|
||||
LOG_DEBUG(log, "Skipping permanently detached table {}.", backQuote(table_name));
|
||||
|
||||
// @TODO refactoring
|
||||
auto parsed_table_metadata = ParsedTableMetadata{full_path.string(), ast};
|
||||
const auto & query = parsed_table_metadata.ast->as<const ASTCreateQuery &>();
|
||||
|
||||
std::lock_guard lock(mutex);
|
||||
|
||||
auto [detached_table_name, table] = createTableFromAST(
|
||||
query,
|
||||
database_name,
|
||||
getTableDataPath(query),
|
||||
std::const_pointer_cast<Context>(local_context),
|
||||
LoadingStrictnessLevel::CREATE);
|
||||
|
||||
const auto storage_id = table->getStorageID();
|
||||
|
||||
SnapshotDetachedTable snapshot_detached_table;
|
||||
snapshot_detached_table.detabase = storage_id.getDatabaseName();
|
||||
snapshot_detached_table.table = detached_table_name;
|
||||
snapshot_detached_table.uuid = storage_id.uuid;
|
||||
snapshot_detached_table.is_permanently = true;
|
||||
snapshot_detached_table.metadata_path = getObjectMetadataPath(snapshot_detached_table.table);
|
||||
|
||||
|
||||
snapshot_detached_tables.emplace(detached_table_name, std::move(snapshot_detached_table));
|
||||
|
||||
LOG_TRACE(log, "Add detached table {} to system.detached_tables", detached_table_name);
|
||||
return;
|
||||
}
|
||||
|
||||
@ -487,6 +514,12 @@ DatabaseTablesIteratorPtr DatabaseOrdinary::getTablesIterator(ContextPtr local_c
|
||||
return DatabaseWithOwnTablesBase::getTablesIterator(local_context, filter_by_table_name, skip_not_loaded);
|
||||
}
|
||||
|
||||
DatabaseDetachedTablesSnapshotIteratorPtr DatabaseOrdinary::getDetachedTablesIterator(
|
||||
ContextPtr local_context, const DatabaseOnDisk::FilterByNameFunction & filter_by_table_name, bool skip_not_loaded) const
|
||||
{
|
||||
return DatabaseWithOwnTablesBase::getDetachedTablesIterator(local_context, filter_by_table_name, skip_not_loaded);
|
||||
}
|
||||
|
||||
Strings DatabaseOrdinary::getAllTableNames(ContextPtr) const
|
||||
{
|
||||
std::set<String> unique_names;
|
||||
|
@ -57,6 +57,9 @@ public:
|
||||
LoadTaskPtr startupDatabaseAsync(AsyncLoader & async_loader, LoadJobSet startup_after, LoadingStrictnessLevel mode) override;
|
||||
|
||||
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr local_context, const DatabaseOnDisk::FilterByNameFunction & filter_by_table_name, bool skip_not_loaded) const override;
|
||||
DatabaseDetachedTablesSnapshotIteratorPtr getDetachedTablesIterator(
|
||||
ContextPtr local_context, const DatabaseOnDisk::FilterByNameFunction & filter_by_table_name, bool skip_not_loaded) const override;
|
||||
|
||||
Strings getAllTableNames(ContextPtr context) const override;
|
||||
|
||||
void alterTable(
|
||||
|
@ -2,12 +2,9 @@
|
||||
|
||||
#include <Backups/BackupEntriesCollector.h>
|
||||
#include <Backups/RestorerFromBackup.h>
|
||||
#include <Common/typeid_cast.h>
|
||||
#include <Common/CurrentMetrics.h>
|
||||
#include <Common/escapeForFileName.h>
|
||||
#include <Interpreters/InterpreterCreateQuery.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Interpreters/DatabaseCatalog.h>
|
||||
#include <Interpreters/InterpreterCreateQuery.h>
|
||||
#include <Parsers/ASTCreateQuery.h>
|
||||
#include <Parsers/ASTSelectWithUnionQuery.h>
|
||||
#include <Parsers/ParserCreateQuery.h>
|
||||
@ -16,6 +13,10 @@
|
||||
#include <Storages/StorageFactory.h>
|
||||
#include <Storages/Utils.h>
|
||||
#include <TableFunctions/TableFunctionFactory.h>
|
||||
#include <Common/CurrentMetrics.h>
|
||||
#include <Common/escapeForFileName.h>
|
||||
#include <Common/logger_useful.h>
|
||||
#include <Common/typeid_cast.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -237,6 +238,35 @@ DatabaseTablesIteratorPtr DatabaseWithOwnTablesBase::getTablesIterator(ContextPt
|
||||
return std::make_unique<DatabaseTablesSnapshotIterator>(std::move(filtered_tables), database_name);
|
||||
}
|
||||
|
||||
DatabaseDetachedTablesSnapshotIteratorPtr DatabaseWithOwnTablesBase::getDetachedTablesIterator(
|
||||
ContextPtr, const FilterByNameFunction & filter_by_table_name, bool /* skip_not_loaded */) const
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
if (!filter_by_table_name)
|
||||
return std::make_unique<DatabaseDetachedTablesSnapshotIterator>(snapshot_detached_tables);
|
||||
|
||||
SnapshotDetachedTables filtered_tables;
|
||||
for (const auto & [table_name, storage] : tables)
|
||||
if (filter_by_table_name(table_name))
|
||||
{
|
||||
SnapshotDetachedTable snapshot_detached_table;
|
||||
snapshot_detached_table.detabase = storage->getStorageID().getDatabaseName();
|
||||
snapshot_detached_table.table = table_name;
|
||||
if (storage->getStorageID().hasUUID())
|
||||
{
|
||||
snapshot_detached_table.uuid = storage->getStorageID().uuid;
|
||||
}
|
||||
|
||||
snapshot_detached_table.is_permanently = false;
|
||||
snapshot_detached_table.metadata_path = getObjectMetadataPath(snapshot_detached_table.table);
|
||||
|
||||
filtered_tables.emplace(table_name, std::move(snapshot_detached_table));
|
||||
}
|
||||
|
||||
|
||||
return std::make_unique<DatabaseDetachedTablesSnapshotIterator>(std::move(filtered_tables));
|
||||
}
|
||||
|
||||
bool DatabaseWithOwnTablesBase::empty() const
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
@ -258,6 +288,19 @@ StoragePtr DatabaseWithOwnTablesBase::detachTableUnlocked(const String & table_n
|
||||
throw Exception(ErrorCodes::UNKNOWN_TABLE, "Table {}.{} doesn't exist",
|
||||
backQuote(database_name), backQuote(table_name));
|
||||
res = it->second;
|
||||
|
||||
SnapshotDetachedTable snapshot_detached_table;
|
||||
snapshot_detached_table.detabase = it->second->getStorageID().getDatabaseName();
|
||||
snapshot_detached_table.table = it->first;
|
||||
if (it->second->getStorageID().hasUUID())
|
||||
{
|
||||
snapshot_detached_table.uuid = it->second->getStorageID().uuid;
|
||||
}
|
||||
snapshot_detached_table.is_permanently = false;
|
||||
snapshot_detached_table.metadata_path = getObjectMetadataPath(snapshot_detached_table.table);
|
||||
|
||||
snapshot_detached_tables.emplace(it->first, std::move(snapshot_detached_table));
|
||||
|
||||
tables.erase(it);
|
||||
res->is_detached = true;
|
||||
CurrentMetrics::sub(getAttachedCounterForStorage(res), 1);
|
||||
@ -298,6 +341,8 @@ void DatabaseWithOwnTablesBase::attachTableUnlocked(const String & table_name, c
|
||||
throw Exception(ErrorCodes::TABLE_ALREADY_EXISTS, "Table {} already exists.", table_id.getFullTableName());
|
||||
}
|
||||
|
||||
snapshot_detached_tables.erase(table_name);
|
||||
|
||||
/// It is important to reset is_detached here since in case of RENAME in
|
||||
/// non-Atomic database the is_detached is set to true before RENAME.
|
||||
table->is_detached = false;
|
||||
@ -333,6 +378,7 @@ void DatabaseWithOwnTablesBase::shutdown()
|
||||
|
||||
std::lock_guard lock(mutex);
|
||||
tables.clear();
|
||||
snapshot_detached_tables.clear();
|
||||
}
|
||||
|
||||
DatabaseWithOwnTablesBase::~DatabaseWithOwnTablesBase()
|
||||
|
@ -37,6 +37,9 @@ public:
|
||||
|
||||
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_name, bool skip_not_loaded) const override;
|
||||
|
||||
DatabaseDetachedTablesSnapshotIteratorPtr
|
||||
getDetachedTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_name, bool skip_not_loaded) const override;
|
||||
|
||||
std::vector<std::pair<ASTPtr, StoragePtr>> getTablesForBackup(const FilterByNameFunction & filter, const ContextPtr & local_context) const override;
|
||||
void createTableRestoredFromBackup(const ASTPtr & create_table_query, ContextMutablePtr local_context, std::shared_ptr<IRestoreCoordination> restore_coordination, UInt64 timeout_ms) override;
|
||||
|
||||
@ -46,12 +49,13 @@ public:
|
||||
|
||||
protected:
|
||||
Tables tables TSA_GUARDED_BY(mutex);
|
||||
SnapshotDetachedTables snapshot_detached_tables TSA_GUARDED_BY(mutex);
|
||||
LoggerPtr log;
|
||||
|
||||
DatabaseWithOwnTablesBase(const String & name_, const String & logger, ContextPtr context);
|
||||
|
||||
void attachTableUnlocked(const String & table_name, const StoragePtr & table) TSA_REQUIRES(mutex);
|
||||
StoragePtr detachTableUnlocked(const String & table_name) TSA_REQUIRES(mutex);
|
||||
StoragePtr detachTableUnlocked(const String & table_name) TSA_REQUIRES(mutex);
|
||||
StoragePtr getTableUnlocked(const String & table_name) const TSA_REQUIRES(mutex);
|
||||
StoragePtr tryGetTableNoWait(const String & table_name) const;
|
||||
};
|
||||
|
@ -5,20 +5,22 @@
|
||||
#include <Interpreters/Context_fwd.h>
|
||||
#include <Interpreters/executeQuery.h>
|
||||
#include <Parsers/IAST_fwd.h>
|
||||
#include <QueryPipeline/BlockIO.h>
|
||||
#include <Storages/IStorage.h>
|
||||
#include <Storages/IStorage_fwd.h>
|
||||
#include <base/types.h>
|
||||
#include <Common/Exception.h>
|
||||
#include <Common/AsyncLoader.h>
|
||||
#include <Common/Exception.h>
|
||||
#include <Common/PoolId.h>
|
||||
#include <Common/ThreadPool_fwd.h>
|
||||
#include <QueryPipeline/BlockIO.h>
|
||||
|
||||
#include <ctime>
|
||||
#include <functional>
|
||||
#include <map>
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
#include <stdexcept>
|
||||
#include <vector>
|
||||
#include <map>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -110,6 +112,55 @@ public:
|
||||
|
||||
using DatabaseTablesIteratorPtr = std::unique_ptr<IDatabaseTablesIterator>;
|
||||
|
||||
struct SnapshotDetachedTable final
|
||||
{
|
||||
String detabase;
|
||||
String table;
|
||||
UUID uuid = UUIDHelpers::Nil;
|
||||
String metadata_path;
|
||||
bool is_permanently{false};
|
||||
};
|
||||
|
||||
class DatabaseDetachedTablesSnapshotIterator
|
||||
{
|
||||
private:
|
||||
SnapshotDetachedTables snapshot;
|
||||
SnapshotDetachedTables::iterator it;
|
||||
|
||||
protected:
|
||||
DatabaseDetachedTablesSnapshotIterator(DatabaseDetachedTablesSnapshotIterator && other) noexcept
|
||||
{
|
||||
size_t idx = std::distance(other.snapshot.begin(), other.it);
|
||||
std::swap(snapshot, other.snapshot);
|
||||
other.it = other.snapshot.end();
|
||||
it = snapshot.begin();
|
||||
std::advance(it, idx);
|
||||
}
|
||||
|
||||
public:
|
||||
explicit DatabaseDetachedTablesSnapshotIterator(const SnapshotDetachedTables & tables_) : snapshot(tables_), it(snapshot.begin()) { }
|
||||
|
||||
explicit DatabaseDetachedTablesSnapshotIterator(SnapshotDetachedTables && tables_) : snapshot(std::move(tables_)), it(snapshot.begin())
|
||||
{
|
||||
}
|
||||
|
||||
void next() { ++it; }
|
||||
|
||||
bool isValid() const { return it != snapshot.end(); }
|
||||
|
||||
String database() const { return it->second.detabase; }
|
||||
|
||||
String table() const { return it->second.table; }
|
||||
|
||||
UUID uuid() const { return it->second.uuid; }
|
||||
|
||||
String metadataPath() const { return it->second.metadata_path; }
|
||||
|
||||
bool isPermanently() const { return it->second.is_permanently; }
|
||||
};
|
||||
|
||||
using DatabaseDetachedTablesSnapshotIteratorPtr = std::unique_ptr<DatabaseDetachedTablesSnapshotIterator>;
|
||||
|
||||
|
||||
/** Database engine.
|
||||
* It is responsible for:
|
||||
@ -232,6 +283,12 @@ public:
|
||||
/// Wait for all tables to be loaded and started up. If `skip_not_loaded` is true, then not yet loaded or not yet started up (at the moment of iterator creation) tables are excluded.
|
||||
virtual DatabaseTablesIteratorPtr getTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_name = {}, bool skip_not_loaded = false) const = 0; /// NOLINT
|
||||
|
||||
virtual DatabaseDetachedTablesSnapshotIteratorPtr getDetachedTablesIterator(
|
||||
ContextPtr /*context*/, const FilterByNameFunction & /*filter_by_table_name = {}*/, bool /*skip_not_loaded = false*/) const
|
||||
{
|
||||
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "There is no get detached tables for Database{}", getEngineName());
|
||||
}
|
||||
|
||||
/// Returns list of table names.
|
||||
virtual Strings getAllTableNames(ContextPtr context) const
|
||||
{
|
||||
|
@ -1,5 +1,6 @@
|
||||
#pragma once
|
||||
|
||||
#include <Core/Types_fwd.h>
|
||||
#include <base/types.h>
|
||||
|
||||
#include <map>
|
||||
@ -9,9 +10,10 @@ namespace DB
|
||||
{
|
||||
|
||||
class IStorage;
|
||||
struct SnapshotDetachedTable;
|
||||
|
||||
using ConstStoragePtr = std::shared_ptr<const IStorage>;
|
||||
using StoragePtr = std::shared_ptr<IStorage>;
|
||||
using Tables = std::map<String, StoragePtr>;
|
||||
|
||||
using SnapshotDetachedTables = std::map<String, SnapshotDetachedTable>;
|
||||
}
|
||||
|
138
src/Storages/System/ReadFromSystemTables.cpp
Normal file
138
src/Storages/System/ReadFromSystemTables.cpp
Normal file
@ -0,0 +1,138 @@
|
||||
#include "ReadFromSystemTables.h"
|
||||
|
||||
#include <Core/Block.h>
|
||||
#include <Core/ColumnWithTypeAndName.h>
|
||||
#include <DataTypes/DataTypeString.h>
|
||||
#include <Interpreters/DatabaseCatalog.h>
|
||||
#include <QueryPipeline/QueryPipelineBuilder.h>
|
||||
#include <Storages/IStorage.h>
|
||||
#include <Storages/System/TablesBlockSource.h>
|
||||
#include <Storages/VirtualColumnUtils.h>
|
||||
|
||||
#include <boost/range/adaptor/map.hpp>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace
|
||||
{
|
||||
|
||||
ColumnPtr getFilteredDatabases(const ActionsDAG::Node * predicate, ContextPtr context)
|
||||
{
|
||||
MutableColumnPtr column = ColumnString::create();
|
||||
|
||||
const auto databases = DatabaseCatalog::instance().getDatabases();
|
||||
for (const auto & database_name : databases | boost::adaptors::map_keys)
|
||||
{
|
||||
if (database_name == DatabaseCatalog::TEMPORARY_DATABASE)
|
||||
continue; /// We don't want to show the internal database for temporary tables in system.tables
|
||||
|
||||
column->insert(database_name);
|
||||
}
|
||||
|
||||
Block block{ColumnWithTypeAndName(std::move(column), std::make_shared<DataTypeString>(), "database")};
|
||||
VirtualColumnUtils::filterBlockWithPredicate(predicate, block, context);
|
||||
return block.getByPosition(0).column;
|
||||
}
|
||||
|
||||
ColumnPtr getFilteredTables(
|
||||
const ActionsDAG::Node * predicate, const ColumnPtr & filtered_databases_column, ContextPtr context, const bool need_detached_tables)
|
||||
{
|
||||
Block sample{
|
||||
ColumnWithTypeAndName(nullptr, std::make_shared<DataTypeString>(), "name"),
|
||||
ColumnWithTypeAndName(nullptr, std::make_shared<DataTypeString>(), "engine")};
|
||||
|
||||
MutableColumnPtr database_column = ColumnString::create();
|
||||
MutableColumnPtr engine_column;
|
||||
|
||||
auto dag = VirtualColumnUtils::splitFilterDagForAllowedInputs(predicate, &sample);
|
||||
if (dag)
|
||||
{
|
||||
bool filter_by_engine = false;
|
||||
for (const auto * input : dag->getInputs())
|
||||
if (input->result_name == "engine")
|
||||
filter_by_engine = true;
|
||||
|
||||
if (filter_by_engine)
|
||||
engine_column = ColumnString::create();
|
||||
}
|
||||
|
||||
for (size_t database_idx = 0; database_idx < filtered_databases_column->size(); ++database_idx)
|
||||
{
|
||||
const auto & database_name = filtered_databases_column->getDataAt(database_idx).toString();
|
||||
DatabasePtr database = DatabaseCatalog::instance().tryGetDatabase(database_name);
|
||||
if (!database)
|
||||
continue;
|
||||
|
||||
if (need_detached_tables)
|
||||
{
|
||||
auto table_it = database->getDetachedTablesIterator(context, {}, false);
|
||||
for (; table_it->isValid(); table_it->next())
|
||||
{
|
||||
database_column->insert(table_it->table());
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
auto table_it = database->getTablesIterator(context);
|
||||
for (; table_it->isValid(); table_it->next())
|
||||
{
|
||||
database_column->insert(table_it->name());
|
||||
if (engine_column)
|
||||
engine_column->insert(table_it->table()->getName());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Block block{ColumnWithTypeAndName(std::move(database_column), std::make_shared<DataTypeString>(), "name")};
|
||||
if (engine_column)
|
||||
block.insert(ColumnWithTypeAndName(std::move(engine_column), std::make_shared<DataTypeString>(), "engine"));
|
||||
|
||||
if (dag)
|
||||
VirtualColumnUtils::filterBlockWithDAG(dag, block, context);
|
||||
|
||||
return block.getByPosition(0).column;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
ReadFromSystemTables::ReadFromSystemTables(
|
||||
const Names & column_names_,
|
||||
const SelectQueryInfo & query_info_,
|
||||
const StorageSnapshotPtr & storage_snapshot_,
|
||||
const ContextPtr & context_,
|
||||
Block sample_block,
|
||||
std::vector<UInt8> columns_mask_,
|
||||
size_t max_block_size_,
|
||||
const bool need_detached_tables_)
|
||||
: SourceStepWithFilter(DataStream{.header = std::move(sample_block)}, column_names_, query_info_, storage_snapshot_, context_)
|
||||
, columns_mask(std::move(columns_mask_))
|
||||
, max_block_size(max_block_size_)
|
||||
, need_detached_tables(need_detached_tables_)
|
||||
{
|
||||
}
|
||||
|
||||
void ReadFromSystemTables::applyFilters(ActionDAGNodes added_filter_nodes)
|
||||
{
|
||||
SourceStepWithFilter::applyFilters(std::move(added_filter_nodes));
|
||||
|
||||
const ActionsDAG::Node * predicate = nullptr;
|
||||
if (filter_actions_dag)
|
||||
predicate = filter_actions_dag->getOutputs().at(0);
|
||||
|
||||
filtered_databases_column = getFilteredDatabases(predicate, context);
|
||||
filtered_tables_column = getFilteredTables(predicate, filtered_databases_column, context, need_detached_tables);
|
||||
}
|
||||
|
||||
void ReadFromSystemTables::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
|
||||
{
|
||||
if (need_detached_tables)
|
||||
{
|
||||
pipeline.init(createPipe<DetachedTablesBlockSource>());
|
||||
}
|
||||
else
|
||||
{
|
||||
pipeline.init(createPipe<TablesBlockSource>());
|
||||
}
|
||||
}
|
||||
}
|
47
src/Storages/System/ReadFromSystemTables.h
Normal file
47
src/Storages/System/ReadFromSystemTables.h
Normal file
@ -0,0 +1,47 @@
|
||||
#pragma once
|
||||
|
||||
#include <Processors/QueryPlan/SourceStepWithFilter.h>
|
||||
#include <QueryPipeline/Pipe.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class ReadFromSystemTables : public SourceStepWithFilter
|
||||
{
|
||||
public:
|
||||
std::string getName() const override { return "ReadFromSystemTables"; }
|
||||
void initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override;
|
||||
|
||||
ReadFromSystemTables(
|
||||
const Names & column_names_,
|
||||
const SelectQueryInfo & query_info_,
|
||||
const StorageSnapshotPtr & storage_snapshot_,
|
||||
const ContextPtr & context_,
|
||||
Block sample_block,
|
||||
std::vector<UInt8> columns_mask_,
|
||||
size_t max_block_size_,
|
||||
bool need_detached_tables);
|
||||
|
||||
void applyFilters(ActionDAGNodes added_filter_nodes) override;
|
||||
|
||||
private:
|
||||
std::vector<UInt8> columns_mask;
|
||||
size_t max_block_size;
|
||||
const bool need_detached_tables;
|
||||
|
||||
ColumnPtr filtered_databases_column;
|
||||
ColumnPtr filtered_tables_column;
|
||||
|
||||
template <class T>
|
||||
Pipe createPipe()
|
||||
{
|
||||
return Pipe(std::make_shared<T>(
|
||||
std::move(columns_mask),
|
||||
getOutputStream().header,
|
||||
max_block_size,
|
||||
std::move(filtered_databases_column),
|
||||
std::move(filtered_tables_column),
|
||||
context));
|
||||
}
|
||||
};
|
||||
}
|
56
src/Storages/System/StorageSystemDetachedTables.cpp
Normal file
56
src/Storages/System/StorageSystemDetachedTables.cpp
Normal file
@ -0,0 +1,56 @@
|
||||
#include "StorageSystemDetachedTables.h"
|
||||
|
||||
#include <Core/NamesAndTypes.h>
|
||||
#include <DataTypes/DataTypeString.h>
|
||||
#include <DataTypes/DataTypeUUID.h>
|
||||
#include <DataTypes/DataTypesNumber.h>
|
||||
#include <Processors/QueryPlan/QueryPlan.h>
|
||||
#include <Storages/ColumnsDescription.h>
|
||||
#include <Storages/ProjectionsDescription.h>
|
||||
#include <Storages/StorageInMemoryMetadata.h>
|
||||
#include <Storages/System/ReadFromSystemTables.h>
|
||||
#include <Storages/System/TablesBlockSource.h>
|
||||
#include <Storages/System/getQueriedColumnsMaskAndHeader.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
StorageSystemDetachedTables::StorageSystemDetachedTables(const StorageID & table_id_) : IStorage(table_id_)
|
||||
{
|
||||
StorageInMemoryMetadata storage_metadata;
|
||||
|
||||
auto description = ColumnsDescription{
|
||||
ColumnDescription{"database", std::make_shared<DataTypeString>(), "The name of the database the table is in."},
|
||||
ColumnDescription{"name", std::make_shared<DataTypeString>(), "Table name."},
|
||||
ColumnDescription{"uuid", std::make_shared<DataTypeUUID>(), "Table uuid (Atomic database)."},
|
||||
ColumnDescription{"metadata_path", std::make_shared<DataTypeString>(), "Path to the table metadata in the file system."},
|
||||
ColumnDescription{"is_permanently", std::make_shared<DataTypeUInt8>(), "Table was detached permanently."},
|
||||
};
|
||||
|
||||
storage_metadata.setColumns(std::move(description));
|
||||
|
||||
setInMemoryMetadata(storage_metadata);
|
||||
}
|
||||
|
||||
void StorageSystemDetachedTables::read(
|
||||
QueryPlan & query_plan,
|
||||
const Names & column_names,
|
||||
const StorageSnapshotPtr & storage_snapshot,
|
||||
SelectQueryInfo & query_info,
|
||||
ContextPtr context,
|
||||
QueryProcessingStage::Enum /*processed_stage*/,
|
||||
const size_t max_block_size,
|
||||
size_t /*num_streams*/)
|
||||
{
|
||||
storage_snapshot->check(column_names);
|
||||
auto sample_block = storage_snapshot->metadata->getSampleBlock();
|
||||
|
||||
auto [columns_mask, res_block] = getQueriedColumnsMaskAndHeader(sample_block, column_names);
|
||||
|
||||
auto reading = std::make_unique<ReadFromSystemTables>(
|
||||
column_names, query_info, storage_snapshot, context, std::move(res_block), std::move(columns_mask), max_block_size, true);
|
||||
|
||||
query_plan.addStep(std::move(reading));
|
||||
}
|
||||
}
|
32
src/Storages/System/StorageSystemDetachedTables.h
Normal file
32
src/Storages/System/StorageSystemDetachedTables.h
Normal file
@ -0,0 +1,32 @@
|
||||
#pragma once
|
||||
|
||||
#include <Storages/IStorage.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class Context;
|
||||
|
||||
/** Implements the system table `detached_tables`, which allows you to get information about detached tables.
|
||||
*/
|
||||
class StorageSystemDetachedTables final : public IStorage
|
||||
{
|
||||
public:
|
||||
explicit StorageSystemDetachedTables(const StorageID & table_id_);
|
||||
|
||||
std::string getName() const override { return "SystemDetachedTables"; }
|
||||
|
||||
void read(
|
||||
QueryPlan & query_plan,
|
||||
const Names & column_names,
|
||||
const StorageSnapshotPtr & storage_snapshot,
|
||||
SelectQueryInfo & /*query_info*/,
|
||||
ContextPtr context,
|
||||
QueryProcessingStage::Enum processed_stage,
|
||||
size_t max_block_size,
|
||||
size_t num_streams) override;
|
||||
|
||||
bool isSystemStorage() const override { return true; }
|
||||
};
|
||||
}
|
@ -1,29 +1,31 @@
|
||||
#include "StorageSystemTables.h"
|
||||
|
||||
#include <Access/ContextAccess.h>
|
||||
#include <Columns/ColumnString.h>
|
||||
#include <DataTypes/DataTypeString.h>
|
||||
#include <DataTypes/DataTypeArray.h>
|
||||
#include <DataTypes/DataTypeDateTime.h>
|
||||
#include <DataTypes/DataTypeNullable.h>
|
||||
#include <Storages/System/StorageSystemTables.h>
|
||||
#include <Storages/System/getQueriedColumnsMaskAndHeader.h>
|
||||
#include <Storages/MergeTree/MergeTreeData.h>
|
||||
#include <Storages/SelectQueryInfo.h>
|
||||
#include <Storages/VirtualColumnUtils.h>
|
||||
#include <Access/ContextAccess.h>
|
||||
#include <DataTypes/DataTypeString.h>
|
||||
#include <DataTypes/DataTypeUUID.h>
|
||||
#include <DataTypes/DataTypesNumber.h>
|
||||
#include <Disks/IStoragePolicy.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Interpreters/DatabaseCatalog.h>
|
||||
#include <Interpreters/formatWithPossiblyHidingSecrets.h>
|
||||
#include <Parsers/ASTCreateQuery.h>
|
||||
#include <Parsers/ASTSelectWithUnionQuery.h>
|
||||
#include <Common/typeid_cast.h>
|
||||
#include <Common/StringUtils.h>
|
||||
#include <DataTypes/DataTypesNumber.h>
|
||||
#include <DataTypes/DataTypeArray.h>
|
||||
#include <Disks/IStoragePolicy.h>
|
||||
#include <Processors/ISource.h>
|
||||
#include <Processors/QueryPlan/QueryPlan.h>
|
||||
#include <Processors/QueryPlan/SourceStepWithFilter.h>
|
||||
#include <QueryPipeline/Pipe.h>
|
||||
#include <QueryPipeline/QueryPipelineBuilder.h>
|
||||
#include <DataTypes/DataTypeUUID.h>
|
||||
#include <Storages/MergeTree/MergeTreeData.h>
|
||||
#include <Storages/SelectQueryInfo.h>
|
||||
#include <Storages/System/ReadFromSystemTables.h>
|
||||
#include <Storages/System/getQueriedColumnsMaskAndHeader.h>
|
||||
#include <Storages/VirtualColumnUtils.h>
|
||||
#include <Common/StringUtils.h>
|
||||
#include <Common/typeid_cast.h>
|
||||
|
||||
#include <boost/range/adaptor/map.hpp>
|
||||
|
||||
@ -32,8 +34,7 @@ namespace DB
|
||||
{
|
||||
|
||||
|
||||
StorageSystemTables::StorageSystemTables(const StorageID & table_id_)
|
||||
: IStorage(table_id_)
|
||||
StorageSystemTables::StorageSystemTables(const StorageID & table_id_) : IStorage(table_id_)
|
||||
{
|
||||
StorageInMemoryMetadata storage_metadata;
|
||||
|
||||
@ -46,9 +47,13 @@ StorageSystemTables::StorageSystemTables(const StorageID & table_id_)
|
||||
{"data_paths", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>()), "Paths to the table data in the file systems."},
|
||||
{"metadata_path", std::make_shared<DataTypeString>(), "Path to the table metadata in the file system."},
|
||||
{"metadata_modification_time", std::make_shared<DataTypeDateTime>(), "Time of latest modification of the table metadata."},
|
||||
{"metadata_version", std::make_shared<DataTypeInt32>(), "Metadata version for ReplicatedMergeTree table, 0 for non ReplicatedMergeTree table."},
|
||||
{"metadata_version",
|
||||
std::make_shared<DataTypeInt32>(),
|
||||
"Metadata version for ReplicatedMergeTree table, 0 for non ReplicatedMergeTree table."},
|
||||
{"dependencies_database", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>()), "Database dependencies."},
|
||||
{"dependencies_table", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>()), "Table dependencies (materialized views the current table)."},
|
||||
{"dependencies_table",
|
||||
std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>()),
|
||||
"Table dependencies (materialized views the current table)."},
|
||||
{"create_table_query", std::make_shared<DataTypeString>(), "The query that was used to create the table."},
|
||||
{"engine_full", std::make_shared<DataTypeString>(), "Parameters of the table engine."},
|
||||
{"as_select", std::make_shared<DataTypeString>(), "SELECT query for view."},
|
||||
@ -57,676 +62,54 @@ StorageSystemTables::StorageSystemTables(const StorageID & table_id_)
|
||||
{"primary_key", std::make_shared<DataTypeString>(), "The primary key expression specified in the table."},
|
||||
{"sampling_key", std::make_shared<DataTypeString>(), "The sampling key expression specified in the table."},
|
||||
{"storage_policy", std::make_shared<DataTypeString>(), "The storage policy."},
|
||||
{"total_rows", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt64>()),
|
||||
"Total number of rows, if it is possible to quickly determine exact number of rows in the table, otherwise NULL (including underlying Buffer table)."
|
||||
},
|
||||
{"total_bytes", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt64>()),
|
||||
"Total number of bytes, if it is possible to quickly determine exact number "
|
||||
"of bytes for the table on storage, otherwise NULL (does not includes any underlying storage). "
|
||||
"If the table stores data on disk, returns used space on disk (i.e. compressed). "
|
||||
"If the table stores data in memory, returns approximated number of used bytes in memory."
|
||||
},
|
||||
{"total_bytes_uncompressed", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt64>()),
|
||||
"Total number of uncompressed bytes, if it's possible to quickly determine the exact number "
|
||||
"of bytes from the part checksums for the table on storage, otherwise NULL (does not take underlying storage (if any) into account)."
|
||||
},
|
||||
{"total_rows",
|
||||
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt64>()),
|
||||
"Total number of rows, if it is possible to quickly determine exact number of rows in the table, otherwise NULL (including "
|
||||
"underlying Buffer table)."},
|
||||
{"total_bytes",
|
||||
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt64>()),
|
||||
"Total number of bytes, if it is possible to quickly determine exact number "
|
||||
"of bytes for the table on storage, otherwise NULL (does not includes any underlying storage). "
|
||||
"If the table stores data on disk, returns used space on disk (i.e. compressed). "
|
||||
"If the table stores data in memory, returns approximated number of used bytes in memory."},
|
||||
{"total_bytes_uncompressed",
|
||||
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt64>()),
|
||||
"Total number of uncompressed bytes, if it's possible to quickly determine the exact number "
|
||||
"of bytes from the part checksums for the table on storage, otherwise NULL (does not take underlying storage (if any) into "
|
||||
"account)."},
|
||||
{"parts", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt64>()), "The total number of parts in this table."},
|
||||
{"active_parts", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt64>()), "The number of active parts in this table."},
|
||||
{"total_marks", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt64>()), "The total number of marks in all parts in this table."},
|
||||
{"lifetime_rows", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt64>()),
|
||||
"Total number of rows INSERTed since server start (only for Buffer tables)."
|
||||
},
|
||||
{"lifetime_bytes", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt64>()),
|
||||
"Total number of bytes INSERTed since server start (only for Buffer tables)."
|
||||
},
|
||||
{"active_parts",
|
||||
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt64>()),
|
||||
"The number of active parts in this table."},
|
||||
{"total_marks",
|
||||
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt64>()),
|
||||
"The total number of marks in all parts in this table."},
|
||||
{"lifetime_rows",
|
||||
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt64>()),
|
||||
"Total number of rows INSERTed since server start (only for Buffer tables)."},
|
||||
{"lifetime_bytes",
|
||||
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt64>()),
|
||||
"Total number of bytes INSERTed since server start (only for Buffer tables)."},
|
||||
{"comment", std::make_shared<DataTypeString>(), "The comment for the table."},
|
||||
{"has_own_data", std::make_shared<DataTypeUInt8>(),
|
||||
"Flag that indicates whether the table itself stores some data on disk or only accesses some other source."
|
||||
},
|
||||
{"loading_dependencies_database", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>()),
|
||||
"Database loading dependencies (list of objects which should be loaded before the current object)."
|
||||
},
|
||||
{"loading_dependencies_table", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>()),
|
||||
"Table loading dependencies (list of objects which should be loaded before the current object)."
|
||||
},
|
||||
{"loading_dependent_database", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>()),
|
||||
"Dependent loading database."
|
||||
},
|
||||
{"loading_dependent_table", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>()),
|
||||
"Dependent loading table."
|
||||
},
|
||||
{"has_own_data",
|
||||
std::make_shared<DataTypeUInt8>(),
|
||||
"Flag that indicates whether the table itself stores some data on disk or only accesses some other source."},
|
||||
{"loading_dependencies_database",
|
||||
std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>()),
|
||||
"Database loading dependencies (list of objects which should be loaded before the current object)."},
|
||||
{"loading_dependencies_table",
|
||||
std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>()),
|
||||
"Table loading dependencies (list of objects which should be loaded before the current object)."},
|
||||
{"loading_dependent_database", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>()), "Dependent loading database."},
|
||||
{"loading_dependent_table", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>()), "Dependent loading table."},
|
||||
};
|
||||
|
||||
description.setAliases({
|
||||
{"table", std::make_shared<DataTypeString>(), "name"}
|
||||
});
|
||||
description.setAliases({{"table", std::make_shared<DataTypeString>(), "name"}});
|
||||
|
||||
storage_metadata.setColumns(std::move(description));
|
||||
setInMemoryMetadata(storage_metadata);
|
||||
}
|
||||
|
||||
|
||||
namespace
|
||||
{
|
||||
|
||||
ColumnPtr getFilteredDatabases(const ActionsDAG::Node * predicate, ContextPtr context)
|
||||
{
|
||||
MutableColumnPtr column = ColumnString::create();
|
||||
|
||||
const auto databases = DatabaseCatalog::instance().getDatabases();
|
||||
for (const auto & database_name : databases | boost::adaptors::map_keys)
|
||||
{
|
||||
if (database_name == DatabaseCatalog::TEMPORARY_DATABASE)
|
||||
continue; /// We don't want to show the internal database for temporary tables in system.tables
|
||||
|
||||
column->insert(database_name);
|
||||
}
|
||||
|
||||
Block block { ColumnWithTypeAndName(std::move(column), std::make_shared<DataTypeString>(), "database") };
|
||||
VirtualColumnUtils::filterBlockWithPredicate(predicate, block, context);
|
||||
return block.getByPosition(0).column;
|
||||
}
|
||||
|
||||
ColumnPtr getFilteredTables(const ActionsDAG::Node * predicate, const ColumnPtr & filtered_databases_column, ContextPtr context)
|
||||
{
|
||||
Block sample {
|
||||
ColumnWithTypeAndName(nullptr, std::make_shared<DataTypeString>(), "name"),
|
||||
ColumnWithTypeAndName(nullptr, std::make_shared<DataTypeString>(), "engine")
|
||||
};
|
||||
|
||||
MutableColumnPtr database_column = ColumnString::create();
|
||||
MutableColumnPtr engine_column;
|
||||
|
||||
auto dag = VirtualColumnUtils::splitFilterDagForAllowedInputs(predicate, &sample);
|
||||
if (dag)
|
||||
{
|
||||
bool filter_by_engine = false;
|
||||
for (const auto * input : dag->getInputs())
|
||||
if (input->result_name == "engine")
|
||||
filter_by_engine = true;
|
||||
|
||||
if (filter_by_engine)
|
||||
engine_column = ColumnString::create();
|
||||
}
|
||||
|
||||
for (size_t database_idx = 0; database_idx < filtered_databases_column->size(); ++database_idx)
|
||||
{
|
||||
const auto & database_name = filtered_databases_column->getDataAt(database_idx).toString();
|
||||
DatabasePtr database = DatabaseCatalog::instance().tryGetDatabase(database_name);
|
||||
if (!database)
|
||||
continue;
|
||||
|
||||
for (auto table_it = database->getTablesIterator(context); table_it->isValid(); table_it->next())
|
||||
{
|
||||
database_column->insert(table_it->name());
|
||||
if (engine_column)
|
||||
engine_column->insert(table_it->table()->getName());
|
||||
}
|
||||
}
|
||||
|
||||
Block block {ColumnWithTypeAndName(std::move(database_column), std::make_shared<DataTypeString>(), "name")};
|
||||
if (engine_column)
|
||||
block.insert(ColumnWithTypeAndName(std::move(engine_column), std::make_shared<DataTypeString>(), "engine"));
|
||||
|
||||
if (dag)
|
||||
VirtualColumnUtils::filterBlockWithDAG(dag, block, context);
|
||||
|
||||
return block.getByPosition(0).column;
|
||||
}
|
||||
|
||||
/// Avoid heavy operation on tables if we only queried columns that we can get without table object.
|
||||
/// Otherwise it will require table initialization for Lazy database.
|
||||
bool needTable(const DatabasePtr & database, const Block & header)
|
||||
{
|
||||
if (database->getEngineName() != "Lazy")
|
||||
return true;
|
||||
|
||||
static const std::set<std::string> columns_without_table = { "database", "name", "uuid", "metadata_modification_time" };
|
||||
for (const auto & column : header.getColumnsWithTypeAndName())
|
||||
{
|
||||
if (columns_without_table.find(column.name) == columns_without_table.end())
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
class TablesBlockSource : public ISource
|
||||
{
|
||||
public:
|
||||
TablesBlockSource(
|
||||
std::vector<UInt8> columns_mask_,
|
||||
Block header,
|
||||
UInt64 max_block_size_,
|
||||
ColumnPtr databases_,
|
||||
ColumnPtr tables_,
|
||||
ContextPtr context_)
|
||||
: ISource(std::move(header))
|
||||
, columns_mask(std::move(columns_mask_))
|
||||
, max_block_size(max_block_size_)
|
||||
, databases(std::move(databases_))
|
||||
, context(Context::createCopy(context_))
|
||||
{
|
||||
size_t size = tables_->size();
|
||||
tables.reserve(size);
|
||||
for (size_t idx = 0; idx < size; ++idx)
|
||||
tables.insert(tables_->getDataAt(idx).toString());
|
||||
}
|
||||
|
||||
String getName() const override { return "Tables"; }
|
||||
|
||||
protected:
|
||||
Chunk generate() override
|
||||
{
|
||||
if (done)
|
||||
return {};
|
||||
|
||||
MutableColumns res_columns = getPort().getHeader().cloneEmptyColumns();
|
||||
|
||||
const auto access = context->getAccess();
|
||||
const bool need_to_check_access_for_databases = !access->isGranted(AccessType::SHOW_TABLES);
|
||||
|
||||
size_t rows_count = 0;
|
||||
while (rows_count < max_block_size)
|
||||
{
|
||||
if (tables_it && !tables_it->isValid())
|
||||
++database_idx;
|
||||
|
||||
while (database_idx < databases->size() && (!tables_it || !tables_it->isValid()))
|
||||
{
|
||||
database_name = databases->getDataAt(database_idx).toString();
|
||||
database = DatabaseCatalog::instance().tryGetDatabase(database_name);
|
||||
|
||||
if (!database)
|
||||
{
|
||||
/// Database was deleted just now or the user has no access.
|
||||
++database_idx;
|
||||
continue;
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
/// This is for temporary tables. They are output in single block regardless to max_block_size.
|
||||
if (database_idx >= databases->size())
|
||||
{
|
||||
if (context->hasSessionContext())
|
||||
{
|
||||
Tables external_tables = context->getSessionContext()->getExternalTables();
|
||||
|
||||
for (auto & table : external_tables)
|
||||
{
|
||||
size_t src_index = 0;
|
||||
size_t res_index = 0;
|
||||
|
||||
// database
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insertDefault();
|
||||
|
||||
// name
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insert(table.first);
|
||||
|
||||
// uuid
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insert(table.second->getStorageID().uuid);
|
||||
|
||||
// engine
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insert(table.second->getName());
|
||||
|
||||
// is_temporary
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insert(1u);
|
||||
|
||||
// data_paths
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insertDefault();
|
||||
|
||||
// metadata_path
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insertDefault();
|
||||
|
||||
// metadata_modification_time
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insertDefault();
|
||||
|
||||
// metadata_version
|
||||
// Temporary tables does not support replication
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insertDefault();
|
||||
|
||||
// dependencies_database
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insertDefault();
|
||||
|
||||
// dependencies_table
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insertDefault();
|
||||
|
||||
// create_table_query
|
||||
if (columns_mask[src_index++])
|
||||
{
|
||||
auto temp_db = DatabaseCatalog::instance().getDatabaseForTemporaryTables();
|
||||
ASTPtr ast = temp_db ? temp_db->tryGetCreateTableQuery(table.second->getStorageID().getTableName(), context) : nullptr;
|
||||
res_columns[res_index++]->insert(ast ? format({context, *ast}) : "");
|
||||
}
|
||||
|
||||
// engine_full
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insert(table.second->getName());
|
||||
|
||||
const auto & settings = context->getSettingsRef();
|
||||
while (src_index < columns_mask.size())
|
||||
{
|
||||
// total_rows
|
||||
if (src_index == 19 && columns_mask[src_index])
|
||||
{
|
||||
if (auto total_rows = table.second->totalRows(settings))
|
||||
res_columns[res_index++]->insert(*total_rows);
|
||||
else
|
||||
res_columns[res_index++]->insertDefault();
|
||||
}
|
||||
// total_bytes
|
||||
else if (src_index == 20 && columns_mask[src_index])
|
||||
{
|
||||
if (auto total_bytes = table.second->totalBytes(settings))
|
||||
res_columns[res_index++]->insert(*total_bytes);
|
||||
else
|
||||
res_columns[res_index++]->insertDefault();
|
||||
}
|
||||
/// Fill the rest columns with defaults
|
||||
else if (columns_mask[src_index])
|
||||
res_columns[res_index++]->insertDefault();
|
||||
src_index++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
UInt64 num_rows = res_columns.at(0)->size();
|
||||
done = true;
|
||||
return Chunk(std::move(res_columns), num_rows);
|
||||
}
|
||||
|
||||
const bool need_to_check_access_for_tables = need_to_check_access_for_databases && !access->isGranted(AccessType::SHOW_TABLES, database_name);
|
||||
|
||||
if (!tables_it || !tables_it->isValid())
|
||||
tables_it = database->getTablesIterator(context);
|
||||
|
||||
const bool need_table = needTable(database, getPort().getHeader());
|
||||
|
||||
for (; rows_count < max_block_size && tables_it->isValid(); tables_it->next())
|
||||
{
|
||||
auto table_name = tables_it->name();
|
||||
if (!tables.contains(table_name))
|
||||
continue;
|
||||
|
||||
if (need_to_check_access_for_tables && !access->isGranted(AccessType::SHOW_TABLES, database_name, table_name))
|
||||
continue;
|
||||
|
||||
StoragePtr table = nullptr;
|
||||
TableLockHolder lock;
|
||||
if (need_table)
|
||||
{
|
||||
table = tables_it->table();
|
||||
if (!table)
|
||||
// Table might have just been removed or detached for Lazy engine (see DatabaseLazy::tryGetTable())
|
||||
continue;
|
||||
|
||||
/// The only column that requires us to hold a shared lock is data_paths as rename might alter them (on ordinary tables)
|
||||
/// and it's not protected internally by other mutexes
|
||||
static const size_t DATA_PATHS_INDEX = 5;
|
||||
if (columns_mask[DATA_PATHS_INDEX])
|
||||
{
|
||||
lock = table->tryLockForShare(context->getCurrentQueryId(),
|
||||
context->getSettingsRef().lock_acquire_timeout);
|
||||
if (!lock)
|
||||
// Table was dropped while acquiring the lock, skipping table
|
||||
continue;
|
||||
}
|
||||
}
|
||||
++rows_count;
|
||||
|
||||
size_t src_index = 0;
|
||||
size_t res_index = 0;
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insert(database_name);
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insert(table_name);
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insert(tables_it->uuid());
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
{
|
||||
chassert(table != nullptr);
|
||||
res_columns[res_index++]->insert(table->getName());
|
||||
}
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insert(0u); // is_temporary
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
{
|
||||
chassert(lock != nullptr);
|
||||
Array table_paths_array;
|
||||
auto paths = table->getDataPaths();
|
||||
table_paths_array.reserve(paths.size());
|
||||
for (const String & path : paths)
|
||||
table_paths_array.push_back(path);
|
||||
res_columns[res_index++]->insert(table_paths_array);
|
||||
/// We don't need the lock anymore
|
||||
lock = nullptr;
|
||||
}
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insert(database->getObjectMetadataPath(table_name));
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insert(static_cast<UInt64>(database->getObjectMetadataModificationTime(table_name)));
|
||||
|
||||
StorageMetadataPtr metadata_snapshot;
|
||||
if (table)
|
||||
metadata_snapshot = table->getInMemoryMetadataPtr();
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
{
|
||||
if (metadata_snapshot && table->supportsReplication())
|
||||
res_columns[res_index++]->insert(metadata_snapshot->metadata_version);
|
||||
else
|
||||
res_columns[res_index++]->insertDefault();
|
||||
}
|
||||
|
||||
{
|
||||
Array views_table_name_array;
|
||||
Array views_database_name_array;
|
||||
if (columns_mask[src_index] || columns_mask[src_index + 1])
|
||||
{
|
||||
const auto view_ids = DatabaseCatalog::instance().getDependentViews(StorageID(database_name, table_name));
|
||||
|
||||
views_table_name_array.reserve(view_ids.size());
|
||||
views_database_name_array.reserve(view_ids.size());
|
||||
for (const auto & view_id : view_ids)
|
||||
{
|
||||
views_table_name_array.push_back(view_id.table_name);
|
||||
views_database_name_array.push_back(view_id.database_name);
|
||||
}
|
||||
}
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insert(views_database_name_array);
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insert(views_table_name_array);
|
||||
}
|
||||
|
||||
if (columns_mask[src_index] || columns_mask[src_index + 1] || columns_mask[src_index + 2])
|
||||
{
|
||||
ASTPtr ast = database->tryGetCreateTableQuery(table_name, context);
|
||||
auto * ast_create = ast ? ast->as<ASTCreateQuery>() : nullptr;
|
||||
|
||||
if (ast_create && !context->getSettingsRef().show_table_uuid_in_table_create_query_if_not_nil)
|
||||
{
|
||||
ast_create->uuid = UUIDHelpers::Nil;
|
||||
ast_create->to_inner_uuid = UUIDHelpers::Nil;
|
||||
}
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insert(ast ? format({context, *ast}) : "");
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
{
|
||||
String engine_full;
|
||||
|
||||
if (ast_create && ast_create->storage)
|
||||
{
|
||||
engine_full = format({context, *ast_create->storage});
|
||||
|
||||
static const char * const extra_head = " ENGINE = ";
|
||||
if (startsWith(engine_full, extra_head))
|
||||
engine_full = engine_full.substr(strlen(extra_head));
|
||||
}
|
||||
|
||||
res_columns[res_index++]->insert(engine_full);
|
||||
}
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
{
|
||||
String as_select;
|
||||
if (ast_create && ast_create->select)
|
||||
as_select = format({context, *ast_create->select});
|
||||
res_columns[res_index++]->insert(as_select);
|
||||
}
|
||||
}
|
||||
else
|
||||
src_index += 3;
|
||||
|
||||
ASTPtr expression_ptr;
|
||||
if (columns_mask[src_index++])
|
||||
{
|
||||
if (metadata_snapshot && (expression_ptr = metadata_snapshot->getPartitionKeyAST()))
|
||||
res_columns[res_index++]->insert(format({context, *expression_ptr}));
|
||||
else
|
||||
res_columns[res_index++]->insertDefault();
|
||||
}
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
{
|
||||
if (metadata_snapshot && (expression_ptr = metadata_snapshot->getSortingKey().expression_list_ast))
|
||||
res_columns[res_index++]->insert(format({context, *expression_ptr}));
|
||||
else
|
||||
res_columns[res_index++]->insertDefault();
|
||||
}
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
{
|
||||
if (metadata_snapshot && (expression_ptr = metadata_snapshot->getPrimaryKey().expression_list_ast))
|
||||
res_columns[res_index++]->insert(format({context, *expression_ptr}));
|
||||
else
|
||||
res_columns[res_index++]->insertDefault();
|
||||
}
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
{
|
||||
if (metadata_snapshot && (expression_ptr = metadata_snapshot->getSamplingKeyAST()))
|
||||
res_columns[res_index++]->insert(format({context, *expression_ptr}));
|
||||
else
|
||||
res_columns[res_index++]->insertDefault();
|
||||
}
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
{
|
||||
auto policy = table ? table->getStoragePolicy() : nullptr;
|
||||
if (policy)
|
||||
res_columns[res_index++]->insert(policy->getName());
|
||||
else
|
||||
res_columns[res_index++]->insertDefault();
|
||||
}
|
||||
|
||||
auto settings = context->getSettingsRef();
|
||||
settings.select_sequential_consistency = 0;
|
||||
if (columns_mask[src_index++])
|
||||
{
|
||||
auto total_rows = table ? table->totalRows(settings) : std::nullopt;
|
||||
if (total_rows)
|
||||
res_columns[res_index++]->insert(*total_rows);
|
||||
else
|
||||
res_columns[res_index++]->insertDefault();
|
||||
}
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
{
|
||||
auto total_bytes = table->totalBytes(settings);
|
||||
if (total_bytes)
|
||||
res_columns[res_index++]->insert(*total_bytes);
|
||||
else
|
||||
res_columns[res_index++]->insertDefault();
|
||||
}
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
{
|
||||
auto total_bytes_uncompressed = table->totalBytesUncompressed(settings);
|
||||
if (total_bytes_uncompressed)
|
||||
res_columns[res_index++]->insert(*total_bytes_uncompressed);
|
||||
else
|
||||
res_columns[res_index++]->insertDefault();
|
||||
}
|
||||
|
||||
auto table_merge_tree = std::dynamic_pointer_cast<MergeTreeData>(table);
|
||||
if (columns_mask[src_index++])
|
||||
{
|
||||
if (table_merge_tree)
|
||||
res_columns[res_index++]->insert(table_merge_tree->getAllPartsCount());
|
||||
else
|
||||
res_columns[res_index++]->insertDefault();
|
||||
}
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
{
|
||||
if (table_merge_tree)
|
||||
res_columns[res_index++]->insert(table_merge_tree->getActivePartsCount());
|
||||
else
|
||||
res_columns[res_index++]->insertDefault();
|
||||
}
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
{
|
||||
if (table_merge_tree)
|
||||
{
|
||||
res_columns[res_index++]->insert(table_merge_tree->getTotalMarksCount());
|
||||
}
|
||||
else
|
||||
res_columns[res_index++]->insertDefault();
|
||||
}
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
{
|
||||
auto lifetime_rows = table ? table->lifetimeRows() : std::nullopt;
|
||||
if (lifetime_rows)
|
||||
res_columns[res_index++]->insert(*lifetime_rows);
|
||||
else
|
||||
res_columns[res_index++]->insertDefault();
|
||||
}
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
{
|
||||
auto lifetime_bytes = table ? table->lifetimeBytes() : std::nullopt;
|
||||
if (lifetime_bytes)
|
||||
res_columns[res_index++]->insert(*lifetime_bytes);
|
||||
else
|
||||
res_columns[res_index++]->insertDefault();
|
||||
}
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
{
|
||||
if (metadata_snapshot)
|
||||
res_columns[res_index++]->insert(metadata_snapshot->comment);
|
||||
else
|
||||
res_columns[res_index++]->insertDefault();
|
||||
}
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
{
|
||||
if (table)
|
||||
res_columns[res_index++]->insert(table->storesDataOnDisk());
|
||||
else
|
||||
res_columns[res_index++]->insertDefault();
|
||||
}
|
||||
|
||||
if (columns_mask[src_index] || columns_mask[src_index + 1] || columns_mask[src_index + 2] || columns_mask[src_index + 3])
|
||||
{
|
||||
auto dependencies = DatabaseCatalog::instance().getLoadingDependencies(StorageID{database_name, table_name});
|
||||
auto dependents = DatabaseCatalog::instance().getLoadingDependents(StorageID{database_name, table_name});
|
||||
|
||||
Array dependencies_databases;
|
||||
Array dependencies_tables;
|
||||
dependencies_databases.reserve(dependencies.size());
|
||||
dependencies_tables.reserve(dependencies.size());
|
||||
for (const auto & dependency : dependencies)
|
||||
{
|
||||
dependencies_databases.push_back(dependency.database_name);
|
||||
dependencies_tables.push_back(dependency.table_name);
|
||||
}
|
||||
|
||||
Array dependents_databases;
|
||||
Array dependents_tables;
|
||||
dependents_databases.reserve(dependents.size());
|
||||
dependents_tables.reserve(dependents.size());
|
||||
for (const auto & dependent : dependents)
|
||||
{
|
||||
dependents_databases.push_back(dependent.database_name);
|
||||
dependents_tables.push_back(dependent.table_name);
|
||||
}
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insert(dependencies_databases);
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insert(dependencies_tables);
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insert(dependents_databases);
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insert(dependents_tables);
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
UInt64 num_rows = res_columns.at(0)->size();
|
||||
return Chunk(std::move(res_columns), num_rows);
|
||||
}
|
||||
private:
|
||||
std::vector<UInt8> columns_mask;
|
||||
UInt64 max_block_size;
|
||||
ColumnPtr databases;
|
||||
NameSet tables;
|
||||
size_t database_idx = 0;
|
||||
DatabaseTablesIteratorPtr tables_it;
|
||||
ContextPtr context;
|
||||
bool done = false;
|
||||
DatabasePtr database;
|
||||
std::string database_name;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
class ReadFromSystemTables : public SourceStepWithFilter
|
||||
{
|
||||
public:
|
||||
std::string getName() const override { return "ReadFromSystemTables"; }
|
||||
void initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override;
|
||||
|
||||
ReadFromSystemTables(
|
||||
const Names & column_names_,
|
||||
const SelectQueryInfo & query_info_,
|
||||
const StorageSnapshotPtr & storage_snapshot_,
|
||||
const ContextPtr & context_,
|
||||
Block sample_block,
|
||||
std::vector<UInt8> columns_mask_,
|
||||
size_t max_block_size_)
|
||||
: SourceStepWithFilter(
|
||||
DataStream{.header = std::move(sample_block)},
|
||||
column_names_,
|
||||
query_info_,
|
||||
storage_snapshot_,
|
||||
context_)
|
||||
, columns_mask(std::move(columns_mask_))
|
||||
, max_block_size(max_block_size_)
|
||||
{
|
||||
}
|
||||
|
||||
void applyFilters(ActionDAGNodes added_filter_nodes) override;
|
||||
|
||||
private:
|
||||
std::vector<UInt8> columns_mask;
|
||||
size_t max_block_size;
|
||||
|
||||
ColumnPtr filtered_databases_column;
|
||||
ColumnPtr filtered_tables_column;
|
||||
};
|
||||
|
||||
void StorageSystemTables::read(
|
||||
QueryPlan & query_plan,
|
||||
const Names & column_names,
|
||||
@ -743,28 +126,8 @@ void StorageSystemTables::read(
|
||||
auto [columns_mask, res_block] = getQueriedColumnsMaskAndHeader(sample_block, column_names);
|
||||
|
||||
auto reading = std::make_unique<ReadFromSystemTables>(
|
||||
column_names, query_info, storage_snapshot, context, std::move(res_block), std::move(columns_mask), max_block_size);
|
||||
column_names, query_info, storage_snapshot, context, std::move(res_block), std::move(columns_mask), max_block_size, false);
|
||||
|
||||
query_plan.addStep(std::move(reading));
|
||||
}
|
||||
|
||||
void ReadFromSystemTables::applyFilters(ActionDAGNodes added_filter_nodes)
|
||||
{
|
||||
SourceStepWithFilter::applyFilters(std::move(added_filter_nodes));
|
||||
|
||||
const ActionsDAG::Node * predicate = nullptr;
|
||||
if (filter_actions_dag)
|
||||
predicate = filter_actions_dag->getOutputs().at(0);
|
||||
|
||||
filtered_databases_column = getFilteredDatabases(predicate, context);
|
||||
filtered_tables_column = getFilteredTables(predicate, filtered_databases_column, context);
|
||||
}
|
||||
|
||||
void ReadFromSystemTables::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
|
||||
{
|
||||
Pipe pipe(std::make_shared<TablesBlockSource>(
|
||||
std::move(columns_mask), getOutputStream().header, max_block_size, std::move(filtered_databases_column), std::move(filtered_tables_column), context));
|
||||
pipeline.init(std::move(pipe));
|
||||
}
|
||||
|
||||
}
|
||||
|
569
src/Storages/System/TablesBlockSource.cpp
Normal file
569
src/Storages/System/TablesBlockSource.cpp
Normal file
@ -0,0 +1,569 @@
|
||||
#include "TablesBlockSource.h"
|
||||
|
||||
#include <memory>
|
||||
|
||||
#include <Access/ContextAccess.h>
|
||||
#include <Disks/IStoragePolicy.h>
|
||||
#include <Interpreters/DatabaseCatalog.h>
|
||||
#include <Interpreters/formatWithPossiblyHidingSecrets.h>
|
||||
#include <Parsers/ASTSelectWithUnionQuery.h>
|
||||
#include <Storages/IStorage.h>
|
||||
#include <Storages/MergeTree/MergeTreeData.h>
|
||||
#include <Common/logger_useful.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace
|
||||
{
|
||||
|
||||
/// Avoid heavy operation on tables if we only queried columns that we can get without table object.
|
||||
/// Otherwise it will require table initialization for Lazy database.
|
||||
bool needTable(const DatabasePtr & database, const Block & header)
|
||||
{
|
||||
if (database->getEngineName() != "Lazy")
|
||||
return true;
|
||||
|
||||
static const std::set<std::string> columns_without_table = { "database", "name", "uuid", "metadata_modification_time" };
|
||||
for (const auto & column : header.getColumnsWithTypeAndName())
|
||||
{
|
||||
if (columns_without_table.find(column.name) == columns_without_table.end())
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Chunk TablesBlockSource::generate()
|
||||
{
|
||||
if (done)
|
||||
return {};
|
||||
|
||||
MutableColumns res_columns = getPort().getHeader().cloneEmptyColumns();
|
||||
|
||||
const auto access = context->getAccess();
|
||||
const bool need_to_check_access_for_databases = !access->isGranted(AccessType::SHOW_TABLES);
|
||||
|
||||
size_t rows_count = 0;
|
||||
while (rows_count < max_block_size)
|
||||
{
|
||||
if (tables_it && !tables_it->isValid())
|
||||
++database_idx;
|
||||
|
||||
while (database_idx < databases->size() && (!tables_it || !tables_it->isValid()))
|
||||
{
|
||||
database_name = databases->getDataAt(database_idx).toString();
|
||||
database = DatabaseCatalog::instance().tryGetDatabase(database_name);
|
||||
|
||||
if (!database)
|
||||
{
|
||||
/// Database was deleted just now or the user has no access.
|
||||
++database_idx;
|
||||
continue;
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
/// This is for temporary tables. They are output in single block regardless to max_block_size.
|
||||
if (database_idx >= databases->size())
|
||||
{
|
||||
if (context->hasSessionContext())
|
||||
{
|
||||
Tables external_tables = context->getSessionContext()->getExternalTables();
|
||||
|
||||
for (auto & table : external_tables)
|
||||
{
|
||||
size_t src_index = 0;
|
||||
size_t res_index = 0;
|
||||
|
||||
// database
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insertDefault();
|
||||
|
||||
// name
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insert(table.first);
|
||||
|
||||
// uuid
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insert(table.second->getStorageID().uuid);
|
||||
|
||||
// engine
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insert(table.second->getName());
|
||||
|
||||
// is_temporary
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insert(1u);
|
||||
|
||||
// data_paths
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insertDefault();
|
||||
|
||||
// metadata_path
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insertDefault();
|
||||
|
||||
// metadata_modification_time
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insertDefault();
|
||||
|
||||
// metadata_version
|
||||
// Temporary tables does not support replication
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insertDefault();
|
||||
|
||||
// dependencies_database
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insertDefault();
|
||||
|
||||
// dependencies_table
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insertDefault();
|
||||
|
||||
// create_table_query
|
||||
if (columns_mask[src_index++])
|
||||
{
|
||||
auto temp_db = DatabaseCatalog::instance().getDatabaseForTemporaryTables();
|
||||
ASTPtr ast
|
||||
= temp_db ? temp_db->tryGetCreateTableQuery(table.second->getStorageID().getTableName(), context) : nullptr;
|
||||
res_columns[res_index++]->insert(ast ? format({context, *ast}) : "");
|
||||
}
|
||||
|
||||
// engine_full
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insert(table.second->getName());
|
||||
|
||||
const auto & settings = context->getSettingsRef();
|
||||
while (src_index < columns_mask.size())
|
||||
{
|
||||
// total_rows
|
||||
if (src_index == 19 && columns_mask[src_index])
|
||||
{
|
||||
if (auto total_rows = table.second->totalRows(settings))
|
||||
res_columns[res_index++]->insert(*total_rows);
|
||||
else
|
||||
res_columns[res_index++]->insertDefault();
|
||||
}
|
||||
// total_bytes
|
||||
else if (src_index == 20 && columns_mask[src_index])
|
||||
{
|
||||
if (auto total_bytes = table.second->totalBytes(settings))
|
||||
res_columns[res_index++]->insert(*total_bytes);
|
||||
else
|
||||
res_columns[res_index++]->insertDefault();
|
||||
}
|
||||
/// Fill the rest columns with defaults
|
||||
else if (columns_mask[src_index])
|
||||
res_columns[res_index++]->insertDefault();
|
||||
src_index++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
UInt64 num_rows = res_columns.at(0)->size();
|
||||
done = true;
|
||||
return Chunk(std::move(res_columns), num_rows);
|
||||
}
|
||||
|
||||
const bool need_to_check_access_for_tables
|
||||
= need_to_check_access_for_databases && !access->isGranted(AccessType::SHOW_TABLES, database_name);
|
||||
|
||||
if (!tables_it || !tables_it->isValid())
|
||||
tables_it = database->getTablesIterator(context);
|
||||
|
||||
const bool need_table = needTable(database, getPort().getHeader());
|
||||
|
||||
for (; rows_count < max_block_size && tables_it->isValid(); tables_it->next())
|
||||
{
|
||||
auto table_name = tables_it->name();
|
||||
if (!tables.contains(table_name))
|
||||
continue;
|
||||
|
||||
if (need_to_check_access_for_tables && !access->isGranted(AccessType::SHOW_TABLES, database_name, table_name))
|
||||
continue;
|
||||
|
||||
StoragePtr table = nullptr;
|
||||
TableLockHolder lock;
|
||||
if (need_table)
|
||||
{
|
||||
table = tables_it->table();
|
||||
if (!table)
|
||||
// Table might have just been removed or detached for Lazy engine (see DatabaseLazy::tryGetTable())
|
||||
continue;
|
||||
|
||||
/// The only column that requires us to hold a shared lock is data_paths as rename might alter them (on ordinary tables)
|
||||
/// and it's not protected internally by other mutexes
|
||||
static const size_t DATA_PATHS_INDEX = 5;
|
||||
if (columns_mask[DATA_PATHS_INDEX])
|
||||
{
|
||||
lock = table->tryLockForShare(context->getCurrentQueryId(), context->getSettingsRef().lock_acquire_timeout);
|
||||
if (!lock)
|
||||
// Table was dropped while acquiring the lock, skipping table
|
||||
continue;
|
||||
}
|
||||
}
|
||||
++rows_count;
|
||||
|
||||
size_t src_index = 0;
|
||||
size_t res_index = 0;
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insert(database_name);
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insert(table_name);
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insert(tables_it->uuid());
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
{
|
||||
chassert(table != nullptr);
|
||||
res_columns[res_index++]->insert(table->getName());
|
||||
}
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insert(0u); // is_temporary
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
{
|
||||
chassert(lock != nullptr);
|
||||
Array table_paths_array;
|
||||
auto paths = table->getDataPaths();
|
||||
table_paths_array.reserve(paths.size());
|
||||
for (const String & path : paths)
|
||||
table_paths_array.push_back(path);
|
||||
res_columns[res_index++]->insert(table_paths_array);
|
||||
/// We don't need the lock anymore
|
||||
lock = nullptr;
|
||||
}
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insert(database->getObjectMetadataPath(table_name));
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insert(static_cast<UInt64>(database->getObjectMetadataModificationTime(table_name)));
|
||||
|
||||
StorageMetadataPtr metadata_snapshot;
|
||||
if (table)
|
||||
metadata_snapshot = table->getInMemoryMetadataPtr();
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
{
|
||||
if (metadata_snapshot && table->supportsReplication())
|
||||
res_columns[res_index++]->insert(metadata_snapshot->metadata_version);
|
||||
else
|
||||
res_columns[res_index++]->insertDefault();
|
||||
}
|
||||
|
||||
{
|
||||
Array views_table_name_array;
|
||||
Array views_database_name_array;
|
||||
if (columns_mask[src_index] || columns_mask[src_index + 1])
|
||||
{
|
||||
const auto view_ids = DatabaseCatalog::instance().getDependentViews(StorageID(database_name, table_name));
|
||||
|
||||
views_table_name_array.reserve(view_ids.size());
|
||||
views_database_name_array.reserve(view_ids.size());
|
||||
for (const auto & view_id : view_ids)
|
||||
{
|
||||
views_table_name_array.push_back(view_id.table_name);
|
||||
views_database_name_array.push_back(view_id.database_name);
|
||||
}
|
||||
}
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insert(views_database_name_array);
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insert(views_table_name_array);
|
||||
}
|
||||
|
||||
if (columns_mask[src_index] || columns_mask[src_index + 1] || columns_mask[src_index + 2])
|
||||
{
|
||||
ASTPtr ast = database->tryGetCreateTableQuery(table_name, context);
|
||||
auto * ast_create = ast ? ast->as<ASTCreateQuery>() : nullptr;
|
||||
|
||||
if (ast_create && !context->getSettingsRef().show_table_uuid_in_table_create_query_if_not_nil)
|
||||
{
|
||||
ast_create->uuid = UUIDHelpers::Nil;
|
||||
ast_create->to_inner_uuid = UUIDHelpers::Nil;
|
||||
}
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insert(ast ? format({context, *ast}) : "");
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
{
|
||||
String engine_full;
|
||||
|
||||
if (ast_create && ast_create->storage)
|
||||
{
|
||||
engine_full = format({context, *ast_create->storage});
|
||||
|
||||
static const char * const extra_head = " ENGINE = ";
|
||||
if (startsWith(engine_full, extra_head))
|
||||
engine_full = engine_full.substr(strlen(extra_head));
|
||||
}
|
||||
|
||||
res_columns[res_index++]->insert(engine_full);
|
||||
}
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
{
|
||||
String as_select;
|
||||
if (ast_create && ast_create->select)
|
||||
as_select = format({context, *ast_create->select});
|
||||
res_columns[res_index++]->insert(as_select);
|
||||
}
|
||||
}
|
||||
else
|
||||
src_index += 3;
|
||||
|
||||
ASTPtr expression_ptr;
|
||||
if (columns_mask[src_index++])
|
||||
{
|
||||
if (metadata_snapshot && (expression_ptr = metadata_snapshot->getPartitionKeyAST()))
|
||||
res_columns[res_index++]->insert(format({context, *expression_ptr}));
|
||||
else
|
||||
res_columns[res_index++]->insertDefault();
|
||||
}
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
{
|
||||
if (metadata_snapshot && (expression_ptr = metadata_snapshot->getSortingKey().expression_list_ast))
|
||||
res_columns[res_index++]->insert(format({context, *expression_ptr}));
|
||||
else
|
||||
res_columns[res_index++]->insertDefault();
|
||||
}
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
{
|
||||
if (metadata_snapshot && (expression_ptr = metadata_snapshot->getPrimaryKey().expression_list_ast))
|
||||
res_columns[res_index++]->insert(format({context, *expression_ptr}));
|
||||
else
|
||||
res_columns[res_index++]->insertDefault();
|
||||
}
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
{
|
||||
if (metadata_snapshot && (expression_ptr = metadata_snapshot->getSamplingKeyAST()))
|
||||
res_columns[res_index++]->insert(format({context, *expression_ptr}));
|
||||
else
|
||||
res_columns[res_index++]->insertDefault();
|
||||
}
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
{
|
||||
auto policy = table ? table->getStoragePolicy() : nullptr;
|
||||
if (policy)
|
||||
res_columns[res_index++]->insert(policy->getName());
|
||||
else
|
||||
res_columns[res_index++]->insertDefault();
|
||||
}
|
||||
|
||||
auto settings = context->getSettingsRef();
|
||||
settings.select_sequential_consistency = 0;
|
||||
if (columns_mask[src_index++])
|
||||
{
|
||||
auto total_rows = table ? table->totalRows(settings) : std::nullopt;
|
||||
if (total_rows)
|
||||
res_columns[res_index++]->insert(*total_rows);
|
||||
else
|
||||
res_columns[res_index++]->insertDefault();
|
||||
}
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
{
|
||||
auto total_bytes = table->totalBytes(settings);
|
||||
if (total_bytes)
|
||||
res_columns[res_index++]->insert(*total_bytes);
|
||||
else
|
||||
res_columns[res_index++]->insertDefault();
|
||||
}
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
{
|
||||
auto total_bytes_uncompressed = table->totalBytesUncompressed(settings);
|
||||
if (total_bytes_uncompressed)
|
||||
res_columns[res_index++]->insert(*total_bytes_uncompressed);
|
||||
else
|
||||
res_columns[res_index++]->insertDefault();
|
||||
}
|
||||
|
||||
auto table_merge_tree = std::dynamic_pointer_cast<MergeTreeData>(table);
|
||||
if (columns_mask[src_index++])
|
||||
{
|
||||
if (table_merge_tree)
|
||||
res_columns[res_index++]->insert(table_merge_tree->getAllPartsCount());
|
||||
else
|
||||
res_columns[res_index++]->insertDefault();
|
||||
}
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
{
|
||||
if (table_merge_tree)
|
||||
res_columns[res_index++]->insert(table_merge_tree->getActivePartsCount());
|
||||
else
|
||||
res_columns[res_index++]->insertDefault();
|
||||
}
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
{
|
||||
if (table_merge_tree)
|
||||
{
|
||||
res_columns[res_index++]->insert(table_merge_tree->getTotalMarksCount());
|
||||
}
|
||||
else
|
||||
res_columns[res_index++]->insertDefault();
|
||||
}
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
{
|
||||
auto lifetime_rows = table ? table->lifetimeRows() : std::nullopt;
|
||||
if (lifetime_rows)
|
||||
res_columns[res_index++]->insert(*lifetime_rows);
|
||||
else
|
||||
res_columns[res_index++]->insertDefault();
|
||||
}
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
{
|
||||
auto lifetime_bytes = table ? table->lifetimeBytes() : std::nullopt;
|
||||
if (lifetime_bytes)
|
||||
res_columns[res_index++]->insert(*lifetime_bytes);
|
||||
else
|
||||
res_columns[res_index++]->insertDefault();
|
||||
}
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
{
|
||||
if (metadata_snapshot)
|
||||
res_columns[res_index++]->insert(metadata_snapshot->comment);
|
||||
else
|
||||
res_columns[res_index++]->insertDefault();
|
||||
}
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
{
|
||||
if (table)
|
||||
res_columns[res_index++]->insert(table->storesDataOnDisk());
|
||||
else
|
||||
res_columns[res_index++]->insertDefault();
|
||||
}
|
||||
|
||||
if (columns_mask[src_index] || columns_mask[src_index + 1] || columns_mask[src_index + 2] || columns_mask[src_index + 3])
|
||||
{
|
||||
auto dependencies = DatabaseCatalog::instance().getLoadingDependencies(StorageID{database_name, table_name});
|
||||
auto dependents = DatabaseCatalog::instance().getLoadingDependents(StorageID{database_name, table_name});
|
||||
|
||||
Array dependencies_databases;
|
||||
Array dependencies_tables;
|
||||
dependencies_databases.reserve(dependencies.size());
|
||||
dependencies_tables.reserve(dependencies.size());
|
||||
for (const auto & dependency : dependencies)
|
||||
{
|
||||
dependencies_databases.push_back(dependency.database_name);
|
||||
dependencies_tables.push_back(dependency.table_name);
|
||||
}
|
||||
|
||||
Array dependents_databases;
|
||||
Array dependents_tables;
|
||||
dependents_databases.reserve(dependents.size());
|
||||
dependents_tables.reserve(dependents.size());
|
||||
for (const auto & dependent : dependents)
|
||||
{
|
||||
dependents_databases.push_back(dependent.database_name);
|
||||
dependents_tables.push_back(dependent.table_name);
|
||||
}
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insert(dependencies_databases);
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insert(dependencies_tables);
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insert(dependents_databases);
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insert(dependents_tables);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
UInt64 num_rows = res_columns.at(0)->size();
|
||||
return Chunk(std::move(res_columns), num_rows);
|
||||
}
|
||||
|
||||
Chunk DetachedTablesBlockSource::generate()
|
||||
{
|
||||
if (done)
|
||||
return {};
|
||||
|
||||
MutableColumns result_columns = getPort().getHeader().cloneEmptyColumns();
|
||||
|
||||
const auto access = context->getAccess();
|
||||
const bool need_to_check_access_for_databases = !access->isGranted(AccessType::SHOW_TABLES);
|
||||
|
||||
for (size_t database_idx = 0, rows_count = 0; database_idx < databases->size() && rows_count < max_block_size; ++database_idx)
|
||||
{
|
||||
database_name = databases->getDataAt(database_idx).toString();
|
||||
database = DatabaseCatalog::instance().tryGetDatabase(database_name);
|
||||
|
||||
if (!database)
|
||||
{
|
||||
LOG_DEBUG(lg, "Database was deleted just now or the user has no access");
|
||||
continue;
|
||||
}
|
||||
|
||||
const bool need_to_check_access_for_tables
|
||||
= need_to_check_access_for_databases && !access->isGranted(AccessType::SHOW_TABLES, database_name);
|
||||
|
||||
if (!detached_tables_it || !detached_tables_it->isValid())
|
||||
detached_tables_it = database->getDetachedTablesIterator(context, {}, false);
|
||||
|
||||
for (; rows_count < max_block_size && detached_tables_it->isValid(); detached_tables_it->next())
|
||||
{
|
||||
const auto detached_table_name = detached_tables_it->table();
|
||||
LOG_DEBUG(lg, "detached_table_name={}", detached_table_name);
|
||||
|
||||
if (!detached_tables.contains(detached_table_name))
|
||||
continue;
|
||||
|
||||
if (need_to_check_access_for_tables && !access->isGranted(AccessType::SHOW_TABLES, database_name, detached_table_name))
|
||||
continue;
|
||||
|
||||
fillResultColumnsByDetachedTableIterator(result_columns);
|
||||
++rows_count;
|
||||
}
|
||||
}
|
||||
|
||||
const UInt64 num_rows = result_columns.at(0)->size();
|
||||
done = true;
|
||||
return Chunk(std::move(result_columns), num_rows);
|
||||
}
|
||||
|
||||
void DetachedTablesBlockSource::fillResultColumnsByDetachedTableIterator(MutableColumns & result_columns) const
|
||||
{
|
||||
size_t src_index = 0;
|
||||
size_t res_index = 0;
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
result_columns[res_index++]->insert(detached_tables_it->database());
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
result_columns[res_index++]->insert(detached_tables_it->table());
|
||||
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
result_columns[res_index++]->insert(detached_tables_it->uuid());
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
result_columns[res_index++]->insert(detached_tables_it->metadataPath());
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
result_columns[res_index++]->insert(detached_tables_it->isPermanently());
|
||||
}
|
||||
}
|
96
src/Storages/System/TablesBlockSource.h
Normal file
96
src/Storages/System/TablesBlockSource.h
Normal file
@ -0,0 +1,96 @@
|
||||
#pragma once
|
||||
|
||||
#include <Databases/IDatabase.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Processors/ISource.h>
|
||||
#include <Common/logger_useful.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class TablesBlockSource : public ISource
|
||||
{
|
||||
public:
|
||||
TablesBlockSource(
|
||||
std::vector<UInt8> columns_mask_,
|
||||
Block header,
|
||||
UInt64 max_block_size_,
|
||||
ColumnPtr databases_,
|
||||
ColumnPtr tables_,
|
||||
ContextPtr context_)
|
||||
: ISource(std::move(header))
|
||||
, columns_mask(std::move(columns_mask_))
|
||||
, max_block_size(max_block_size_)
|
||||
, databases(std::move(databases_))
|
||||
, context(Context::createCopy(context_))
|
||||
{
|
||||
size_t size = tables_->size();
|
||||
tables.reserve(size);
|
||||
for (size_t idx = 0; idx < size; ++idx)
|
||||
tables.insert(tables_->getDataAt(idx).toString());
|
||||
}
|
||||
|
||||
String getName() const override { return "Tables"; }
|
||||
|
||||
protected:
|
||||
Chunk generate() override;
|
||||
|
||||
private:
|
||||
std::vector<UInt8> columns_mask;
|
||||
UInt64 max_block_size;
|
||||
ColumnPtr databases;
|
||||
NameSet tables;
|
||||
size_t database_idx = 0;
|
||||
DatabaseTablesIteratorPtr tables_it;
|
||||
ContextPtr context;
|
||||
bool done = false;
|
||||
DatabasePtr database;
|
||||
std::string database_name;
|
||||
};
|
||||
|
||||
class DetachedTablesBlockSource : public ISource
|
||||
{
|
||||
public:
|
||||
DetachedTablesBlockSource(
|
||||
std::vector<UInt8> columns_mask_,
|
||||
Block header,
|
||||
UInt64 max_block_size_,
|
||||
ColumnPtr databases_,
|
||||
ColumnPtr detached_tables_,
|
||||
ContextPtr context_)
|
||||
: ISource(std::move(header))
|
||||
, columns_mask(std::move(columns_mask_))
|
||||
, max_block_size(max_block_size_)
|
||||
, databases(std::move(databases_))
|
||||
, context(Context::createCopy(context_))
|
||||
{
|
||||
size_t size = detached_tables_->size();
|
||||
detached_tables.reserve(size);
|
||||
for (size_t idx = 0; idx < size; ++idx)
|
||||
{
|
||||
detached_tables.insert(detached_tables_->getDataAt(idx).toString());
|
||||
}
|
||||
}
|
||||
|
||||
String getName() const override { return "DetachedTables"; }
|
||||
|
||||
protected:
|
||||
Chunk generate() override;
|
||||
|
||||
private:
|
||||
const std::vector<UInt8> columns_mask;
|
||||
const UInt64 max_block_size;
|
||||
const ColumnPtr databases;
|
||||
NameSet detached_tables;
|
||||
DatabaseDetachedTablesSnapshotIteratorPtr detached_tables_it;
|
||||
ContextPtr context;
|
||||
bool done = false;
|
||||
DatabasePtr database;
|
||||
std::string database_name;
|
||||
|
||||
// temp log for debug
|
||||
LoggerPtr lg = getLogger("DetachedTablesBlockSource");
|
||||
|
||||
void fillResultColumnsByDetachedTableIterator(MutableColumns & result_columns) const;
|
||||
};
|
||||
}
|
@ -18,6 +18,7 @@
|
||||
#include <Storages/System/StorageSystemDataSkippingIndices.h>
|
||||
#include <Storages/System/StorageSystemDataTypeFamilies.h>
|
||||
#include <Storages/System/StorageSystemDetachedParts.h>
|
||||
#include <Storages/System/StorageSystemDetachedTables.h>
|
||||
#include <Storages/System/StorageSystemDictionaries.h>
|
||||
#include <Storages/System/StorageSystemEvents.h>
|
||||
#include <Storages/System/StorageSystemFormats.h>
|
||||
@ -129,6 +130,7 @@ void attachSystemTablesServer(ContextPtr context, IDatabase & system_database, b
|
||||
attachNoDescription<StorageSystemZeros>(context, system_database, "zeros_mt", "Multithreaded version of system.zeros.", true);
|
||||
attach<StorageSystemDatabases>(context, system_database, "databases", "Lists all databases of the current server.");
|
||||
attachNoDescription<StorageSystemTables>(context, system_database, "tables", "Lists all tables of the current server.");
|
||||
attachNoDescription<StorageSystemDetachedTables>(context, system_database, "detached_tables", "Lists all detached tables of the current server.");
|
||||
attachNoDescription<StorageSystemColumns>(context, system_database, "columns", "Lists all columns from all tables of the current server.");
|
||||
attach<StorageSystemFunctions>(context, system_database, "functions", "Contains a list of all available ordinary and aggregate functions with their descriptions.");
|
||||
attach<StorageSystemEvents>(context, system_database, "events", "Contains profiling events and their current value.");
|
||||
|
38
tests/integration/test_system_detached_tables/test.py
Normal file
38
tests/integration/test_system_detached_tables/test.py
Normal file
@ -0,0 +1,38 @@
|
||||
import pytest
|
||||
from helpers.cluster import ClickHouseCluster
|
||||
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
node = cluster.add_instance("node_default", stay_alive=True)
|
||||
|
||||
|
||||
@pytest.fixture(scope="module", autouse=True)
|
||||
def start_cluster():
|
||||
try:
|
||||
cluster.start()
|
||||
yield cluster
|
||||
finally:
|
||||
cluster.shutdown()
|
||||
|
||||
|
||||
def test_system_detached_tables():
|
||||
node.query("CREATE TABLE test_table (n Int64) ENGINE=MergeTree ORDER BY n;")
|
||||
node.query("CREATE TABLE test_table_perm (n Int64) ENGINE=MergeTree ORDER BY n;")
|
||||
|
||||
result = node.query("SELECT * FROM system.detached_tables")
|
||||
assert result == ""
|
||||
|
||||
node.query("DETACH TABLE test_table")
|
||||
node.query("DETACH TABLE test_table_perm PERMANENTLY")
|
||||
|
||||
result = node.query("SELECT name FROM system.detached_tables")
|
||||
assert result == "test_table\ntest_table_perm\n"
|
||||
|
||||
node.restart_clickhouse()
|
||||
|
||||
result = node.query("SELECT name FROM system.detached_tables")
|
||||
assert result == "test_table_perm\n"
|
||||
|
||||
node.restart_clickhouse()
|
||||
|
||||
result = node.query("SELECT name FROM system.detached_tables")
|
||||
assert result == "test_table_perm\n"
|
@ -0,0 +1,5 @@
|
||||
test_system_detached_tables test_table 0
|
||||
test_system_detached_tables test_table_perm 1
|
||||
test_system_detached_tables test_table 0
|
||||
test_system_detached_tables test_table_perm 1
|
||||
test_system_detached_tables test_table 0
|
28
tests/queries/0_stateless/03172_system_detached_tables.sql
Normal file
28
tests/queries/0_stateless/03172_system_detached_tables.sql
Normal file
@ -0,0 +1,28 @@
|
||||
-- Tags: no-parallel
|
||||
|
||||
DROP DATABASE IF EXISTS test_system_detached_tables;
|
||||
CREATE DATABASE IF NOT EXISTS test_system_detached_tables;
|
||||
|
||||
CREATE TABLE test_system_detached_tables.test_table (n Int64) ENGINE=MergeTree ORDER BY n;
|
||||
SELECT * FROM system.detached_tables;
|
||||
|
||||
DETACH TABLE test_system_detached_tables.test_table;
|
||||
SELECT database, name, is_permanently FROM system.detached_tables;
|
||||
|
||||
ATTACH TABLE test_system_detached_tables.test_table;
|
||||
|
||||
CREATE TABLE test_system_detached_tables.test_table_perm (n Int64) ENGINE=MergeTree ORDER BY n;
|
||||
SELECT * FROM system.detached_tables;
|
||||
|
||||
DETACH TABLE test_system_detached_tables.test_table_perm PERMANENTLY;
|
||||
SELECT database, name, is_permanently FROM system.detached_tables;
|
||||
|
||||
DETACH TABLE test_system_detached_tables.test_table SYNC;
|
||||
SELECT database, name, is_permanently FROM system.detached_tables;
|
||||
|
||||
SELECT database, name, is_permanently FROM system.detached_tables WHERE name='test_table';
|
||||
|
||||
DROP DATABASE test_system_detached_tables;
|
||||
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user