mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-29 11:02:08 +00:00
added system.parts_columns as a copy of system.parts [#CLICKHOUSE-3328]
This commit is contained in:
parent
4c3dfb97df
commit
de62ae6b1b
@ -274,19 +274,27 @@ MergeTreeDataPartChecksums MergeTreeDataPartChecksums::parse(const String & s)
|
||||
return res;
|
||||
}
|
||||
|
||||
|
||||
const MergeTreeDataPartChecksums::Checksum * MergeTreeDataPart::tryGetBinChecksum(const String & name) const
|
||||
const MergeTreeDataPartChecksums::Checksum * MergeTreeDataPart::tryGetChecksum(const String & name, const String & ext) const
|
||||
{
|
||||
if (checksums.empty())
|
||||
return nullptr;
|
||||
|
||||
const auto & files = checksums.files;
|
||||
const auto bin_file_name = escapeForFileName(name) + ".bin";
|
||||
auto it = files.find(bin_file_name);
|
||||
const auto file_name = escapeForFileName(name) + ext;
|
||||
auto it = files.find(file_name);
|
||||
|
||||
return (it == files.end()) ? nullptr : &it->second;
|
||||
}
|
||||
|
||||
const MergeTreeDataPartChecksums::Checksum * MergeTreeDataPart::tryGetBinChecksum(const String & name) const
|
||||
{
|
||||
return tryGetChecksum(name, ".bin");
|
||||
}
|
||||
|
||||
const MergeTreeDataPartChecksums::Checksum * MergeTreeDataPart::tryGetMrkChecksum(const String & name) const
|
||||
{
|
||||
return tryGetChecksum(name, ".mrk");
|
||||
}
|
||||
|
||||
static ReadBufferFromFile openForReading(const String & path)
|
||||
{
|
||||
@ -399,6 +407,13 @@ size_t MergeTreeDataPart::getColumnUncompressedSize(const String & name) const
|
||||
}
|
||||
|
||||
|
||||
size_t MergeTreeDataPart::getColumnMrkSize(const String & name) const
|
||||
{
|
||||
const Checksum * checksum = tryGetMrkChecksum(name);
|
||||
return checksum ? checksum->file_size : 0;
|
||||
}
|
||||
|
||||
|
||||
/** Returns the name of a column with minimum compressed size (as returned by getColumnSize()).
|
||||
* If no checksums are present returns the name of the first physically existing column.
|
||||
*/
|
||||
@ -925,6 +940,18 @@ size_t MergeTreeDataPart::getIndexSizeInAllocatedBytes() const
|
||||
return res;
|
||||
}
|
||||
|
||||
size_t MergeTreeDataPart::getTotalMrkSizeInBytes() const
|
||||
{
|
||||
size_t res = 0;
|
||||
for (const NameAndTypePair & it : columns)
|
||||
{
|
||||
const Checksum * checksum = tryGetMrkChecksum(it.name);
|
||||
if (checksum)
|
||||
res += checksum->file_size;
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
String MergeTreeDataPart::stateToString(MergeTreeDataPart::State state)
|
||||
{
|
||||
switch (state)
|
||||
|
@ -97,12 +97,17 @@ struct MergeTreeDataPart
|
||||
|
||||
MergeTreeDataPart(MergeTreeData & storage_, const String & name_);
|
||||
|
||||
const Checksum * tryGetChecksum(const String & name, const String & ext) const;
|
||||
/// Returns checksum of column's binary file.
|
||||
const Checksum * tryGetBinChecksum(const String & name) const;
|
||||
/// Returns checksum of column's mrk file.
|
||||
const Checksum * tryGetMrkChecksum(const String & name) const;
|
||||
|
||||
/// Returns the size of .bin file for column `name` if found, zero otherwise
|
||||
size_t getColumnCompressedSize(const String & name) const;
|
||||
size_t getColumnUncompressedSize(const String & name) const;
|
||||
/// Returns the size of .mrk file for column `name` if found, zero otherwise
|
||||
size_t getColumnMrkSize(const String & name) const;
|
||||
|
||||
/// Returns the name of a column with minimum compressed size (as returned by getColumnSize()).
|
||||
/// If no checksums are present returns the name of the first physically existing column.
|
||||
@ -294,6 +299,8 @@ struct MergeTreeDataPart
|
||||
/// For data in RAM ('index')
|
||||
size_t getIndexSizeInBytes() const;
|
||||
size_t getIndexSizeInAllocatedBytes() const;
|
||||
/// Total size of *.mrk files
|
||||
size_t getTotalMrkSizeInBytes() const;
|
||||
|
||||
private:
|
||||
/// Reads columns names and types from columns.txt
|
||||
|
282
dbms/src/Storages/System/StorageSystemPartsColumns.cpp
Normal file
282
dbms/src/Storages/System/StorageSystemPartsColumns.cpp
Normal file
@ -0,0 +1,282 @@
|
||||
#include <Common/escapeForFileName.h>
|
||||
#include <Columns/ColumnString.h>
|
||||
#include <DataTypes/DataTypeString.h>
|
||||
#include <DataTypes/DataTypesNumber.h>
|
||||
#include <DataTypes/DataTypeDateTime.h>
|
||||
#include <DataTypes/DataTypeDate.h>
|
||||
#include <DataStreams/OneBlockInputStream.h>
|
||||
#include <Storages/System/StorageSystemPartsColumns.h>
|
||||
#include <Storages/StorageMergeTree.h>
|
||||
#include <Storages/StorageReplicatedMergeTree.h>
|
||||
#include <Storages/VirtualColumnUtils.h>
|
||||
#include <Databases/IDatabase.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
|
||||
StorageSystemPartsColumns::StorageSystemPartsColumns(const std::string & name_)
|
||||
: name(name_),
|
||||
columns
|
||||
{
|
||||
{"partition", std::make_shared<DataTypeString>()},
|
||||
{"name", std::make_shared<DataTypeString>()},
|
||||
{"active", std::make_shared<DataTypeUInt8>()},
|
||||
{"marks", std::make_shared<DataTypeUInt64>()},
|
||||
{"marks_size", std::make_shared<DataTypeUInt64>()},
|
||||
{"rows", std::make_shared<DataTypeUInt64>()},
|
||||
{"bytes", std::make_shared<DataTypeUInt64>()},
|
||||
{"modification_time", std::make_shared<DataTypeDateTime>()},
|
||||
{"remove_time", std::make_shared<DataTypeDateTime>()},
|
||||
{"refcount", std::make_shared<DataTypeUInt32>()},
|
||||
{"min_date", std::make_shared<DataTypeDate>()},
|
||||
{"max_date", std::make_shared<DataTypeDate>()},
|
||||
{"min_block_number", std::make_shared<DataTypeInt64>()},
|
||||
{"max_block_number", std::make_shared<DataTypeInt64>()},
|
||||
{"level", std::make_shared<DataTypeUInt32>()},
|
||||
{"primary_key_bytes_in_memory", std::make_shared<DataTypeUInt64>()},
|
||||
{"primary_key_bytes_in_memory_allocated", std::make_shared<DataTypeUInt64>()},
|
||||
|
||||
{"database", std::make_shared<DataTypeString>()},
|
||||
{"table", std::make_shared<DataTypeString>()},
|
||||
{"engine", std::make_shared<DataTypeString>()}
|
||||
}
|
||||
{
|
||||
}
|
||||
|
||||
|
||||
BlockInputStreams StorageSystemPartsColumns::read(
|
||||
const Names & column_names,
|
||||
const SelectQueryInfo & query_info,
|
||||
const Context & context,
|
||||
QueryProcessingStage::Enum & processed_stage,
|
||||
const size_t /*max_block_size*/,
|
||||
const unsigned /*num_streams*/)
|
||||
{
|
||||
bool has_state_column = false;
|
||||
Names real_column_names;
|
||||
|
||||
for (const String & column_name : column_names)
|
||||
{
|
||||
if (column_name == "_state")
|
||||
has_state_column = true;
|
||||
else
|
||||
real_column_names.emplace_back(column_name);
|
||||
}
|
||||
|
||||
/// Do not check if only _state column is requested
|
||||
if (!(has_state_column && real_column_names.empty()))
|
||||
check(real_column_names);
|
||||
|
||||
processed_stage = QueryProcessingStage::FetchColumns;
|
||||
|
||||
/// Will apply WHERE to subset of columns and then add more columns.
|
||||
/// This is kind of complicated, but we use WHERE to do less work.
|
||||
|
||||
Block block_to_filter;
|
||||
|
||||
std::map<std::pair<String, String>, StoragePtr> storages;
|
||||
|
||||
{
|
||||
Databases databases = context.getDatabases();
|
||||
|
||||
/// Add column 'database'.
|
||||
ColumnPtr database_column = std::make_shared<ColumnString>();
|
||||
for (const auto & database : databases)
|
||||
database_column->insert(database.first);
|
||||
block_to_filter.insert(ColumnWithTypeAndName(database_column, std::make_shared<DataTypeString>(), "database"));
|
||||
|
||||
/// Filter block_to_filter with column 'database'.
|
||||
VirtualColumnUtils::filterBlockWithQuery(query_info.query, block_to_filter, context);
|
||||
|
||||
if (!block_to_filter.rows())
|
||||
return BlockInputStreams();
|
||||
|
||||
/// Add columns 'table', 'engine', 'active'
|
||||
database_column = block_to_filter.getByName("database").column;
|
||||
size_t rows = database_column->size();
|
||||
|
||||
ColumnPtr table_column = std::make_shared<ColumnString>();
|
||||
ColumnPtr engine_column = std::make_shared<ColumnString>();
|
||||
ColumnPtr active_column = std::make_shared<ColumnUInt8>();
|
||||
|
||||
for (size_t i = 0; i < rows; ++i)
|
||||
{
|
||||
String database_name = (*database_column)[i].get<String>();
|
||||
const DatabasePtr database = databases.at(database_name);
|
||||
|
||||
for (auto iterator = database->getIterator(context); iterator->isValid(); iterator->next())
|
||||
{
|
||||
String table_name = iterator->name();
|
||||
StoragePtr storage = iterator->table();
|
||||
String engine_name = storage->getName();
|
||||
|
||||
if (!dynamic_cast<StorageMergeTree *>(&*storage) &&
|
||||
!dynamic_cast<StorageReplicatedMergeTree *>(&*storage))
|
||||
continue;
|
||||
|
||||
storages[std::make_pair(database_name, iterator->name())] = storage;
|
||||
|
||||
/// Add all combinations of flag 'active'.
|
||||
for (auto active : {0, 1})
|
||||
{
|
||||
table_column->insert(table_name);
|
||||
engine_column->insert(engine_name);
|
||||
active_column->insert(active);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
block_to_filter.insert(ColumnWithTypeAndName(table_column, std::make_shared<DataTypeString>(), "table"));
|
||||
block_to_filter.insert(ColumnWithTypeAndName(engine_column, std::make_shared<DataTypeString>(), "engine"));
|
||||
block_to_filter.insert(ColumnWithTypeAndName(active_column, std::make_shared<DataTypeUInt8>(), "active"));
|
||||
}
|
||||
|
||||
/// Filter block_to_filter with columns 'database', 'table', 'engine', 'active'.
|
||||
VirtualColumnUtils::filterBlockWithQuery(query_info.query, block_to_filter, context);
|
||||
|
||||
/// If all was filtered out.
|
||||
if (!block_to_filter.rows())
|
||||
return {};
|
||||
|
||||
ColumnPtr filtered_database_column = block_to_filter.getByName("database").column;
|
||||
ColumnPtr filtered_table_column = block_to_filter.getByName("table").column;
|
||||
ColumnPtr filtered_active_column = block_to_filter.getByName("active").column;
|
||||
|
||||
/// Finally, create the result.
|
||||
|
||||
Block block = getSampleBlock();
|
||||
if (has_state_column)
|
||||
block.insert(ColumnWithTypeAndName(std::make_shared<DataTypeString>(), "_state"));
|
||||
|
||||
for (size_t i = 0; i < filtered_database_column->size();)
|
||||
{
|
||||
String database = (*filtered_database_column)[i].get<String>();
|
||||
String table = (*filtered_table_column)[i].get<String>();
|
||||
|
||||
/// What 'active' value we need.
|
||||
bool need[2]{}; /// [active]
|
||||
for (; i < filtered_database_column->size() &&
|
||||
(*filtered_database_column)[i].get<String>() == database &&
|
||||
(*filtered_table_column)[i].get<String>() == table; ++i)
|
||||
{
|
||||
bool active = (*filtered_active_column)[i].get<UInt64>() != 0;
|
||||
need[active] = true;
|
||||
}
|
||||
|
||||
StoragePtr storage = storages.at(std::make_pair(database, table));
|
||||
TableStructureReadLockPtr table_lock;
|
||||
|
||||
try
|
||||
{
|
||||
table_lock = storage->lockStructure(false, __PRETTY_FUNCTION__); /// For table not to be dropped.
|
||||
}
|
||||
catch (const Exception & e)
|
||||
{
|
||||
/** There are case when IStorage::drop was called,
|
||||
* but we still own the object.
|
||||
* Then table will throw exception at attempt to lock it.
|
||||
* Just skip the table.
|
||||
*/
|
||||
if (e.code() == ErrorCodes::TABLE_IS_DROPPED)
|
||||
continue;
|
||||
|
||||
throw;
|
||||
}
|
||||
|
||||
String engine = storage->getName();
|
||||
|
||||
MergeTreeData * data = nullptr;
|
||||
|
||||
if (auto merge_tree = dynamic_cast<StorageMergeTree *>(&*storage))
|
||||
{
|
||||
data = &merge_tree->getData();
|
||||
}
|
||||
else if (auto replicated_merge_tree = dynamic_cast<StorageReplicatedMergeTree *>(&*storage))
|
||||
{
|
||||
data = &replicated_merge_tree->getData();
|
||||
}
|
||||
else
|
||||
{
|
||||
throw Exception("Unknown engine " + engine, ErrorCodes::LOGICAL_ERROR);
|
||||
}
|
||||
|
||||
using State = MergeTreeDataPart::State;
|
||||
MergeTreeData::DataPartStateVector all_parts_state;
|
||||
MergeTreeData::DataPartsVector all_parts;
|
||||
|
||||
if (need[0])
|
||||
{
|
||||
/// If has_state_column is requested, return all states
|
||||
if (!has_state_column)
|
||||
all_parts = data->getDataPartsVector({State::Committed, State::Outdated}, &all_parts_state);
|
||||
else
|
||||
all_parts = data->getAllDataPartsVector(&all_parts_state);
|
||||
}
|
||||
else
|
||||
all_parts = data->getDataPartsVector({State::Committed}, &all_parts_state);
|
||||
|
||||
|
||||
/// Finally, we'll go through the list of parts.
|
||||
for (size_t part_number = 0; part_number < all_parts.size(); ++part_number)
|
||||
{
|
||||
const auto & part = all_parts[part_number];
|
||||
auto part_state = all_parts_state[part_number];
|
||||
|
||||
size_t j = 0;
|
||||
{
|
||||
WriteBufferFromOwnString out;
|
||||
part->partition.serializeTextQuoted(*data, out);
|
||||
block.getByPosition(j++).column->insert(out.str());
|
||||
}
|
||||
block.getByPosition(j++).column->insert(part->name);
|
||||
block.getByPosition(j++).column->insert(static_cast<UInt64>(part_state == State::Committed));
|
||||
block.getByPosition(j++).column->insert(static_cast<UInt64>(part->marks_count));
|
||||
block.getByPosition(j++).column->insert(static_cast<UInt64>(part->getTotalMrkSizeInBytes()));
|
||||
|
||||
block.getByPosition(j++).column->insert(static_cast<UInt64>(part->rows_count));
|
||||
block.getByPosition(j++).column->insert(static_cast<UInt64>(part->size_in_bytes));
|
||||
block.getByPosition(j++).column->insert(static_cast<UInt64>(part->modification_time));
|
||||
block.getByPosition(j++).column->insert(static_cast<UInt64>(part->remove_time));
|
||||
|
||||
/// For convenience, in returned refcount, don't add references that was due to local variables in this method: all_parts, active_parts.
|
||||
block.getByPosition(j++).column->insert(static_cast<UInt64>(part.use_count() - 1));
|
||||
|
||||
block.getByPosition(j++).column->insert(static_cast<UInt64>(part->getMinDate()));
|
||||
block.getByPosition(j++).column->insert(static_cast<UInt64>(part->getMaxDate()));
|
||||
block.getByPosition(j++).column->insert(part->info.min_block);
|
||||
block.getByPosition(j++).column->insert(part->info.max_block);
|
||||
block.getByPosition(j++).column->insert(static_cast<UInt64>(part->info.level));
|
||||
block.getByPosition(j++).column->insert(static_cast<UInt64>(part->getIndexSizeInBytes()));
|
||||
block.getByPosition(j++).column->insert(static_cast<UInt64>(part->getIndexSizeInAllocatedBytes()));
|
||||
|
||||
block.getByPosition(j++).column->insert(database);
|
||||
block.getByPosition(j++).column->insert(table);
|
||||
block.getByPosition(j++).column->insert(engine);
|
||||
|
||||
if (has_state_column)
|
||||
block.getByPosition(j++).column->insert(part->stateString());
|
||||
}
|
||||
}
|
||||
|
||||
return BlockInputStreams(1, std::make_shared<OneBlockInputStream>(block));
|
||||
}
|
||||
|
||||
NameAndTypePair StorageSystemPartsColumns::getColumn(const String & column_name) const
|
||||
{
|
||||
if (column_name == "_state")
|
||||
return NameAndTypePair("_state", std::make_shared<DataTypeString>());
|
||||
|
||||
return ITableDeclaration::getColumn(column_name);
|
||||
}
|
||||
|
||||
bool StorageSystemPartsColumns::hasColumn(const String & column_name) const
|
||||
{
|
||||
if (column_name == "_state")
|
||||
return true;
|
||||
|
||||
return ITableDeclaration::hasColumn(column_name);
|
||||
}
|
||||
|
||||
|
||||
}
|
43
dbms/src/Storages/System/StorageSystemPartsColumns.h
Normal file
43
dbms/src/Storages/System/StorageSystemPartsColumns.h
Normal file
@ -0,0 +1,43 @@
|
||||
#pragma once
|
||||
|
||||
#include <ext/shared_ptr_helper.h>
|
||||
#include <Storages/IStorage.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class Context;
|
||||
|
||||
|
||||
/** Implements system table 'parts' which allows to get information about data parts for tables of MergeTree family.
|
||||
*/
|
||||
class StorageSystemPartsColumns : public ext::shared_ptr_helper<StorageSystemPartsColumns>, public IStorage
|
||||
{
|
||||
public:
|
||||
std::string getName() const override { return "SystemPartsColumns"; }
|
||||
std::string getTableName() const override { return name; }
|
||||
|
||||
const NamesAndTypesList & getColumnsListImpl() const override { return columns; }
|
||||
|
||||
NameAndTypePair getColumn(const String & column_name) const override;
|
||||
|
||||
bool hasColumn(const String & column_name) const override;
|
||||
|
||||
BlockInputStreams read(
|
||||
const Names & column_names,
|
||||
const SelectQueryInfo & query_info,
|
||||
const Context & context,
|
||||
QueryProcessingStage::Enum & processed_stage,
|
||||
size_t max_block_size,
|
||||
unsigned num_streams) override;
|
||||
|
||||
private:
|
||||
const std::string name;
|
||||
NamesAndTypesList columns;
|
||||
|
||||
protected:
|
||||
StorageSystemPartsColumns(const std::string & name_);
|
||||
};
|
||||
|
||||
}
|
@ -15,6 +15,7 @@
|
||||
#include <Storages/System/StorageSystemNumbers.h>
|
||||
#include <Storages/System/StorageSystemOne.h>
|
||||
#include <Storages/System/StorageSystemParts.h>
|
||||
#include <Storages/System/StorageSystemPartsColumns.h>
|
||||
#include <Storages/System/StorageSystemProcesses.h>
|
||||
#include <Storages/System/StorageSystemReplicas.h>
|
||||
#include <Storages/System/StorageSystemReplicationQueue.h>
|
||||
@ -44,6 +45,7 @@ void attachSystemTablesServer(IDatabase & system_database, bool has_zookeeper)
|
||||
{
|
||||
attachSystemTablesLocal(system_database);
|
||||
system_database.attachTable("parts", StorageSystemParts::create("parts"));
|
||||
system_database.attachTable("parts_columns", StorageSystemPartsColumns::create("parts_columns"));
|
||||
system_database.attachTable("processes", StorageSystemProcesses::create("processes"));
|
||||
system_database.attachTable("metrics", StorageSystemMetrics::create("metrics"));
|
||||
system_database.attachTable("merges", StorageSystemMerges::create("merges"));
|
||||
|
Loading…
Reference in New Issue
Block a user