Merge pull request #9919 from azat/system.tables

system.tables improvements (total_rows/total_bytes/storage_policy)
This commit is contained in:
alexey-milovidov 2020-03-30 14:21:49 +03:00 committed by GitHub
commit 9436a136b6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 226 additions and 15 deletions

View File

@ -1064,7 +1064,7 @@ void InterpreterSelectQuery::executeFetchColumns(
auto check_trivial_count_query = [&]() -> std::optional<AggregateDescription>
{
if (!settings.optimize_trivial_count_query || !syntax_analyzer_result->maybe_optimize_trivial_count || !storage
|| query.sampleSize() || query.sampleOffset() || query.final() || query.prewhere() || query.where()
|| query.sampleSize() || query.sampleOffset() || query.final() || query.prewhere() || query.where() || query.groupBy()
|| !query_analyzer->hasAggregation() || processing_stage != QueryProcessingStage::FetchColumns)
return {};

View File

@ -459,13 +459,31 @@ public:
/// Returns storage policy if storage supports it
virtual StoragePolicyPtr getStoragePolicy() const { return {}; }
/** If it is possible to quickly determine exact number of rows in the table at this moment of time, then return it.
*/
/// If it is possible to quickly determine exact number of rows in the table at this moment of time, then return it.
/// Used for:
/// - Simple count() opimization
/// - For total_rows column in system.tables
///
/// Does takes underlying Storage (if any) into account.
virtual std::optional<UInt64> totalRows() const
{
return {};
}
/// If it is possible to quickly determine exact number of bytes for the table on storage:
/// - memory (approximated)
/// - disk (compressed)
///
/// Used for:
/// - For total_bytes column in system.tables
//
/// Does not takes underlying Storage (if any) into account
/// (since for Buffer we still need to know how much bytes it uses).
virtual std::optional<UInt64> totalBytes() const
{
return {};
}
private:
/// You always need to take the next three locks in this order.

View File

@ -725,6 +725,35 @@ void StorageBuffer::checkAlterIsPossible(const AlterCommands & commands, const S
}
}
std::optional<UInt64> StorageBuffer::totalRows() const
{
std::optional<UInt64> underlying_rows;
auto underlying = DatabaseCatalog::instance().tryGetTable(destination_id);
if (underlying)
underlying_rows = underlying->totalRows();
if (!underlying_rows)
return underlying_rows;
UInt64 rows = 0;
for (auto & buffer : buffers)
{
std::lock_guard lock(buffer.mutex);
rows += buffer.data.rows();
}
return rows + *underlying_rows;
}
std::optional<UInt64> StorageBuffer::totalBytes() const
{
UInt64 bytes = 0;
for (auto & buffer : buffers)
{
std::lock_guard lock(buffer.mutex);
bytes += buffer.data.bytes();
}
return bytes;
}
void StorageBuffer::alter(const AlterCommands & params, const Context & context, TableStructureWriteLockHolder & table_lock_holder)
{

View File

@ -88,8 +88,11 @@ public:
void checkAlterIsPossible(const AlterCommands & commands, const Settings & /* settings */) override;
/// The structure of the subordinate table is not checked and does not change.
void alter(const AlterCommands & params, const Context & context, TableStructureWriteLockHolder & table_lock_holder) override;
/// The structure of the subordinate table is not checked and does not change.
void alter(const AlterCommands & params, const Context & context, TableStructureWriteLockHolder & table_lock_holder) override;
std::optional<UInt64> totalRows() const override;
std::optional<UInt64> totalBytes() const override;
~StorageBuffer() override;
@ -100,7 +103,7 @@ private:
{
time_t first_write_time = 0;
Block data;
std::mutex mutex;
mutable std::mutex mutex;
};
/// There are `num_shards` of independent buffers.

View File

@ -530,6 +530,13 @@ namespace
}
StoragePolicyPtr StorageDistributed::getStoragePolicy() const
{
if (storage_policy.empty())
return {};
return global_context.getStoragePolicySelector()->get(storage_policy);
}
NameAndTypePair StorageDistributed::getColumn(const String & column_name) const
{
if (getColumns().hasPhysical(column_name))

View File

@ -60,6 +60,7 @@ public:
bool supportsSampling() const override { return true; }
bool supportsFinal() const override { return true; }
bool supportsPrewhere() const override { return true; }
StoragePolicyPtr getStoragePolicy() const override;
NameAndTypePair getColumn(const String & column_name) const override;
bool hasColumn(const String & column_name) const override;

View File

@ -136,6 +136,24 @@ void StorageMemory::truncate(const ASTPtr &, const Context &, TableStructureWrit
data.clear();
}
std::optional<UInt64> StorageMemory::totalRows() const
{
UInt64 rows = 0;
std::lock_guard lock(mutex);
for (auto & buffer : data)
rows += buffer.rows();
return rows;
}
std::optional<UInt64> StorageMemory::totalBytes() const
{
UInt64 bytes = 0;
std::lock_guard lock(mutex);
for (auto & buffer : data)
bytes += buffer.bytes();
return bytes;
}
void registerStorageMemory(StorageFactory & factory)
{

View File

@ -42,11 +42,14 @@ public:
void truncate(const ASTPtr &, const Context &, TableStructureWriteLockHolder &) override;
std::optional<UInt64> totalRows() const override;
std::optional<UInt64> totalBytes() const override;
private:
/// The data itself. `list` - so that when inserted to the end, the existing iterators are not invalidated.
BlocksList data;
std::mutex mutex;
mutable std::mutex mutex;
protected:
StorageMemory(const StorageID & table_id_, ColumnsDescription columns_description_, ConstraintsDescription constraints_);

View File

@ -151,6 +151,11 @@ std::optional<UInt64> StorageMergeTree::totalRows() const
return getTotalActiveSizeInRows();
}
std::optional<UInt64> StorageMergeTree::totalBytes() const
{
return getTotalActiveSizeInBytes();
}
BlockOutputStreamPtr StorageMergeTree::write(const ASTPtr & /*query*/, const Context & context)
{
return std::make_shared<MergeTreeBlockOutputStream>(*this, context.getSettingsRef().max_partitions_per_insert_block);

View File

@ -46,6 +46,7 @@ public:
unsigned num_streams) override;
std::optional<UInt64> totalRows() const override;
std::optional<UInt64> totalBytes() const override;
BlockOutputStreamPtr write(const ASTPtr & query, const Context & context) override;

View File

@ -44,6 +44,15 @@ public:
void alter(const AlterCommands & params, const Context & context, TableStructureWriteLockHolder & table_lock_holder) override;
std::optional<UInt64> totalRows() const override
{
return {0};
}
std::optional<UInt64> totalBytes() const override
{
return {0};
}
private:
protected:

View File

@ -2995,9 +2995,9 @@ Pipes StorageReplicatedMergeTree::read(
}
std::optional<UInt64> StorageReplicatedMergeTree::totalRows() const
template <class Func>
void StorageReplicatedMergeTree::foreachCommittedParts(const Func & func) const
{
size_t res = 0;
auto max_added_blocks = getMaxAddedBlocks();
auto lock = lockParts();
for (auto & part : getDataPartsStateRange(DataPartState::Committed))
@ -3009,8 +3009,21 @@ std::optional<UInt64> StorageReplicatedMergeTree::totalRows() const
if (blocks_iterator == max_added_blocks.end() || part->info.max_block > blocks_iterator->second)
continue;
res += part->rows_count;
func(part);
}
}
std::optional<UInt64> StorageReplicatedMergeTree::totalRows() const
{
UInt64 res = 0;
foreachCommittedParts([&res](auto & part) { res += part->rows_count; });
return res;
}
std::optional<UInt64> StorageReplicatedMergeTree::totalBytes() const
{
UInt64 res = 0;
foreachCommittedParts([&res](auto & part) { res += part->getBytesOnDisk(); });
return res;
}

View File

@ -96,6 +96,7 @@ public:
unsigned num_streams) override;
std::optional<UInt64> totalRows() const override;
std::optional<UInt64> totalBytes() const override;
BlockOutputStreamPtr write(const ASTPtr & query, const Context & context) override;
@ -287,6 +288,9 @@ private:
/// True if replica was created for existing table with fixed granularity
bool other_replicas_fixed_granularity = false;
template <class Func>
void foreachCommittedParts(const Func & func) const;
/** Creates the minimum set of nodes in ZooKeeper.
*/
void createTableIfNotExists();

View File

@ -2,6 +2,7 @@
#include <Columns/ColumnsNumber.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataStreams/OneBlockInputStream.h>
#include <Storages/System/StorageSystemTables.h>
#include <Storages/VirtualColumnUtils.h>
@ -49,6 +50,8 @@ StorageSystemTables::StorageSystemTables(const std::string & name_)
{"primary_key", std::make_shared<DataTypeString>()},
{"sampling_key", std::make_shared<DataTypeString>()},
{"storage_policy", std::make_shared<DataTypeString>()},
{"total_rows", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt64>())},
{"total_bytes", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt64>())},
}));
}
@ -141,51 +144,75 @@ protected:
size_t src_index = 0;
size_t res_index = 0;
// database
if (columns_mask[src_index++])
res_columns[res_index++]->insertDefault();
// name
if (columns_mask[src_index++])
res_columns[res_index++]->insert(table.first);
// engine
if (columns_mask[src_index++])
res_columns[res_index++]->insert(table.second->getName());
// is_temporary
if (columns_mask[src_index++])
res_columns[res_index++]->insert(1u);
// data_paths
if (columns_mask[src_index++])
res_columns[res_index++]->insertDefault();
// metadata_path
if (columns_mask[src_index++])
res_columns[res_index++]->insertDefault();
// metadata_modification_time
if (columns_mask[src_index++])
res_columns[res_index++]->insertDefault();
// dependencies_database
if (columns_mask[src_index++])
res_columns[res_index++]->insertDefault();
// dependencies_table
if (columns_mask[src_index++])
res_columns[res_index++]->insertDefault();
// create_table_query
if (columns_mask[src_index++])
res_columns[res_index++]->insertDefault();
// engine_full
if (columns_mask[src_index++])
res_columns[res_index++]->insert(table.second->getName());
// partition_key
if (columns_mask[src_index++])
res_columns[res_index++]->insertDefault();
// sorting_key
if (columns_mask[src_index++])
res_columns[res_index++]->insertDefault();
// primary_key
if (columns_mask[src_index++])
res_columns[res_index++]->insertDefault();
// sampling_key
if (columns_mask[src_index++])
res_columns[res_index++]->insertDefault();
// storage_policy
if (columns_mask[src_index++])
res_columns[res_index++]->insertDefault();
// total_rows
if (columns_mask[src_index++])
res_columns[res_index++]->insertDefault();
// total_bytes
if (columns_mask[src_index++])
res_columns[res_index++]->insertDefault();
}
@ -363,6 +390,26 @@ protected:
else
res_columns[res_index++]->insertDefault();
}
if (columns_mask[src_index++])
{
assert(table != nullptr);
auto total_rows = table->totalRows();
if (total_rows)
res_columns[res_index++]->insert(*total_rows);
else
res_columns[res_index++]->insertDefault();
}
if (columns_mask[src_index++])
{
assert(table != nullptr);
auto total_bytes = table->totalBytes();
if (total_bytes)
res_columns[res_index++]->insert(*total_bytes);
else
res_columns[res_index++]->insertDefault();
}
}
}

View File

@ -2,7 +2,7 @@
1
1
1
t_00693 Memory 1 [] 0000-00-00 00:00:00 [] [] Memory
t_00693 Memory 1 [] 0000-00-00 00:00:00 [] [] Memory \N \N
1
1
1

View File

@ -1,11 +1,12 @@
┌─name────────────────┬─partition_key─┬─sorting_key─┬─primary_key─┬─sampling_key─┐
│ check_system_tables │ name2 │ name1 │ name1 │ name1 │
└─────────────────────┴───────────────┴─────────────┴─────────────┴──────────────┘
┌─name────────────────┬─partition_key─┬─sorting_key─┬─primary_key─┬─sampling_key─┬─storage_policy─┬─total_rows─
│ check_system_tables │ name2 │ name1 │ name1 │ name1 │ default │ 0 │
└─────────────────────┴───────────────┴─────────────┴─────────────┴──────────────┴────────────────┴────────────
┌─name──┬─is_in_partition_key─┬─is_in_sorting_key─┬─is_in_primary_key─┬─is_in_sampling_key─┐
│ name1 │ 0 │ 1 │ 1 │ 1 │
│ name2 │ 1 │ 0 │ 0 │ 0 │
│ name3 │ 0 │ 0 │ 0 │ 0 │
└───────┴─────────────────────┴───────────────────┴───────────────────┴────────────────────┘
231 1
┌─name────────────────┬─partition_key─┬─sorting_key───┬─primary_key─┬─sampling_key─┐
│ check_system_tables │ date │ date, version │ date │ │
└─────────────────────┴───────────────┴───────────────┴─────────────┴──────────────┘
@ -23,3 +24,12 @@
│ UserId │ 0 │ 1 │ 1 │ 1 │
│ Counter │ 0 │ 1 │ 1 │ 0 │
└─────────┴─────────────────────┴───────────────────┴───────────────────┴────────────────────┘
Check total_bytes/total_rows for TinyLog
\N \N
\N \N
Check total_bytes/total_rows for Memory
0 0
2 1
Check total_bytes/total_rows for Buffer
0 0
100 50

View File

@ -11,7 +11,7 @@ CREATE TABLE check_system_tables
PARTITION BY name2
SAMPLE BY name1;
SELECT name, partition_key, sorting_key, primary_key, sampling_key
SELECT name, partition_key, sorting_key, primary_key, sampling_key, storage_policy, total_rows
FROM system.tables
WHERE name = 'check_system_tables'
FORMAT PrettyCompactNoEscapes;
@ -21,6 +21,9 @@ FROM system.columns
WHERE table = 'check_system_tables'
FORMAT PrettyCompactNoEscapes;
INSERT INTO check_system_tables VALUES (1, 1, 1);
SELECT total_bytes, total_rows FROM system.tables WHERE name = 'check_system_tables';
DROP TABLE IF EXISTS check_system_tables;
-- Check VersionedCollapsingMergeTree
@ -65,3 +68,33 @@ WHERE table = 'check_system_tables'
FORMAT PrettyCompactNoEscapes;
DROP TABLE IF EXISTS check_system_tables;
SELECT 'Check total_bytes/total_rows for TinyLog';
CREATE TABLE check_system_tables (key UInt8) ENGINE = TinyLog();
SELECT total_bytes, total_rows FROM system.tables WHERE name = 'check_system_tables';
INSERT INTO check_system_tables VALUES (1);
SELECT total_bytes, total_rows FROM system.tables WHERE name = 'check_system_tables';
DROP TABLE check_system_tables;
SELECT 'Check total_bytes/total_rows for Memory';
CREATE TABLE check_system_tables (key UInt16) ENGINE = Memory();
SELECT total_bytes, total_rows FROM system.tables WHERE name = 'check_system_tables';
INSERT INTO check_system_tables VALUES (1);
SELECT total_bytes, total_rows FROM system.tables WHERE name = 'check_system_tables';
DROP TABLE check_system_tables;
SELECT 'Check total_bytes/total_rows for Buffer';
CREATE TABLE check_system_tables_null (key UInt16) ENGINE = Null();
CREATE TABLE check_system_tables (key UInt16) ENGINE = Buffer(
currentDatabase(),
check_system_tables_null,
2,
0, 100, /* min_time /max_time */
100, 100, /* min_rows /max_rows */
0, 1e6 /* min_bytes/max_bytes */
);
SELECT total_bytes, total_rows FROM system.tables WHERE name = 'check_system_tables';
INSERT INTO check_system_tables SELECT * FROM numbers_mt(50);
SELECT total_bytes, total_rows FROM system.tables WHERE name = 'check_system_tables';
DROP TABLE check_system_tables;
DROP TABLE check_system_tables_null;

View File

@ -934,6 +934,16 @@ This table contains the following columns (the column type is shown in brackets)
- `sorting_key` (String) - The sorting key expression specified in the table.
- `primary_key` (String) - The primary key expression specified in the table.
- `sampling_key` (String) - The sampling key expression specified in the table.
- `storage_policy` (String) - The storage policy:
- [MergeTree](table_engines/mergetree.md#table_engine-mergetree-multiple-volumes)
- [Distributed](table_engines/distributed.md#distributed)
- `total_rows` (Nullable(UInt64)) - Total number of rows, if it is possible to quickly determine exact number of rows in the table, otherwise `Null` (including underying `Buffer` table).
- `total_bytes` (Nullable(UInt64)) - Total number of bytes, if it is possible to quickly determine exact number of bytes for the table on storage, otherwise `Null` (**does not** includes any underlying storage).
- If the table stores data on disk, returns used space on disk (i.e. compressed).
- If the table stores data in memory, returns approximated number of used bytes in memory.
The `system.tables` table is used in `SHOW TABLES` query implementation.